Skip to main content

Long Processes

Workflows execute multi-step business logic that survives server restarts. Sleep for 45 days. Wait for external events. Roll back on failure.

The Code

use forge::prelude::*;
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize)]
pub struct TrialInput {
pub user_id: Uuid,
pub email: String,
}

#[forge::workflow(timeout = "30d")]
pub async fn trial_expiration(ctx: &WorkflowContext, input: TrialInput) -> Result<bool> {
// Step 1: Send welcome email
ctx.step("send_welcome", || async {
send_email(&input.email, "Welcome to your trial!").await
})
.run()
.await?;

// Step 2: Sleep for 14 days (survives restarts)
ctx.sleep(Duration::from_secs(14 * 24 * 60 * 60)).await?;

// Step 3: Send reminder
ctx.step("send_reminder", || async {
send_email(&input.email, "Your trial ends in 16 days").await
})
.run()
.await?;

// Step 4: Wait for trial end
ctx.sleep(Duration::from_secs(16 * 24 * 60 * 60)).await?;

// Step 5: Expire or convert
let converted = ctx.step("check_subscription", || async {
is_subscribed(input.user_id).await
})
.run()
.await?;

Ok(converted)
}

What Happens

Forge persists workflow state to PostgreSQL after each step. When the process restarts, workflows resume from where they left off. Completed steps replay from stored results. Durable timers survive indefinitely.

The macro validates your code at compile time. Using tokio::sleep() with durations over 100 seconds produces a compile error, guiding you toward ctx.sleep().

Attributes

AttributeTypeDefaultDescription
versionu321Workflow version (for parallel deployments)
timeout"duration""24h"Maximum workflow lifetime
publicflag-Skip authentication
require_role("x")string-Require specific role
deprecatedflag-Mark workflow as deprecated

Duration formats: 30s, 5m, 1h, 7d, 30d

Patterns

Step with Timeout

#[forge::workflow]
pub async fn process_order(ctx: &WorkflowContext, order: Order) -> Result<OrderResult> {
let payment = ctx.step("charge_card", || async {
charge_card(&order.card, order.amount).await
})
.timeout(Duration::from_secs(30))
.run()
.await?;

ctx.step("fulfill", || async {
ship_order(order.id).await
})
.timeout(Duration::from_secs(60))
.run()
.await?;

Ok(OrderResult { payment_id: payment.id })
}

Step with Retry

#[forge::workflow]
pub async fn send_notifications(ctx: &WorkflowContext, input: NotifyInput) -> Result<()> {
ctx.step("notify_email", || async {
send_email(&input.email, &input.message).await
})
.retry(3, Duration::from_secs(5)) // 3 attempts, 5 second delay
.run()
.await?;

ctx.step("notify_slack", || async {
post_to_slack(&input.message).await
})
.optional() // Failure won't stop workflow
.run()
.await?;

Ok(())
}

Saga Pattern with Compensation

When a step fails, previously completed steps need to be undone. The saga pattern handles this without distributed transactions.

#[forge::workflow]
pub async fn book_trip(ctx: &WorkflowContext, trip: TripRequest) -> Result<Booking> {
let flight = ctx.step("book_flight", || async {
book_flight(&trip).await
})
.compensate(|flight| async move {
cancel_flight(&flight.confirmation_id).await
})
.run()
.await?;

let hotel = ctx.step("book_hotel", || async {
book_hotel(&trip).await
})
.compensate(|hotel| async move {
cancel_hotel(&hotel.reservation_id).await
})
.run()
.await?;

let car = ctx.step("book_car", || async {
book_car(&trip).await // If this fails...
})
.compensate(|car| async move {
cancel_car(&car.rental_id).await
})
.run()
.await?; // ...compensation runs in reverse: hotel, then flight

Ok(Booking { flight, hotel, car })
}

If book_car fails, Forge runs compensation handlers in reverse order: first cancel_hotel, then cancel_flight. Each step's result is passed to its compensation handler.

Wait for External Events

