Skip to main content

Background Work

Process tasks asynchronously with automatic retries, priority queuing, and worker routing.

The Code

#[forge::job(
priority = "high",
retry(max_attempts = 5, backoff = "exponential"),
worker_capability = "media"
)]
pub async fn transcode_video(ctx: &JobContext, args: TranscodeArgs) -> Result<Video> {
ctx.progress(0, "Starting transcription")?;

for (i, chunk) in args.chunks.iter().enumerate() {
process_chunk(chunk).await?;
ctx.progress((i * 100 / args.chunks.len()) as u8, "Processing chunks")?;
ctx.heartbeat().await?;
}

ctx.progress(100, "Complete")?;
Ok(Video { url: args.output_url })
}

What Happens

Forge inserts the job into a PostgreSQL queue table within your mutation's transaction. Workers poll for jobs using FOR UPDATE SKIP LOCKED, preventing thundering herd and guaranteeing exactly-once delivery. Each worker maintains a semaphore to limit concurrent jobs, creating backpressure when capacity is reached.

Jobs that exceed max_attempts move to the dead letter queue for manual inspection. Heartbeats prevent stale job recovery from reclaiming active long-running jobs.

Attributes

AttributeTypeDefaultDescription
name"string"function nameOverride job type name
priority"level""normal"Priority level (see below)
timeout"duration""1h"Maximum execution time
max_attemptsu323Maximum retry attempts
backoff"strategy""exponential"Retry backoff strategy
max_backoff"duration""5m"Maximum backoff delay
worker_capability"string"noneRoute to specific workers
idempotentflagfalseEnable deduplication
idempotent(key = "field")confignoneCustom idempotency key
publicflagfalseAllow unauthenticated dispatch
require_role("name")stringnoneRequire role to dispatch
ttl"duration"noneAuto-cleanup after completion (e.g., "7d")

Priority Levels

LevelValueUse Case
background0Maintenance, cleanup, analytics
low25Batch processing, reports
normal50Standard async work
high75User-facing operations
critical100Payment processing, alerts

Workers claim highest priority first via ORDER BY priority DESC. Lower priority jobs yield to critical work.

Backoff Strategies

StrategyDelay PatternExample (attempts 1-4)
fixedSame each time1s, 1s, 1s, 1s
linearIncreases linearly1s, 2s, 3s, 4s
exponentialDoubles each time1s, 2s, 4s, 8s

All strategies cap at max_backoff (default 5 minutes).

Retry Configuration

#[forge::job(retry(max_attempts = 5, backoff = "exponential", max_backoff = "10m"))]
ParameterTypeDefaultDescription
max_attemptsu323Total execution attempts
backoff"strategy""exponential"Delay calculation method
max_backoff"duration""5m"Cap on retry delay

Patterns

Dispatch from Mutation

Jobs are dispatched within the mutation's transaction. If the mutation fails, the job is never enqueued.

#[forge::mutation(transactional)]
pub async fn create_order(ctx: &MutationContext, order: NewOrder) -> Result<Order> {
let order = sqlx::query_as("INSERT INTO orders ... RETURNING *")
.bind(&order)
.fetch_one(ctx.db())
.await?;

ctx.dispatch_job("send_order_confirmation", json!({
"order_id": order.id,
"email": order.email.clone(),
})).await?;

Ok(order)
}

Idempotent Jobs

Prevent duplicate processing with idempotency keys.

#[forge::job(idempotent(key = "payment_id"))]
pub async fn process_payment(ctx: &JobContext, args: PaymentArgs) -> Result<Receipt> {
// If a job with this payment_id already exists (pending/running/completed),
// the dispatch returns the existing job ID without creating a duplicate
charge_card(&args.payment_id, args.amount).await
}

Without a custom key, idempotent uses a hash of all arguments.

Progress Tracking

Report progress for long-running jobs. Progress updates also serve as heartbeats.

#[forge::job(timeout = "30m")]
pub async fn export_data(ctx: &JobContext, args: ExportArgs) -> Result<ExportResult> {
let rows = fetch_all_rows(&args.query).await?;
let total = rows.len();

for (i, row) in rows.iter().enumerate() {
write_row(&args.output, row).await?;

if i % 100 == 0 {
let percent = (i * 100 / total) as u8;
ctx.progress(percent, format!("Exported {} of {} rows", i, total))?;
}
}

ctx.progress(100, "Export complete")?;
Ok(ExportResult { path: args.output })
}

Cancellation & Compensation

Jobs can be cancelled after dispatch. Use ctx.check_cancelled() to exit early and define a compensation handler for cleanup.

#[forge::job(compensate = "rollback_charge")]
pub async fn charge_card(ctx: &JobContext, args: ChargeArgs) -> Result<Receipt> {
ctx.check_cancelled().await?;

let receipt = charge(&args).await?;

// Save data generated during execution for compensation
ctx.save("receipt_id", json!(receipt.id)).await?;

Ok(receipt)
}

pub async fn rollback_charge(
ctx: &JobContext,
args: ChargeArgs,
reason: &str,
) -> Result<()> {
// Original args are available directly
tracing::info!("Rolling back charge {} due to: {}", args.charge_id, reason);

// Data generated during execution comes from saved()
let saved = ctx.saved().await;
if let Some(receipt_id) = saved.get("receipt_id") {
refund(receipt_id.as_str().unwrap()).await?;
}
Ok(())
}

The compensation handler receives the original args. Use save(key, value) for data generated during execution that isn't in the original args (transaction IDs, receipts, temporary resources).

Worker Routing

Route jobs to workers with specific capabilities.

#[forge::job(worker_capability = "gpu")]
pub async fn train_model(ctx: &JobContext, args: TrainArgs) -> Result<Model> {
// Only workers configured with capabilities = ["gpu"] will claim this job
run_training(&args.dataset, &args.config).await
}

Configure worker capabilities in forge.toml:

[node]
worker_capabilities = ["gpu", "general"]

Jobs without a capability requirement run on any worker.

Heartbeat for Long Jobs

For jobs that run longer than the stale threshold (5 minutes), send periodic heartbeats to prevent reclamation.

#[forge::job(timeout = "2h")]
pub async fn batch_import(ctx: &JobContext, args: ImportArgs) -> Result<ImportResult> {
for file in args.files {
process_file(&file).await?;
ctx.heartbeat().await?;
}
Ok(ImportResult { imported: args.files.len() })
}

Retry-Aware Logic

Adjust behavior based on retry state.

#[forge::job(max_attempts = 5)]
pub async fn send_notification(ctx: &JobContext, args: NotifyArgs) -> Result<()> {
if ctx.is_retry() {
tracing::warn!(
"Retry attempt {} of {} for notification {}",
ctx.attempt,
ctx.max_attempts,
args.notification_id
);
}

if ctx.is_last_attempt() {
// Last chance - use backup provider
send_via_backup(&args).await?;
} else {
send_via_primary(&args).await?;
}

Ok(())
}

Context Methods

MethodReturn TypeDescription
ctx.db()&PgPoolDatabase connection pool
ctx.http()&ClientHTTP client for external calls
ctx.progress(percent, msg)Result<()>Report progress (0-100)
ctx.heartbeat()Result<()>Send heartbeat to prevent stale reclaim
ctx.save(key, value)Result<()>Save data for retries/compensation
ctx.saved()ValueGet all saved data
ctx.set_saved(data)Result<()>Replace all saved data
ctx.is_cancel_requested()Result<bool>Check if cancellation requested
ctx.check_cancelled()Result<()>Return error if cancelled
ctx.is_retry()boolCheck if this is a retry attempt
ctx.is_last_attempt()boolCheck if this is the final attempt
ctx.env(key)Option<String>Get environment variable
ctx.env_require(key)Result<String>Get required env var

Context Fields

FieldTypeDescription
ctx.job_idUuidUnique job identifier
ctx.job_typeStringJob type name
ctx.attemptu32Current attempt (1-based)
ctx.max_attemptsu32Maximum allowed attempts
ctx.authAuthContextAuthentication context

Under the Hood

SKIP LOCKED Pattern

Workers claim jobs with PostgreSQL's FOR UPDATE SKIP LOCKED:

WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($1) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $3, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *

Each worker grabs different jobs without blocking. Job claiming is O(1) regardless of cluster size. No thundering herd.

Semaphore-Based Backpressure

Each worker maintains a semaphore limiting concurrent jobs (default: 10). Workers only poll when permits are available:

let available = semaphore.available_permits();
if available == 0 {
continue; // Skip polling, wait for a job to complete
}

let batch_size = available.min(config.batch_size);
let jobs = queue.claim(worker_id, capabilities, batch_size).await?;

Backpressure propagates naturally. Overloaded workers stop claiming. Jobs remain pending until capacity frees.

Stale Job Recovery

A background task runs every 60 seconds. Jobs claimed more than 5 minutes ago (with no heartbeat) are released back to pending:

UPDATE forge_jobs
SET status = 'pending', worker_id = NULL, claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - INTERVAL '5 minutes'

Crashed workers' jobs are reclaimed automatically. Long-running jobs should call ctx.heartbeat() to prevent false reclamation.

Dead Letter Queue

Jobs exceeding max_attempts move to dead letter status:

UPDATE forge_jobs
SET status = 'dead_letter', last_error = $2, failed_at = NOW()
WHERE id = $1

Dead letter jobs remain in the database for inspection. They can be manually retried or deleted by querying the forge_jobs table.

Idempotency via Unique Constraint

Idempotent jobs check for existing records before insertion:

SELECT id FROM forge_jobs
WHERE idempotency_key = $1
AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')

If found, the existing job ID returns without creating a duplicate. The unique constraint (job_type, idempotency_key) prevents race conditions.

Priority Queue

Jobs are ordered by priority then scheduled time:

ORDER BY priority DESC, scheduled_at ASC

Critical jobs (priority 100) execute before background jobs (priority 0). Within a priority level, older jobs run first. Priority inversion is prevented by always selecting highest priority available.

Job Record TTL

Jobs with a ttl attribute have their records automatically cleaned up after completion:

#[forge::job(ttl = "7d")]  // Records deleted 7 days after completion
pub async fn process_order(ctx: &JobContext, args: OrderArgs) -> Result<()> {
// ...
}

When a job completes, fails, or is cancelled, expires_at is set to completed_at + ttl. A background task deletes expired records every 60 seconds.

Jobs without ttl remain in the database indefinitely for auditing. Set ttl for high-volume jobs to prevent database bloat.

Testing

Use TestJobContext to test jobs in isolation.

#[test]
fn test_export_progress() {
let ctx = TestJobContext::builder("export_data")
.with_max_attempts(3)
.build();

ctx.progress(50, "Halfway").unwrap();
ctx.progress(100, "Done").unwrap();

let updates = ctx.progress_updates();
assert_eq!(updates.len(), 2);
assert_eq!(updates[0].percent, 50);
assert_eq!(updates[1].percent, 100);
}

Retry Simulation

Test retry-aware logic:

#[test]
fn test_last_attempt_fallback() {
let ctx = TestJobContext::builder("send_notification")
.as_retry(3)
.with_max_attempts(3)
.build();

assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}

Cancellation Testing

Test cancellation handling:

#[test]
fn test_job_handles_cancellation() {
let ctx = TestJobContext::builder("process_payment")
.with_cancellation_requested()
.build();

assert!(ctx.is_cancel_requested().unwrap());
assert!(ctx.check_cancelled().is_err());
}

#[test]
fn test_mid_execution_cancellation() {
let ctx = TestJobContext::builder("long_running_job").build();

// Simulate work that saves data
ctx.save("temp_file", json!("/tmp/upload.dat")).unwrap();

// Request cancellation mid-execution
ctx.request_cancellation();

// Job detects cancellation
assert!(ctx.check_cancelled().is_err());

// Saved data accessible for compensation
assert_eq!(ctx.saved()["temp_file"], "/tmp/upload.dat");
}

With HTTP Mocking

Mock external API calls:

#[test]
fn test_external_api_call() {
let ctx = TestJobContext::builder("process_webhook")
.mock_http_json("api.stripe.com/*", json!({ "status": "success" }))
.build();

// Job can now make HTTP calls to api.stripe.com
}

Asserting Job Dispatch

Verify jobs are dispatched from mutations:

#[tokio::test]
async fn test_order_dispatches_confirmation() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());

create_order(&ctx, new_order).await.unwrap();

assert_job_dispatched!(ctx, "send_order_confirmation");
}