Background Work
Process tasks asynchronously with automatic retries, priority queuing, and worker routing.
The Code
#[forge::job(
priority = "high",
retry(max_attempts = 5, backoff = "exponential"),
worker_capability = "media"
)]
pub async fn transcode_video(ctx: &JobContext, args: TranscodeArgs) -> Result<Video> {
ctx.progress(0, "Starting transcription")?;
for (i, chunk) in args.chunks.iter().enumerate() {
process_chunk(chunk).await?;
ctx.progress((i * 100 / args.chunks.len()) as u8, "Processing chunks")?;
ctx.heartbeat().await?;
}
ctx.progress(100, "Complete")?;
Ok(Video { url: args.output_url })
}
What Happens
Forge inserts the job into a PostgreSQL queue table within your mutation's transaction. Workers poll for jobs using FOR UPDATE SKIP LOCKED, preventing thundering herd and guaranteeing exactly-once delivery. Each worker maintains a semaphore to limit concurrent jobs, creating backpressure when capacity is reached.
Jobs that exceed max_attempts move to the dead letter queue for manual inspection. Heartbeats prevent stale job recovery from reclaiming active long-running jobs.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
name | "string" | function name | Override job type name |
priority | "level" | "normal" | Priority level (see below) |
timeout | "duration" | "1h" | Maximum execution time |
max_attempts | u32 | 3 | Maximum retry attempts |
backoff | "strategy" | "exponential" | Retry backoff strategy |
max_backoff | "duration" | "5m" | Maximum backoff delay |
worker_capability | "string" | none | Route to specific workers |
idempotent | flag | false | Enable deduplication |
idempotent(key = "field") | config | none | Custom idempotency key |
public | flag | false | Allow unauthenticated dispatch |
require_role("name") | string | none | Require role to dispatch |
ttl | "duration" | none | Auto-cleanup after completion (e.g., "7d") |
Priority Levels
| Level | Value | Use Case |
|---|---|---|
background | 0 | Maintenance, cleanup, analytics |
low | 25 | Batch processing, reports |
normal | 50 | Standard async work |
high | 75 | User-facing operations |
critical | 100 | Payment processing, alerts |
Workers claim highest priority first via ORDER BY priority DESC. Lower priority jobs yield to critical work.
Backoff Strategies
| Strategy | Delay Pattern | Example (attempts 1-4) |
|---|---|---|
fixed | Same each time | 1s, 1s, 1s, 1s |
linear | Increases linearly | 1s, 2s, 3s, 4s |
exponential | Doubles each time | 1s, 2s, 4s, 8s |
All strategies cap at max_backoff (default 5 minutes).
Retry Configuration
#[forge::job(retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"))]
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts | u32 | 3 | Total execution attempts |
backoff | "strategy" | "exponential" | Delay calculation method |
max_backoff | "duration" | "5m" | Cap on retry delay |
Patterns
Dispatch from Mutation
Jobs are dispatched within the mutation's transaction. If the mutation fails, the job is never enqueued.
#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, order: NewOrder) -> Result<Order> {
let order = sqlx::query_as("INSERT INTO orders ... RETURNING *")
.bind(&order)
.fetch_one(ctx.db())
.await?;
ctx.dispatch_job("send_order_confirmation", json!({
"order_id": order.id,
"email": order.email.clone(),
})).await?;
Ok(order)
}
Idempotent Jobs
Prevent duplicate processing with idempotency keys.
#[forge::job(idempotent(key = "payment_id"))]
pub async fn process_payment(ctx: &JobContext, args: PaymentArgs) -> Result<Receipt> {
// If a job with this payment_id already exists (pending/running/completed),
// the dispatch returns the existing job ID without creating a duplicate
charge_card(&args.payment_id, args.amount).await
}
Without a custom key, idempotent uses a hash of all arguments.
Progress Tracking
Report progress for long-running jobs. Progress updates also serve as heartbeats.
#[forge::job(timeout = "30m")]
pub async fn export_data(ctx: &JobContext, args: ExportArgs) -> Result<ExportResult> {
let rows = fetch_all_rows(&args.query).await?;
let total = rows.len();
for (i, row) in rows.iter().enumerate() {
write_row(&args.output, row).await?;
if i % 100 == 0 {
let percent = (i * 100 / total) as u8;
ctx.progress(percent, format!("Exported {} of {} rows", i, total))?;
}
}
ctx.progress(100, "Export complete")?;
Ok(ExportResult { path: args.output })
}
Cancellation & Compensation
Jobs can be cancelled after dispatch. Use ctx.check_cancelled() to exit early and define
a compensation handler for cleanup.
#[forge::job(compensate = "rollback_charge")]
pub async fn charge_card(ctx: &JobContext, args: ChargeArgs) -> Result<Receipt> {
ctx.check_cancelled().await?;
let receipt = charge(&args).await?;
// Save data generated during execution for compensation
ctx.save("receipt_id", json!(receipt.id)).await?;
Ok(receipt)
}
pub async fn rollback_charge(
ctx: &JobContext,
args: ChargeArgs,
reason: &str,
) -> Result<()> {
// Original args are available directly
tracing::info!("Rolling back charge {} due to: {}", args.charge_id, reason);
// Data generated during execution comes from saved()
let saved = ctx.saved().await;
if let Some(receipt_id) = saved.get("receipt_id") {
refund(receipt_id.as_str().unwrap()).await?;
}
Ok(())
}
The compensation handler receives the original args. Use save(key, value) for data generated
during execution that isn't in the original args (transaction IDs, receipts, temporary resources).
Worker Routing
Route jobs to workers with specific capabilities.
#[forge::job(worker_capability = "gpu")]
pub async fn train_model(ctx: &JobContext, args: TrainArgs) -> Result<Model> {
// Only workers configured with capabilities = ["gpu"] will claim this job
run_training(&args.dataset, &args.config).await
}
Configure worker capabilities in forge.toml:
[node]
worker_capabilities = ["gpu", "general"]
Jobs without a capability requirement run on any worker.
Heartbeat for Long Jobs
For jobs that run longer than the stale threshold (5 minutes), send periodic heartbeats to prevent reclamation.
#[forge::job(timeout = "2h")]
pub async fn batch_import(ctx: &JobContext, args: ImportArgs) -> Result<ImportResult> {
for file in args.files {
process_file(&file).await?;
ctx.heartbeat().await?;
}
Ok(ImportResult { imported: args.files.len() })
}
Retry-Aware Logic
Adjust behavior based on retry state.
#[forge::job(max_attempts = 5)]
pub async fn send_notification(ctx: &JobContext, args: NotifyArgs) -> Result<()> {
if ctx.is_retry() {
tracing::warn!(
"Retry attempt {} of {} for notification {}",
ctx.attempt,
ctx.max_attempts,
args.notification_id
);
}
if ctx.is_last_attempt() {
// Last chance - use backup provider
send_via_backup(&args).await?;
} else {
send_via_primary(&args).await?;
}
Ok(())
}
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | &PgPool | Database connection pool |
ctx.http() | &Client | HTTP client for external calls |
ctx.progress(percent, msg) | Result<()> | Report progress (0-100) |
ctx.heartbeat() | Result<()> | Send heartbeat to prevent stale reclaim |
ctx.save(key, value) | Result<()> | Save data for retries/compensation |
ctx.saved() | Value | Get all saved data |
ctx.set_saved(data) | Result<()> | Replace all saved data |
ctx.is_cancel_requested() | Result<bool> | Check if cancellation requested |
ctx.check_cancelled() | Result<()> | Return error if cancelled |
ctx.is_retry() | bool | Check if this is a retry attempt |
ctx.is_last_attempt() | bool | Check if this is the final attempt |
ctx.env(key) | Option<String> | Get environment variable |
ctx.env_require(key) | Result<String> | Get required env var |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.job_id | Uuid | Unique job identifier |
ctx.job_type | String | Job type name |
ctx.attempt | u32 | Current attempt (1-based) |
ctx.max_attempts | u32 | Maximum allowed attempts |
ctx.auth | AuthContext | Authentication context |
Under the Hood
SKIP LOCKED Pattern
Workers claim jobs with PostgreSQL's FOR UPDATE SKIP LOCKED:
WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($1) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $3, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *
Each worker grabs different jobs without blocking. Job claiming is O(1) regardless of cluster size. No thundering herd.
Semaphore-Based Backpressure
Each worker maintains a semaphore limiting concurrent jobs (default: 10). Workers only poll when permits are available:
let available = semaphore.available_permits();
if available == 0 {
continue; // Skip polling, wait for a job to complete
}
let batch_size = available.min(config.batch_size);
let jobs = queue.claim(worker_id, capabilities, batch_size).await?;
Backpressure propagates naturally. Overloaded workers stop claiming. Jobs remain pending until capacity frees.
Stale Job Recovery
A background task runs every 60 seconds. Jobs claimed more than 5 minutes ago (with no heartbeat) are released back to pending:
UPDATE forge_jobs
SET status = 'pending', worker_id = NULL, claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - INTERVAL '5 minutes'
Crashed workers' jobs are reclaimed automatically. Long-running jobs should call ctx.heartbeat() to prevent false reclamation.
Dead Letter Queue
Jobs exceeding max_attempts move to dead letter status:
UPDATE forge_jobs
SET status = 'dead_letter', last_error = $2, failed_at = NOW()
WHERE id = $1
Dead letter jobs remain in the database for inspection. They can be manually retried or deleted by querying the forge_jobs table.
Idempotency via Unique Constraint
Idempotent jobs check for existing records before insertion:
SELECT id FROM forge_jobs
WHERE idempotency_key = $1
AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
If found, the existing job ID returns without creating a duplicate. The unique constraint (job_type, idempotency_key) prevents race conditions.
Priority Queue
Jobs are ordered by priority then scheduled time:
ORDER BY priority DESC, scheduled_at ASC
Critical jobs (priority 100) execute before background jobs (priority 0). Within a priority level, older jobs run first. Priority inversion is prevented by always selecting highest priority available.
Job Record TTL
Jobs with a ttl attribute have their records automatically cleaned up after completion:
#[forge::job(ttl = "7d")] // Records deleted 7 days after completion
pub async fn process_order(ctx: &JobContext, args: OrderArgs) -> Result<()> {
// ...
}
When a job completes, fails, or is cancelled, expires_at is set to completed_at + ttl. A background task deletes expired records every 60 seconds.
Jobs without ttl remain in the database indefinitely for auditing. Set ttl for high-volume jobs to prevent database bloat.
Testing
Use TestJobContext to test jobs in isolation.
#[test]
fn test_export_progress() {
let ctx = TestJobContext::builder("export_data")
.with_max_attempts(3)
.build();
ctx.progress(50, "Halfway").unwrap();
ctx.progress(100, "Done").unwrap();
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 2);
assert_eq!(updates[0].percent, 50);
assert_eq!(updates[1].percent, 100);
}
Retry Simulation
Test retry-aware logic:
#[test]
fn test_last_attempt_fallback() {
let ctx = TestJobContext::builder("send_notification")
.as_retry(3)
.with_max_attempts(3)
.build();
assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}
Cancellation Testing
Test cancellation handling:
#[test]
fn test_job_handles_cancellation() {
let ctx = TestJobContext::builder("process_payment")
.with_cancellation_requested()
.build();
assert!(ctx.is_cancel_requested().unwrap());
assert!(ctx.check_cancelled().is_err());
}
#[test]
fn test_mid_execution_cancellation() {
let ctx = TestJobContext::builder("long_running_job").build();
// Simulate work that saves data
ctx.save("temp_file", json!("/tmp/upload.dat")).unwrap();
// Request cancellation mid-execution
ctx.request_cancellation();
// Job detects cancellation
assert!(ctx.check_cancelled().is_err());
// Saved data accessible for compensation
assert_eq!(ctx.saved()["temp_file"], "/tmp/upload.dat");
}
With HTTP Mocking
Mock external API calls:
#[test]
fn test_external_api_call() {
let ctx = TestJobContext::builder("process_webhook")
.mock_http_json("api.stripe.com/*", json!({ "status": "success" }))
.build();
// Job can now make HTTP calls to api.stripe.com
}
Asserting Job Dispatch
Verify jobs are dispatched from mutations:
#[tokio::test]
async fn test_order_dispatches_confirmation() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());
create_order(&ctx, new_order).await.unwrap();
assert_job_dispatched!(ctx, "send_order_confirmation");
}