Long Processes
Define workflows with durable state, long sleeps, event waits, and compensation handlers.
The Code
use forge::prelude::*;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize)]
pub struct TrialInput {
pub user_id: Uuid,
pub email: String,
}
#[forge::workflow(timeout = "30d")]
pub async fn trial_expiration(ctx: &WorkflowContext, input: TrialInput) -> Result<bool> {
// Step 1: Send welcome email
ctx.step("send_welcome", || async {
send_email(&input.email, "Welcome to your trial!").await
})
.run()
.await?; // Result<Option<T>> — ? unwraps the Result, returns Option
// Step 2: Sleep for 14 days (survives restarts)
ctx.sleep(Duration::from_secs(14 * 24 * 60 * 60)).await?;
// Step 3: Send reminder
ctx.step("send_reminder", || async {
send_email(&input.email, "Your trial ends in 16 days").await
})
.run()
.await?;
// Step 4: Wait for trial end
ctx.sleep(Duration::from_secs(16 * 24 * 60 * 60)).await?;
// Step 5: Expire or convert
let converted = ctx.step("check_subscription", || async {
is_subscribed(input.user_id).await
})
.run()
.await?
.expect("non-optional step always returns Some");
Ok(converted)
}
What Happens
Forge persists workflow state to PostgreSQL after each step. When the process restarts, workflows resume from where they left off. Completed steps replay from stored results. Durable timers survive indefinitely.
The macro validates your code at compile time. Using tokio::sleep() with durations over 100 seconds produces a compile error, guiding you toward ctx.sleep().
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
version | u32 | 1 | Workflow version (for parallel deployments) |
timeout | "duration" | "24h" | Maximum workflow lifetime |
public | flag | - | Skip authentication |
require_role("x") | string | - | Require specific role |
deprecated | flag | - | Mark workflow as deprecated |
Duration formats: 30s, 5m, 1h, 7d, 30d
Patterns
Ownership Tracking
Workflows automatically set owner_subject from ctx.auth.principal_id() at dispatch time.
#[forge::mutation(transactional)]
pub async fn begin_onboarding(ctx: &MutationContext, user: NewUser) -> Result<Uuid> {
// owner_subject set automatically from the authenticated principal
ctx.start_workflow("onboard_user", user).await
}
// Query workflows belonging to the current user
#[forge::query]
pub async fn my_workflows(ctx: &QueryContext) -> Result<Vec<WorkflowSummary>> {
let subject = ctx.auth.require_subject()?;
sqlx::query_as("SELECT id, workflow_name, status FROM forge_workflow_runs WHERE owner_subject = $1 ORDER BY created_at DESC")
.bind(subject)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Ownership persists across restarts and suspensions.
Step with Timeout
#[forge::workflow]
pub async fn process_order(ctx: &WorkflowContext, order: Order) -> Result<OrderResult> {
let payment = ctx.step("charge_card", || async {
charge_card(&order.card, order.amount).await
})
.timeout(Duration::from_secs(30))
.run()
.await?
.expect("charge_card is non-optional");
ctx.step("fulfill", || async {
ship_order(order.id).await
})
.timeout(Duration::from_secs(60))
.run()
.await?;
Ok(OrderResult { payment_id: payment.id })
}
Step with Retry
#[forge::workflow]
pub async fn send_notifications(ctx: &WorkflowContext, input: NotifyInput) -> Result<()> {
ctx.step("notify_email", || async {
send_email(&input.email, &input.message).await
})
.retry(3, Duration::from_secs(5)) // 3 attempts, 5 second delay
.run()
.await?;
ctx.step("notify_slack", || async {
post_to_slack(&input.message).await
})
.optional() // Failure won't stop workflow
.run()
.await?;
Ok(())
}
Saga Pattern with Compensation
When a step fails, previously completed steps need to be undone. The saga pattern coordinates compensation instead of distributed transactions.
#[forge::workflow]
pub async fn book_trip(ctx: &WorkflowContext, trip: TripRequest) -> Result<Booking> {
let flight = ctx.step("book_flight", || async {
book_flight(&trip).await
})
.compensate(|flight| async move {
cancel_flight(&flight.confirmation_id).await
})
.run()
.await?
.expect("book_flight is non-optional");
let hotel = ctx.step("book_hotel", || async {
book_hotel(&trip).await
})
.compensate(|hotel| async move {
cancel_hotel(&hotel.reservation_id).await
})
.run()
.await?
.expect("book_hotel is non-optional");
let car = ctx.step("book_car", || async {
book_car(&trip).await // If this fails...
})
.compensate(|car| async move {
cancel_car(&car.rental_id).await
})
.run()
.await? // ...compensation runs in reverse: hotel, then flight
.expect("book_car is non-optional");
Ok(Booking { flight, hotel, car })
}
If book_car fails, Forge runs compensation handlers in reverse order: first cancel_hotel, then cancel_flight. Each step's result is passed to its compensation handler.
Compensation handlers live in memory. If the process restarts before compensation completes, the workflow is marked as failed rather than silently skipping handlers. This fail-closed behavior prevents silent data inconsistency.
Wait for External Events
#[forge::workflow(timeout = "7d")]
pub async fn approval_workflow(ctx: &WorkflowContext, request: ApprovalRequest) -> Result<bool> {
ctx.step("notify_approver", || async {
send_approval_request(&request.approver_email, request.id).await
})
.run()
.await?;
// Wait up to 7 days for approval event
let decision: ApprovalDecision = ctx.wait_for_event(
"approval_decision",
Some(Duration::from_secs(7 * 24 * 60 * 60)),
).await?;
ctx.step("process_decision", || async {
if decision.approved {
execute_request(request.id).await
} else {
reject_request(request.id, &decision.reason).await
}
})
.run()
.await?;
Ok(decision.approved)
}
External systems send events using the workflow run ID for correlation:
// From a webhook or another service
sqlx::query(
"INSERT INTO forge_workflow_events (event_name, correlation_id, payload)
VALUES ($1, $2, $3)"
)
.bind("approval_decision")
.bind(workflow_run_id.to_string())
.bind(json!({ "approved": true }))
.execute(&pool)
.await?;
Parallel Step Execution
Run independent steps concurrently:
#[forge::workflow]
pub async fn onboard_user(ctx: &WorkflowContext, user: NewUser) -> Result<OnboardingResult> {
let results = ctx.parallel()
.step("create_account", || async {
create_user_account(&user).await
})
.step("setup_billing", || async {
create_billing_profile(&user).await
})
.step_with_compensate(
"provision_resources",
|| async { provision_cloud_resources(&user).await },
|resources| async move { cleanup_resources(&resources).await }
)
.run()
.await?;
let account: Account = results.get("create_account")?;
let billing: BillingProfile = results.get("setup_billing")?;
let resources: CloudResources = results.get("provision_resources")?;
Ok(OnboardingResult { account, billing, resources })
}
Parallel steps execute concurrently. If any step fails, compensation runs for all completed steps.
Deterministic Time for Replay
Use workflow_time() instead of Utc::now() for timestamps that need to be consistent across replays:
#[forge::workflow]
pub async fn create_invoice(ctx: &WorkflowContext, order: Order) -> Result<Invoice> {
let invoice_date = ctx.workflow_time(); // Consistent across replays
ctx.step("generate_invoice", || async {
Invoice::create(order.id, invoice_date).await
})
.run()
.await?
.expect("generate_invoice is non-optional")
}
When a workflow resumes after restart, workflow_time() returns the original execution time, not the current time. This ensures idempotent step behavior.
Low-Level Step API
For full control, use the manual step tracking API:
#[forge::workflow]
pub async fn custom_workflow(ctx: &WorkflowContext, input: Input) -> Result<Output> {
// Check if step already completed (for resumption)
let result = if ctx.is_step_completed("process_data") {
ctx.get_step_result::<ProcessResult>("process_data").unwrap()
} else {
ctx.record_step_start("process_data");
let result = process_data(&input).await?;
ctx.record_step_complete("process_data", serde_json::to_value(&result)?);
result
};
Ok(Output { result })
}
Version Migration
Deploy workflow changes safely with versioning:
// Old version continues running existing workflows
#[forge::workflow(version = 1, deprecated)]
pub async fn user_signup_v1(ctx: &WorkflowContext, user: User) -> Result<()> {
// Original implementation
Ok(())
}
// New version handles new workflows
#[forge::workflow(version = 2)]
pub async fn user_signup(ctx: &WorkflowContext, user: User) -> Result<()> {
// Updated implementation with new steps
Ok(())
}
The version is recorded when a workflow is started. On resume, the runtime loads the handler matching the recorded version, so in-flight workflows always run against the code they started with. Mark old versions deprecated to prevent new starts while existing runs finish.
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.step(name, fn) | StepRunner | Create a step with fluent configuration |
ctx.parallel() | ParallelBuilder | Create parallel step executor |
ctx.sleep(duration) | Result<()> | Durable sleep (survives restarts) |
ctx.sleep_until(datetime) | Result<()> | Sleep until specific time |
ctx.wait_for_event(name, timeout) | Result<T> | Wait for external event |
ctx.workflow_time() | DateTime<Utc> | Deterministic time for replay |
ctx.is_resumed() | bool | Check if workflow resumed from suspension |
ctx.is_step_completed(name) | bool | Check if step already completed |
ctx.get_step_result(name) | Option<T> | Get cached step result |
ctx.record_step_start(name) | () | Mark step as started |
ctx.record_step_complete(name, value) | () | Mark step as completed |
ctx.elapsed() | chrono::Duration | Time since workflow started |
ctx.db() | &PgPool | Database connection pool |
ctx.db_conn() | DbConn<'_> | Database connection as DbConn for shared helpers |
ctx.http() | &Client | HTTP client |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_or(key, default) | String | Environment variable with default |
ctx.env_require(key) | Result<String> | Required environment variable |
ctx.env_parse::<T>(key) | Result<T> | Parse environment variable |
ctx.env_parse_or(key, default) | Result<T> | Parse environment variable with default |
ctx.env_contains(key) | bool | Check if environment variable is set |
Step Configuration
The fluent API configures step behavior:
| Method | Description |
|---|---|
.timeout(duration) | Cancel step if it exceeds duration |
.retry(max_retries, delay) | Retry failed steps (N retries = N+1 total attempts) |
.optional() | Continue workflow even if step fails |
.compensate(fn) | Register rollback handler |
.run() | Execute the step |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.run_id | Uuid | Unique workflow run identifier |
ctx.workflow_name | String | Workflow function name |
ctx.version | u32 | Workflow version |
ctx.started_at | DateTime<Utc> | When workflow started |
ctx.auth | AuthContext | Authentication context |
Under the Hood
Event Sourcing for Workflows
Workflow state is reconstructed from persisted step events:
SELECT step_name, status, result, started_at, completed_at
FROM forge_workflow_steps
WHERE workflow_run_id = $1
ORDER BY started_at ASC
When a workflow resumes, completed steps return cached results without re-executing. The workflow function runs from the beginning, but ctx.step().run() short-circuits for completed steps.
Saga Pattern with Compensation
Compensation handlers run in reverse order on failure:
Step 1: book_flight → completed → compensation registered
Step 2: book_hotel → completed → compensation registered
Step 3: book_car → FAILED
Compensation (reverse order):
- cancel_hotel(hotel_result)
- cancel_flight(flight_result)
Workflow steps avoid distributed transactions and two-phase commit; each step is independently recoverable.
Transactional Outbox Pattern
Step events persist to the database before NOTIFY is sent:
BEGIN;
INSERT INTO forge_workflow_steps (workflow_run_id, step_name, status, result)
VALUES ($1, $2, 'completed', $3);
COMMIT;
NOTIFY forge_workflow_events, 'workflow_run_id';
If NOTIFY fails, a background poller finds pending events. Delivery is guaranteed even with network partitions.
Optimistic Concurrency Control
Workflow state updates include version checks:
UPDATE forge_workflow_runs
SET status = $2, version = version + 1
WHERE id = $1 AND version = $3
Concurrent updates to the same workflow fail fast rather than corrupt state. The executor retries with fresh state on conflict.
Durable Timers
ctx.sleep() persists wake time to the database:
UPDATE forge_workflow_runs
SET status = 'waiting', wake_at = $2
WHERE id = $1
A background scheduler queries for expired timers and resumes workflows:
SELECT id FROM forge_workflow_runs
WHERE status = 'waiting' AND wake_at <= NOW()
FOR UPDATE SKIP LOCKED
Timers survive process restarts, server migrations, and cluster rebalancing.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_user(Uuid::new_v4())
.build();
let input = TrialInput {
user_id: Uuid::new_v4(),
email: "test@example.com".into(),
};
let result = trial_expiration(&ctx, input).await;
assert!(result.is_ok());
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.sleep_called());
}
#[tokio::test]
async fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_resumed()
.with_completed_step("send_welcome", json!({"sent": true}))
.with_completed_step("send_reminder", json!({"sent": true}))
.build();
// Workflow resumes from where it left off
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.is_step_completed("send_reminder"));
}
#[tokio::test]
async fn test_deterministic_time() {
let fixed_time = chrono::Utc::now();
let ctx = TestWorkflowContext::builder("create_invoice")
.with_workflow_time(fixed_time)
.build();
assert_eq!(ctx.workflow_time(), fixed_time);
}
#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestWorkflowContext::builder("process_order")
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();
// HTTP calls are mocked
ctx.http().assert_called("https://api.stripe.com/*");
}
}
Test context builder methods:
.as_resumed()- Simulate resumed workflow.with_completed_step(name, result)- Pre-populate step results.with_workflow_time(datetime)- Set deterministic time.as_user(uuid)- Authenticate as user.with_role("admin")- Add role.mock_http(pattern, handler)- Mock HTTP responses.with_env(key, value)- Mock environment variable