Skip to main content

Attributes Reference

All macro attributes for Forge function decorators.

#[forge::query]

Read-only database operations with caching and subscriptions.

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role (returns 403 if missing)
cache"duration"noneCache TTL ("30s", "5m", "1h")
timeoutu64noneStatement timeout in seconds
tables["t1", "t2"]autoExplicit table dependencies for subscriptions
log"level"noneLog level (trace, debug, info, warn, error, off)
rate_limit(...)confignoneRate limiting configuration

Rate Limit Config

rate_limit(requests = 100, per = "1m", key = "user")
ParameterTypeDescription
requestsu32Requests allowed per window
per"duration"Time window ("30s", "1m", "1h")
keystringBucket key: "user", "ip", "tenant", "global"

Example

#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}

#[forge::mutation]

Transactional write operations.

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
require_role("name")stringnoneRequire specific role (returns 403 if missing)
transactionalflagfalseWrap in database transaction (required if dispatching jobs)
timeoutu64noneStatement timeout in seconds
log"level"noneLog level (trace, debug, info, warn, error, off)
rate_limit(...)confignoneRate limiting configuration (same as query)

Compile-Time Enforcement

Mutations that call dispatch_job() or start_workflow() must use transactional. The macro will error if you forget:

error: Mutations that call `dispatch_job()` must use #[forge::mutation(transactional)]

Example

#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}

#[forge::job]

Background tasks with retries and worker routing.

AttributeTypeDefaultDescription
publicflagfalseAllow dispatch without authentication
require_role("name")stringnoneRequire role to dispatch
name"string"fn nameOverride job name
timeout"duration""1h"Job timeout ("30s", "10m", "2h")
priority"level""normal"background, low, normal, high, critical
max_attemptsu323Maximum retry attempts
backoff"strategy""exponential"fixed, linear, exponential
max_backoff"duration""5m"Maximum backoff delay
worker_capability"string"noneRequired worker capability (e.g., "gpu", "media")
idempotentflagfalseMark job as idempotent
idempotent(key = "...")confignoneIdempotent with custom key field
compensate"handler"noneCancellation compensation handler
ttl"duration"noneAuto-cleanup after completion (e.g., "7d")

Retry Configuration (Grouped)

retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")

Equivalent to specifying max_attempts, backoff, and max_backoff separately.

Priority Levels

LevelDescription
backgroundLowest priority, runs when idle
lowBelow normal priority
normalDefault priority
highAbove normal priority
criticalHighest priority, preempts other work

Example

#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;

let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;

// ... processing ...

ctx.progress(100, "Complete")?;
Ok(output)
}

pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);

// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}

#[forge::cron]

Scheduled tasks with exactly-once execution.

#[forge::cron("schedule", ...attributes)]
AttributeTypeDefaultDescription
(first arg)"cron expr"requiredCron schedule (5-part: "0 9 * * *")
timezone"string""UTC"IANA timezone (e.g., "America/New_York")
timeout"duration""1h"Execution timeout
catch_upflagfalseRun missed executions after downtime
catch_up_limitu3210Maximum catch-up runs

Cron Expression

Standard 5-part cron format:

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
ExpressionDescription
0 9 * * *Daily at 9:00 AM
*/15 * * * *Every 15 minutes
0 0 1 * *First day of month at midnight
0 0 * * 0Every Sunday at midnight

Example

#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}

#[forge::workflow]

Multi-step durable processes with compensation.

AttributeTypeDefaultDescription
publicflagfalseAllow start without authentication
require_role("name")stringnoneRequire role to start
versionu321Workflow version (increment for breaking changes)
timeout"duration""24h"Maximum workflow duration ("1h", "7d", "30d")
deprecatedflagfalseMark workflow as deprecated

Duration Format

SuffixUnit
sseconds
mminutes
hhours
ddays

Compile-Time Enforcement

The macro detects tokio::sleep() calls with durations over 100 seconds and errors:

error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.

Short sleeps for polling loops are allowed.

Example

#[forge::workflow(version = 2, timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?;

ctx.sleep(Duration::from_days(3)).await?; // Durable sleep

ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;

Ok(FulfillmentResult { payment, shipped: true })
}

#[forge::daemon]

Long-running singleton services with leader election.

AttributeTypeDefaultDescription
leader_electedbooltrueOnly one instance runs across cluster
restart_on_panicbooltrueRestart daemon if it panics
restart_delay"duration""5s"Delay before restart after failure
startup_delay"duration""0s"Delay before first execution
max_restartsu32unlimitedMaximum restart attempts

Leadership

When leader_elected = true (default):

  • Exactly one instance runs across all nodes
  • Uses PostgreSQL advisory locks for election
  • Lock is session-bound; automatic failover on connection drop
  • Other nodes wait in standby

When leader_elected = false:

  • Daemon runs on every node
  • No coordination between instances

Example

#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;

tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

#[forge::webhook]

HTTP endpoints for external events with signature verification.

AttributeTypeDefaultDescription
path"string"requiredURL path (e.g., "/webhooks/stripe")
timeout"duration""30s"Request timeout
signatureWebhookSignature::...noneSignature verification config
idempotency"source:key"noneIdempotency key source

Signature Verification

signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
AlgorithmFunction
HMAC-SHA256WebhookSignature::hmac_sha256(header, secret_env)
HMAC-SHA1WebhookSignature::hmac_sha1(header, secret_env)
HMAC-SHA512WebhookSignature::hmac_sha512(header, secret_env)

The secret is read from the environment variable at runtime.

Idempotency

SourceFormatExample
Header"header:Header-Name""header:X-Request-Id"
Body"body:$.json.path""body:$.id"

Idempotency keys are stored for 24 hours by default. Duplicate requests return the cached response.

Common Integrations

// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]

// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]

// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]

Example

#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");

match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}

Ok(WebhookResult::Accepted)
}

Duration Format

All duration attributes accept strings with these suffixes:

SuffixUnitExample
msmilliseconds"500ms"
sseconds"30s"
mminutes"5m"
hhours"2h"
ddays"7d"

Raw numbers without suffix are interpreted as seconds.