Master Background Processing
Three ways to run work outside the request cycle: background jobs for one-off tasks, cron schedules for recurring tasks, and durable workflows for multi-step processes. Each survives server restarts and integrates with Forge's type-safe frontend bindings.
What You'll Learn
- Dispatch and track background jobs with progress reporting
- Schedule recurring work with cron expressions
- Build multi-step workflows with durable sleep and step-level recovery
Part 1: Background Jobs
When to Use
Use a job when you have long-running work -- data exports, sending emails, image processing -- that should not block an API response. The caller gets a job ID immediately and can poll for progress.
Define a Job
use crate::schema::{User, UserRole};
use forge::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportInput {
pub format: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportOutput {
pub count: usize,
pub data: String,
pub format: String,
}
#[forge::job(
timeout = "5m",
priority = "low",
retry(max_attempts = 3, backoff = "exponential"),
idempotent,
public
)]
pub async fn export_users(ctx: &JobContext, input: ExportInput) -> Result<ExportOutput> {
// Job body
}
Here is what each attribute does:
| Attribute | Purpose |
|---|---|
timeout = "5m" | Kill the job if it runs longer than five minutes |
priority = "low" | Schedule behind normal-priority jobs in the queue |
retry(max_attempts = 3, backoff = "exponential") | Retry up to 3 times with exponential backoff on failure |
idempotent | Safe to re-execute -- the runtime may deduplicate calls |
public | Expose to frontend bindings (omit for internal-only jobs) |
Track Progress
Inside a job you have access to ctx.progress() and ctx.heartbeat():
#[forge::job(timeout = "5m", priority = "low", retry(max_attempts = 3, backoff = "exponential"), idempotent, public)]
pub async fn export_users(ctx: &JobContext, input: ExportInput) -> Result<ExportOutput> {
if ctx.is_retry() {
tracing::warn!(attempt = ctx.attempt, "Retrying export job");
}
ctx.progress(0, "Initializing export...");
// ... do some work ...
ctx.heartbeat().await?;
ctx.progress(20, "Fetching users...");
let users: Vec<User> = sqlx::query_as!(User, "SELECT ... FROM users")
.fetch_all(ctx.db())
.await?;
ctx.heartbeat().await?;
ctx.progress(50, format!("Processing {} users...", users.len()));
// ... format data ...
ctx.progress(100, "Export complete");
Ok(ExportOutput { count: users.len(), data, format: input.format })
}
ctx.progress(percent, message)-- report 0-100 progress to any listening frontend client.ctx.heartbeat()-- signal that the job is still alive, preventing timeout during long I/O waits.ctx.is_retry()/ctx.attempt-- detect retries so you can log or adjust behavior.
Dispatch from a Mutation
To kick off a job from your API layer, dispatch it inside a transactional mutation. The job is enqueued atomically with any other database writes in the same transaction:
#[forge::mutation(transactional, public)]
pub async fn start_export(ctx: &MutationContext, input: ExportInput) -> Result<Uuid> {
let job_id = ctx.dispatch_job::<ExportUsersJob>(&input).await?;
Ok(job_id)
}
The mutation returns the job_id immediately. If the transaction rolls back, the job is never enqueued.
Track in the Frontend
Forge generates type-safe tracking functions for every public job. Call the tracker and it dispatches the job, then polls for progress automatically.
- SvelteKit
- Dioxus
<script lang="ts">
import { trackExportUsers } from "$lib/forge";
let exportJobStore = $state<ReturnType<typeof trackExportUsers> | null>(null);
function startExportJob() {
exportJobStore = trackExportUsers({ format: "csv" });
}
</script>
{#if exportJobStore && $exportJobStore}
<div class="progress-bar">
<div class="fill" style="width:{$exportJobStore.progress || 0}%"></div>
</div>
<p>{$exportJobStore.progress || 0}% - {$exportJobStore.message || $exportJobStore.status}</p>
{#if ["completed", "failed"].includes($exportJobStore.status)}
<button onclick={startExportJob}>Run Again</button>
{/if}
{:else}
<button onclick={startExportJob}>Start Export</button>
{/if}
trackExportUsers returns a Svelte store. Subscribe with $exportJobStore to reactively render progress, status, and messages.
use dioxus::prelude::*;
use forge_dioxus::JobStatus;
use crate::forge::{ExportInput, use_export_users};
#[component]
fn ExportRun(on_restart: EventHandler<MouseEvent>) -> Element {
let job = use_export_users(ExportInput::new("csv"));
let progress = job.state.progress.unwrap_or(0.0).clamp(0.0, 100.0);
let message = job.state.message.clone()
.unwrap_or_else(|| format!("{:?}", job.state.status).to_lowercase());
let can_restart = matches!(
job.state.status, JobStatus::Completed | JobStatus::Failed
);
rsx! {
div { class: "progress-bar",
div { class: "fill", style: "width: {progress}%;" }
}
p { "{progress:.0}% - {message}" }
if can_restart {
button { onclick: move |e| on_restart.call(e), "Run Again" }
}
}
}
use_export_users is a generated hook. It dispatches the job on mount and re-renders as progress updates arrive.
Part 2: Scheduled Tasks (Crons)
When to Use
Use a cron for recurring work: cleaning up stale data, syncing from external APIs, periodic health checks. Forge manages scheduling, execution, and logging.
Define a Cron
use forge::prelude::*;
#[forge::cron("* * * * *", timezone = "UTC")]
pub async fn iss_location(ctx: &CronContext) -> Result<()> {
ctx.log.info("Fetching ISS location", serde_json::json!({"run_id": ctx.run_id.to_string()}));
let response = ctx.http()
.get("http://api.open-notify.org/iss-now.json")
.send()
.await
.map_err(|e| ForgeError::Internal(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
ctx.log.error("Failed to fetch ISS location",
serde_json::json!({"status": response.status().as_u16()}));
return Err(ForgeError::Internal("Failed to fetch ISS location".into()));
}
let data: IssApiResponse = response.json().await
.map_err(|e| ForgeError::Deserialization(format!("Failed to parse: {}", e)))?;
sqlx::query!(
"INSERT INTO iss_location (id, latitude, longitude, api_timestamp, created_at) \
VALUES (gen_random_uuid(), $1, $2, to_timestamp($3), NOW())",
snapshot.latitude, snapshot.longitude, snapshot.timestamp as f64
)
.execute(ctx.db())
.await?;
Ok(())
}
The cron expression "* * * * *" means every minute. The context provides:
ctx.http()-- a pre-configured HTTP client for external calls.ctx.db()-- the database pool.ctx.log-- structured logging with.info(),.warn(),.error(),.debug().ctx.run_id-- unique ID for this particular run.
Cron Expression Syntax
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, SUN-SAT)
│ │ │ │ │
* * * * *
Common examples:
| Expression | Meaning |
|---|---|
* * * * * | Every minute |
0 * * * * | Every hour on the hour |
0 9 * * MON-FRI | Weekdays at 9 AM |
0 0 1 * * | First of each month at midnight |
Timezone Support
By default crons run in UTC. Specify a timezone with the timezone attribute:
#[forge::cron("0 9 * * MON-FRI", timezone = "America/New_York")]
pub async fn send_daily_digest(ctx: &CronContext) -> Result<()> {
// Runs at 9 AM Eastern, Monday through Friday
}
Catch-Up Missed Runs
If the server was down and missed scheduled runs, enable catch_up to replay them on restart:
#[forge::cron("0 * * * *", catch_up, catch_up_limit = 3)]
pub async fn hourly_sync(ctx: &CronContext) -> Result<()> {
// If the server missed 5 hours, only the last 3 will replay
}
catch_up_limit caps how many missed runs are replayed, preventing a thundering herd after a long outage.
Part 3: Durable Workflows
When to Use
Use a workflow when you have a multi-step process that must survive server restarts: user onboarding, payment processing, verification flows. Each step is tracked individually, so after a restart the workflow resumes from the last completed step rather than starting over.
Define a Workflow
use forge::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationInput {
pub account_id: String,
pub email: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationOutput {
pub verified: bool,
pub token: String,
}
#[forge::workflow(timeout = "24h", public)]
pub async fn account_verification(
ctx: &WorkflowContext,
input: VerificationInput,
) -> Result<VerificationOutput> {
if ctx.is_resumed() {
tracing::info!(workflow_id = %ctx.run_id, "Resuming verification workflow");
}
// Steps follow...
}
| Attribute | Purpose |
|---|---|
version = 1 | Schema version -- increment when step logic changes to avoid conflicts with in-flight workflows |
timeout = "24h" | Maximum wall-clock time for the entire workflow |
public | Expose to frontend bindings |
Steps with Tracking
Each step is guarded by is_step_completed so it can be skipped on resume. Results are persisted with record_step_complete so downstream steps can retrieve them:
// Step 1: Generate token
let token = if ctx.is_step_completed("generate_token") {
ctx.get_step_result::<String>("generate_token")
.unwrap_or_else(|| format!("verify_{}", Uuid::new_v4()))
} else {
ctx.record_step_start("generate_token");
let token = format!("verify_{}", Uuid::new_v4());
ctx.record_step_complete("generate_token", serde_json::json!(token));
token
};
// Step 2: Store token
if !ctx.is_step_completed("store_token") {
ctx.record_step_start("store_token");
// ... persist to database ...
ctx.record_step_complete("store_token", serde_json::json!({"stored": true}));
}
// Step 3: Send email
if !ctx.is_step_completed("send_email") {
ctx.record_step_start("send_email");
// ... send verification email ...
ctx.record_step_complete("send_email", serde_json::json!({"sent": true}));
}
The pattern for every step is:
- Check --
ctx.is_step_completed("name")returnstrueif this step already ran. - Record start --
ctx.record_step_start("name")marks the step as in-progress. - Do work -- your actual business logic.
- Record complete --
ctx.record_step_complete("name", result)persists the output. - Retrieve on resume --
ctx.get_step_result::<T>("name")deserializes the saved result.
Durable Sleep
Workflows can sleep for hours or days. Unlike tokio::time::sleep, a durable sleep survives server restarts -- the runtime persists the wake time and resumes the workflow when it elapses:
// Step 4: Wait for user to click the verification link
if !ctx.is_step_completed("wait_period") {
ctx.record_step_start("wait_period");
ctx.sleep(Duration::from_secs(86400)).await?; // 24 hours
ctx.record_step_complete_async("wait_period", serde_json::json!({"waited": true})).await;
}
After the sleep, even if the server restarted multiple times, the workflow picks up at step 5.
Track in the Frontend
Like jobs, public workflows get generated frontend bindings that track step-level progress.
- SvelteKit
- Dioxus
<script lang="ts">
import { trackAccountVerification } from "$lib/forge";
let workflowStore = $state<ReturnType<typeof trackAccountVerification> | null>(null);
function startVerificationWorkflow() {
workflowStore = trackAccountVerification({
account_id: "user-123",
email: "user@example.com",
});
}
</script>
{#if workflowStore && $workflowStore}
<div class="steps">
{#each $workflowStore.steps as step (step.name)}
<div class="step {step.status}">
<span>{step.name}</span>
</div>
{/each}
</div>
{#if ["completed", "failed", "compensated"].includes($workflowStore.status)}
<button onclick={startVerificationWorkflow}>Run Again</button>
{/if}
{:else}
<button onclick={startVerificationWorkflow}>Start Workflow</button>
{/if}
Each step in $workflowStore.steps has a name and status ("pending", "running", "completed", "failed"), giving you fine-grained UI control.
use dioxus::prelude::*;
use forge_dioxus::WorkflowStatus;
use crate::forge::{VerificationInput, use_account_verification};
#[component]
fn VerificationRun(
account_id: String,
email: String,
on_restart: EventHandler<MouseEvent>,
) -> Element {
let wf = use_account_verification(VerificationInput::new(account_id, email));
let can_restart = matches!(
wf.state.status,
WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Compensated
);
rsx! {
div { class: "steps",
for step in wf.state.steps.iter() {
div { key: "{step.name}", class: "step {step.status}",
span { "{step.name}" }
}
}
}
if can_restart {
button { onclick: move |e| on_restart.call(e), "Run Again" }
}
}
}
use_account_verification dispatches the workflow on mount and streams step updates back to the component.
Choosing the Right Tool
| Feature | Job | Cron | Workflow |
|---|---|---|---|
| Triggered by | Code dispatch | Schedule | Code dispatch |
| Retry | Automatic (configurable) | Next scheduled run | Per-step resume |
| Survives restart | Yes | Yes | Yes (step-level) |
| Progress tracking | Yes (0-100%) | No | Yes (per step) |
| Durable sleep | No | N/A | Yes |
| Frontend bindings | trackXxx / use_xxx | Query latest result | trackXxx / use_xxx |
| Best for | One-off tasks | Recurring tasks | Multi-step processes |
Rules of thumb:
- If it runs once and you need to track progress, use a job.
- If it runs on a schedule, use a cron.
- If it has multiple steps that must all complete (or compensate), use a workflow.