Skip to main content

Attributes Reference

All macro attributes for Forge function decorators.

#[forge::query]

Read-only database operations with caching and subscriptions.

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role (returns 403 if missing)
consistentflagfalseForce reads from primary (skip replicas)
cache"duration"noneCache TTL ("30s", "5m", "1h")
timeoutu64noneStatement timeout in seconds
tables["t1", "t2"]autoExplicit table dependencies for subscriptions
log"level"noneLog level (trace, debug, info, warn, error, off). Requires a quoted value (e.g., log = "debug"); a bare log without a value is silently accepted but has no effect.
unscopedflagfalseOpt out of compile-time scope checking. By default, private queries must filter by user_id or owner_id in SQL. This attribute disables that check for admin or shared data queries.
rate_limit(...)confignoneRate limiting configuration

Rate Limit Config

rate_limit(requests = 100, per = "1m", key = "user")
ParameterTypeDescription
requestsu32Requests allowed per window
per"duration"Time window ("30s", "1m", "1h")
keystringBucket key: "user", "ip", "tenant", "global"

Example

#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}

Consistent Reads

When a query needs the latest data immediately after a mutation (read-after-write), use consistent to bypass replicas:

#[forge::query(consistent)]
pub async fn get_order_confirmation(ctx: &QueryContext, order_id: Uuid) -> Result<Order> {
sqlx::query_as!(Order, "SELECT * FROM orders WHERE id = $1", order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

This always reads from the primary database, avoiding replication lag. Only use when stale data would cause visible problems (confirmation screens, permission checks against just-written state). Most queries can tolerate eventual consistency and should use the default replica routing.


---

## `#[forge::model]`

Declares a struct as a database model. Must appear **before** `#[derive(...)]`.

```rust
#[forge::model]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct UserProfile {
pub id: Uuid,
pub display_name: String,
pub created_at: DateTime<Utc>,
}

Table Name

The table name is auto-pluralized from the struct name (converted to snake_case):

EndingRuleExample
s, sh, ch, x, zappend "es"UserBatchuser_batches
consonant + yreplace y with "ies"DeliveryEntrydelivery_entries
everything elseappend "s"UserProfileuser_profiles

Override with #[table(name = "custom_name")]:

#[forge::model]
#[table(name = "legacy_users")]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct User {
pub id: Uuid,
pub email: String,
}

Primary Key

The primary key is always id. This is hardcoded and cannot be overridden.

Generated Code

The macro generates a ModelMeta trait implementation providing:

  • TABLE_NAME — the resolved table name
  • table_def() — table definition metadata
  • primary_key_field() — returns "id"

Models are auto-registered via the inventory crate at compile time. No manual registration is needed.


#[forge::forge_enum]

Declares an enum for use as a database column type with automatic serialization.

#[forge::forge_enum]
pub enum TaskStatus {
Pending,
InProgress,
Completed,
Cancelled,
}

Auto-Derived Traits

The macro automatically derives: Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize with #[serde(rename_all = "snake_case")].

SQL Representation

Variants are stored as snake_case strings in the database. For example, InProgress becomes "in_progress".

The PostgreSQL type name is the snake_case form of the enum name (e.g., TaskStatus"task_status").

Generated Implementations

  • Display and FromStr using the snake_case form
  • sqlx::Type<Postgres>, sqlx::Encode, sqlx::Decode for direct use in queries

#[forge::mutation]

Transactional write operations.

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role (returns 403 if missing)
transactionalflagfalseWrap in database transaction (required if dispatching jobs)
timeoutu64noneHandler timeout in seconds; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http()
max_size"size"noneMaximum upload size for this mutation ("100mb", "1gb"). Overrides the global max_body_size setting. Supports b, kb, mb, gb suffixes (case-insensitive).
log"level"noneLog level (trace, debug, info, warn, error, off). Requires a quoted value (e.g., log = "debug"); a bare log without a value is silently accepted but has no effect.
unscopedflagfalseOpt out of compile-time scope checking. By default, private queries must filter by user_id or owner_id in SQL. This attribute disables that check for admin or shared data queries.
rate_limit(...)confignoneRate limiting configuration (same as query)

Compile-Time Enforcement

Mutations that call dispatch_job() or start_workflow() must use transactional. The macro scans the function body for these calls and emits a compile error if transactional is missing:


error: Mutations that call `dispatch_job()` must use #[forge::mutation(transactional)]

Example

#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}

If you omit timeout, outbound HTTP requests made through ctx.http() stay unlimited by default. If you set it, Forge applies that timeout to both the handler and its outbound HTTP requests unless a request overrides its own timeout.


#[forge::mcp_tool]

Explicitly exposes a function as an MCP tool (Streamable HTTP endpoint).

AttributeTypeDefaultDescription
name = "tool_name"stringfn nameOverride exposed MCP tool name
title = "..."stringnoneHuman-friendly display title
description = "..."stringnoneTool description for MCP clients
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role
timeoutu64noneTimeout in seconds
rate_limit(...)confignoneRate limiting (requests, per, key)
read_onlyflagnoneNo side effects, safe to call speculatively
destructiveflagnoneModifies external state irreversibly
idempotentflagnoneRepeated calls with same args produce same result
open_worldflagnoneInteracts with entities beyond its dataset

Tool names must be 1–128 characters, using only ASCII alphanumeric characters plus _, -, and .. Invalid names cause a compile error.

MCP tools participate in Forge auth by default: omit public to require a valid JWT, and add require_role("...") for role-gated tools.

Example

use forge::prelude::*;

#[derive(Debug, serde::Serialize, JsonSchema)]
pub struct ExportToolOutput {
pub job_id: uuid::Uuid,
}

#[forge::mcp_tool(
name = "export_project",
title = "Export Project",
description = "Queues a background export job for a project",
timeout = 15,
rate_limit(requests = 30, per = "1m", key = "user"),
idempotent
)]
pub async fn export_project(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
) -> Result<ExportToolOutput> {
let user_id = ctx.user_id()?;
let job_id = ctx
.dispatch_job(
"export_project",
serde_json::json!({
"user_id": user_id,
"project_id": project_id,
"format": "json"
}),
)
.await?;
Ok(ExportToolOutput { job_id })
}

Parameter Metadata in inputSchema

MCP per-parameter metadata (descriptions, enums, defaults, constraints) is generated from JSON Schema:

  1. Preferred: use a dedicated input struct with #[derive(Deserialize, JsonSchema)] and #[schemars(...)] field attributes.
  2. Inline parameters are also supported: when Forge auto-generates the tool args struct, it propagates parameter #[schemars(...)] and #[serde(...)] attributes.
use forge::prelude::*;

#[derive(Debug, serde::Serialize, serde::Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExportFormat {
Json,
Csv,
}

#[forge::mcp_tool(name = "export_project")]
pub async fn export_project_tool(
ctx: &McpToolContext,
#[schemars(description = "Project UUID to export")]
project_id: uuid::Uuid,
#[serde(default)]
dry_run: bool,
#[schemars(description = "Output format")]
format: ExportFormat,
) -> Result<ExportToolOutput> {
// ...
}

#[forge::job]

Background tasks with retries and worker routing.

AttributeTypeDefaultDescription
publicflagfalseAllow dispatch without authentication
require_role("name")stringnoneRequire role to dispatch
name"string"fn nameOverride job name
timeout"duration""1h"Job timeout ("30s", "10m", "2h"); if explicitly set it also becomes the default outbound HTTP timeout for ctx.http()
priority"level""normal"background, low, normal, high, critical
max_attemptsu323Maximum retry attempts
backoff"strategy""exponential"fixed, linear, exponential
max_backoff"duration""5m"Maximum backoff delay
worker_capability"string"noneRequired worker capability (e.g., "gpu", "media")
idempotentflagfalseMark job as idempotent
idempotent(key = "...")confignoneIdempotent with custom key field
compensate"handler"noneCancellation compensation handler
ttl"duration"noneAuto-cleanup after completion (e.g., "7d")

Retry Configuration (Grouped)

retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")

Equivalent to specifying max_attempts, backoff, and max_backoff separately.

Priority Levels

LevelDescription
backgroundLowest priority, runs when idle
lowBelow normal priority
normalDefault priority
highAbove normal priority
criticalHighest priority, preempts other work

Example

#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;

let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;

// ... processing ...

ctx.progress(100, "Complete")?;
Ok(output)
}

pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);

// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}

#[forge::cron]

Scheduled tasks with exactly-once execution.

#[forge::cron("schedule", ...attributes)]
AttributeTypeDefaultDescription
(first arg)"cron expr"requiredCron schedule (5-part: "0 9 * * *")
timezone"string""UTC"IANA timezone (e.g., "America/New_York")
timeout"duration""1h"Execution timeout; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http()
catch_upflagfalseRun missed executions after downtime
catch_up_limitu3210Maximum catch-up runs
group"string""default"Sharded leader election group

Cron Expression

Standard 5-part cron format:

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
ExpressionDescription
0 9 * * *Daily at 9:00 AM
*/15 * * * *Every 15 minutes
0 0 1 * *First day of month at midnight
0 0 * * 0Every Sunday at midnight

Example

#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}

#[forge::workflow]

Multi-step durable processes with compensation.

AttributeTypeDefaultDescription
name"string"fn nameStable workflow identity. Two versions share the same name.
version"string""v1"Freeform version label (e.g., "2026-05", "v2", "1.0.0"). Paired with name to form the unique definition key.
activeflagtrueThis version handles new runs. At most one active version per name. Default when neither active nor deprecated is set.
deprecatedflagfalseKeeps executing in-flight runs but rejects new starts. Cannot combine with active (compile error).
timeout"duration""24h"Maximum workflow duration ("1h", "7d", "30d"); if explicitly set it also becomes the default outbound HTTP timeout for ctx.http()
publicflagfalseAllow start without authentication
require_role("name")stringnoneRequire role to start

Workflow Signatures

A signature is derived automatically at compile time from the workflow's persisted contract: step keys, wait keys, timeout, and input/output types. The runtime stamps each run with the version and signature. On resume, the handler must match the exact version and signature the run was started with. If you add, remove, or rename steps, the signature changes — deploy a new version for the updated handler and mark the old version deprecated until its in-flight runs drain.

Compile-Time Enforcement

The macro detects tokio::sleep() calls with durations over 100 seconds and errors:

error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.

Short sleeps for polling loops are allowed.

Example

#[forge::workflow(name = "order_fulfillment", version = "v1", timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?
.expect("charge_card is non-optional");

ctx.sleep(Duration::from_days(3)).await?; // Durable sleep

ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;

Ok(FulfillmentResult { payment, shipped: true })
}

#[forge::daemon]

Long-running singleton services with leader election.

AttributeTypeDefaultDescription
leader_electedbooltrueSingle leader instance runs across cluster
restart_on_panicbooltrueRestart daemon if it panics
timeout"duration"noneDefault outbound HTTP timeout for ctx.http()
restart_delay"duration""5s"Delay before restart after failure
startup_delay"duration""0s"Delay before first execution
max_restartsu32unlimitedMaximum restart attempts

Leadership

When leader_elected = true (default):

  • Exactly one instance runs across all nodes
  • Uses PostgreSQL advisory locks for election
  • Lock is session-bound; automatic failover on connection drop
  • Other nodes wait in standby

When leader_elected = false:

  • Daemon runs on every node
  • Instances run independently without coordination

Example

#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;

tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

#[forge::webhook]

HTTP endpoints for external events with signature verification.

AttributeTypeDefaultDescription
path"string"requiredURL path (e.g., "/webhooks/stripe")
timeout"duration""30s"Request timeout; if explicitly set it also becomes the default outbound HTTP timeout for ctx.http()
signatureWebhookSignature::...noneSignature verification config
idempotency"source:key"noneIdempotency key source
allow_unsignedflagfalseAccept requests without valid signature

Signature Verification

signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
AlgorithmFunction
HMAC-SHA256WebhookSignature::hmac_sha256(header, secret_env)
HMAC-SHA1WebhookSignature::hmac_sha1(header, secret_env)
HMAC-SHA512WebhookSignature::hmac_sha512(header, secret_env)

The secret is read from the environment variable at runtime.

Idempotency

SourceFormatExample
Header"header:Header-Name""header:X-Request-Id"
Body"body:$.json.path""body:$.id"

Idempotency keys are stored for 24 hours by default. Duplicate requests return 200 OK with {"status":"already_processed"} instead of re-running the handler.

Common Integrations

// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]

// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]

// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]

Example

#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");

match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}

Ok(WebhookResult::Accepted)
}

Duration Format

All duration attributes accept strings with these suffixes:

SuffixUnitExample
msmilliseconds"500ms"
sseconds"30s"
mminutes"5m"
hhours"2h"
ddays"7d"

Raw numbers without suffix are interpreted as seconds.