Track Progress
Track job and workflow progress in the UI with reactive stores that subscribe to status updates via SSE.
The Code
import { trackExportUsers } from "$lib/forge/api";
const job = trackExportUsers({ userId: "abc-123" });
// Reactive state
$: if ($job.status === "completed") {
downloadFile($job.output);
}
The store starts the job, subscribes to status updates, and exposes unsubscribe() so you can stop local updates when the component unmounts.
What Happens
Calling trackExportUsers() does three things:
- Starts the job by calling the backend function
- Registers for SSE updates with the gateway
- Returns a reactive store that updates as the job progresses
The subscription uses the same SSE connection as query subscriptions. When your component unmounts, Svelte detaches the store subscription for that component automatically.
Job State
interface JobState<TOutput> {
jobId: string;
status: JobStatus;
progress: number | null; // 0-100
message: string | null; // "Processing row 500..."
output: TOutput | null; // Result when completed
error: string | null;
}
type JobStatus =
| "pending"
| "claimed"
| "running"
| "completed"
| "retry"
| "failed"
| "dead_letter"
| "cancel_requested"
| "cancelled"
| "not_found";
| Field | Description |
|---|---|
jobId | Unique job identifier |
status | Current job status |
progress | Percentage complete (0-100), reported by ctx.progress() |
message | Status message from ctx.progress(percent, message) |
output | Job return value when status is completed |
error | Error message when status is failed, dead_letter, or cancelled |
Job Statuses
| Status | Description |
|---|---|
pending | Job is queued and waiting to be picked up by a worker |
claimed | A worker has claimed the job but hasn't started executing yet |
running | Job handler is executing |
completed | Job finished successfully; output contains the return value |
retry | Job failed with a retryable error and will be re-queued |
failed | Job failed and has no retries left |
dead_letter | Permanently failed after exhausting all retries |
cancel_requested | Cancellation requested while the job is running; the handler should call ctx.check_cancelled() and stop gracefully |
cancelled | Job was successfully cancelled |
not_found | No job exists with the given ID (client-only status) |
Workflow State
Workflows expose additional fields for step-by-step tracking:
interface WorkflowState<TOutput> {
workflowId: string;
status: WorkflowStatus;
step: string | null; // Current step name
waitingFor: string | null; // Reserved for future event details; currently null
steps: WorkflowStepState[]; // All steps with status
output: TOutput | null;
error: string | null;
}
interface WorkflowStepState {
name: string;
status:
| "pending"
| "running"
| "completed"
| "failed"
| "compensated"
| "skipped";
error: string | null;
}
type WorkflowStatus =
| "created"
| "running"
| "waiting"
| "completed"
| "compensating"
| "compensated"
| "failed"
| "not_found";
Patterns
Progress Bar
<script>
import { trackExportUsers } from "$lib/forge/api";
const job = trackExportUsers({ format: "csv" });
</script>
{#if $job.status === "running"}
<div class="progress-bar">
<div class="fill" style="width: {$job.progress ?? 0}%"></div>
<span>{$job.message ?? "Processing..."}</span>
</div>
{:else if $job.status === "completed"}
<a href={$job.output.downloadUrl}>Download</a>
{:else if $job.status === "failed"}
<p class="error">{$job.error}</p>
{/if}
Workflow Stepper
<script>
import { trackAccountVerification } from "$lib/forge/api";
const workflow = trackAccountVerification({ userId });
</script>
<ol class="steps">
{#each $workflow.steps as step}
<li class:current={step.name === $workflow.step} class:done={step.status === "completed"}>
{step.name}
{#if step.status === "failed"}
<span class="error">{step.error}</span>
{/if}
</li>
{/each}
</ol>
Handle All States
<script>
import { trackSendReport } from "$lib/forge/api";
let job = null;
function startExport() {
job = trackSendReport({ reportId: "q4-2024" });
}
</script>
<button onclick={startExport} disabled={job && $job.status === "running"}>
Generate Report
</button>
{#if job}
{#if $job.status === "pending" || $job.status === "claimed"}
<p>Waiting to start...</p>
{:else if $job.status === "running"}
<p>Processing: {$job.progress ?? 0}%</p>
{:else if $job.status === "retry"}
<p>Retrying after temporary failure...</p>
{:else if $job.status === "completed"}
<p>Done! Result: {JSON.stringify($job.output)}</p>
{:else if $job.status === "failed"}
<p>Failed: {$job.error}</p>
{:else if $job.status === "dead_letter"}
<p>Permanently failed after max retries</p>
{:else if $job.status === "cancel_requested"}
<p>Cancellation requested...</p>
{:else if $job.status === "cancelled"}
<p>Job cancelled</p>
{/if}
{/if}
Manual Unsubscribe
For stores created outside component lifecycle:
const job = trackExportUsers({ userId });
// Later, clean up manually
job.unsubscribe();
Store Interface
JobStore
interface JobStore<TOutput> {
subscribe: (
run: (value: JobState<TOutput> & { loading: boolean }) => void,
) => () => void;
unsubscribe: () => void;
}
WorkflowStore
interface WorkflowStore<TOutput> {
subscribe: (
run: (value: WorkflowState<TOutput> & { loading: boolean }) => void,
) => () => void;
unsubscribe: () => void;
}
Both stores include a loading: boolean field that is true until the initial job/workflow is created.
Generated Functions
For each job:
#[forge::job]
pub async fn export_users(ctx: &JobContext, format: String) -> Result<ExportResult> {
// ...
}
Forge generates:
export const trackExportUsers = (args: { format: string }) =>
createJobStore<{ format: string }, ExportResult>("export_users", args);
For workflows, the pattern is identical:
#[forge::workflow]
pub async fn account_verification(ctx: &WorkflowContext, user_id: Uuid) -> Result<()> {
// ...
}
Generates:
export const trackAccountVerification = (args: { userId: string }) =>
createWorkflowStore<{ userId: string }, void>("account_verification", args);
RPC Endpoints
Individual function calls
Individual functions can also be called via POST /_api/rpc/{function_name} with the args as the JSON body. Useful for server-to-server calls and curl testing:
curl -X POST https://example.com/_api/rpc/export_users \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{ "format": "csv" }'
Authentication is enforced identically to the standard /rpc endpoint.
Batch calls
POST to /_api/rpc/batch with an array of call objects to reduce round-trips:
[
{ "fn": "get_user", "args": { "id": "abc-123" } },
{ "fn": "get_team", "args": { "team_id": "t-456" } }
]
The server executes queries in parallel and mutations serially. The generated client does not yet expose a typed batch() helper.
Under the Hood
SSE transport: Job updates arrive over the same SSE connection used for query subscriptions. The client registers for job-specific updates via /_api/subscribe-job:
await client._registerJob(clientSubId, jobId);
Connection lifecycle: If the SSE connection drops, the client reconnects with exponential backoff. Query subscriptions are re-registered automatically; recreate job and workflow stores after reconnect if you need fresh tracking on the new session.
Automatic cleanup: Svelte's store contract means the unsubscribe callback runs when the last subscriber unsubscribes. For job and workflow stores, unsubscribe() stops local updates for that store instance.
Server-side updates: Progress updates are pushed via PostgreSQL LISTEN/NOTIFY. The gateway receives notifications and broadcasts to relevant SSE connections:
NOTIFY forge_job_updates, '{"job_id": "...", "status": "running", "progress": 50}'