Attributes Reference
All macro attributes for Forge function decorators.
#[forge::query]
Read-only database operations with caching and subscriptions.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
consistent | flag | false | Force reads from primary (skip replicas) |
cache | "duration" | none | Cache TTL ("30s", "5m", "1h") |
timeout | u64 | none | Statement timeout in seconds |
tables | ["t1", "t2"] | auto | Explicit table dependencies for subscriptions |
log | "level" | none | Log level (trace, debug, info, warn, error, off). Requires a quoted value (e.g., log = "debug"); a bare log without a value is silently accepted but has no effect. |
unscoped | flag | false | Opt out of compile-time scope checking. By default, private queries must filter by user_id or owner_id in SQL. This attribute disables that check for admin or shared data queries. |
rate_limit(...) | config | none | Rate limiting configuration |
Rate Limit Config
rate_limit(requests = 100, per = "1m", key = "user")
| Parameter | Type | Description |
|---|---|---|
requests | u32 | Requests allowed per window |
per | "duration" | Time window ("30s", "1m", "1h") |
key | string | Bucket key: "user", "ip", "tenant", "global" |
Example
#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}
Consistent Reads
When a query needs the latest data immediately after a mutation (read-after-write), use consistent to bypass replicas:
#[forge::query(consistent)]
pub async fn get_order_confirmation(ctx: &QueryContext, order_id: Uuid) -> Result<Order> {
sqlx::query_as!(Order, "SELECT * FROM orders WHERE id = $1", order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
This always reads from the primary database, avoiding replication lag. Only use when stale data would cause visible problems (confirmation screens, permission checks against just-written state). Most queries can tolerate eventual consistency and should use the default replica routing.
---
## `#[forge::model]`
Declares a struct as a database model. Must appear **before** `#[derive(...)]`.
```rust
#[forge::model]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct UserProfile {
pub id: Uuid,
pub display_name: String,
pub created_at: DateTime<Utc>,
}
Table Name
The table name is auto-pluralized from the struct name (converted to snake_case):
| Ending | Rule | Example |
|---|---|---|
s, sh, ch, x, z | append "es" | UserBatch → user_batches |
consonant + y | replace y with "ies" | DeliveryEntry → delivery_entries |
| everything else | append "s" | UserProfile → user_profiles |
Override with #[table(name = "custom_name")]:
#[forge::model]
#[table(name = "legacy_users")]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct User {
pub id: Uuid,
pub email: String,
}
Primary Key
The primary key is always id. This is hardcoded and cannot be overridden.
Generated Code
The macro generates a ModelMeta trait implementation providing:
TABLE_NAME— the resolved table nametable_def()— table definition metadataprimary_key_field()— returns"id"
Models are auto-registered via the inventory crate at compile time. No manual registration is needed.
#[forge::forge_enum]
Declares an enum for use as a database column type with automatic serialization.
#[forge::forge_enum]
pub enum TaskStatus {
Pending,
InProgress,
Completed,
Cancelled,
}
Auto-Derived Traits
The macro automatically derives: Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize with #[serde(rename_all = "snake_case")].
SQL Representation
Variants are stored as snake_case strings in the database. For example, InProgress becomes "in_progress".
The PostgreSQL type name is the snake_case form of the enum name (e.g., TaskStatus → "task_status").
Generated Implementations
DisplayandFromStrusing the snake_case formsqlx::Type<Postgres>,sqlx::Encode,sqlx::Decodefor direct use in queries
#[forge::mutation]
Transactional write operations.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
transactional | flag | false | Wrap in database transaction (required if dispatching jobs) |
timeout | u64 | none | Handler timeout in seconds; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http() |
max_size | "size" | none | Maximum upload size for this mutation ("100mb", "1gb"). Overrides the global max_body_size setting. Supports b, kb, mb, gb suffixes (case-insensitive). |
log | "level" | none | Log level (trace, debug, info, warn, error, off). Requires a quoted value (e.g., log = "debug"); a bare log without a value is silently accepted but has no effect. |
unscoped | flag | false | Opt out of compile-time scope checking. By default, private queries must filter by user_id or owner_id in SQL. This attribute disables that check for admin or shared data queries. |
rate_limit(...) | config | none | Rate limiting configuration (same as query) |
Compile-Time Enforcement
Mutations that call dispatch_job() or start_workflow() must use transactional. The macro scans the function body for these calls and emits a compile error if transactional is missing:
error: Mutations that call `dispatch_job()` must use #[forge::mutation(transactional)]
Example
#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}
If you omit timeout, outbound HTTP requests made through ctx.http() stay unlimited by default. If you set it, Forge applies that timeout to both the handler and its outbound HTTP requests unless a request overrides its own timeout.
#[forge::mcp_tool]
Explicitly exposes a function as an MCP tool (Streamable HTTP endpoint).
| Attribute | Type | Default | Description |
|---|---|---|---|
name = "tool_name" | string | fn name | Override exposed MCP tool name |
title = "..." | string | none | Human-friendly display title |
description = "..." | string | none | Tool description for MCP clients |
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role |
timeout | u64 | none | Timeout in seconds |
rate_limit(...) | config | none | Rate limiting (requests, per, key) |
read_only | flag | none | No side effects, safe to call speculatively |
destructive | flag | none | Modifies external state irreversibly |
idempotent | flag | none | Repeated calls with same args produce same result |
open_world | flag | none | Interacts with entities beyond its dataset |
Tool names must be 1–128 characters, using only ASCII alphanumeric characters plus _, -, and .. Invalid names cause a compile error.
MCP tools participate in Forge auth by default: omit public to require a valid JWT, and add require_role("...") for role-gated tools.
Example
use forge::prelude::*;
#[derive(Debug, serde::Serialize, JsonSchema)]
pub struct ExportToolOutput {
pub job_id: uuid::Uuid,
}
#[forge::mcp_tool(
name = "export_project",
title = "Export Project",
description = "Queues a background export job for a project",
timeout = 15,
rate_limit(requests = 30, per = "1m", key = "user"),
idempotent
)]
pub async fn export_project(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
) -> Result<ExportToolOutput> {
let user_id = ctx.user_id()?;
let job_id = ctx
.dispatch_job(
"export_project",
serde_json::json!({
"user_id": user_id,
"project_id": project_id,
"format": "json"
}),
)
.await?;
Ok(ExportToolOutput { job_id })
}
Parameter Metadata in inputSchema
MCP per-parameter metadata (descriptions, enums, defaults, constraints) is generated from JSON Schema:
- Preferred: use a dedicated input struct with
#[derive(Deserialize, JsonSchema)]and#[schemars(...)]field attributes. - Inline parameters are also supported: when Forge auto-generates the tool args struct, it propagates parameter
#[schemars(...)]and#[serde(...)]attributes.
use forge::prelude::*;
#[derive(Debug, serde::Serialize, serde::Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExportFormat {
Json,
Csv,
}
#[forge::mcp_tool(name = "export_project")]
pub async fn export_project_tool(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
#[serde(default)]
dry_run: bool,
#[schemars(description = "Output format")]
format: ExportFormat,
) -> Result<ExportToolOutput> {
// ...
}
#[forge::job]
Background tasks with retries and worker routing.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Allow dispatch without authentication |
require_role("name") | string | none | Require role to dispatch |
name | "string" | fn name | Override job name |
timeout | "duration" | "1h" | Job timeout ("30s", "10m", "2h"); if explicitly set it also becomes the default outbound HTTP timeout for ctx.http() |
priority | "level" | "normal" | background, low, normal, high, critical |
max_attempts | u32 | 3 | Maximum retry attempts |
backoff | "strategy" | "exponential" | fixed, linear, exponential |
max_backoff | "duration" | "5m" | Maximum backoff delay |
worker_capability | "string" | none | Required worker capability (e.g., "gpu", "media") |
idempotent | flag | false | Mark job as idempotent |
idempotent(key = "...") | config | none | Idempotent with custom key field |
compensate | "handler" | none | Cancellation compensation handler |
ttl | "duration" | none | Auto-cleanup after completion (e.g., "7d") |
Retry Configuration (Grouped)
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")
Equivalent to specifying max_attempts, backoff, and max_backoff separately.
Priority Levels
| Level | Description |
|---|---|
background | Lowest priority, runs when idle |
low | Below normal priority |
normal | Default priority |
high | Above normal priority |
critical | Highest priority, preempts other work |
Example
#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;
let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;
// ... processing ...
ctx.progress(100, "Complete")?;
Ok(output)
}
pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);
// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}
#[forge::cron]
Scheduled tasks with exactly-once execution.
#[forge::cron("schedule", ...attributes)]
| Attribute | Type | Default | Description |
|---|---|---|---|
| (first arg) | "cron expr" | required | Cron schedule (5-part: "0 9 * * *") |
timezone | "string" | "UTC" | IANA timezone (e.g., "America/New_York") |
timeout | "duration" | "1h" | Execution timeout; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http() |
catch_up | flag | false | Run missed executions after downtime |
catch_up_limit | u32 | 10 | Maximum catch-up runs |
group | "string" | "default" | Sharded leader election group |
Cron Expression
Standard 5-part cron format:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
| Expression | Description |
|---|---|
0 9 * * * | Daily at 9:00 AM |
*/15 * * * * | Every 15 minutes |
0 0 1 * * | First day of month at midnight |
0 0 * * 0 | Every Sunday at midnight |
Example
#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}
#[forge::workflow]
Multi-step durable processes with compensation.
| Attribute | Type | Default | Description |
|---|---|---|---|
name | "string" | fn name | Stable workflow identity. Two versions share the same name. |
version | "string" | "v1" | Freeform version label (e.g., "2026-05", "v2", "1.0.0"). Paired with name to form the unique definition key. |
active | flag | true | This version handles new runs. At most one active version per name. Default when neither active nor deprecated is set. |
deprecated | flag | false | Keeps executing in-flight runs but rejects new starts. Cannot combine with active (compile error). |
timeout | "duration" | "24h" | Maximum workflow duration ("1h", "7d", "30d"); if explicitly set it also becomes the default outbound HTTP timeout for ctx.http() |
public | flag | false | Allow start without authentication |
require_role("name") | string | none | Require role to start |
Workflow Signatures
A signature is derived automatically at compile time from the workflow's persisted contract: step keys, wait keys, timeout, and input/output types. The runtime stamps each run with the version and signature. On resume, the handler must match the exact version and signature the run was started with. If you add, remove, or rename steps, the signature changes — deploy a new version for the updated handler and mark the old version deprecated until its in-flight runs drain.
Compile-Time Enforcement
The macro detects tokio::sleep() calls with durations over 100 seconds and errors:
error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.
Short sleeps for polling loops are allowed.
Example
#[forge::workflow(name = "order_fulfillment", version = "v1", timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?
.expect("charge_card is non-optional");
ctx.sleep(Duration::from_days(3)).await?; // Durable sleep
ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;
Ok(FulfillmentResult { payment, shipped: true })
}
#[forge::daemon]
Long-running singleton services with leader election.
| Attribute | Type | Default | Description |
|---|---|---|---|
leader_elected | bool | true | Single leader instance runs across cluster |
restart_on_panic | bool | true | Restart daemon if it panics |
timeout | "duration" | none | Default outbound HTTP timeout for ctx.http() |
restart_delay | "duration" | "5s" | Delay before restart after failure |
startup_delay | "duration" | "0s" | Delay before first execution |
max_restarts | u32 | unlimited | Maximum restart attempts |
Leadership
When leader_elected = true (default):
- Exactly one instance runs across all nodes
- Uses PostgreSQL advisory locks for election
- Lock is session-bound; automatic failover on connection drop
- Other nodes wait in standby
When leader_elected = false:
- Daemon runs on every node
- Instances run independently without coordination
Example
#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
#[forge::webhook]
HTTP endpoints for external events with signature verification.
| Attribute | Type | Default | Description |
|---|---|---|---|
path | "string" | required | URL path (e.g., "/webhooks/stripe") |
timeout | "duration" | "30s" | Request timeout; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http() |
signature | WebhookSignature::... | none | Signature verification config |
idempotency | "source:key" | none | Idempotency key source |
allow_unsigned | flag | false | Accept requests without valid signature |
Signature Verification
signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
| Algorithm | Function |
|---|---|
| HMAC-SHA256 | WebhookSignature::hmac_sha256(header, secret_env) |
| HMAC-SHA1 | WebhookSignature::hmac_sha1(header, secret_env) |
| HMAC-SHA512 | WebhookSignature::hmac_sha512(header, secret_env) |
The secret is read from the environment variable at runtime.
Idempotency
| Source | Format | Example |
|---|---|---|
| Header | "header:Header-Name" | "header:X-Request-Id" |
| Body | "body:$.json.path" | "body:$.id" |
Idempotency keys are stored for 24 hours by default. Duplicate requests return 200 OK with {"status":"already_processed"} instead of re-running the handler.
Common Integrations
// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]
// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]
// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]
Example
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");
match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}
Ok(WebhookResult::Accepted)
}
Duration Format
All duration attributes accept strings with these suffixes:
| Suffix | Unit | Example |
|---|---|---|
ms | milliseconds | "500ms" |
s | seconds | "30s" |
m | minutes | "5m" |
h | hours | "2h" |
d | days | "7d" |
Raw numbers without suffix are interpreted as seconds.