Long Processes
Workflows execute multi-step business logic that survives server restarts. Sleep for 45 days. Wait for external events. Roll back on failure.
The Code
use forge::prelude::*;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize)]
pub struct TrialInput {
pub user_id: Uuid,
pub email: String,
}
#[forge::workflow(timeout = "30d")]
pub async fn trial_expiration(ctx: &WorkflowContext, input: TrialInput) -> Result<bool> {
// Step 1: Send welcome email
ctx.step("send_welcome", || async {
send_email(&input.email, "Welcome to your trial!").await
})
.run()
.await?;
// Step 2: Sleep for 14 days (survives restarts)
ctx.sleep(Duration::from_secs(14 * 24 * 60 * 60)).await?;
// Step 3: Send reminder
ctx.step("send_reminder", || async {
send_email(&input.email, "Your trial ends in 16 days").await
})
.run()
.await?;
// Step 4: Wait for trial end
ctx.sleep(Duration::from_secs(16 * 24 * 60 * 60)).await?;
// Step 5: Expire or convert
let converted = ctx.step("check_subscription", || async {
is_subscribed(input.user_id).await
})
.run()
.await?;
Ok(converted)
}
What Happens
Forge persists workflow state to PostgreSQL after each step. When the process restarts, workflows resume from where they left off. Completed steps replay from stored results. Durable timers survive indefinitely.
The macro validates your code at compile time. Using tokio::sleep() with durations over 100 seconds produces a compile error, guiding you toward ctx.sleep().
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
version | u32 | 1 | Workflow version (for parallel deployments) |
timeout | "duration" | "24h" | Maximum workflow lifetime |
public | flag | - | Skip authentication |
require_role("x") | string | - | Require specific role |
deprecated | flag | - | Mark workflow as deprecated |
Duration formats: 30s, 5m, 1h, 7d, 30d
Patterns
Step with Timeout
#[forge::workflow]
pub async fn process_order(ctx: &WorkflowContext, order: Order) -> Result<OrderResult> {
let payment = ctx.step("charge_card", || async {
charge_card(&order.card, order.amount).await
})
.timeout(Duration::from_secs(30))
.run()
.await?;
ctx.step("fulfill", || async {
ship_order(order.id).await
})
.timeout(Duration::from_secs(60))
.run()
.await?;
Ok(OrderResult { payment_id: payment.id })
}
Step with Retry
#[forge::workflow]
pub async fn send_notifications(ctx: &WorkflowContext, input: NotifyInput) -> Result<()> {
ctx.step("notify_email", || async {
send_email(&input.email, &input.message).await
})
.retry(3, Duration::from_secs(5)) // 3 attempts, 5 second delay
.run()
.await?;
ctx.step("notify_slack", || async {
post_to_slack(&input.message).await
})
.optional() // Failure won't stop workflow
.run()
.await?;
Ok(())
}
Saga Pattern with Compensation
When a step fails, previously completed steps need to be undone. The saga pattern handles this without distributed transactions.
#[forge::workflow]
pub async fn book_trip(ctx: &WorkflowContext, trip: TripRequest) -> Result<Booking> {
let flight = ctx.step("book_flight", || async {
book_flight(&trip).await
})
.compensate(|flight| async move {
cancel_flight(&flight.confirmation_id).await
})
.run()
.await?;
let hotel = ctx.step("book_hotel", || async {
book_hotel(&trip).await
})
.compensate(|hotel| async move {
cancel_hotel(&hotel.reservation_id).await
})
.run()
.await?;
let car = ctx.step("book_car", || async {
book_car(&trip).await // If this fails...
})
.compensate(|car| async move {
cancel_car(&car.rental_id).await
})
.run()
.await?; // ...compensation runs in reverse: hotel, then flight
Ok(Booking { flight, hotel, car })
}
If book_car fails, Forge runs compensation handlers in reverse order: first cancel_hotel, then cancel_flight. Each step's result is passed to its compensation handler.
Wait for External Events
#[forge::workflow(timeout = "7d")]
pub async fn approval_workflow(ctx: &WorkflowContext, request: ApprovalRequest) -> Result<bool> {
ctx.step("notify_approver", || async {
send_approval_request(&request.approver_email, request.id).await
})
.run()
.await?;
// Wait up to 7 days for approval event
let decision: ApprovalDecision = ctx.wait_for_event(
"approval_decision",
Some(Duration::from_secs(7 * 24 * 60 * 60)),
).await?;
ctx.step("process_decision", || async {
if decision.approved {
execute_request(request.id).await
} else {
reject_request(request.id, &decision.reason).await
}
})
.run()
.await?;
Ok(decision.approved)
}
External systems send events using the workflow run ID for correlation:
// From a webhook or another service
sqlx::query(
"INSERT INTO forge_workflow_events (event_name, correlation_id, payload)
VALUES ($1, $2, $3)"
)
.bind("approval_decision")
.bind(workflow_run_id.to_string())
.bind(json!({ "approved": true }))
.execute(&pool)
.await?;
Parallel Step Execution
Run independent steps concurrently:
#[forge::workflow]
pub async fn onboard_user(ctx: &WorkflowContext, user: NewUser) -> Result<OnboardingResult> {
let results = ctx.parallel()
.step("create_account", || async {
create_user_account(&user).await
})
.step("setup_billing", || async {
create_billing_profile(&user).await
})
.step_with_compensate(
"provision_resources",
|| async { provision_cloud_resources(&user).await },
|resources| async move { cleanup_resources(&resources).await }
)
.run()
.await?;
let account: Account = results.get("create_account")?;
let billing: BillingProfile = results.get("setup_billing")?;
let resources: CloudResources = results.get("provision_resources")?;
Ok(OnboardingResult { account, billing, resources })
}
Parallel steps execute concurrently. If any step fails, compensation runs for all completed steps.
Deterministic Time for Replay
Use workflow_time() instead of Utc::now() for timestamps that need to be consistent across replays:
#[forge::workflow]
pub async fn create_invoice(ctx: &WorkflowContext, order: Order) -> Result<Invoice> {
let invoice_date = ctx.workflow_time(); // Consistent across replays
ctx.step("generate_invoice", || async {
Invoice::create(order.id, invoice_date).await
})
.run()
.await
}
When a workflow resumes after restart, workflow_time() returns the original execution time, not the current time. This ensures idempotent step behavior.
Low-Level Step API
For full control, use the manual step tracking API:
#[forge::workflow]
pub async fn custom_workflow(ctx: &WorkflowContext, input: Input) -> Result<Output> {
// Check if step already completed (for resumption)
let result = if ctx.is_step_completed("process_data") {
ctx.get_step_result::<ProcessResult>("process_data").unwrap()
} else {
ctx.record_step_start("process_data");
let result = process_data(&input).await?;
ctx.record_step_complete("process_data", serde_json::to_value(&result)?);
result
};
Ok(Output { result })
}
Version Migration
Deploy workflow changes safely with versioning:
// Old version continues running existing workflows
#[forge::workflow(version = 1, deprecated)]
pub async fn user_signup_v1(ctx: &WorkflowContext, user: User) -> Result<()> {
// Original implementation
Ok(())
}
// New version handles new workflows
#[forge::workflow(version = 2)]
pub async fn user_signup(ctx: &WorkflowContext, user: User) -> Result<()> {
// Updated implementation with new steps
Ok(())
}
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.step(name, fn) | StepRunner | Create a step with fluent configuration |
ctx.parallel() | ParallelBuilder | Create parallel step executor |
ctx.sleep(duration) | Result<()> | Durable sleep (survives restarts) |
ctx.sleep_until(datetime) | Result<()> | Sleep until specific time |
ctx.wait_for_event(name, timeout) | Result<T> | Wait for external event |
ctx.workflow_time() | DateTime<Utc> | Deterministic time for replay |
ctx.is_resumed() | bool | Check if workflow resumed from suspension |
ctx.is_step_completed(name) | bool | Check if step already completed |
ctx.get_step_result(name) | Option<T> | Get cached step result |
ctx.record_step_start(name) | () | Mark step as started |
ctx.record_step_complete(name, value) | () | Mark step as completed |
ctx.db() | &PgPool | Database connection pool |
ctx.http() | &Client | HTTP client |
ctx.env(key) | Option<String> | Environment variable |
Step Configuration
The fluent API configures step behavior:
| Method | Description |
|---|---|
.timeout(duration) | Cancel step if it exceeds duration |
.retry(count, delay) | Retry failed steps with fixed delay |
.optional() | Continue workflow even if step fails |
.compensate(fn) | Register rollback handler |
.run() | Execute the step |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.run_id | Uuid | Unique workflow run identifier |
ctx.workflow_name | String | Workflow function name |
ctx.version | u32 | Workflow version |
ctx.started_at | DateTime<Utc> | When workflow started |
ctx.auth | AuthContext | Authentication context |
Under the Hood
Event Sourcing for Workflows
Workflow state is reconstructed from persisted step events:
SELECT step_name, status, result, started_at, completed_at
FROM forge_workflow_steps
WHERE workflow_run_id = $1
ORDER BY started_at ASC
When a workflow resumes, completed steps return cached results without re-executing. The workflow function runs from the beginning, but ctx.step().run() short-circuits for completed steps.
Saga Pattern with Compensation
Compensation handlers run in reverse order on failure:
Step 1: book_flight → completed → compensation registered
Step 2: book_hotel → completed → compensation registered
Step 3: book_car → FAILED
Compensation (reverse order):
- cancel_hotel(hotel_result)
- cancel_flight(flight_result)
No distributed transactions. No two-phase commit. Each step is independently recoverable.
Transactional Outbox Pattern
Step events persist to the database before NOTIFY is sent:
BEGIN;
INSERT INTO forge_workflow_steps (workflow_run_id, step_name, status, result)
VALUES ($1, $2, 'completed', $3);
COMMIT;
NOTIFY forge_workflow_events, 'workflow_run_id';
If NOTIFY fails, a background poller finds pending events. Delivery is guaranteed even with network partitions.
Optimistic Concurrency Control
Workflow state updates include version checks:
UPDATE forge_workflow_runs
SET status = $2, version = version + 1
WHERE id = $1 AND version = $3
Concurrent updates to the same workflow fail fast rather than corrupt state. The executor retries with fresh state on conflict.
Durable Timers
ctx.sleep() persists wake time to the database:
UPDATE forge_workflow_runs
SET status = 'waiting', wake_at = $2
WHERE id = $1
A background scheduler queries for expired timers and resumes workflows:
SELECT id FROM forge_workflow_runs
WHERE status = 'waiting' AND wake_at <= NOW()
FOR UPDATE SKIP LOCKED
Timers survive process restarts, server migrations, and cluster rebalancing.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_user(Uuid::new_v4())
.build();
let input = TrialInput {
user_id: Uuid::new_v4(),
email: "test@example.com".into(),
};
let result = trial_expiration(&ctx, input).await;
assert!(result.is_ok());
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.sleep_called());
}
#[tokio::test]
async fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("trial_expiration")
.as_resumed()
.with_completed_step("send_welcome", json!({"sent": true}))
.with_completed_step("send_reminder", json!({"sent": true}))
.build();
// Workflow resumes from where it left off
assert!(ctx.is_step_completed("send_welcome"));
assert!(ctx.is_step_completed("send_reminder"));
}
#[tokio::test]
async fn test_deterministic_time() {
let fixed_time = chrono::Utc::now();
let ctx = TestWorkflowContext::builder("create_invoice")
.with_workflow_time(fixed_time)
.build();
assert_eq!(ctx.workflow_time(), fixed_time);
}
#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestWorkflowContext::builder("process_order")
.mock_http_json("https://api.stripe.com/*", json!({
"id": "ch_123",
"status": "succeeded"
}))
.build();
// HTTP calls are mocked
ctx.http().assert_called("https://api.stripe.com/*");
}
}
Test context builder methods:
.as_resumed()- Simulate resumed workflow.with_completed_step(name, result)- Pre-populate step results.with_workflow_time(datetime)- Set deterministic time.as_user(uuid)- Authenticate as user.with_role("admin")- Add role.mock_http(pattern, handler)- Mock HTTP responses.with_env(key, value)- Mock environment variable