Write Data
Define mutations that change database state. Add the transactional attribute for transaction wrapping and rollback guarantees.
The Code
use forge::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}
#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *")
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
)
.await
.map_err(Into::into)
}
What Happens
By default, mutations execute directly against the database pool without a transaction wrapper. For single-statement mutations, this is fine since individual SQL statements are atomic.
For multi-statement mutations or when dispatching jobs/workflows, add #[forge::mutation(transactional)] to wrap everything in a database transaction. If the function returns Err or panics, the entire transaction rolls back.
The generated TypeScript client calls this mutation as a type-safe RPC.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | - | Skip authentication |
require_role("x") | string | - | Require specific role |
timeout | seconds | from [function] config | Request timeout (overrides global timeout_secs) |
rate_limit(...) | - | - | Token bucket rate limiting |
transactional | flag | - | Wrap in database transaction with outbox pattern for jobs/workflows |
log | string | - | Log level override |
Patterns
Validation with Early Return
#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
)
.await
.map_err(Into::into)
}
Partial Updates
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}
#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
ctx.db().fetch_one(
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
)
.await
.map_err(Into::into)
}
Dispatch Jobs from Mutations
When a mutation needs to trigger background work, use transactional to ensure atomicity:
#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
).await?;
ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;
Ok(order)
}
The transactional attribute enables the outbox pattern: jobs are buffered in memory during the mutation and only inserted into the database after the transaction commits successfully. If the mutation fails, no jobs are dispatched.
Runtime enforcement: If your mutation calls dispatch_job() or start_workflow() without transactional, you get an error at runtime:
error: Mutations that call `dispatch_job()` or `start_workflow()` must use
#[forge::mutation(transactional)] to ensure atomicity. Without it,
jobs may be dispatched but database changes rolled back on error.
Start Workflows from Mutations
#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
ctx.db().execute(
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
).await?;
let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;
Ok(workflow_id)
}
Call External APIs
#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;
let charge: StripeCharge = response.json().await?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
)
.await
.map_err(Into::into)
}
Role-Based Access
#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = ctx.db().execute(
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
).await?;
Ok(result.rows_affected() > 0)
}
Rate-Limited Mutations
#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
)
.await
.map_err(Into::into)
}
Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), user_action (per user per function), global (all requests).
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | DbConn<'_> | Database connection (pool or transaction handle) |
ctx.pool() | &PgPool | Direct pool access for non-transactional operations |
ctx.http() | &reqwest::Client | HTTP client for external APIs |
ctx.require_user_id() | Result<Uuid> | Authenticated user ID or error |
ctx.require_subject() | Result<&str> | Raw subject claim (Firebase, Clerk) |
ctx.dispatch_job(name, args) | Result<Uuid> | Dispatch background job |
ctx.cancel_job(job_id, reason) | Result<bool> | Request cancellation for a job (reason is Option<String>) |
ctx.issue_token(claims) | Result<String> | Issue HMAC-signed JWT token (requires HMAC auth config) |
ctx.start_workflow(name, input) | Result<Uuid> | Start workflow |
ctx.dispatch_job_with_context(name, args, context) | Result<Uuid> | Dispatch job with additional JSON context |
ctx.http_with_circuit_breaker() | &CircuitBreakerClient | HTTP client with circuit breaker |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_or(key, default) | String | Environment variable with default |
ctx.env_require(key) | Result<String> | Required environment variable |
ctx.env_parse::<T>(key) | Result<T> | Parse environment variable |
ctx.env_parse_or(key, default) | Result<T> | Parse environment variable with default |
ctx.env_contains(key) | bool | Check if environment variable is set |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.auth | AuthContext | Authentication context |
ctx.auth.user_id() | Option<Uuid> | User ID if authenticated |
ctx.auth.has_role(role) | bool | Check role membership |
ctx.auth.roles() | &[String] | All roles |
ctx.auth.claim(key) | Option<&Value> | Custom JWT claim |
ctx.request | RequestMetadata | Request metadata |
ctx.request.request_id | Uuid | Unique request ID |
ctx.request.trace_id | String | Distributed trace ID |
ctx.request.client_ip | Option<String> | Client IP address |
Under the Hood
Transactional Outbox Pattern
When you use #[forge::mutation(transactional)] with job dispatch:
-- Step 1: Your mutation runs inside a transaction
BEGIN;
INSERT INTO orders (product_id, user_id) VALUES (...);
COMMIT;
-- Step 2: Only after successful commit, buffered jobs are inserted
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');
If the mutation fails, the transaction rolls back and no jobs are inserted. Jobs are buffered in memory during the transaction and flushed to the database only after a successful commit. This avoids orphaned jobs without requiring distributed transactions.
Circuit Breaker for HTTP
The ctx.http() method returns a raw reqwest::Client for external calls. For circuit breaker protection, use ctx.http_with_circuit_breaker() which tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy.
The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests fail immediately
- Half-Open: After backoff timeout, limited requests allowed to test recovery
Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.
Write-Ahead Logging
All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();
create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();
ctx.job_dispatch().assert_dispatched("process_order");
}
#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();
let result = charge_card(&ctx, ChargeInput { ... }).await;
ctx.http().assert_called("https://api.stripe.com/*");
}
#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user
let result = create_order(&ctx, input).await;
assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}
Test context builder methods:
.as_user(uuid)- Authenticate as user.with_role("admin")- Add role.with_claim("org_id", value)- Add JWT claim.with_pool(pool)- Use real database.mock_http(pattern, handler)- Mock HTTP responses.mock_http_json(pattern, value)- Mock JSON response.with_env(key, value)- Mock environment variable