Skip to main content

Attributes Reference

All macro attributes for Forge function decorators.

#[forge::query]

Read-only database operations with caching and subscriptions.

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role (returns 403 if missing)
consistentflagfalseForce reads from primary (skip replicas)
cache"duration"noneCache TTL ("30s", "5m", "1h")
timeoutu64noneStatement timeout in seconds
tables["t1", "t2"]autoExplicit table dependencies for subscriptions
log"level"noneLog level (trace, debug, info, warn, error, off)
rate_limit(...)confignoneRate limiting configuration

Rate Limit Config

rate_limit(requests = 100, per = "1m", key = "user")
ParameterTypeDescription
requestsu32Requests allowed per window
per"duration"Time window ("30s", "1m", "1h")
keystringBucket key: "user", "ip", "tenant", "global"

Example

#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}

Consistent Reads

When a query needs the latest data immediately after a mutation (read-after-write), use consistent to bypass replicas:

#[forge::query(consistent)]
pub async fn get_order_confirmation(ctx: &QueryContext, order_id: Uuid) -> Result<Order> {
sqlx::query_as!(Order, "SELECT * FROM orders WHERE id = $1", order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

This always reads from the primary database, avoiding replication lag. Only use when stale data would cause visible problems (confirmation screens, permission checks against just-written state). Most queries can tolerate eventual consistency and should use the default replica routing.


---

## `#[forge::mutation]`

Transactional write operations.

| Attribute | Type | Default | Description |
|-----------|------|---------|-------------|
| `public` | flag | `false` | Bypass authentication requirement |
| `require_role("name")` | `string` | none | Require specific role (returns 403 if missing) |
| `transactional` | flag | `false` | Wrap in database transaction (required if dispatching jobs) |
| `timeout` | `u64` | none | Statement timeout in seconds |
| `log` | `"level"` | none | Log level (`trace`, `debug`, `info`, `warn`, `error`, `off`) |
| `rate_limit(...)` | config | none | Rate limiting configuration (same as query) |

### Runtime Enforcement

Mutations that call `dispatch_job()` or `start_workflow()` must use `transactional`. You get an error at runtime if you forget:

error: Mutations that call dispatch_job() must use #[forge::mutation(transactional)]


### Example

```rust
#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}

#[forge::mcp_tool]

Explicitly exposes a function as an MCP tool (Streamable HTTP endpoint).

AttributeTypeDefaultDescription
name = "tool_name"stringfn nameOverride exposed MCP tool name
title = "..."stringnoneHuman-friendly display title
description = "..."stringnoneTool description for MCP clients
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role
timeoutu64noneTimeout in seconds
rate_limit(...)confignoneRate limiting (requests, per, key)
read_onlyflagnoneNo side effects, safe to call speculatively
destructiveflagnoneModifies external state irreversibly
idempotentflagnoneRepeated calls with same args produce same result
open_worldflagnoneInteracts with entities beyond its dataset

Example

use forge::prelude::*;

#[derive(Debug, serde::Serialize, JsonSchema)]
pub struct ExportToolOutput {
pub job_id: uuid::Uuid,
}

#[forge::mcp_tool(
name = "export_project",
title = "Export Project",
description = "Queues a background export job for a project",
timeout = 15,
rate_limit(requests = 30, per = "1m", key = "user"),
idempotent
)]
pub async fn export_project(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
) -> Result<ExportToolOutput> {
let user_id = ctx.require_user_id()?;
let job_id = ctx
.dispatch_job(
"export_project",
serde_json::json!({
"user_id": user_id,
"project_id": project_id,
"format": "json"
}),
)
.await?;
Ok(ExportToolOutput { job_id })
}

Parameter Metadata in inputSchema

MCP per-parameter metadata (descriptions, enums, defaults, constraints) is generated from JSON Schema:

  1. Preferred: use a dedicated input struct with #[derive(Deserialize, JsonSchema)] and #[schemars(...)] field attributes.
  2. Inline parameters are also supported: when Forge auto-generates the tool args struct, it propagates parameter #[schemars(...)] and #[serde(...)] attributes.
use forge::prelude::*;

#[derive(Debug, serde::Serialize, serde::Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExportFormat {
Json,
Csv,
}

#[forge::mcp_tool(name = "export_project")]
pub async fn export_project_tool(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
#[serde(default)]
dry_run: bool,
#[schemars(description = "Output format")]
format: ExportFormat,
) -> Result<ExportToolOutput> {
// ...
}

#[forge::job]

Background tasks with retries and worker routing.

AttributeTypeDefaultDescription
publicflagfalseAllow dispatch without authentication
require_role("name")stringnoneRequire role to dispatch
name"string"fn nameOverride job name
timeout"duration""1h"Job timeout ("30s", "10m", "2h")
priority"level""normal"background, low, normal, high, critical
max_attemptsu323Maximum retry attempts
backoff"strategy""exponential"fixed, linear, exponential
max_backoff"duration""5m"Maximum backoff delay
worker_capability"string"noneRequired worker capability (e.g., "gpu", "media")
idempotentflagfalseMark job as idempotent
idempotent(key = "...")confignoneIdempotent with custom key field
compensate"handler"noneCancellation compensation handler
ttl"duration"noneAuto-cleanup after completion (e.g., "7d")

Retry Configuration (Grouped)

retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")

Equivalent to specifying max_attempts, backoff, and max_backoff separately.

Priority Levels

LevelDescription
backgroundLowest priority, runs when idle
lowBelow normal priority
normalDefault priority
highAbove normal priority
criticalHighest priority, preempts other work

Example

#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;

let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;

// ... processing ...

ctx.progress(100, "Complete")?;
Ok(output)
}

pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);

// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}

#[forge::cron]

Scheduled tasks with exactly-once execution.

#[forge::cron("schedule", ...attributes)]
AttributeTypeDefaultDescription
(first arg)"cron expr"requiredCron schedule (5-part: "0 9 * * *")
timezone"string""UTC"IANA timezone (e.g., "America/New_York")
timeout"duration""1h"Execution timeout
catch_upflagfalseRun missed executions after downtime
catch_up_limitu3210Maximum catch-up runs
group"string""default"Sharded leader election group

Cron Expression

Standard 5-part cron format:

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
ExpressionDescription
0 9 * * *Daily at 9:00 AM
*/15 * * * *Every 15 minutes
0 0 1 * *First day of month at midnight
0 0 * * 0Every Sunday at midnight

Example

#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}

#[forge::workflow]

Multi-step durable processes with compensation.

AttributeTypeDefaultDescription
publicflagfalseAllow start without authentication
require_role("name")stringnoneRequire role to start
versionu321Workflow version (increment for breaking changes)
timeout"duration""24h"Maximum workflow duration ("1h", "7d", "30d")
deprecatedflagfalseMark workflow as deprecated

Duration Format

SuffixUnit
sseconds
mminutes
hhours
ddays

Compile-Time Enforcement

The macro detects tokio::sleep() calls with durations over 100 seconds and errors:

error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.

Short sleeps for polling loops are allowed.

Example

#[forge::workflow(version = 2, timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?
.expect("charge_card is non-optional");

ctx.sleep(Duration::from_days(3)).await?; // Durable sleep

ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;

Ok(FulfillmentResult { payment, shipped: true })
}

#[forge::daemon]

Long-running singleton services with leader election.

AttributeTypeDefaultDescription
leader_electedbooltrueSingle leader instance runs across cluster
restart_on_panicbooltrueRestart daemon if it panics
restart_delay"duration""5s"Delay before restart after failure
startup_delay"duration""0s"Delay before first execution
max_restartsu32unlimitedMaximum restart attempts

Leadership

When leader_elected = true (default):

  • Exactly one instance runs across all nodes
  • Uses PostgreSQL advisory locks for election
  • Lock is session-bound; automatic failover on connection drop
  • Other nodes wait in standby

When leader_elected = false:

  • Daemon runs on every node
  • Instances run independently without coordination

Example

#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;

tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

#[forge::webhook]

HTTP endpoints for external events with signature verification.

AttributeTypeDefaultDescription
path"string"requiredURL path (e.g., "/webhooks/stripe")
timeout"duration""30s"Request timeout
signatureWebhookSignature::...noneSignature verification config
idempotency"source:key"noneIdempotency key source
allow_unsignedflagfalseAccept requests without valid signature

Signature Verification

signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
AlgorithmFunction
HMAC-SHA256WebhookSignature::hmac_sha256(header, secret_env)
HMAC-SHA1WebhookSignature::hmac_sha1(header, secret_env)
HMAC-SHA512WebhookSignature::hmac_sha512(header, secret_env)

The secret is read from the environment variable at runtime.

Idempotency

SourceFormatExample
Header"header:Header-Name""header:X-Request-Id"
Body"body:$.json.path""body:$.id"

Idempotency keys are stored for 24 hours by default. Duplicate requests return 200 OK with {"status":"already_processed"} instead of re-running the handler.

Common Integrations

// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]

// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]

// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]

Example

#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");

match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}

Ok(WebhookResult::Accepted)
}

Duration Format

All duration attributes accept strings with these suffixes:

SuffixUnitExample
msmilliseconds"500ms"
sseconds"30s"
mminutes"5m"
hhours"2h"
ddays"7d"

Raw numbers without suffix are interpreted as seconds.