Testing
Use test contexts for queries, mutations, jobs, workflows, crons, daemons, and webhooks with isolated state, HTTP mocking, and PostgreSQL.
The Code
#[tokio::test]
async fn test_admin_can_delete_user() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_env("AUDIT_LOG_URL", "https://audit.example.com")
.mock_http_json("audit.example.com/*", json!({ "logged": true }))
.build();
let result = delete_user(&ctx, DeleteUserArgs { user_id: target_id }).await;
assert_ok!(result);
assert_job_dispatched!(ctx, "cleanup_user_data");
ctx.http().assert_called("audit.example.com/*");
}
What Happens
Each test context builder creates an isolated environment with mocked authentication, configurable environment variables, HTTP interception, and dispatch recording. Tests run against real PostgreSQL when needed via testcontainers or an explicit TEST_DATABASE_URL. This avoids global state pollution between tests.
Test Contexts
Every Forge function type has a corresponding test context:
| Function Type | Test Context | Builder |
|---|---|---|
| Query | TestQueryContext | TestQueryContext::builder() |
| Mutation | TestMutationContext | TestMutationContext::builder() |
| Job | TestJobContext | TestJobContext::builder("job_name") |
| Cron | TestCronContext | TestCronContext::builder("cron_name") |
| Workflow | TestWorkflowContext | TestWorkflowContext::builder("workflow_name") |
| Daemon | TestDaemonContext | TestDaemonContext::builder("daemon_name") |
| Webhook | TestWebhookContext | TestWebhookContext::builder("webhook_name") |
Builder Pattern
All test contexts follow the same builder pattern with shared methods for authentication, environment, and database access. HTTP mocking is available on all contexts except TestQueryContext.
Authentication
// UUID-based auth (default for most apps)
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_role("manager")
.with_claim("org_id", json!("org-123"))
.with_claim("tier", json!("premium"))
.build();
// Non-UUID auth (Firebase, Clerk, Auth0)
let ctx = TestMutationContext::builder()
.as_subject("firebase-user-id-abc123")
.with_role("user")
.build();
// Unauthenticated
let ctx = TestQueryContext::minimal();
assert!(!ctx.auth.is_authenticated());
Environment Variables
let ctx = TestJobContext::builder("send_email")
.with_env("SMTP_HOST", "localhost")
.with_env("SMTP_PORT", "587")
.with_env("SMTP_USERNAME", "test@example.com")
.build();
// Inside the job function
let host = ctx.env_require("SMTP_HOST")?; // "localhost"
let port: u16 = ctx.env_parse("SMTP_PORT")?; // 587
let timeout = ctx.env_or("SMTP_TIMEOUT", "30"); // "30" (default)
// Verify access in tests
ctx.env_mock().assert_accessed("SMTP_HOST");
ctx.env_mock().assert_not_accessed("UNUSED_VAR");
Multi-Tenant Testing
let tenant_id = Uuid::new_v4();
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_tenant(tenant_id)
.with_claim("tenant_id", json!(tenant_id.to_string()))
.build();
assert_eq!(ctx.tenant_id(), Some(tenant_id));
HTTP Mocking
Mock external HTTP calls with pattern matching. Requests are recorded for verification.
Pattern Matching
let ctx = TestMutationContext::builder()
// Wildcard at end
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
// Wildcard in middle
.mock_http_json("api.example.com/*/users", json!([]))
// Exact match
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();
Custom Handlers
let ctx = TestJobContext::builder("process_payment")
.mock_http("api.stripe.com/v1/charges", |req| {
// Inspect the request
let amount = req.body["amount"].as_i64().unwrap_or(0);
if amount > 10000 {
MockResponse::error(400, "Amount exceeds limit")
} else {
MockResponse::json(json!({
"id": "ch_123",
"amount": amount,
"status": "succeeded"
}))
}
})
.build();
Request Verification
#[tokio::test]
async fn test_payment_calls_stripe() {
let ctx = TestMutationContext::builder()
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
.build();
process_payment(&ctx, args).await.unwrap();
// Verify calls were made
ctx.http().assert_called("api.stripe.com/*");
ctx.http().assert_called_times("api.stripe.com/*", 1);
ctx.http().assert_not_called("api.paypal.com/*");
// Verify request body
ctx.http().assert_called_with_body("api.stripe.com/*", |body| {
body["amount"] == 1000 && body["currency"] == "usd"
});
// Get all recorded requests
let requests = ctx.http().requests_to("api.stripe.com/*");
assert_eq!(requests[0].method, "POST");
}
Response Helpers
// Success responses
MockResponse::json(json!({ "data": "value" })) // 200 with JSON
MockResponse::ok() // 200 with empty JSON
// Error responses
MockResponse::error(400, "Bad request")
MockResponse::not_found("Resource not found")
MockResponse::unauthorized("Invalid token")
MockResponse::internal_error("Server error")
Job and Workflow Dispatch
Test contexts for mutations and webhooks record dispatched jobs and workflows.
Job Dispatch Assertions
#[tokio::test]
async fn test_order_dispatches_jobs() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());
create_order(&ctx, order_args).await.unwrap();
// Verify job was dispatched
assert_job_dispatched!(ctx, "send_confirmation_email");
// Verify with argument predicate
assert_job_dispatched!(ctx, "send_confirmation_email", |args| {
args["order_id"] == "ord_123"
});
// Verify job was NOT dispatched
assert_job_not_dispatched!(ctx, "send_sms");
// Verify dispatch count
ctx.job_dispatch().assert_dispatch_count("send_confirmation_email", 1);
// Get all dispatched jobs
let jobs = ctx.job_dispatch().dispatched_jobs();
let email_jobs = ctx.job_dispatch().jobs_of_type("send_confirmation_email");
}
Workflow Dispatch Assertions
#[tokio::test]
async fn test_signup_starts_onboarding() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());
signup(&ctx, signup_args).await.unwrap();
// Verify workflow was started
assert_workflow_started!(ctx, "user_onboarding");
// Verify with input predicate
assert_workflow_started!(ctx, "user_onboarding", |input| {
input["user_id"].is_string() && input["plan"] == "premium"
});
// Verify workflow was NOT started
assert_workflow_not_started!(ctx, "enterprise_onboarding");
// Get started workflows
let workflows = ctx.workflow_dispatch().workflows_named("user_onboarding");
}
Webhook Job Dispatch
#[tokio::test]
async fn test_github_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_idempotency_key("delivery-abc123")
.build();
handle_github_webhook(&ctx, payload).await.unwrap();
assert_job_dispatched!(ctx, "process_push_event");
}
Database Testing
Two ways to get a PostgreSQL connection in tests:
| Method | Feature flag | Env var needed | What happens |
|---|---|---|---|
| Testcontainers | testcontainers | No | Starts a Docker container per TestDatabase |
| Explicit URL | None | TEST_DATABASE_URL | Connects to whatever you point it at |
All three go through TestDatabase::from_env(). If TEST_DATABASE_URL is set it always wins, regardless of features.
Testcontainers
Spins up a throwaway PG container via Docker. No local PG install needed.
# Cargo.toml
[dev-dependencies]
forge = { version = "0.7.1", features = ["testcontainers"] }
#[tokio::test]
async fn test_with_container() {
// Starts a PG container automatically (requires Docker)
let db = TestDatabase::from_env().await.unwrap();
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
let users = list_users(&ctx).await.unwrap();
}
The container lives as long as the TestDatabase. When it drops, the container stops.
To use an external PG instead, set TEST_DATABASE_URL and the container is skipped entirely:
TEST_DATABASE_URL=postgres://localhost/test_db cargo test --features testcontainers
Explicit Database URL
No feature flags. Set TEST_DATABASE_URL and go.
#[tokio::test]
async fn test_with_explicit_db() {
let db = TestDatabase::from_url("postgres://localhost/test_db").await.unwrap();
// or from TEST_DATABASE_URL env var
let db = TestDatabase::from_env().await.unwrap();
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
}
Isolated Databases
Each test gets its own database. Prevents data collision between parallel tests.
#[tokio::test]
async fn test_user_creation() {
// One-liner: creates isolated DB, runs internal SQL, and applies migrations
let db = IsolatedTestDb::setup(
"test_user_creation",
&forge::get_internal_sql(),
Path::new("migrations"),
).await.unwrap();
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
create_user(&ctx, user_args).await.unwrap();
// Optional: explicit cleanup (otherwise orphaned DBs are cleaned on next run)
db.cleanup().await.unwrap();
}
For more control over the setup steps, use the manual approach:
#[tokio::test]
async fn test_with_custom_setup() {
let base = TestDatabase::from_env().await.unwrap();
let db = base.isolated("test_custom_setup").await.unwrap();
db.run_sql(r#"
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT NOT NULL UNIQUE
);
"#).await.unwrap();
db.migrate(Path::new("migrations")).await.unwrap();
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
create_user(&ctx, user_args).await.unwrap();
db.cleanup().await.unwrap();
}
Testing Jobs
#[test]
fn test_export_job_progress() {
let ctx = TestJobContext::builder("export_data")
.with_job_id(Uuid::new_v4())
.with_max_attempts(3)
.as_user(Uuid::new_v4())
.with_env("EXPORT_BUCKET", "test-bucket")
.mock_http_json("storage.googleapis.com/*", json!({ "uploaded": true }))
.build();
// Simulate progress updates
ctx.progress(25, "Fetching data").unwrap();
ctx.progress(50, "Processing rows").unwrap();
ctx.progress(100, "Upload complete").unwrap();
// Verify progress was recorded
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 3);
assert_eq!(updates[0].percent, 25);
assert_eq!(updates[0].message, "Fetching data");
assert_eq!(updates[2].percent, 100);
}
Retry Simulation
#[test]
fn test_job_retry_behavior() {
// First attempt
let ctx = TestJobContext::builder("send_email")
.with_max_attempts(5)
.build();
assert!(!ctx.is_retry());
assert_eq!(ctx.attempt, 1);
assert!(!ctx.is_last_attempt());
// Third attempt (retry)
let ctx = TestJobContext::builder("send_email")
.as_retry(3)
.with_max_attempts(5)
.build();
assert!(ctx.is_retry());
assert_eq!(ctx.attempt, 3);
assert!(!ctx.is_last_attempt());
// Last attempt
let ctx = TestJobContext::builder("send_email")
.as_last_attempt()
.build();
assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}
Heartbeat
#[tokio::test]
async fn test_long_running_job() {
let ctx = TestJobContext::builder("batch_import").build();
// Heartbeat is a no-op in tests but verifies the call compiles
ctx.heartbeat().await.unwrap();
}
Testing Crons
#[test]
fn test_cron_execution() {
let ctx = TestCronContext::builder("daily_cleanup")
.with_run_id(Uuid::new_v4())
.with_timezone("America/New_York")
.with_env("RETENTION_DAYS", "30")
.build();
assert_eq!(ctx.cron_name, "daily_cleanup");
assert_eq!(ctx.timezone, "America/New_York");
assert!(!ctx.is_catch_up);
// Structured logging
ctx.log.info("Starting cleanup");
ctx.log.info_with("Deleted records", json!({ "count": 150 }));
let entries = ctx.log.entries();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].level, "info");
}
Lateness Detection
#[test]
fn test_late_cron_execution() {
let scheduled = Utc::now() - Duration::minutes(5);
let ctx = TestCronContext::builder("hourly_sync")
.scheduled_at(scheduled)
.build();
// More than 1 minute late
assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}
Catch-Up Runs
#[test]
fn test_catch_up_cron() {
let ctx = TestCronContext::builder("daily_report")
.as_catch_up()
.scheduled_at(Utc::now() - Duration::hours(25))
.build();
assert!(ctx.is_catch_up);
assert!(ctx.is_late());
}
Testing Workflows
#[test]
fn test_workflow_execution() {
let ctx = TestWorkflowContext::builder("order_fulfillment")
.with_run_id(Uuid::new_v4())
.with_version(2)
.as_user(Uuid::new_v4())
.with_tenant(Uuid::new_v4())
.build();
assert_eq!(ctx.workflow_name, "order_fulfillment");
assert_eq!(ctx.version, 2);
assert!(!ctx.is_resumed());
}
Step Tracking
#[test]
fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("user_onboarding").build();
// Record step execution
ctx.record_step_start("validate_email");
ctx.record_step_complete("validate_email", json!({ "valid": true }));
ctx.record_step_start("create_account");
ctx.record_step_complete("create_account", json!({ "user_id": "u_123" }));
// Verify steps
assert!(ctx.is_step_completed("validate_email"));
assert!(ctx.is_step_completed("create_account"));
assert!(!ctx.is_step_completed("send_welcome"));
// Get step result
let result: Option<serde_json::Value> = ctx.get_step_result("create_account");
assert_eq!(result.unwrap()["user_id"], "u_123");
// Verify order
let completed = ctx.completed_step_names();
assert_eq!(completed, vec!["validate_email", "create_account"]);
}
Resume Simulation
Test workflow resumption after crash or sleep.
#[test]
fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("payment_flow")
.as_resumed()
.with_completed_step("validate_card", json!({ "valid": true }))
.with_completed_step("reserve_funds", json!({ "hold_id": "h_123" }))
.build();
assert!(ctx.is_resumed());
assert!(ctx.is_step_completed("validate_card"));
assert!(ctx.is_step_completed("reserve_funds"));
assert!(!ctx.is_step_completed("capture_payment"));
// Resume from where we left off
let hold_id: String = ctx.get_step_result::<serde_json::Value>("reserve_funds")
.and_then(|v| v["hold_id"].as_str().map(String::from))
.unwrap();
assert_eq!(hold_id, "h_123");
}
Deterministic Time
#[test]
fn test_workflow_deterministic_time() {
let fixed_time = Utc.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap();
let ctx = TestWorkflowContext::builder("trial_expiration")
.with_workflow_time(fixed_time)
.build();
// workflow_time() returns the same value on every call
// even across replays, enabling deterministic execution
assert_eq!(ctx.workflow_time(), fixed_time);
}
Durable Sleep
#[tokio::test]
async fn test_workflow_sleep() {
let ctx = TestWorkflowContext::builder("reminder_flow").build();
assert!(!ctx.sleep_called());
// Sleep is a no-op in tests but records intent
ctx.sleep(Duration::from_secs(86400)).await.unwrap();
assert!(ctx.sleep_called());
}
Testing Daemons
#[test]
fn test_daemon_creation() {
let ctx = TestDaemonContext::builder("heartbeat_daemon")
.with_instance_id(Uuid::new_v4())
.with_env("HEARTBEAT_INTERVAL", "5")
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();
assert_eq!(ctx.daemon_name, "heartbeat_daemon");
assert!(!ctx.is_shutdown_requested());
}
Shutdown Signal Control
#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("worker_daemon").build();
// Initially not shutdown
assert!(!ctx.is_shutdown_requested());
// Trigger shutdown from test
ctx.request_shutdown();
// Daemon sees shutdown
assert!(ctx.is_shutdown_requested());
}
Graceful Shutdown Pattern
#[tokio::test]
async fn test_daemon_graceful_shutdown() {
let ctx = TestDaemonContext::builder("queue_processor").build();
// Spawn shutdown trigger
let shutdown_tx = ctx.shutdown_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = shutdown_tx.send(true);
});
// Wait for shutdown signal (with timeout)
tokio::time::timeout(
Duration::from_millis(200),
ctx.shutdown_signal(),
).await.expect("Shutdown should complete");
assert!(ctx.is_shutdown_requested());
}
Testing Webhooks
#[tokio::test]
async fn test_stripe_webhook() {
let ctx = TestWebhookContext::builder("stripe_webhook")
.with_header("Stripe-Signature", "t=123,v1=abc...")
.with_idempotency_key("evt_123")
.with_env("STRIPE_WEBHOOK_SECRET", "whsec_test")
.build();
assert_eq!(ctx.webhook_name, "stripe_webhook");
assert_eq!(ctx.idempotency_key, Some("evt_123".to_string()));
assert_eq!(ctx.header("Stripe-Signature"), Some("t=123,v1=abc..."));
}
Case-Insensitive Headers
#[test]
fn test_webhook_headers() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_header("Content-Type", "application/json")
.build();
// Headers are case-insensitive
assert_eq!(ctx.header("x-github-event"), Some("push"));
assert_eq!(ctx.header("X-GITHUB-EVENT"), Some("push"));
assert_eq!(ctx.header("content-type"), Some("application/json"));
}
Webhook Job Dispatch
#[tokio::test]
async fn test_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("payment_webhook").build();
ctx.dispatch_job("process_payment", json!({
"payment_id": "pay_123",
"amount": 1000
})).await.unwrap();
ctx.job_dispatch().assert_dispatched("process_payment");
ctx.job_dispatch().assert_dispatched_with("process_payment", |args| {
args["amount"] == 1000
});
}
Assertion Macros
Forge provides ergonomic assertion macros for common patterns.
Result Assertions
use forge::prelude::*;
// Assert success
let result = some_operation();
assert_ok!(result);
assert_ok!(result, "Operation should succeed");
// Assert failure
let result: Result<(), ForgeError> = Err(ForgeError::NotFound("user".into()));
assert_err!(result);
// Assert specific error variant
assert_err_variant!(result, ForgeError::NotFound(_));
Dispatch Assertions
// Jobs
assert_job_dispatched!(ctx, "send_email");
assert_job_dispatched!(ctx, "send_email", |args| args["to"] == "test@example.com");
assert_job_not_dispatched!(ctx, "send_sms");
// Workflows
assert_workflow_started!(ctx, "onboarding");
assert_workflow_started!(ctx, "onboarding", |input| input["plan"] == "premium");
assert_workflow_not_started!(ctx, "enterprise_setup");
// HTTP
assert_http_called!(ctx, "api.stripe.com/*");
assert_http_not_called!(ctx, "api.paypal.com/*");
JSON Matching
Partial matching for JSON values.
use forge::testing::assert_json_matches;
let actual = json!({
"id": 123,
"name": "Test",
"nested": { "foo": "bar" },
"extra": "ignored"
});
// Only specified fields must match
assert!(assert_json_matches(&actual, &json!({ "id": 123 })));
assert!(assert_json_matches(&actual, &json!({ "name": "Test" })));
assert!(assert_json_matches(&actual, &json!({ "nested": { "foo": "bar" } })));
// Non-matches
assert!(!assert_json_matches(&actual, &json!({ "id": 456 })));
assert!(!assert_json_matches(&actual, &json!({ "missing": true })));
Under the Hood
Test Isolation
Each test context is fully independent. Authentication, environment variables, HTTP mocks, and dispatch recorders are scoped to that instance. This avoids global state mutation.
Deterministic Execution
Test contexts provide deterministic alternatives to non-deterministic operations:
workflow_time()returns fixed time instead of wall clock- HTTP mocks return controlled responses
- Environment comes from test configuration, not system
- Job/workflow dispatch records instead of executing
Real PostgreSQL
All database test modes (testcontainers, explicit URL) run against real PostgreSQL. Tests exercise real SQL with real constraints, indexes, and behavior.
Transactional Rollback
Isolated databases are created fresh per test. Manual cleanup or transaction rollback tricks are not required; each test starts with a clean slate.
Production Parity
Test contexts mirror production contexts. The same methods (ctx.db(), ctx.db_conn(), ctx.http(), ctx.env()) work identically. Tests written against test contexts run against production contexts without modification.