Track Progress
Show job and workflow progress in your UI. Forge generates reactive stores that subscribe to status updates via SSE.
The Code
import { trackExportUsers$ } from "$lib/forge/api";
const job = trackExportUsers$({ userId: "abc-123" });
// Reactive state
$: if ($job.status === "completed") {
downloadFile($job.output);
}
The store starts the job, subscribes to status updates, and unsubscribes when the component unmounts.
What Happens
Calling trackExportUsers$() does three things:
- Starts the job by calling the backend function
- Registers for SSE updates with the gateway
- Returns a reactive store that updates as the job progresses
The subscription uses the same SSE connection as query subscriptions. When your component unmounts (Svelte's $: reactive block cleanup), the store automatically unsubscribes.
Job State
interface JobState<TOutput> {
jobId: string;
status: JobStatus;
progress: number | null; // 0-100
message: string | null; // "Processing row 500..."
output: TOutput | null; // Result when completed
error: string | null;
}
type JobStatus =
| "pending"
| "claimed"
| "running"
| "completed"
| "retry"
| "failed"
| "dead_letter"
| "cancel_requested"
| "cancelled"
| "not_found";
| Field | Description |
|---|---|
jobId | Unique job identifier |
status | Current job status |
progress | Percentage complete (0-100), reported by ctx.progress() |
message | Status message from ctx.progress(percent, message) |
output | Job return value when status is completed |
error | Error message when status is failed, dead_letter, or cancelled |
Workflow State
Workflows expose additional fields for step-by-step tracking:
interface WorkflowState<TOutput> {
workflowId: string;
status: WorkflowStatus;
step: string | null; // Current step name
waitingFor: string | null; // Event being awaited
steps: WorkflowStepState[]; // All steps with status
output: TOutput | null;
error: string | null;
}
interface WorkflowStepState {
name: string;
status: "pending" | "running" | "completed" | "failed" | "compensated" | "skipped";
error: string | null;
}
type WorkflowStatus =
| "created"
| "running"
| "waiting"
| "completed"
| "compensating"
| "compensated"
| "failed"
| "not_found";
Patterns
Progress Bar
<script>
import { trackExportUsers$ } from "$lib/forge/api";
const job = trackExportUsers$({ format: "csv" });
</script>
{#if $job.status === "running"}
<div class="progress-bar">
<div class="fill" style="width: {$job.progress ?? 0}%"></div>
<span>{$job.message ?? "Processing..."}</span>
</div>
{:else if $job.status === "completed"}
<a href={$job.output.downloadUrl}>Download</a>
{:else if $job.status === "failed"}
<p class="error">{$job.error}</p>
{/if}
Workflow Stepper
<script>
import { trackAccountVerification$ } from "$lib/forge/api";
const workflow = trackAccountVerification$({ userId });
</script>
<ol class="steps">
{#each $workflow.steps as step}
<li class:current={step.name === $workflow.step} class:done={step.status === "completed"}>
{step.name}
{#if step.status === "failed"}
<span class="error">{step.error}</span>
{/if}
</li>
{/each}
</ol>
Handle All States
<script>
import { trackSendReport$ } from "$lib/forge/api";
let job = null;
function startExport() {
job = trackSendReport$({ reportId: "q4-2024" });
}
</script>
<button onclick={startExport} disabled={job && $job.status === "running"}>
Generate Report
</button>
{#if job}
{#if $job.status === "pending" || $job.status === "claimed"}
<p>Waiting to start...</p>
{:else if $job.status === "running"}
<p>Processing: {$job.progress ?? 0}%</p>
{:else if $job.status === "retry"}
<p>Retrying after temporary failure...</p>
{:else if $job.status === "completed"}
<p>Done! Result: {JSON.stringify($job.output)}</p>
{:else if $job.status === "failed"}
<p>Failed: {$job.error}</p>
{:else if $job.status === "dead_letter"}
<p>Permanently failed after max retries</p>
{:else if $job.status === "cancel_requested"}
<p>Cancellation requested...</p>
{:else if $job.status === "cancelled"}
<p>Job cancelled</p>
{/if}
{/if}
Manual Unsubscribe
For stores created outside component lifecycle:
const job = trackExportUsers$({ userId });
// Later, clean up manually
job.unsubscribe();
Store Interface
JobStore
interface JobStore<TOutput> {
subscribe: (run: (value: JobState<TOutput> & { loading: boolean }) => void) => () => void;
unsubscribe: () => void;
}
WorkflowStore
interface WorkflowStore<TOutput> {
subscribe: (run: (value: WorkflowState<TOutput> & { loading: boolean }) => void) => () => void;
unsubscribe: () => void;
}
Both stores include a loading: boolean field that is true until the initial job/workflow is created.
Generated Functions
For each job:
#[forge::job]
pub async fn export_users(ctx: &JobContext, format: String) -> Result<ExportResult> {
// ...
}
Forge generates:
export const trackExportUsers$ = (args: { format: string }) =>
createJobStore<{ format: string }, ExportResult>("export_users", args);
For workflows, the pattern is identical:
#[forge::workflow]
pub async fn account_verification(ctx: &WorkflowContext, user_id: Uuid) -> Result<()> {
// ...
}
Generates:
export const trackAccountVerification$ = (args: { userId: string }) =>
createWorkflowStore<{ userId: string }, void>("account_verification", args);
Under the Hood
SSE transport: Job updates arrive over the same SSE connection used for query subscriptions. The client registers for job-specific updates via /subscribe-job:
await client._registerJob(clientSubId, jobId);
Connection lifecycle: If the SSE connection drops, the client reconnects with exponential backoff. Job subscriptions are re-registered automatically on reconnection.
Automatic cleanup: Svelte's store contract means the unsubscribe callback runs when the last subscriber unsubscribes. For $job syntax, this happens when the component is destroyed.
Server-side updates: Progress updates are pushed via PostgreSQL LISTEN/NOTIFY. The gateway receives notifications and broadcasts to relevant SSE connections:
NOTIFY forge_job_updates, '{"job_id": "...", "status": "running", "progress_percent": 50}'