#[forge::workflow(timeout = "7d")]
pub async fn approval_workflow(ctx: &WorkflowContext, request: ApprovalRequest) -> Result<bool> {
ctx.step("notify_approver", || async {
send_approval_request(&request.approver_email, request.id).await
})
.run()
.await?;

// Wait up to 7 days for approval event
let decision: ApprovalDecision = ctx.wait_for_event(
"approval_decision",
Some(Duration::from_secs(7 * 24 * 60 * 60)),
).await?;

ctx.step("process_decision", || async {
if decision.approved {
execute_request(request.id).await
} else {
reject_request(request.id, &decision.reason).await
}
})
.run()
.await?;

Ok(decision.approved)
}

External systems send events using the workflow run ID for correlation:

// From a webhook or another service
sqlx::query(
"INSERT INTO forge_workflow_events (event_name, correlation_id, payload)
VALUES ($1, $2, $3)"
)
.bind("approval_decision")
.bind(workflow_run_id.to_string())
.bind(json!({ "approved": true }))
.execute(&pool)
.await?;

Parallel Step Execution

Run independent steps concurrently:

#[forge::workflow]
pub async fn onboard_user(ctx: &WorkflowContext, user: NewUser) -> Result<OnboardingResult> {
let results = ctx.parallel()
.step("create_account", || async {
create_user_account(&user).await
})
.step("setup_billing", || async {
create_billing_profile(&user).await
})
.step_with_compensate(
"provision_resources",
|| async { provision_cloud_resources(&user).await },
|resources| async move { cleanup_resources(&resources).await }
)
.run()
.await?;

let account: Account = results.get("create_account")?;
let billing: BillingProfile = results.get("setup_billing")?;
let resources: CloudResources = results.get("provision_resources")?;

Ok(OnboardingResult { account, billing, resources })
}

Parallel steps execute concurrently. If any step fails, compensation runs for all completed steps.

Deterministic Time for Replay

Use workflow_time() instead of Utc::now() for timestamps that need to be consistent across replays:

#[forge::workflow]
pub async fn create_invoice(ctx: &WorkflowContext, order: Order) -> Result<Invoice> {
let invoice_date = ctx.workflow_time(); // Consistent across replays

ctx.step("generate_invoice", || async {
Invoice::create(order.id, invoice_date).await
})
.run()
.await
}

When a workflow resumes after restart, workflow_time() returns the original execution time, not the current time. This ensures idempotent step behavior.

Low-Level Step API

For full control, use the manual step tracking API:

#[forge::workflow]
pub async fn custom_workflow(ctx: &WorkflowContext, input: Input) -> Result<Output> {
// Check if step already completed (for resumption)
let result = if ctx.is_step_completed("process_data") {
ctx.get_step_result::<ProcessResult>("process_data").unwrap()
} else {
ctx.record_step_start("process_data");

let result = process_data(&input).await?;

ctx.record_step_complete("process_data", serde_json::to_value(&result)?);
result
};

Ok(Output { result })
}

Version Migration

Deploy workflow changes safely with versioning:

// Old version continues running existing workflows
#[forge::workflow(version = 1, deprecated)]
pub async fn user_signup_v1(ctx: &WorkflowContext, user: User) -> Result<()> {
// Original implementation
Ok(())
}

// New version handles new workflows
#[forge::workflow(version = 2)]
pub async fn user_signup(ctx: &WorkflowContext, user: User) -> Result<()> {
// Updated implementation with new steps
Ok(())
}

Context Methods

MethodReturn TypeDescription
ctx.step(name, fn)StepRunnerCreate a step with fluent configuration
ctx.parallel()ParallelBuilderCreate parallel step executor
ctx.sleep(duration)Result<()>Durable sleep (survives restarts)
ctx.sleep_until(datetime)Result<()>Sleep until specific time
ctx.wait_for_event(name, timeout)Result<T>Wait for external event
ctx.workflow_time()DateTime<Utc>Deterministic time for replay
ctx.is_resumed()boolCheck if workflow resumed from suspension
ctx.is_step_completed(name)boolCheck if step already completed
ctx.get_step_result(name)Option<T>Get cached step result
ctx.record_step_start(name)()Mark step as started
ctx.record_step_complete(name, value)()Mark step as completed
ctx.db()&PgPoolDatabase connection pool
ctx.http()&ClientHTTP client
ctx.env(key)Option<String>Environment variable

