Skip to main content

Write Data

Mutations change your database state. Add the transactional attribute for automatic transaction wrapping and rollback guarantees.

The Code

use forge::prelude::*;

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}

#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;

sqlx::query_as(
"INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *"
)
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

What Happens

By default, mutations execute directly against the database pool without a transaction wrapper. For single-statement mutations, this is fine since individual SQL statements are atomic.

For multi-statement mutations or when dispatching jobs/workflows, add #[forge::mutation(transactional)] to wrap everything in a database transaction. If the function returns Err or panics, the entire transaction rolls back.

The generated TypeScript client calls this mutation as a type-safe RPC.

Attributes

AttributeTypeDefaultDescription
publicflag-Skip authentication
require_role("x")string-Require specific role
timeoutseconds30Request timeout
rate_limit(...)--Token bucket rate limiting
transactionalflag-Wrap in database transaction with outbox pattern for jobs/workflows
logstring-Log level override

Patterns

Validation with Early Return

#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}

sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Partial Updates

#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}

#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Dispatch Jobs from Mutations

When a mutation needs to trigger background work, use transactional to ensure atomicity:

#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
.fetch_one(ctx.db())
.await?;

ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;

Ok(order)
}

The transactional attribute enables the outbox pattern: jobs are inserted into the same transaction as your database changes. If the mutation fails, no jobs are dispatched.

Compile-time enforcement: If your mutation calls dispatch_job() or start_workflow() without transactional, the macro produces a compile error:

error: Mutations that call `dispatch_job()` or `start_workflow()` must use
#[forge::mutation(transactional)] to ensure atomicity. Without it,
jobs may be dispatched but database changes rolled back on error.

Start Workflows from Mutations

#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
.execute(ctx.db())
.await?;

let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;

Ok(workflow_id)
}

Call External APIs

#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;

let charge: StripeCharge = response.json().await?;

sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Role-Based Access

#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
.execute(ctx.db())
.await?;

Ok(result.rows_affected() > 0)
}

Rate-Limited Mutations

#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;

sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), global (all requests).

Context Methods

MethodReturn TypeDescription
ctx.db()DbConn<'_>Database connection (pool or transaction handle)
ctx.http()&reqwest::ClientHTTP client for external APIs
ctx.require_user_id()Result<Uuid>Authenticated user ID or error
ctx.require_subject()Result<&str>Raw subject claim (Firebase, Clerk)
ctx.dispatch_job(name, args)Result<Uuid>Dispatch background job
ctx.start_workflow(name, input)Result<Uuid>Start workflow
ctx.env(key)Option<String>Environment variable
ctx.env_require(key)Result<String>Required environment variable

Context Fields

FieldTypeDescription
ctx.authAuthContextAuthentication context
ctx.auth.user_id()Option<Uuid>User ID if authenticated
ctx.auth.has_role(role)boolCheck role membership
ctx.auth.roles()&[String]All roles
ctx.auth.claim(key)Option<&Value>Custom JWT claim
ctx.requestRequestMetadataRequest metadata
ctx.request.request_idUuidUnique request ID
ctx.request.trace_idStringDistributed trace ID
ctx.request.client_ipOption<String>Client IP address

Under the Hood

Transactional Outbox Pattern

When you use #[forge::mutation(transactional)] with job dispatch:

BEGIN;
-- Your mutation SQL
INSERT INTO orders (product_id, user_id) VALUES (...);

-- Job dispatch (same transaction)
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');
COMMIT;

If the mutation fails anywhere, both the order and the job are rolled back. No orphaned jobs, no missing data. The job worker picks up pending jobs after commit.

This pattern guarantees exactly-once semantics for the mutation + job combination without requiring distributed transactions.

Circuit Breaker for HTTP

The HTTP client (ctx.http()) includes circuit breaker protection that tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy.

The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests fail immediately
  • Half-Open: After backoff timeout, limited requests allowed to test recovery

Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.

Write-Ahead Logging

All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();

create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();

ctx.job_dispatch().assert_dispatched("process_order");
}

#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();

let result = charge_card(&ctx, ChargeInput { ... }).await;

ctx.http().assert_called("https://api.stripe.com/*");
}

#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user

let result = create_order(&ctx, input).await;

assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}

Test context builder methods:

  • .as_user(uuid) - Authenticate as user
  • .with_role("admin") - Add role
  • .with_claim("org_id", value) - Add JWT claim
  • .with_pool(pool) - Use real database
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable