Attributes Reference
All macro attributes for Forge function decorators.
#[forge::query]
Read-only database operations with caching and subscriptions.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
consistent | flag | false | Force reads from primary (skip replicas) |
cache | "duration" | none | Cache TTL ("30s", "5m", "1h") |
timeout | u64 | none | Statement timeout in seconds |
tables | ["t1", "t2"] | auto | Explicit table dependencies for subscriptions |
log | "level" | none | Log level (trace, debug, info, warn, error, off) |
rate_limit(...) | config | none | Rate limiting configuration |
Rate Limit Config
rate_limit(requests = 100, per = "1m", key = "user")
| Parameter | Type | Description |
|---|---|---|
requests | u32 | Requests allowed per window |
per | "duration" | Time window ("30s", "1m", "1h") |
key | string | Bucket key: "user", "ip", "tenant", "global" |
Example
#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}
Consistent Reads
When a query needs the latest data immediately after a mutation (read-after-write), use consistent to bypass replicas:
#[forge::query(consistent)]
pub async fn get_order_confirmation(ctx: &QueryContext, order_id: Uuid) -> Result<Order> {
sqlx::query_as!(Order, "SELECT * FROM orders WHERE id = $1", order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
This always reads from the primary database, avoiding replication lag. Only use when stale data would cause visible problems (confirmation screens, permission checks against just-written state). Most queries can tolerate eventual consistency and should use the default replica routing.
---
## `#[forge::mutation]`
Transactional write operations.
| Attribute | Type | Default | Description |
|-----------|------|---------|-------------|
| `public` | flag | `false` | Bypass authentication requirement |
| `require_role("name")` | `string` | none | Require specific role (returns 403 if missing) |
| `transactional` | flag | `false` | Wrap in database transaction (required if dispatching jobs) |
| `timeout` | `u64` | none | Statement timeout in seconds |
| `log` | `"level"` | none | Log level (`trace`, `debug`, `info`, `warn`, `error`, `off`) |
| `rate_limit(...)` | config | none | Rate limiting configuration (same as query) |
### Runtime Enforcement
Mutations that call `dispatch_job()` or `start_workflow()` must use `transactional`. You get an error at runtime if you forget:
error: Mutations that call dispatch_job() must use #[forge::mutation(transactional)]
### Example
```rust
#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}
#[forge::mcp_tool]
Explicitly exposes a function as an MCP tool (Streamable HTTP endpoint).
| Attribute | Type | Default | Description |
|---|---|---|---|
name = "tool_name" | string | fn name | Override exposed MCP tool name |
title = "..." | string | none | Human-friendly display title |
description = "..." | string | none | Tool description for MCP clients |
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role |
timeout | u64 | none | Timeout in seconds |
rate_limit(...) | config | none | Rate limiting (requests, per, key) |
read_only | flag | none | No side effects, safe to call speculatively |
destructive | flag | none | Modifies external state irreversibly |
idempotent | flag | none | Repeated calls with same args produce same result |
open_world | flag | none | Interacts with entities beyond its dataset |
Example
use forge::prelude::*;
#[derive(Debug, serde::Serialize, JsonSchema)]
pub struct ExportToolOutput {
pub job_id: uuid::Uuid,
}
#[forge::mcp_tool(
name = "export_project",
title = "Export Project",
description = "Queues a background export job for a project",
timeout = 15,
rate_limit(requests = 30, per = "1m", key = "user"),
idempotent
)]
pub async fn export_project(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
) -> Result<ExportToolOutput> {
let user_id = ctx.require_user_id()?;
let job_id = ctx
.dispatch_job(
"export_project",
serde_json::json!({
"user_id": user_id,
"project_id": project_id,
"format": "json"
}),
)
.await?;
Ok(ExportToolOutput { job_id })
}
Parameter Metadata in inputSchema
MCP per-parameter metadata (descriptions, enums, defaults, constraints) is generated from JSON Schema:
- Preferred: use a dedicated input struct with
#[derive(Deserialize, JsonSchema)]and#[schemars(...)]field attributes. - Inline parameters are also supported: when Forge auto-generates the tool args struct, it propagates parameter
#[schemars(...)]and#[serde(...)]attributes.
use forge::prelude::*;
#[derive(Debug, serde::Serialize, serde::Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExportFormat {
Json,
Csv,
}
#[forge::mcp_tool(name = "export_project")]
pub async fn export_project_tool(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
#[serde(default)]
dry_run: bool,
#[schemars(description = "Output format")]
format: ExportFormat,
) -> Result<ExportToolOutput> {
// ...
}
#[forge::job]
Background tasks with retries and worker routing.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Allow dispatch without authentication |
require_role("name") | string | none | Require role to dispatch |
name | "string" | fn name | Override job name |
timeout | "duration" | "1h" | Job timeout ("30s", "10m", "2h") |
priority | "level" | "normal" | background, low, normal, high, critical |
max_attempts | u32 | 3 | Maximum retry attempts |
backoff | "strategy" | "exponential" | fixed, linear, exponential |
max_backoff | "duration" | "5m" | Maximum backoff delay |
worker_capability | "string" | none | Required worker capability (e.g., "gpu", "media") |
idempotent | flag | false | Mark job as idempotent |
idempotent(key = "...") | config | none | Idempotent with custom key field |
compensate | "handler" | none | Cancellation compensation handler |
ttl | "duration" | none | Auto-cleanup after completion (e.g., "7d") |
Retry Configuration (Grouped)
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")
Equivalent to specifying max_attempts, backoff, and max_backoff separately.
Priority Levels
| Level | Description |
|---|---|
background | Lowest priority, runs when idle |
low | Below normal priority |
normal | Default priority |
high | Above normal priority |
critical | Highest priority, preempts other work |
Example
#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;
let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;
// ... processing ...
ctx.progress(100, "Complete")?;
Ok(output)
}
pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);
// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}
#[forge::cron]
Scheduled tasks with exactly-once execution.
#[forge::cron("schedule", ...attributes)]
| Attribute | Type | Default | Description |
|---|---|---|---|
| (first arg) | "cron expr" | required | Cron schedule (5-part: "0 9 * * *") |
timezone | "string" | "UTC" | IANA timezone (e.g., "America/New_York") |
timeout | "duration" | "1h" | Execution timeout |
catch_up | flag | false | Run missed executions after downtime |
catch_up_limit | u32 | 10 | Maximum catch-up runs |
group | "string" | "default" | Sharded leader election group |
Cron Expression
Standard 5-part cron format:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
| Expression | Description |
|---|---|
0 9 * * * | Daily at 9:00 AM |
*/15 * * * * | Every 15 minutes |
0 0 1 * * | First day of month at midnight |
0 0 * * 0 | Every Sunday at midnight |
Example
#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}
#[forge::workflow]
Multi-step durable processes with compensation.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Allow start without authentication |
require_role("name") | string | none | Require role to start |
version | u32 | 1 | Workflow version (increment for breaking changes) |
timeout | "duration" | "24h" | Maximum workflow duration ("1h", "7d", "30d") |
deprecated | flag | false | Mark workflow as deprecated |
Duration Format
| Suffix | Unit |
|---|---|
s | seconds |
m | minutes |
h | hours |
d | days |
Compile-Time Enforcement
The macro detects tokio::sleep() calls with durations over 100 seconds and errors:
error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.
Short sleeps for polling loops are allowed.
Example
#[forge::workflow(version = 2, timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?
.expect("charge_card is non-optional");
ctx.sleep(Duration::from_days(3)).await?; // Durable sleep
ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;
Ok(FulfillmentResult { payment, shipped: true })
}
#[forge::daemon]
Long-running singleton services with leader election.
| Attribute | Type | Default | Description |
|---|---|---|---|
leader_elected | bool | true | Single leader instance runs across cluster |
restart_on_panic | bool | true | Restart daemon if it panics |
restart_delay | "duration" | "5s" | Delay before restart after failure |
startup_delay | "duration" | "0s" | Delay before first execution |
max_restarts | u32 | unlimited | Maximum restart attempts |
Leadership
When leader_elected = true (default):
- Exactly one instance runs across all nodes
- Uses PostgreSQL advisory locks for election
- Lock is session-bound; automatic failover on connection drop
- Other nodes wait in standby
When leader_elected = false:
- Daemon runs on every node
- Instances run independently without coordination
Example
#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
#[forge::webhook]
HTTP endpoints for external events with signature verification.
| Attribute | Type | Default | Description |
|---|---|---|---|
path | "string" | required | URL path (e.g., "/webhooks/stripe") |
timeout | "duration" | "30s" | Request timeout |
signature | WebhookSignature::... | none | Signature verification config |
idempotency | "source:key" | none | Idempotency key source |
allow_unsigned | flag | false | Accept requests without valid signature |
Signature Verification
signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
| Algorithm | Function |
|---|---|
| HMAC-SHA256 | WebhookSignature::hmac_sha256(header, secret_env) |
| HMAC-SHA1 | WebhookSignature::hmac_sha1(header, secret_env) |
| HMAC-SHA512 | WebhookSignature::hmac_sha512(header, secret_env) |
The secret is read from the environment variable at runtime.
Idempotency
| Source | Format | Example |
|---|---|---|
| Header | "header:Header-Name" | "header:X-Request-Id" |
| Body | "body:$.json.path" | "body:$.id" |
Idempotency keys are stored for 24 hours by default. Duplicate requests return 200 OK with {"status":"already_processed"} instead of re-running the handler.
Common Integrations
// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]
// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]
// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]
Example
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");
match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}
Ok(WebhookResult::Accepted)
}
Duration Format
All duration attributes accept strings with these suffixes:
| Suffix | Unit | Example |
|---|---|---|
ms | milliseconds | "500ms" |
s | seconds | "30s" |
m | minutes | "5m" |
h | hours | "2h" |
d | days | "7d" |
Raw numbers without suffix are interpreted as seconds.