Skip to main content

Write Data

Define mutations that change database state. Add the transactional attribute for transaction wrapping and rollback guarantees.

The Code

use forge::prelude::*;

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}

#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *")
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
)
.await
.map_err(Into::into)
}

What Happens

By default, mutations execute inside a database transaction. If the function returns Err, the entire transaction rolls back automatically.

To opt out of the transaction wrapper — for example, when running a single idempotent statement where the overhead is unnecessary — add transactional = false: #[forge::mutation(transactional = false)].

The generated TypeScript client calls this mutation as a type-safe RPC.

Attributes

AttributeTypeDefaultDescription
publicflag-Skip authentication
require_role("x")string-Require specific role
timeoutsecondsfrom [function] configRequest timeout (overrides global timeout_secs)
rate_limit(...)--Token bucket rate limiting
transactionalbooltrueWrap in database transaction with outbox pattern for jobs/workflows. Set transactional = false to opt out.
logstring-Log level override

Patterns

Validation with Early Return

#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
)
.await
.map_err(Into::into)
}

Partial Updates

#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}

#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
ctx.db().fetch_one(
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
)
.await
.map_err(Into::into)
}

Dispatch Jobs from Mutations

When a mutation needs to trigger background work, use transactional to ensure atomicity:

#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
).await?;

ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;

Ok(order)
}

The transactional attribute enables the outbox pattern: jobs are buffered in memory during the mutation and only inserted into the database after the transaction commits successfully. If the mutation fails, no jobs are dispatched.

Compile-time enforcement: If your mutation calls dispatch_job() or start_workflow() with transactional = false, you get an error at compile time:

error: Mutations that call `dispatch_job()` or `start_workflow()` must use
the transactional outbox pattern. Remove `transactional = false` or
avoid dispatching jobs/workflows from a non-transactional mutation.

Start Workflows from Mutations

#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
ctx.db().execute(
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
).await?;

let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;

Ok(workflow_id)
}

Call External APIs

#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;

let charge: StripeCharge = response.json().await?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
)
.await
.map_err(Into::into)
}

Role-Based Access

#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = ctx.db().execute(
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
).await?;

Ok(result.rows_affected() > 0)
}

Rate-Limited Mutations

#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;

ctx.db().fetch_one(
sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
)
.await
.map_err(Into::into)
}

Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), user_action (per user per function), global (all requests).

Context Methods

MethodReturn TypeDescription
ctx.db()DbConn<'_>Database connection (pool or transaction handle)
ctx.pool()&PgPoolDirect pool access for non-transactional operations
ctx.http()HttpClientCircuit-breaker-backed HTTP client for external calls
ctx.raw_http()&reqwest::ClientRaw HTTP client, bypasses circuit breaker
ctx.require_user_id()Result<Uuid>Authenticated user ID or error
ctx.require_subject()Result<&str>Raw subject claim (Firebase, Clerk)
ctx.dispatch_job(name, args)Result<Uuid>Dispatch background job
ctx.cancel_job(job_id, reason)Result<bool>Request cancellation for a job (reason is Option<String>)
ctx.issue_token(claims)Result<String>Issue HMAC-signed JWT token (requires HMAC auth config)
ctx.start_workflow(name, input)Result<Uuid>Start workflow
ctx.env(key)Option<String>Environment variable
ctx.env_or(key, default)StringEnvironment variable with default
ctx.env_require(key)Result<String>Required environment variable
ctx.env_parse::<T>(key)Result<T>Parse environment variable
ctx.env_parse_or(key, default)Result<T>Parse environment variable with default
ctx.env_contains(key)boolCheck if environment variable is set

Context Fields

FieldTypeDescription
ctx.authAuthContextAuthentication context
ctx.auth.user_id()Option<Uuid>User ID if authenticated
ctx.auth.has_role(role)boolCheck role membership
ctx.auth.roles()&[String]All roles
ctx.auth.claim(key)Option<&Value>Custom JWT claim
ctx.requestRequestMetadataRequest metadata
ctx.request.request_idUuidUnique request ID
ctx.request.trace_idStringDistributed trace ID
ctx.request.client_ipOption<String>Client IP address

Under the Hood

Transactional Outbox Pattern

When you use #[forge::mutation(transactional)] with job dispatch:

-- Step 1: Your mutation runs inside a transaction
BEGIN;
INSERT INTO orders (product_id, user_id) VALUES (...);
COMMIT;

-- Step 2: Only after successful commit, buffered jobs are inserted
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');

If the mutation fails, the transaction rolls back and no jobs are inserted. Jobs are buffered in memory during the transaction and flushed to the database only after a successful commit. This avoids orphaned jobs without requiring distributed transactions.

Circuit Breaker for HTTP

ctx.http() returns an HttpClient backed by the circuit breaker, which tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy. Use ctx.raw_http() only when you need streaming responses or a custom redirect policy — it bypasses the circuit breaker.

The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests fail immediately
  • Half-Open: After backoff timeout, limited requests allowed to test recovery

Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.

Write-Ahead Logging

All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();

create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();

ctx.job_dispatch().assert_dispatched("process_order");
}

#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();

let result = charge_card(&ctx, ChargeInput { ... }).await;

ctx.http().assert_called("https://api.stripe.com/*");
}

#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user

let result = create_order(&ctx, input).await;

assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}

Test context builder methods:

  • .as_user(uuid) - Authenticate as user
  • .with_role("admin") - Add role
  • .with_claim("org_id", value) - Add JWT claim
  • .with_pool(pool) - Use real database
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable