Skip to main content

Master Background Processing

Three ways to run work outside the request cycle: background jobs for one-off tasks, cron schedules for recurring tasks, and durable workflows for multi-step processes. Each survives server restarts and integrates with Forge's type-safe frontend bindings.

What You'll Learn

  • Dispatch and track background jobs with progress reporting
  • Schedule recurring work with cron expressions
  • Build multi-step workflows with durable sleep and step-level recovery

Part 1: Background Jobs

When to Use

Use a job when you have long-running work -- data exports, sending emails, image processing -- that should not block an API response. The caller gets a job ID immediately and can poll for progress.

Define a Job

use crate::schema::{User, UserRole};
use forge::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportInput {
pub format: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportOutput {
pub count: usize,
pub data: String,
pub format: String,
}

#[forge::job(
timeout = "5m",
priority = "low",
retry(max_attempts = 3, backoff = "exponential"),
idempotent,
public
)]
pub async fn export_users(ctx: &JobContext, input: ExportInput) -> Result<ExportOutput> {
// Job body
}

Here is what each attribute does:

AttributePurpose
timeout = "5m"Kill the job if it runs longer than five minutes
priority = "low"Schedule behind normal-priority jobs in the queue
retry(max_attempts = 3, backoff = "exponential")Retry up to 3 times with exponential backoff on failure
idempotentSafe to re-execute -- the runtime may deduplicate calls
publicExpose to frontend bindings (omit for internal-only jobs)

Track Progress

Inside a job you have access to ctx.progress() and ctx.heartbeat():

#[forge::job(timeout = "5m", priority = "low", retry(max_attempts = 3, backoff = "exponential"), idempotent, public)]
pub async fn export_users(ctx: &JobContext, input: ExportInput) -> Result<ExportOutput> {
if ctx.is_retry() {
tracing::warn!(attempt = ctx.attempt, "Retrying export job");
}

ctx.progress(0, "Initializing export...");
// ... do some work ...
ctx.heartbeat().await?;

ctx.progress(20, "Fetching users...");
let users: Vec<User> = sqlx::query_as!(User, "SELECT ... FROM users")
.fetch_all(ctx.db())
.await?;

ctx.heartbeat().await?;
ctx.progress(50, format!("Processing {} users...", users.len()));
// ... format data ...

ctx.progress(100, "Export complete");
Ok(ExportOutput { count: users.len(), data, format: input.format })
}
  • ctx.progress(percent, message) -- report 0-100 progress to any listening frontend client.
  • ctx.heartbeat() -- signal that the job is still alive, preventing timeout during long I/O waits.
  • ctx.is_retry() / ctx.attempt -- detect retries so you can log or adjust behavior.

Dispatch from a Mutation

To kick off a job from your API layer, dispatch it inside a transactional mutation. The job is enqueued atomically with any other database writes in the same transaction:

#[forge::mutation(transactional, public)]
pub async fn start_export(ctx: &MutationContext, input: ExportInput) -> Result<Uuid> {
let job_id = ctx.dispatch_job::<ExportUsersJob>(&input).await?;
Ok(job_id)
}

The mutation returns the job_id immediately. If the transaction rolls back, the job is never enqueued.

Track in the Frontend

Forge generates type-safe tracking functions for every public job. Call the tracker and it dispatches the job, then polls for progress automatically.

<script lang="ts">
import { trackExportUsers } from "$lib/forge";

let exportJobStore = $state<ReturnType<typeof trackExportUsers> | null>(null);

function startExportJob() {
exportJobStore = trackExportUsers({ format: "csv" });
}
</script>

{#if exportJobStore && $exportJobStore}
<div class="progress-bar">
<div class="fill" style="width:{$exportJobStore.progress || 0}%"></div>
</div>
<p>{$exportJobStore.progress || 0}% - {$exportJobStore.message || $exportJobStore.status}</p>
{#if ["completed", "failed"].includes($exportJobStore.status)}
<button onclick={startExportJob}>Run Again</button>
{/if}
{:else}
<button onclick={startExportJob}>Start Export</button>
{/if}

trackExportUsers returns a Svelte store. Subscribe with $exportJobStore to reactively render progress, status, and messages.

Part 2: Scheduled Tasks (Crons)

When to Use

Use a cron for recurring work: cleaning up stale data, syncing from external APIs, periodic health checks. Forge manages scheduling, execution, and logging.

Define a Cron

use forge::prelude::*;

#[forge::cron("* * * * *", timezone = "UTC")]
pub async fn iss_location(ctx: &CronContext) -> Result<()> {
ctx.log.info("Fetching ISS location", serde_json::json!({"run_id": ctx.run_id.to_string()}));

let response = ctx.http()
.get("http://api.open-notify.org/iss-now.json")
.send()
.await
.map_err(|e| ForgeError::Internal(format!("HTTP request failed: {}", e)))?;

if !response.status().is_success() {
ctx.log.error("Failed to fetch ISS location",
serde_json::json!({"status": response.status().as_u16()}));
return Err(ForgeError::Internal("Failed to fetch ISS location".into()));
}

let data: IssApiResponse = response.json().await
.map_err(|e| ForgeError::Deserialization(format!("Failed to parse: {}", e)))?;

sqlx::query!(
"INSERT INTO iss_location (id, latitude, longitude, api_timestamp, created_at) \
VALUES (gen_random_uuid(), $1, $2, to_timestamp($3), NOW())",
snapshot.latitude, snapshot.longitude, snapshot.timestamp as f64
)
.execute(ctx.db())
.await?;

Ok(())
}

The cron expression "* * * * *" means every minute. The context provides:

  • ctx.http() -- a pre-configured HTTP client for external calls.
  • ctx.db() -- the database pool.
  • ctx.log -- structured logging with .info(), .warn(), .error(), .debug().
  • ctx.run_id -- unique ID for this particular run.

Cron Expression Syntax

 ┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, SUN-SAT)
│ │ │ │ │
* * * * *

Common examples:

ExpressionMeaning
* * * * *Every minute
0 * * * *Every hour on the hour
0 9 * * MON-FRIWeekdays at 9 AM
0 0 1 * *First of each month at midnight

Timezone Support

By default crons run in UTC. Specify a timezone with the timezone attribute:

#[forge::cron("0 9 * * MON-FRI", timezone = "America/New_York")]
pub async fn send_daily_digest(ctx: &CronContext) -> Result<()> {
// Runs at 9 AM Eastern, Monday through Friday
}

Catch-Up Missed Runs

If the server was down and missed scheduled runs, enable catch_up to replay them on restart:

#[forge::cron("0 * * * *", catch_up, catch_up_limit = 3)]
pub async fn hourly_sync(ctx: &CronContext) -> Result<()> {
// If the server missed 5 hours, only the last 3 will replay
}

catch_up_limit caps how many missed runs are replayed, preventing a thundering herd after a long outage.

Part 3: Durable Workflows

When to Use

Use a workflow when you have a multi-step process that must survive server restarts: user onboarding, payment processing, verification flows. Each step is tracked individually, so after a restart the workflow resumes from the last completed step rather than starting over.

Define a Workflow

use forge::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationInput {
pub account_id: String,
pub email: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationOutput {
pub verified: bool,
pub token: String,
}

#[forge::workflow(timeout = "24h", public)]
pub async fn account_verification(
ctx: &WorkflowContext,
input: VerificationInput,
) -> Result<VerificationOutput> {
if ctx.is_resumed() {
tracing::info!(workflow_id = %ctx.run_id, "Resuming verification workflow");
}
// Steps follow...
}
AttributePurpose
version = 1Schema version -- increment when step logic changes to avoid conflicts with in-flight workflows
timeout = "24h"Maximum wall-clock time for the entire workflow
publicExpose to frontend bindings

Steps with Tracking

Each step is guarded by is_step_completed so it can be skipped on resume. Results are persisted with record_step_complete so downstream steps can retrieve them:

// Step 1: Generate token
let token = if ctx.is_step_completed("generate_token") {
ctx.get_step_result::<String>("generate_token")
.unwrap_or_else(|| format!("verify_{}", Uuid::new_v4()))
} else {
ctx.record_step_start("generate_token");
let token = format!("verify_{}", Uuid::new_v4());
ctx.record_step_complete("generate_token", serde_json::json!(token));
token
};

// Step 2: Store token
if !ctx.is_step_completed("store_token") {
ctx.record_step_start("store_token");
// ... persist to database ...
ctx.record_step_complete("store_token", serde_json::json!({"stored": true}));
}

// Step 3: Send email
if !ctx.is_step_completed("send_email") {
ctx.record_step_start("send_email");
// ... send verification email ...
ctx.record_step_complete("send_email", serde_json::json!({"sent": true}));
}

The pattern for every step is:

  1. Check -- ctx.is_step_completed("name") returns true if this step already ran.
  2. Record start -- ctx.record_step_start("name") marks the step as in-progress.
  3. Do work -- your actual business logic.
  4. Record complete -- ctx.record_step_complete("name", result) persists the output.
  5. Retrieve on resume -- ctx.get_step_result::<T>("name") deserializes the saved result.

Durable Sleep

Workflows can sleep for hours or days. Unlike tokio::time::sleep, a durable sleep survives server restarts -- the runtime persists the wake time and resumes the workflow when it elapses:

// Step 4: Wait for user to click the verification link
if !ctx.is_step_completed("wait_period") {
ctx.record_step_start("wait_period");
ctx.sleep(Duration::from_secs(86400)).await?; // 24 hours
ctx.record_step_complete_async("wait_period", serde_json::json!({"waited": true})).await;
}

After the sleep, even if the server restarted multiple times, the workflow picks up at step 5.

Track in the Frontend

Like jobs, public workflows get generated frontend bindings that track step-level progress.

<script lang="ts">
import { trackAccountVerification } from "$lib/forge";

let workflowStore = $state<ReturnType<typeof trackAccountVerification> | null>(null);

function startVerificationWorkflow() {
workflowStore = trackAccountVerification({
account_id: "user-123",
email: "user@example.com",
});
}
</script>

{#if workflowStore && $workflowStore}
<div class="steps">
{#each $workflowStore.steps as step (step.name)}
<div class="step {step.status}">
<span>{step.name}</span>
</div>
{/each}
</div>
{#if ["completed", "failed", "compensated"].includes($workflowStore.status)}
<button onclick={startVerificationWorkflow}>Run Again</button>
{/if}
{:else}
<button onclick={startVerificationWorkflow}>Start Workflow</button>
{/if}

Each step in $workflowStore.steps has a name and status ("pending", "running", "completed", "failed"), giving you fine-grained UI control.

Choosing the Right Tool

FeatureJobCronWorkflow
Triggered byCode dispatchScheduleCode dispatch
RetryAutomatic (configurable)Next scheduled runPer-step resume
Survives restartYesYesYes (step-level)
Progress trackingYes (0-100%)NoYes (per step)
Durable sleepNoN/AYes
Frontend bindingstrackXxx / use_xxxQuery latest resulttrackXxx / use_xxx
Best forOne-off tasksRecurring tasksMulti-step processes

Rules of thumb:

  • If it runs once and you need to track progress, use a job.
  • If it runs on a schedule, use a cron.
  • If it has multiple steps that must all complete (or compensate), use a workflow.