Step Configuration

The fluent API configures step behavior:

MethodDescription
.timeout(duration)Cancel step if it exceeds duration
.retry(count, delay)Retry failed steps with fixed delay
.optional()Continue workflow even if step fails
.compensate(fn)Register rollback handler
.run()Execute the step

Context Fields

FieldTypeDescription
ctx.run_idUuidUnique workflow run identifier
ctx.workflow_nameStringWorkflow function name
ctx.versionu32Workflow version
ctx.started_atDateTime<Utc>When workflow started
ctx.authAuthContextAuthentication context

Under the Hood

Event Sourcing for Workflows

Workflow state is reconstructed from persisted step events:

SELECT step_name, status, result, started_at, completed_at
FROM forge_workflow_steps
WHERE workflow_run_id = $1
ORDER BY started_at ASC

When a workflow resumes, completed steps return cached results without re-executing. The workflow function runs from the beginning, but ctx.step().run() short-circuits for completed steps.

Saga Pattern with Compensation

Compensation handlers run in reverse order on failure:

Step 1: book_flight → completed → compensation registered
Step 2: book_hotel → completed → compensation registered
Step 3: book_car → FAILED

Compensation (reverse order):
- cancel_hotel(hotel_result)
- cancel_flight(flight_result)

No distributed transactions. No two-phase commit. Each step is independently recoverable.

Transactional Outbox Pattern

Step events persist to the database before NOTIFY is sent:

BEGIN;
INSERT INTO forge_workflow_steps (workflow_run_id, step_name, status, result)
VALUES ($1, $2, 'completed', $3);
COMMIT;
NOTIFY forge_workflow_events, 'workflow_run_id';

If NOTIFY fails, a background poller finds pending events. Delivery is guaranteed even with network partitions.

Optimistic Concurrency Control

Workflow state updates include version checks:

UPDATE forge_workflow_runs
SET status = $2, version = version + 1
WHERE id = $1 AND version = $3

Concurrent updates to the same workflow fail fast rather than corrupt state. The executor retries with fresh state on conflict.

Durable Timers

ctx.sleep() persists wake time to the database:

UPDATE forge_workflow_runs
SET status = 'waiting', wake_at = $2
WHERE id = $1

A background scheduler queries for expired timers and resumes workflows:

SELECT id FROM forge_workflow_runs
WHERE status = 'waiting' AND wake_at <= NOW()
FOR UPDATE SKIP LOCKED

Timers survive process restarts, server migrations, and cluster rebalancing.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_user(Uuid::new_v4())
.build();

let input = TrialInput {
user_id: Uuid::new_v4(),
email: "test@example.com".into(),
};

let result = trial_expiration(&ctx, input).await;

assert!(result.is_ok());
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.sleep_called());
}

#[tokio::test]
async fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_resumed()
.with_completed_step("send_welcome", json!({"sent": true}))
.with_completed_step("send_reminder", json!({"sent": true}))
.build();

// Workflow resumes from where it left off
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.is_step_completed("send_reminder"));
}

#[tokio::test]
async fn test_deterministic_time() {
let fixed_time = chrono::Utc::now();
let ctx = TestWorkflowContext::builder("create_invoice")
.with_workflow_time(fixed_time)
.build();

assert_eq!(ctx.workflow_time(), fixed_time);
}

#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestWorkflowContext::builder("process_order")
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();

// HTTP calls are mocked
ctx.http().assert_called("https://api.stripe.com/*");
}
}

Test context builder methods:

  • .as_resumed() - Simulate resumed workflow
  • .with_completed_step(name, result) - Pre-populate step results
  • .with_workflow_time(datetime) - Set deterministic time
  • .as_user(uuid) - Authenticate as user
  • .with_role("admin") - Add role
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .with_env(key, value) - Mock environment variable