Skip to main content

Track Progress

Track job and workflow progress in the UI with reactive stores that subscribe to status updates via SSE.

The Code

import { trackExportUsers } from "$lib/forge/api";

const job = trackExportUsers({ userId: "abc-123" });

// Reactive state
$: if ($job.status === "completed") {
downloadFile($job.output);
}

The store starts the job, subscribes to status updates, and exposes unsubscribe() so you can stop local updates when the component unmounts.

What Happens

Calling trackExportUsers() does three things:

  1. Starts the job by calling the backend function
  2. Registers for SSE updates with the gateway
  3. Returns a reactive store that updates as the job progresses

The subscription uses the same SSE connection as query subscriptions. When your component unmounts, Svelte detaches the store subscription for that component automatically.

Job State

interface JobState<TOutput> {
jobId: string;
status: JobStatus;
progress: number | null; // 0-100
message: string | null; // "Processing row 500..."
output: TOutput | null; // Result when completed
error: string | null;
}

type JobStatus =
| "pending"
| "claimed"
| "running"
| "completed"
| "retry"
| "failed"
| "dead_letter"
| "cancel_requested"
| "cancelled"
| "not_found";
FieldDescription
jobIdUnique job identifier
statusCurrent job status
progressPercentage complete (0-100), reported by ctx.progress()
messageStatus message from ctx.progress(percent, message)
outputJob return value when status is completed
errorError message when status is failed, dead_letter, or cancelled

Job Statuses

StatusDescription
pendingJob is queued and waiting to be picked up by a worker
claimedA worker has claimed the job but hasn't started executing yet
runningJob handler is executing
completedJob finished successfully; output contains the return value
retryJob failed with a retryable error and will be re-queued
failedJob failed and has no retries left
dead_letterPermanently failed after exhausting all retries
cancel_requestedCancellation requested while the job is running; the handler should call ctx.check_cancelled() and stop gracefully
cancelledJob was successfully cancelled
not_foundNo job exists with the given ID (client-only status)

Workflow State

Workflows expose additional fields for step-by-step tracking:

interface WorkflowState<TOutput> {
workflowId: string;
status: WorkflowStatus;
step: string | null; // Current step name
waitingFor: string | null; // Reserved for future event details; currently null
steps: WorkflowStepState[]; // All steps with status
output: TOutput | null;
error: string | null;
}

interface WorkflowStepState {
name: string;
status:
| "pending"
| "running"
| "completed"
| "failed"
| "compensated"
| "skipped";
error: string | null;
}

type WorkflowStatus =
| "created"
| "running"
| "waiting"
| "completed"
| "compensating"
| "compensated"
| "failed"
| "not_found";

Patterns

Progress Bar

<script>
import { trackExportUsers } from "$lib/forge/api";

const job = trackExportUsers({ format: "csv" });
</script>

{#if $job.status === "running"}
<div class="progress-bar">
<div class="fill" style="width: {$job.progress ?? 0}%"></div>
<span>{$job.message ?? "Processing..."}</span>
</div>
{:else if $job.status === "completed"}
<a href={$job.output.downloadUrl}>Download</a>
{:else if $job.status === "failed"}
<p class="error">{$job.error}</p>
{/if}

Workflow Stepper

<script>
import { trackAccountVerification } from "$lib/forge/api";

const workflow = trackAccountVerification({ userId });
</script>

<ol class="steps">
{#each $workflow.steps as step}
<li class:current={step.name === $workflow.step} class:done={step.status === "completed"}>
{step.name}
{#if step.status === "failed"}
<span class="error">{step.error}</span>
{/if}
</li>
{/each}
</ol>

Handle All States

<script>
import { trackSendReport } from "$lib/forge/api";

let job = null;

function startExport() {
job = trackSendReport({ reportId: "q4-2024" });
}
</script>

<button onclick={startExport} disabled={job && $job.status === "running"}>
Generate Report
</button>

{#if job}
{#if $job.status === "pending" || $job.status === "claimed"}
<p>Waiting to start...</p>
{:else if $job.status === "running"}
<p>Processing: {$job.progress ?? 0}%</p>
{:else if $job.status === "retry"}
<p>Retrying after temporary failure...</p>
{:else if $job.status === "completed"}
<p>Done! Result: {JSON.stringify($job.output)}</p>
{:else if $job.status === "failed"}
<p>Failed: {$job.error}</p>
{:else if $job.status === "dead_letter"}
<p>Permanently failed after max retries</p>
{:else if $job.status === "cancel_requested"}
<p>Cancellation requested...</p>
{:else if $job.status === "cancelled"}
<p>Job cancelled</p>
{/if}
{/if}

Manual Unsubscribe

For stores created outside component lifecycle:

const job = trackExportUsers({ userId });

// Later, clean up manually
job.unsubscribe();

Store Interface

JobStore

interface JobStore<TOutput> {
subscribe: (
run: (value: JobState<TOutput> & { loading: boolean }) => void,
) => () => void;
unsubscribe: () => void;
}

WorkflowStore

interface WorkflowStore<TOutput> {
subscribe: (
run: (value: WorkflowState<TOutput> & { loading: boolean }) => void,
) => () => void;
unsubscribe: () => void;
}

Both stores include a loading: boolean field that is true until the initial job/workflow is created.

Generated Functions

For each job:

#[forge::job]
pub async fn export_users(ctx: &JobContext, format: String) -> Result<ExportResult> {
// ...
}

Forge generates:

export const trackExportUsers = (args: { format: string }) =>
createJobStore<{ format: string }, ExportResult>("export_users", args);

For workflows, the pattern is identical:

#[forge::workflow]
pub async fn account_verification(ctx: &WorkflowContext, user_id: Uuid) -> Result<()> {
// ...
}

Generates:

export const trackAccountVerification = (args: { userId: string }) =>
createWorkflowStore<{ userId: string }, void>("account_verification", args);

RPC Endpoints

Individual function calls

Individual functions can also be called via POST /_api/rpc/{function_name} with the args as the JSON body. Useful for server-to-server calls and curl testing:

curl -X POST https://example.com/_api/rpc/export_users \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{ "format": "csv" }'

Authentication is enforced identically to the standard /rpc endpoint.

Batch calls

POST to /_api/rpc/batch with an array of call objects to reduce round-trips:

[
{ "fn": "get_user", "args": { "id": "abc-123" } },
{ "fn": "get_team", "args": { "team_id": "t-456" } }
]

The server executes queries in parallel and mutations serially. The generated client does not yet expose a typed batch() helper.

Under the Hood

SSE transport: Job updates arrive over the same SSE connection used for query subscriptions. The client registers for job-specific updates via /_api/subscribe-job:

await client._registerJob(clientSubId, jobId);

Connection lifecycle: If the SSE connection drops, the client reconnects with exponential backoff. Query subscriptions are re-registered automatically; recreate job and workflow stores after reconnect if you need fresh tracking on the new session.

Automatic cleanup: Svelte's store contract means the unsubscribe callback runs when the last subscriber unsubscribes. For job and workflow stores, unsubscribe() stops local updates for that store instance.

Server-side updates: Progress updates are pushed via PostgreSQL LISTEN/NOTIFY. The gateway receives notifications and broadcasts to relevant SSE connections:

NOTIFY forge_job_updates, '{"job_id": "...", "status": "running", "progress": 50}'