Attributes Reference
All macro attributes for Forge function decorators.
#[forge::query]
Read-only database operations with caching and subscriptions.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
cache | "duration" | none | Cache TTL ("30s", "5m", "1h") |
timeout | u64 | none | Statement timeout in seconds |
tables | ["t1", "t2"] | auto | Explicit table dependencies for subscriptions |
log | "level" | none | Log level (trace, debug, info, warn, error, off) |
rate_limit(...) | config | none | Rate limiting configuration |
Rate Limit Config
rate_limit(requests = 100, per = "1m", key = "user")
| Parameter | Type | Description |
|---|---|---|
requests | u32 | Requests allowed per window |
per | "duration" | Time window ("30s", "1m", "1h") |
key | string | Bucket key: "user", "ip", "tenant", "global" |
Example
#[forge::query(
cache = "5m",
timeout = 30,
require_role("admin"),
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_stats(ctx: &QueryContext) -> Result<Stats> {
// ...
}
#[forge::mutation]
Transactional write operations.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
transactional | flag | false | Wrap in database transaction (required if dispatching jobs) |
timeout | u64 | none | Statement timeout in seconds |
log | "level" | none | Log level (trace, debug, info, warn, error, off) |
rate_limit(...) | config | none | Rate limiting configuration (same as query) |
Compile-Time Enforcement
Mutations that call dispatch_job() or start_workflow() must use transactional. The macro will error if you forget:
error: Mutations that call `dispatch_job()` must use #[forge::mutation(transactional)]
Example
#[forge::mutation(transactional, require_role("admin"))]
pub async fn create_order(ctx: &MutationContext, input: OrderInput) -> Result<Order> {
let order = insert_order(ctx.db(), &input).await?;
ctx.dispatch_job("send_confirmation", json!({ "order_id": order.id })).await?;
Ok(order)
}
#[forge::job]
Background tasks with retries and worker routing.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Allow dispatch without authentication |
require_role("name") | string | none | Require role to dispatch |
name | "string" | fn name | Override job name |
timeout | "duration" | "1h" | Job timeout ("30s", "10m", "2h") |
priority | "level" | "normal" | background, low, normal, high, critical |
max_attempts | u32 | 3 | Maximum retry attempts |
backoff | "strategy" | "exponential" | fixed, linear, exponential |
max_backoff | "duration" | "5m" | Maximum backoff delay |
worker_capability | "string" | none | Required worker capability (e.g., "gpu", "media") |
idempotent | flag | false | Mark job as idempotent |
idempotent(key = "...") | config | none | Idempotent with custom key field |
compensate | "handler" | none | Cancellation compensation handler |
ttl | "duration" | none | Auto-cleanup after completion (e.g., "7d") |
Retry Configuration (Grouped)
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m")
Equivalent to specifying max_attempts, backoff, and max_backoff separately.
Priority Levels
| Level | Description |
|---|---|
background | Lowest priority, runs when idle |
low | Below normal priority |
normal | Default priority |
high | Above normal priority |
critical | Highest priority, preempts other work |
Example
#[forge::job(
priority = "high",
timeout = "30m",
retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"),
worker_capability = "gpu",
idempotent(key = "video_id"),
compensate = "compensate_process_video",
ttl = "7d"
)]
pub async fn process_video(ctx: &JobContext, input: VideoInput) -> Result<VideoOutput> {
ctx.progress(0, "Starting transcode")?;
ctx.check_cancelled().await?;
let temp_file = create_temp_file().await?;
ctx.save("temp_file", json!(temp_file.path())).await?;
// ... processing ...
ctx.progress(100, "Complete")?;
Ok(output)
}
pub async fn compensate_process_video(
ctx: &JobContext,
input: VideoInput,
reason: &str,
) -> Result<()> {
// Original args available directly
tracing::warn!("Cleaning up video {} due to: {}", input.video_id, reason);
// Data generated during execution from saved()
let saved = ctx.saved().await;
if let Some(path) = saved.get("temp_file") {
delete_temp_file(path.as_str().unwrap()).await?;
}
Ok(())
}
#[forge::cron]
Scheduled tasks with exactly-once execution.
#[forge::cron("schedule", ...attributes)]
| Attribute | Type | Default | Description |
|---|---|---|---|
| (first arg) | "cron expr" | required | Cron schedule (5-part: "0 9 * * *") |
timezone | "string" | "UTC" | IANA timezone (e.g., "America/New_York") |
timeout | "duration" | "1h" | Execution timeout |
catch_up | flag | false | Run missed executions after downtime |
catch_up_limit | u32 | 10 | Maximum catch-up runs |
Cron Expression
Standard 5-part cron format:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
| Expression | Description |
|---|---|
0 9 * * * | Daily at 9:00 AM |
*/15 * * * * | Every 15 minutes |
0 0 1 * * | First day of month at midnight |
0 0 * * 0 | Every Sunday at midnight |
Example
#[forge::cron("0 9 * * *", timezone = "America/New_York", timeout = "30m", catch_up, catch_up_limit = 5)]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
// Skip heavy operations for catch-up runs
return Ok(());
}
// Generate and send report
Ok(())
}
#[forge::workflow]
Multi-step durable processes with compensation.
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Allow start without authentication |
require_role("name") | string | none | Require role to start |
version | u32 | 1 | Workflow version (increment for breaking changes) |
timeout | "duration" | "24h" | Maximum workflow duration ("1h", "7d", "30d") |
deprecated | flag | false | Mark workflow as deprecated |
Duration Format
| Suffix | Unit |
|---|---|
s | seconds |
m | minutes |
h | hours |
d | days |
Compile-Time Enforcement
The macro detects tokio::sleep() calls with durations over 100 seconds and errors:
error: Use `ctx.sleep()` instead of `tokio::sleep()` for long sleeps in workflows.
Workflows require durable sleep that survives process restarts.
Short sleeps for polling loops are allowed.
Example
#[forge::workflow(version = 2, timeout = "7d", require_role("admin"))]
pub async fn order_fulfillment(ctx: &WorkflowContext, order: Order) -> Result<FulfillmentResult> {
let payment = ctx.step("charge_card", || async {
charge(&order.payment).await
})
.compensate(|result| async move {
refund(&result.charge_id).await
})
.run()
.await?;
ctx.sleep(Duration::from_days(3)).await?; // Durable sleep
ctx.step("ship_order", || async {
ship(&order).await
}).run().await?;
Ok(FulfillmentResult { payment, shipped: true })
}
#[forge::daemon]
Long-running singleton services with leader election.
| Attribute | Type | Default | Description |
|---|---|---|---|
leader_elected | bool | true | Only one instance runs across cluster |
restart_on_panic | bool | true | Restart daemon if it panics |
restart_delay | "duration" | "5s" | Delay before restart after failure |
startup_delay | "duration" | "0s" | Delay before first execution |
max_restarts | u32 | unlimited | Maximum restart attempts |
Leadership
When leader_elected = true (default):
- Exactly one instance runs across all nodes
- Uses PostgreSQL advisory locks for election
- Lock is session-bound; automatic failover on connection drop
- Other nodes wait in standby
When leader_elected = false:
- Daemon runs on every node
- No coordination between instances
Example
#[forge::daemon(
leader_elected = true,
restart_on_panic = true,
restart_delay = "10s",
startup_delay = "5s",
max_restarts = 10
)]
pub async fn metrics_aggregator(ctx: &DaemonContext) -> Result<()> {
loop {
aggregate_metrics(ctx.db()).await?;
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
#[forge::webhook]
HTTP endpoints for external events with signature verification.
| Attribute | Type | Default | Description |
|---|---|---|---|
path | "string" | required | URL path (e.g., "/webhooks/stripe") |
timeout | "duration" | "30s" | Request timeout |
signature | WebhookSignature::... | none | Signature verification config |
idempotency | "source:key" | none | Idempotency key source |
Signature Verification
signature = WebhookSignature::hmac_sha256("Header-Name", "SECRET_ENV_VAR")
| Algorithm | Function |
|---|---|
| HMAC-SHA256 | WebhookSignature::hmac_sha256(header, secret_env) |
| HMAC-SHA1 | WebhookSignature::hmac_sha1(header, secret_env) |
| HMAC-SHA512 | WebhookSignature::hmac_sha512(header, secret_env) |
The secret is read from the environment variable at runtime.
Idempotency
| Source | Format | Example |
|---|---|---|
| Header | "header:Header-Name" | "header:X-Request-Id" |
| Body | "body:$.json.path" | "body:$.id" |
Idempotency keys are stored for 24 hours by default. Duplicate requests return the cached response.
Common Integrations
// GitHub
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery"
)]
// Stripe
#[forge::webhook(
path = "/webhooks/stripe",
signature = WebhookSignature::hmac_sha256("Stripe-Signature", "STRIPE_WEBHOOK_SECRET"),
idempotency = "body:$.id"
)]
// Generic
#[forge::webhook(path = "/webhooks/external", timeout = "60s")]
Example
#[forge::webhook(
path = "/webhooks/github",
signature = WebhookSignature::hmac_sha256("X-Hub-Signature-256", "GITHUB_SECRET"),
idempotency = "header:X-GitHub-Delivery",
timeout = "30s"
)]
pub async fn github_events(ctx: &WebhookContext, payload: Value) -> Result<WebhookResult> {
let event_type = ctx.header("X-GitHub-Event").unwrap_or("unknown");
match event_type {
"push" => { ctx.dispatch_job("process_push", payload.clone()).await?; }
"pull_request" => { ctx.dispatch_job("process_pr", payload.clone()).await?; }
_ => {}
}
Ok(WebhookResult::Accepted)
}
Duration Format
All duration attributes accept strings with these suffixes:
| Suffix | Unit | Example |
|---|---|---|
ms | milliseconds | "500ms" |
s | seconds | "30s" |
m | minutes | "5m" |
h | hours | "2h" |
d | days | "7d" |
Raw numbers without suffix are interpreted as seconds.