Write Data
Mutations change your database state. Add the transactional attribute for automatic transaction wrapping and rollback guarantees.
The Code
use forge::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}
#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;
sqlx::query_as(
"INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *"
)
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
What Happens
By default, mutations execute directly against the database pool without a transaction wrapper. For single-statement mutations, this is fine since individual SQL statements are atomic.
For multi-statement mutations or when dispatching jobs/workflows, add #[forge::mutation(transactional)] to wrap everything in a database transaction. If the function returns Err or panics, the entire transaction rolls back.
The generated TypeScript client calls this mutation as a type-safe RPC.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | - | Skip authentication |
require_role("x") | string | - | Require specific role |
timeout | seconds | 30 | Request timeout |
rate_limit(...) | - | - | Token bucket rate limiting |
transactional | flag | - | Wrap in database transaction with outbox pattern for jobs/workflows |
log | string | - | Log level override |
Patterns
Validation with Early Return
#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}
sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Partial Updates
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}
#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Dispatch Jobs from Mutations
When a mutation needs to trigger background work, use transactional to ensure atomicity:
#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
.fetch_one(ctx.db())
.await?;
ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;
Ok(order)
}
The transactional attribute enables the outbox pattern: jobs are inserted into the same transaction as your database changes. If the mutation fails, no jobs are dispatched.
Compile-time enforcement: If your mutation calls dispatch_job() or start_workflow() without transactional, the macro produces a compile error:
error: Mutations that call `dispatch_job()` or `start_workflow()` must use
#[forge::mutation(transactional)] to ensure atomicity. Without it,
jobs may be dispatched but database changes rolled back on error.
Start Workflows from Mutations
#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
.execute(ctx.db())
.await?;
let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;
Ok(workflow_id)
}
Call External APIs
#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;
let charge: StripeCharge = response.json().await?;
sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Role-Based Access
#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
.execute(ctx.db())
.await?;
Ok(result.rows_affected() > 0)
}
Rate-Limited Mutations
#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;
sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), global (all requests).
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | DbConn<'_> | Database connection (pool or transaction handle) |
ctx.http() | &reqwest::Client | HTTP client for external APIs |
ctx.require_user_id() | Result<Uuid> | Authenticated user ID or error |
ctx.require_subject() | Result<&str> | Raw subject claim (Firebase, Clerk) |
ctx.dispatch_job(name, args) | Result<Uuid> | Dispatch background job |
ctx.start_workflow(name, input) | Result<Uuid> | Start workflow |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_require(key) | Result<String> | Required environment variable |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.auth | AuthContext | Authentication context |
ctx.auth.user_id() | Option<Uuid> | User ID if authenticated |
ctx.auth.has_role(role) | bool | Check role membership |
ctx.auth.roles() | &[String] | All roles |
ctx.auth.claim(key) | Option<&Value> | Custom JWT claim |
ctx.request | RequestMetadata | Request metadata |
ctx.request.request_id | Uuid | Unique request ID |
ctx.request.trace_id | String | Distributed trace ID |
ctx.request.client_ip | Option<String> | Client IP address |
Under the Hood
Transactional Outbox Pattern
When you use #[forge::mutation(transactional)] with job dispatch:
BEGIN;
-- Your mutation SQL
INSERT INTO orders (product_id, user_id) VALUES (...);
-- Job dispatch (same transaction)
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');
COMMIT;
If the mutation fails anywhere, both the order and the job are rolled back. No orphaned jobs, no missing data. The job worker picks up pending jobs after commit.
This pattern guarantees exactly-once semantics for the mutation + job combination without requiring distributed transactions.
Circuit Breaker for HTTP
The HTTP client (ctx.http()) includes circuit breaker protection that tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy.
The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests fail immediately
- Half-Open: After backoff timeout, limited requests allowed to test recovery
Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.
Write-Ahead Logging
All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();
create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();
ctx.job_dispatch().assert_dispatched("process_order");
}
#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();
let result = charge_card(&ctx, ChargeInput { ... }).await;
ctx.http().assert_called("https://api.stripe.com/*");
}
#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user
let result = create_order(&ctx, input).await;
assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}
Test context builder methods:
.as_user(uuid)- Authenticate as user.with_role("admin")- Add role.with_claim("org_id", value)- Add JWT claim.with_pool(pool)- Use real database.mock_http(pattern, handler)- Mock HTTP responses.mock_http_json(pattern, value)- Mock JSON response.with_env(key, value)- Mock environment variable