Skip to main content

Long Processes

Define workflows with durable state, long sleeps, event waits, and compensation handlers.

The Code

use forge::prelude::*;
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize)]
pub struct TrialInput {
pub user_id: Uuid,
pub email: String,
}

#[forge::workflow(timeout = "30d")]
pub async fn trial_expiration(ctx: &WorkflowContext, input: TrialInput) -> Result<bool> {
// Step 1: Send welcome email
ctx.step("send_welcome", || async {
send_email(&input.email, "Welcome to your trial!").await
})
.run()
.await?; // Result<Option<T>> — ? unwraps the Result, returns Option

// Step 2: Sleep for 14 days (survives restarts)
ctx.sleep(Duration::from_secs(14 * 24 * 60 * 60)).await?;

// Step 3: Send reminder
ctx.step("send_reminder", || async {
send_email(&input.email, "Your trial ends in 16 days").await
})
.run()
.await?;

// Step 4: Wait for trial end
ctx.sleep(Duration::from_secs(16 * 24 * 60 * 60)).await?;

// Step 5: Expire or convert
let converted = ctx.step("check_subscription", || async {
is_subscribed(input.user_id).await
})
.run()
.await?
.expect("non-optional step always returns Some");

Ok(converted)
}

What Happens

Forge persists workflow state to PostgreSQL after each step. When the process restarts, workflows resume from where they left off. Completed steps replay from stored results. Durable timers survive indefinitely.

The macro validates your code at compile time. Using tokio::sleep() with durations over 100 seconds produces a compile error, guiding you toward ctx.sleep().

Attributes

AttributeTypeDefaultDescription
name"string"fn nameStable workflow identity. Two versions share the same name.
version"string""v1"Freeform version label ("2026-05", "v2", "1.0.0"). Paired with name to form the unique definition key.
status"active" | "deprecated" | "staging""active"active: handles new runs (at most one per name). deprecated: drains in-flight runs, rejects new starts. staging: registered but invisible to dispatch, useful for pre-deploy verification.
timeout"duration""24h"Maximum workflow lifetime
publicflagfalseSkip authentication
require_role("x")string-Require specific role

Duration formats: 30s, 5m, 1h, 7d, 30d

Signature Stability

Forge derives a 64-bit FNV-1a signature from each workflow definition's persisted contract: the name, version, step keys, wait keys, timeout, and input/output types. This signature is recorded when a workflow run starts and checked on every resume.

If the signature changes between deploys, in-flight runs are blocked rather than resumed with potentially incompatible logic. The /_api/ready health endpoint reports unhealthy when blocked runs exist, which prevents rolling deploys from completing until the issue is resolved.

What changes the signature:

  • Adding, removing, or renaming steps (ctx.step("name", ...))
  • Adding, removing, or changing wait-for-event keys (ctx.wait_for_event("key", ...))
  • Changing the workflow timeout
  • Changing input or output types

What does not change the signature:

  • Modifying logic inside a step (the step name is the same)
  • Changing compensation handlers
  • Updating log messages or non-persisted behavior

When you need to change the contract: create a new version. Set the old version to deprecated so it drains in-flight runs while the new version handles new starts. Once all old runs complete, remove the deprecated version.

#[forge::workflow(name = "onboard_user", version = "v2", status = "active")]
pub async fn onboard_user_v2(ctx: &WorkflowContext, input: OnboardInput) -> Result<Value> {
// New step structure
}

#[forge::workflow(name = "onboard_user", version = "v1", status = "deprecated")]
pub async fn onboard_user_v1(ctx: &WorkflowContext, input: OnboardInput) -> Result<Value> {
// Old step structure, kept until all v1 runs complete
}

Patterns

Ownership Tracking

Workflows automatically set owner_subject from ctx.auth.principal_id() at dispatch time.

#[forge::mutation(transactional)]
pub async fn begin_onboarding(ctx: &MutationContext, user: NewUser) -> Result<Uuid> {
// owner_subject set automatically from the authenticated principal
ctx.start_workflow("onboard_user", user).await
}

