Skip to main content

Write Data

Define mutations that change database state. Add the transactional attribute for transaction wrapping and rollback guarantees.

The Code

use forge::prelude::*;

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}

#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *")
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
)
.await
.map_err(Into::into)
}

What Happens

By default, mutations execute directly against the database pool without a transaction wrapper. For single-statement mutations, this is fine since individual SQL statements are atomic.

For multi-statement mutations or when dispatching jobs/workflows, add #[forge::mutation(transactional)] to wrap everything in a database transaction. If the function returns Err or panics, the entire transaction rolls back.

The generated TypeScript client calls this mutation as a type-safe RPC.

Attributes

AttributeTypeDefaultDescription
publicflag-Skip authentication
require_role("x")string-Require specific role
timeoutsecondsfrom [function] configRequest timeout (overrides global timeout_secs)
rate_limit(...)--Token bucket rate limiting
transactionalflag-Wrap in database transaction with outbox pattern for jobs/workflows
logstring-Log level override

Patterns

Validation with Early Return

#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
)
.await
.map_err(Into::into)
}

Partial Updates

#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}

#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
ctx.db().fetch_one(
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
)
.await
.map_err(Into::into)
}

Dispatch Jobs from Mutations

When a mutation needs to trigger background work, use transactional to ensure atomicity:

#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
).await?;

ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;

Ok(order)
}

The transactional attribute enables the outbox pattern: jobs are buffered in memory during the mutation and only inserted into the database after the transaction commits successfully. If the mutation fails, no jobs are dispatched.

Runtime enforcement: If your mutation calls dispatch_job() or start_workflow() without transactional, you get an error at runtime:

error: Mutations that call `dispatch_job()` or `start_workflow()` must use
#[forge::mutation(transactional)] to ensure atomicity. Without it,
jobs may be dispatched but database changes rolled back on error.

Start Workflows from Mutations

#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
ctx.db().execute(
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
).await?;

let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;

Ok(workflow_id)
}

Call External APIs

#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;

let charge: StripeCharge = response.json().await?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
)
.await
.map_err(Into::into)
}

Role-Based Access

#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = ctx.db().execute(
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
).await?;

Ok(result.rows_affected() > 0)
}

Rate-Limited Mutations

#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
)
.await
.map_err(Into::into)
}

Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), user_action (per user per function), global (all requests).

Context Methods

MethodReturn TypeDescription
ctx.db()DbConn<'_>Database connection (pool or transaction handle)
ctx.pool()&PgPoolDirect pool access for non-transactional operations
ctx.http()&reqwest::ClientHTTP client for external APIs
ctx.require_user_id()Result<Uuid>Authenticated user ID or error
ctx.require_subject()Result<&str>Raw subject claim (Firebase, Clerk)
ctx.dispatch_job(name, args)Result<Uuid>Dispatch background job
ctx.cancel_job(job_id, reason)Result<bool>Request cancellation for a job (reason is Option<String>)
ctx.issue_token(claims)Result<String>Issue HMAC-signed JWT token (requires HMAC auth config)
ctx.start_workflow(name, input)Result<Uuid>Start workflow
ctx.dispatch_job_with_context(name, args, context)Result<Uuid>Dispatch job with additional JSON context
ctx.http_with_circuit_breaker()&CircuitBreakerClientHTTP client with circuit breaker
ctx.env(key)Option<String>Environment variable
ctx.env_or(key, default)StringEnvironment variable with default
ctx.env_require(key)Result<String>Required environment variable
ctx.env_parse::<T>(key)Result<T>Parse environment variable
ctx.env_parse_or(key, default)Result<T>Parse environment variable with default
ctx.env_contains(key)boolCheck if environment variable is set

Context Fields

FieldTypeDescription
ctx.authAuthContextAuthentication context
ctx.auth.user_id()Option<Uuid>User ID if authenticated
ctx.auth.has_role(role)boolCheck role membership
ctx.auth.roles()&[String]All roles
ctx.auth.claim(key)Option<&Value>Custom JWT claim
ctx.requestRequestMetadataRequest metadata
ctx.request.request_idUuidUnique request ID
ctx.request.trace_idStringDistributed trace ID
ctx.request.client_ipOption<String>Client IP address

Under the Hood

Transactional Outbox Pattern

When you use #[forge::mutation(transactional)] with job dispatch:

-- Step 1: Your mutation runs inside a transaction
BEGIN;
INSERT INTO orders (product_id, user_id) VALUES (...);
COMMIT;

-- Step 2: Only after successful commit, buffered jobs are inserted
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');

If the mutation fails, the transaction rolls back and no jobs are inserted. Jobs are buffered in memory during the transaction and flushed to the database only after a successful commit. This avoids orphaned jobs without requiring distributed transactions.

Circuit Breaker for HTTP

The ctx.http() method returns a raw reqwest::Client for external calls. For circuit breaker protection, use ctx.http_with_circuit_breaker() which tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy.

The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests fail immediately
  • Half-Open: After backoff timeout, limited requests allowed to test recovery

Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.

Write-Ahead Logging

All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();

create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();

ctx.job_dispatch().assert_dispatched("process_order");
}

#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();

let result = charge_card(&ctx, ChargeInput { ... }).await;

ctx.http().assert_called("https://api.stripe.com/*");
}

#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user

let result = create_order(&ctx, input).await;

assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}

Test context builder methods:

  • .as_user(uuid) - Authenticate as user
  • .with_role("admin") - Add role
  • .with_claim("org_id", value) - Add JWT claim
  • .with_pool(pool) - Use real database
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable