Skip to main content

Track Progress

Show job and workflow progress in your UI. Forge generates reactive stores that subscribe to status updates via SSE.

The Code

import { trackExportUsers$ } from "$lib/forge/api";

const job = trackExportUsers$({ userId: "abc-123" });

// Reactive state
$: if ($job.status === "completed") {
downloadFile($job.output);
}

The store starts the job, subscribes to status updates, and unsubscribes when the component unmounts.

What Happens

Calling trackExportUsers$() does three things:

  1. Starts the job by calling the backend function
  2. Registers for SSE updates with the gateway
  3. Returns a reactive store that updates as the job progresses

The subscription uses the same SSE connection as query subscriptions. When your component unmounts (Svelte's $: reactive block cleanup), the store automatically unsubscribes.

Job State

interface JobState<TOutput> {
jobId: string;
status: JobStatus;
progress: number | null; // 0-100
message: string | null; // "Processing row 500..."
output: TOutput | null; // Result when completed
error: string | null;
}

type JobStatus =
| "pending"
| "claimed"
| "running"
| "completed"
| "retry"
| "failed"
| "dead_letter"
| "cancel_requested"
| "cancelled"
| "not_found";
FieldDescription
jobIdUnique job identifier
statusCurrent job status
progressPercentage complete (0-100), reported by ctx.progress()
messageStatus message from ctx.progress(percent, message)
outputJob return value when status is completed
errorError message when status is failed, dead_letter, or cancelled

Workflow State

Workflows expose additional fields for step-by-step tracking:

interface WorkflowState<TOutput> {
workflowId: string;
status: WorkflowStatus;
step: string | null; // Current step name
waitingFor: string | null; // Event being awaited
steps: WorkflowStepState[]; // All steps with status
output: TOutput | null;
error: string | null;
}

interface WorkflowStepState {
name: string;
status: "pending" | "running" | "completed" | "failed" | "compensated" | "skipped";
error: string | null;
}

type WorkflowStatus =
| "created"
| "running"
| "waiting"
| "completed"
| "compensating"
| "compensated"
| "failed"
| "not_found";

Patterns

Progress Bar

<script>
import { trackExportUsers$ } from "$lib/forge/api";

const job = trackExportUsers$({ format: "csv" });
</script>

{#if $job.status === "running"}
<div class="progress-bar">
<div class="fill" style="width: {$job.progress ?? 0}%"></div>
<span>{$job.message ?? "Processing..."}</span>
</div>
{:else if $job.status === "completed"}
<a href={$job.output.downloadUrl}>Download</a>
{:else if $job.status === "failed"}
<p class="error">{$job.error}</p>
{/if}

Workflow Stepper

<script>
import { trackAccountVerification$ } from "$lib/forge/api";

const workflow = trackAccountVerification$({ userId });
</script>

<ol class="steps">
{#each $workflow.steps as step}
<li class:current={step.name === $workflow.step} class:done={step.status === "completed"}>
{step.name}
{#if step.status === "failed"}
<span class="error">{step.error}</span>
{/if}
</li>
{/each}
</ol>

Handle All States

<script>
import { trackSendReport$ } from "$lib/forge/api";

let job = null;

function startExport() {
job = trackSendReport$({ reportId: "q4-2024" });
}
</script>

<button onclick={startExport} disabled={job && $job.status === "running"}>
Generate Report
</button>

{#if job}
{#if $job.status === "pending" || $job.status === "claimed"}
<p>Waiting to start...</p>
{:else if $job.status === "running"}
<p>Processing: {$job.progress ?? 0}%</p>
{:else if $job.status === "retry"}
<p>Retrying after temporary failure...</p>
{:else if $job.status === "completed"}
<p>Done! Result: {JSON.stringify($job.output)}</p>
{:else if $job.status === "failed"}
<p>Failed: {$job.error}</p>
{:else if $job.status === "dead_letter"}
<p>Permanently failed after max retries</p>
{:else if $job.status === "cancel_requested"}
<p>Cancellation requested...</p>
{:else if $job.status === "cancelled"}
<p>Job cancelled</p>
{/if}
{/if}

Manual Unsubscribe

For stores created outside component lifecycle:

const job = trackExportUsers$({ userId });

// Later, clean up manually
job.unsubscribe();

Store Interface

JobStore

interface JobStore<TOutput> {
subscribe: (run: (value: JobState<TOutput> & { loading: boolean }) => void) => () => void;
unsubscribe: () => void;
}

WorkflowStore

interface WorkflowStore<TOutput> {
subscribe: (run: (value: WorkflowState<TOutput> & { loading: boolean }) => void) => () => void;
unsubscribe: () => void;
}

Both stores include a loading: boolean field that is true until the initial job/workflow is created.

Generated Functions

For each job:

#[forge::job]
pub async fn export_users(ctx: &JobContext, format: String) -> Result<ExportResult> {
// ...
}

Forge generates:

export const trackExportUsers$ = (args: { format: string }) =>
createJobStore<{ format: string }, ExportResult>("export_users", args);

For workflows, the pattern is identical:

#[forge::workflow]
pub async fn account_verification(ctx: &WorkflowContext, user_id: Uuid) -> Result<()> {
// ...
}

Generates:

export const trackAccountVerification$ = (args: { userId: string }) =>
createWorkflowStore<{ userId: string }, void>("account_verification", args);

Under the Hood

SSE transport: Job updates arrive over the same SSE connection used for query subscriptions. The client registers for job-specific updates via /subscribe-job:

await client._registerJob(clientSubId, jobId);

Connection lifecycle: If the SSE connection drops, the client reconnects with exponential backoff. Job subscriptions are re-registered automatically on reconnection.

Automatic cleanup: Svelte's store contract means the unsubscribe callback runs when the last subscriber unsubscribes. For $job syntax, this happens when the component is destroyed.

Server-side updates: Progress updates are pushed via PostgreSQL LISTEN/NOTIFY. The gateway receives notifications and broadcasts to relevant SSE connections:

NOTIFY forge_job_updates, '{"job_id": "...", "status": "running", "progress_percent": 50}'