// Query workflows belonging to the current user
#[forge::query]
pub async fn my_workflows(ctx: &QueryContext) -> Result<Vec<WorkflowSummary>> {
let subject = ctx.auth.require_subject()?;

sqlx::query_as("SELECT id, workflow_name, status FROM forge_workflow_runs WHERE owner_subject = $1 ORDER BY created_at DESC")
.bind(subject)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Ownership persists across restarts and suspensions.

Step with Timeout

#[forge::workflow]
pub async fn process_order(ctx: &WorkflowContext, order: Order) -> Result<OrderResult> {
let payment = ctx.step("charge_card", || async {
charge_card(&order.card, order.amount).await
})
.timeout(Duration::from_secs(30))
.run()
.await?
.expect("charge_card is non-optional");

ctx.step("fulfill", || async {
ship_order(order.id).await
})
.timeout(Duration::from_secs(60))
.run()
.await?;

Ok(OrderResult { payment_id: payment.id })
}

Step with Retry

#[forge::workflow]
pub async fn send_notifications(ctx: &WorkflowContext, input: NotifyInput) -> Result<()> {
ctx.step("notify_email", || async {
send_email(&input.email, &input.message).await
})
.retry(3, Duration::from_secs(5)) // 3 attempts, 5 second delay
.run()
.await?;

ctx.step("notify_slack", || async {
post_to_slack(&input.message).await
})
.optional() // Failure won't stop workflow
.run()
.await?;

Ok(())
}

Saga Pattern with Compensation

When a step fails, previously completed steps need to be undone. The saga pattern coordinates compensation instead of distributed transactions.

#[forge::workflow]
pub async fn book_trip(ctx: &WorkflowContext, trip: TripRequest) -> Result<Booking> {
let flight = ctx.step("book_flight", || async {
book_flight(&trip).await
})
.compensate(|flight| async move {
cancel_flight(&flight.confirmation_id).await
})
.run()
.await?
.expect("book_flight is non-optional");

let hotel = ctx.step("book_hotel", || async {
book_hotel(&trip).await
})
.compensate(|hotel| async move {
cancel_hotel(&hotel.reservation_id).await
})
.run()
.await?
.expect("book_hotel is non-optional");

let car = ctx.step("book_car", || async {
book_car(&trip).await // If this fails...
})
.compensate(|car| async move {
cancel_car(&car.rental_id).await
})
.run()
.await? // ...compensation runs in reverse: hotel, then flight
.expect("book_car is non-optional");

Ok(Booking { flight, hotel, car })
}

If book_car fails, Forge runs compensation handlers in reverse order: first cancel_hotel, then cancel_flight. Each step's result is passed to its compensation handler.

Compensation handlers live in memory. If the process restarts before compensation completes, the workflow is marked as failed rather than silently skipping handlers. This fail-closed behavior prevents silent data inconsistency.

Wait for External Events

#[forge::workflow(timeout = "7d")]
pub async fn approval_workflow(ctx: &WorkflowContext, request: ApprovalRequest) -> Result<bool> {
ctx.step("notify_approver", || async {
send_approval_request(&request.approver_email, request.id).await
})
.run()
.await?;

// Wait up to 7 days for approval event
let decision: ApprovalDecision = ctx.wait_for_event(
"approval_decision",
Some(Duration::from_secs(7 * 24 * 60 * 60)),
).await?;

ctx.step("process_decision", || async {
if decision.approved {
execute_request(request.id).await
} else {
reject_request(request.id, &decision.reason).await
}
})
.run()
.await?;

Ok(decision.approved)
}

External systems send events using the workflow run ID for correlation:

// From a webhook or another service
sqlx::query(
"INSERT INTO forge_workflow_events (event_name, correlation_id, payload)
VALUES ($1, $2, $3)"
)
.bind("approval_decision")
.bind(workflow_run_id.to_string())
.bind(json!({ "approved": true }))
.execute(&pool)
.await?;

Parallel Step Execution

Run independent steps concurrently:

#[forge::workflow]
pub async fn onboard_user(ctx: &WorkflowContext, user: NewUser) -> Result<OnboardingResult> {
let results = ctx.parallel()
.step("create_account", || async {
create_user_account(&user).await
})
.step("setup_billing", || async {
create_billing_profile(&user).await
})
.step_with_compensate(
"provision_resources",
|| async { provision_cloud_resources(&user).await },
|resources| async move { cleanup_resources(&resources).await }
)
.run()
.await?;

let account: Account = results.get("create_account")?;
let billing: BillingProfile = results.get("setup_billing")?;
let resources: CloudResources = results.get("provision_resources")?;

Ok(OnboardingResult { account, billing, resources })
}

Parallel steps execute concurrently. If any step fails, compensation runs for all completed steps.

Deterministic Time for Replay

Use workflow_time() instead of Utc::now() for timestamps that need to be consistent across replays:

#[forge::workflow]
pub async fn create_invoice(ctx: &WorkflowContext, order: Order) -> Result<Invoice> {
let invoice_date = ctx.workflow_time(); // Consistent across replays

ctx.step("generate_invoice", || async {
Invoice::create(order.id, invoice_date).await
})
.run()
.await?
.expect("generate_invoice is non-optional")
}

When a workflow resumes after restart, workflow_time() returns the original execution time, not the current time. This ensures idempotent step behavior.

Low-Level Step API

For full control, use the manual step tracking API:

#[forge::workflow]
pub async fn custom_workflow(ctx: &WorkflowContext, input: Input) -> Result<Output> {
// Check if step already completed (for resumption)
let result = if ctx.is_step_completed("process_data") {
ctx.get_step_result::<ProcessResult>("process_data").unwrap()
} else {
ctx.record_step_start("process_data");

let result = process_data(&input).await?;

ctx.record_step_complete("process_data", serde_json::to_value(&result)?);
result
};

Ok(Output { result })
}

Version Migration

Deploy workflow changes safely with versioning:

// Old version drains in-flight runs, rejects new starts
#[forge::workflow(name = "user_signup", version = "v1", status = "deprecated")]
pub async fn user_signup_v1(ctx: &WorkflowContext, user: User) -> Result<()> {
// Original implementation
Ok(())
}

// New version handles new workflows
#[forge::workflow(name = "user_signup", version = "v2", status = "active")]
pub async fn user_signup(ctx: &WorkflowContext, user: User) -> Result<()> {
// Updated implementation with new steps
Ok(())
}

The version and signature are recorded when a workflow is started. On resume, the runtime loads the handler matching the exact version and signature, so in-flight workflows always run against the code they started with. Mark old versions status = "deprecated" to prevent new starts while existing runs finish. Keep deprecated handlers deployed until all their in-flight runs complete.

Migrating a workflow safely

The lifecycle is deprecate → drain → remove:

  1. Bump version and add the new handler. Mark the old one status = "deprecated" so it stops accepting new starts but keeps draining.
  2. Deploy. Both versions ship in the same binary. Existing runs of the old version keep going on the old code; new dispatches go to the new version.
  3. Wait until the old version has zero in-flight runs (query forge_workflow_runs filtering by workflow_name, workflow_version, and a non-terminal status).
  4. Delete the old handler code and redeploy.

The runtime defends step 4: at boot it scans forge_workflow_runs for non-terminal runs whose (name, version) tuple is no longer in the registry. If any are found, it logs a warning per stranded group and flips /_api/ready to 503 with workflows: false. Boot still succeeds, so the node stays up but the load balancer rotates traffic away. The exact count is logged but not exposed on the public probe (the response stays minimal so unauthenticated probes don't leak deployment state).

To unblock the runtime, an operator clears the stuck rows directly in Postgres:

UPDATE forge_workflow_runs
SET status = 'retired_unresumable',
resolution_reason = 'drained for v1 removal'
WHERE workflow_name = 'user_signup'
AND workflow_version = 'v1'
AND status NOT IN (
'completed', 'compensated', 'failed',
'retired_unresumable', 'cancelled_by_operator'
);

Use cancelled_by_operator if you want the row to read as a cancellation rather than a retirement. Within five seconds, the next /_api/ready probe re-runs the drain check and flips back to 200. There is no admin HTTP route for this; the database is the operator interface.

Context Methods

MethodReturn TypeDescription
ctx.step(name, fn)StepRunnerCreate a step with fluent configuration
ctx.parallel()ParallelBuilderCreate parallel step executor
ctx.sleep(duration)Result<()>Durable sleep (survives restarts)
ctx.sleep_until(datetime)Result<()>Sleep until specific time
ctx.wait_for_event(name, timeout)Result<T>Wait for external event
ctx.workflow_time()DateTime<Utc>Deterministic time for replay
ctx.is_resumed()boolCheck if workflow resumed from suspension
ctx.is_step_completed(name)boolCheck if step already completed
ctx.get_step_result(name)Option<T>Get cached step result
ctx.record_step_start(name)()Mark step as started
ctx.record_step_complete(name, value)()Mark step as completed
ctx.elapsed()chrono::DurationTime since workflow started
ctx.db()&PgPoolDatabase connection pool
ctx.db_conn()DbConn<'_>Database connection as DbConn for shared helpers
ctx.http()&ClientHTTP client
ctx.env(key)Option<String>Environment variable
ctx.env_or(key, default)StringEnvironment variable with default
ctx.env_require(key)Result<String>Required environment variable
ctx.env_parse::<T>(key)Result<T>Parse environment variable
ctx.env_parse_or(key, default)Result<T>Parse environment variable with default
ctx.env_contains(key)boolCheck if environment variable is set

Step Configuration

The fluent API configures step behavior:

MethodDescription
.timeout(duration)Cancel step if it exceeds duration
.retry(max_retries, delay)Retry failed steps (N retries = N+1 total attempts)
.optional()Continue workflow even if step fails
.compensate(fn)Register rollback handler
.run()Execute the step

Context Fields

FieldTypeDescription
ctx.run_idUuidUnique workflow run identifier
ctx.workflow_nameStringWorkflow function name
ctx.versionu32Workflow version
ctx.started_atDateTime<Utc>When workflow started
ctx.authAuthContextAuthentication context

Under the Hood

Event Sourcing for Workflows

Workflow state is reconstructed from persisted step events:

SELECT step_name, status, result, started_at, completed_at
FROM forge_workflow_steps
WHERE workflow_run_id = $1
ORDER BY started_at ASC

When a workflow resumes, completed steps return cached results without re-executing. The workflow function runs from the beginning, but ctx.step().run() short-circuits for completed steps.

Saga Pattern with Compensation

Compensation handlers run in reverse order on failure:

Step 1: book_flight → completed → compensation registered
Step 2: book_hotel → completed → compensation registered
Step 3: book_car → FAILED

Compensation (reverse order):
- cancel_hotel(hotel_result)
- cancel_flight(flight_result)

Workflow steps avoid distributed transactions and two-phase commit; each step is independently recoverable.

Transactional Outbox Pattern

Step events persist to the database before NOTIFY is sent:

BEGIN;
INSERT INTO forge_workflow_steps (workflow_run_id, step_name, status, result)
VALUES ($1, $2, 'completed', $3);
COMMIT;
NOTIFY forge_workflow_events, 'workflow_run_id';

If NOTIFY fails, a background poller finds pending events. Delivery is guaranteed even with network partitions.

Optimistic Concurrency Control

Workflow state updates include version checks:

UPDATE forge_workflow_runs
SET status = $2, version = version + 1
WHERE id = $1 AND version = $3

Concurrent updates to the same workflow fail fast rather than corrupt state. The executor retries with fresh state on conflict.

Durable Timers

ctx.sleep() persists wake time to the database:

UPDATE forge_workflow_runs
SET status = 'waiting', wake_at = $2
WHERE id = $1

A background scheduler queries for expired timers and resumes workflows:

SELECT id FROM forge_workflow_runs
WHERE status = 'waiting' AND wake_at <= NOW()
FOR UPDATE SKIP LOCKED

Timers survive process restarts, server migrations, and cluster rebalancing.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_user(Uuid::new_v4())
.build();

let input = TrialInput {
user_id: Uuid::new_v4(),
email: "test@example.com".into(),
};

let result = trial_expiration(&ctx, input).await;

assert!(result.is_ok());
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.sleep_called());
}

#[tokio::test]
async fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_resumed()
.with_completed_step("send_welcome", json!({"sent": true}))
.with_completed_step("send_reminder", json!({"sent": true}))
.build();

// Workflow resumes from where it left off
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.is_step_completed("send_reminder"));
}

#[tokio::test]
async fn test_deterministic_time() {
let fixed_time = chrono::Utc::now();
let ctx = TestWorkflowContext::builder("create_invoice")
.with_workflow_time(fixed_time)
.build();

assert_eq!(ctx.workflow_time(), fixed_time);
}

#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestWorkflowContext::builder("process_order")
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();

// HTTP calls are mocked
ctx.http().assert_called("https://api.stripe.com/*");
}
}

Test context builder methods:

  • .as_resumed() - Simulate resumed workflow
  • .with_completed_step(name, result) - Pre-populate step results
  • .with_workflow_time(datetime) - Set deterministic time
  • .as_user(uuid) - Authenticate as user
  • .with_role("admin") - Add role
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .with_env(key, value) - Mock environment variable