Write Data
Define mutations that change database state. Add the transactional attribute for transaction wrapping and rollback guarantees.
The Code
use forge::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateOrderInput {
pub product_id: Uuid,
pub quantity: i32,
}
#[forge::mutation]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let user_id = ctx.require_user_id()?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (user_id, product_id, quantity) VALUES ($1, $2, $3) RETURNING *")
.bind(user_id)
.bind(input.product_id)
.bind(input.quantity)
)
.await
.map_err(Into::into)
}
What Happens
By default, mutations execute inside a database transaction. If the function returns Err, the entire transaction rolls back automatically.
To opt out of the transaction wrapper — for example, when running a single idempotent statement where the overhead is unnecessary — add transactional = false: #[forge::mutation(transactional = false)].
The generated TypeScript client calls this mutation as a type-safe RPC.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | - | Skip authentication |
require_role("x") | string | - | Require specific role |
timeout | seconds | from [function] config | Request timeout (overrides global timeout_secs) |
rate_limit(...) | - | - | Token bucket rate limiting |
transactional | bool | true | Wrap in database transaction with outbox pattern for jobs/workflows. Set transactional = false to opt out. |
log | string | - | Log level override |
Patterns
Validation with Early Return
#[forge::mutation(public)]
pub async fn create_todo(ctx: &MutationContext, title: String) -> Result<Todo> {
if title.trim().is_empty() {
return Err(ForgeError::Validation("Title cannot be empty".into()));
}
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO todos (title) VALUES ($1) RETURNING *")
.bind(title.trim())
)
.await
.map_err(Into::into)
}
Partial Updates
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateUserInput {
pub id: Uuid,
pub email: Option<String>,
pub name: Option<String>,
}
#[forge::mutation]
pub async fn update_user(ctx: &MutationContext, input: UpdateUserInput) -> Result<User> {
ctx.db().fetch_one(
sqlx::query_as(
"UPDATE users SET
email = COALESCE($2, email),
name = COALESCE($3, name),
updated_at = NOW()
WHERE id = $1 RETURNING *"
)
.bind(input.id)
.bind(input.email)
.bind(input.name)
)
.await
.map_err(Into::into)
}
Dispatch Jobs from Mutations
When a mutation needs to trigger background work, use transactional to ensure atomicity:
#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, input: CreateOrderInput) -> Result<Order> {
let order = ctx.db().fetch_one(
sqlx::query_as("INSERT INTO orders (...) VALUES (...) RETURNING *")
).await?;
ctx.dispatch_job("process_order", json!({ "order_id": order.id })).await?;
ctx.dispatch_job("send_confirmation", json!({ "email": order.email })).await?;
Ok(order)
}
The transactional attribute enables the outbox pattern: jobs are buffered in memory during the mutation and only inserted into the database after the transaction commits successfully. If the mutation fails, no jobs are dispatched.
Compile-time enforcement: If your mutation calls dispatch_job() or start_workflow() with transactional = false, you get an error at compile time:
error: Mutations that call `dispatch_job()` or `start_workflow()` must use
the transactional outbox pattern. Remove `transactional = false` or
avoid dispatching jobs/workflows from a non-transactional mutation.
Start Workflows from Mutations
#[forge::mutation(transactional)]
pub async fn start_trial(ctx: &MutationContext, user_id: Uuid) -> Result<Uuid> {
ctx.db().execute(
sqlx::query("UPDATE users SET trial_started = NOW() WHERE id = $1")
.bind(user_id)
).await?;
let workflow_id = ctx.start_workflow("trial_flow", json!({ "user_id": user_id })).await?;
Ok(workflow_id)
}
Call External APIs
#[forge::mutation]
pub async fn charge_card(ctx: &MutationContext, input: ChargeInput) -> Result<Payment> {
let response = ctx.http()
.post("https://api.stripe.com/v1/charges")
.bearer_auth(ctx.env_require("STRIPE_SECRET_KEY")?)
.json(&json!({
"amount": input.amount,
"currency": "usd",
"source": input.token,
}))
.send()
.await?;
let charge: StripeCharge = response.json().await?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO payments (...) VALUES (...) RETURNING *")
.bind(charge.id)
)
.await
.map_err(Into::into)
}
Role-Based Access
#[forge::mutation(require_role("admin"))]
pub async fn delete_user(ctx: &MutationContext, user_id: Uuid) -> Result<bool> {
let result = ctx.db().execute(
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
).await?;
Ok(result.rows_affected() > 0)
}
Rate-Limited Mutations
#[forge::mutation(rate_limit(requests = 10, per = "1m", key = "user"))]
pub async fn submit_comment(ctx: &MutationContext, input: CommentInput) -> Result<Comment> {
let user_id = ctx.require_user_id()?;
ctx.db().fetch_one(
sqlx::query_as("INSERT INTO comments (user_id, content) VALUES ($1, $2) RETURNING *")
.bind(user_id)
.bind(input.content)
)
.await
.map_err(Into::into)
}
Rate limit keys: user (per authenticated user), ip (per client IP), tenant (per tenant claim), user_action (per user per function), global (all requests).
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | DbConn<'_> | Database connection (pool or transaction handle) |
ctx.pool() | &PgPool | Direct pool access for non-transactional operations |
ctx.http() | HttpClient | Circuit-breaker-backed HTTP client for external calls |
ctx.raw_http() | &reqwest::Client | Raw HTTP client, bypasses circuit breaker |
ctx.require_user_id() | Result<Uuid> | Authenticated user ID or error |
ctx.require_subject() | Result<&str> | Raw subject claim (Firebase, Clerk) |
ctx.dispatch_job(name, args) | Result<Uuid> | Dispatch background job |
ctx.cancel_job(job_id, reason) | Result<bool> | Request cancellation for a job (reason is Option<String>) |
ctx.issue_token(claims) | Result<String> | Issue HMAC-signed JWT token (requires HMAC auth config) |
ctx.start_workflow(name, input) | Result<Uuid> | Start workflow |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_or(key, default) | String | Environment variable with default |
ctx.env_require(key) | Result<String> | Required environment variable |
ctx.env_parse::<T>(key) | Result<T> | Parse environment variable |
ctx.env_parse_or(key, default) | Result<T> | Parse environment variable with default |
ctx.env_contains(key) | bool | Check if environment variable is set |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.auth | AuthContext | Authentication context |
ctx.auth.user_id() | Option<Uuid> | User ID if authenticated |
ctx.auth.has_role(role) | bool | Check role membership |
ctx.auth.roles() | &[String] | All roles |
ctx.auth.claim(key) | Option<&Value> | Custom JWT claim |
ctx.request | RequestMetadata | Request metadata |
ctx.request.request_id | Uuid | Unique request ID |
ctx.request.trace_id | String | Distributed trace ID |
ctx.request.client_ip | Option<String> | Client IP address |
Under the Hood
Transactional Outbox Pattern
When you use #[forge::mutation(transactional)] with job dispatch:
-- Step 1: Your mutation runs inside a transaction
BEGIN;
INSERT INTO orders (product_id, user_id) VALUES (...);
COMMIT;
-- Step 2: Only after successful commit, buffered jobs are inserted
INSERT INTO forge_jobs (name, args, status) VALUES ('process_order', '{"order_id": "..."}', 'pending');
If the mutation fails, the transaction rolls back and no jobs are inserted. Jobs are buffered in memory during the transaction and flushed to the database only after a successful commit. This avoids orphaned jobs without requiring distributed transactions.
Circuit Breaker for HTTP
ctx.http() returns an HttpClient backed by the circuit breaker, which tracks failure rates per host. After repeated failures (default: 5), requests fail fast rather than waiting for timeouts. This prevents cascade failures when downstream services are unhealthy. Use ctx.raw_http() only when you need streaming responses or a custom redirect policy — it bypasses the circuit breaker.
The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests fail immediately
- Half-Open: After backoff timeout, limited requests allowed to test recovery
Configuration uses exponential backoff (30s base, 1.5x multiplier, 10min max) to gradually retry unhealthy services.
Write-Ahead Logging
All mutations go through PostgreSQL's WAL (Write-Ahead Log). Committed transactions survive crashes. Forge inherits PostgreSQL's battle-tested durability guarantees without adding its own persistence layer.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_create_order_dispatches_job() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.build();
create_order(&ctx, CreateOrderInput {
product_id: Uuid::new_v4(),
quantity: 1,
}).await.unwrap();
ctx.job_dispatch().assert_dispatched("process_order");
}
#[tokio::test]
async fn test_mutation_with_mocked_http() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();
let result = charge_card(&ctx, ChargeInput { ... }).await;
ctx.http().assert_called("https://api.stripe.com/*");
}
#[tokio::test]
async fn test_unauthorized_mutation() {
let ctx = TestMutationContext::builder().build(); // No user
let result = create_order(&ctx, input).await;
assert!(matches!(result, Err(ForgeError::Unauthorized(_))));
}
}
Test context builder methods:
.as_user(uuid)- Authenticate as user.with_role("admin")- Add role.with_claim("org_id", value)- Add JWT claim.with_pool(pool)- Use real database.mock_http(pattern, handler)- Mock HTTP responses.mock_http_json(pattern, value)- Mock JSON response.with_env(key, value)- Mock environment variable