Testing
Test queries, mutations, jobs, workflows, crons, daemons, and webhooks with isolated contexts, HTTP mocking, and real PostgreSQL.
The Code
#[tokio::test]
async fn test_admin_can_delete_user() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_env("AUDIT_LOG_URL", "https://audit.example.com")
.mock_http_json("audit.example.com/*", json!({ "logged": true }))
.build();
let result = delete_user(&ctx, DeleteUserArgs { user_id: target_id }).await;
assert_ok!(result);
assert_job_dispatched!(ctx, "cleanup_user_data");
ctx.http().assert_called("audit.example.com/*");
}
What Happens
Each test context builder creates an isolated environment with mocked authentication, configurable environment variables, HTTP interception, and dispatch recording. Tests run against real PostgreSQL when needed via embedded or isolated databases. No global state pollution between tests.
Test Contexts
Every Forge function type has a corresponding test context:
| Function Type | Test Context | Builder |
|---|---|---|
| Query | TestQueryContext | TestQueryContext::builder() |
| Mutation | TestMutationContext | TestMutationContext::builder() |
| Job | TestJobContext | TestJobContext::builder("job_name") |
| Cron | TestCronContext | TestCronContext::builder("cron_name") |
| Workflow | TestWorkflowContext | TestWorkflowContext::builder("workflow_name") |
| Daemon | TestDaemonContext | TestDaemonContext::builder("daemon_name") |
| Webhook | TestWebhookContext | TestWebhookContext::builder("webhook_name") |
Builder Pattern
All test contexts follow the same builder pattern with shared methods for authentication, environment, HTTP mocking, and database access.
Authentication
// UUID-based auth (default for most apps)
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_role("manager")
.with_claim("org_id", json!("org-123"))
.with_claim("tier", json!("premium"))
.build();
// Non-UUID auth (Firebase, Clerk, Auth0)
let ctx = TestMutationContext::builder()
.as_subject("firebase-user-id-abc123")
.with_role("user")
.build();
// Unauthenticated
let ctx = TestQueryContext::minimal();
assert!(!ctx.auth.is_authenticated());
Environment Variables
let ctx = TestJobContext::builder("send_email")
.with_env("SMTP_HOST", "localhost")
.with_env("SMTP_PORT", "587")
.with_env("SMTP_USERNAME", "test@example.com")
.build();
// Inside the job function
let host = ctx.env_require("SMTP_HOST")?; // "localhost"
let port: u16 = ctx.env_parse("SMTP_PORT")?; // 587
let timeout = ctx.env_or("SMTP_TIMEOUT", "30"); // "30" (default)
// Verify access in tests
ctx.env_mock().assert_accessed("SMTP_HOST");
ctx.env_mock().assert_not_accessed("UNUSED_VAR");
Multi-Tenant Testing
let tenant_id = Uuid::new_v4();
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_tenant(tenant_id)
.with_claim("tenant_id", json!(tenant_id.to_string()))
.build();
assert_eq!(ctx.tenant_id(), Some(tenant_id));
HTTP Mocking
Mock external HTTP calls with pattern matching. Requests are recorded for verification.
Pattern Matching
let ctx = TestMutationContext::builder()
// Wildcard at end
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
// Wildcard in middle
.mock_http_json("api.example.com/*/users", json!([]))
// Exact match
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();
Custom Handlers
let ctx = TestJobContext::builder("process_payment")
.mock_http("api.stripe.com/v1/charges", |req| {
// Inspect the request
let amount = req.body["amount"].as_i64().unwrap_or(0);
if amount > 10000 {
MockResponse::error(400, "Amount exceeds limit")
} else {
MockResponse::json(json!({
"id": "ch_123",
"amount": amount,
"status": "succeeded"
}))
}
})
.build();
Request Verification
#[tokio::test]
async fn test_payment_calls_stripe() {
let ctx = TestMutationContext::builder()
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
.build();
process_payment(&ctx, args).await.unwrap();
// Verify calls were made
ctx.http().assert_called("api.stripe.com/*");
ctx.http().assert_called_times("api.stripe.com/*", 1);
ctx.http().assert_not_called("api.paypal.com/*");
// Verify request body
ctx.http().assert_called_with_body("api.stripe.com/*", |body| {
body["amount"] == 1000 && body["currency"] == "usd"
});
// Get all recorded requests
let requests = ctx.http().requests_to("api.stripe.com/*");
assert_eq!(requests[0].method, "POST");
}
Response Helpers
// Success responses
MockResponse::json(json!({ "data": "value" })) // 200 with JSON
MockResponse::ok() // 200 with empty JSON
// Error responses
MockResponse::error(400, "Bad request")
MockResponse::not_found("Resource not found")
MockResponse::unauthorized("Invalid token")
MockResponse::internal_error("Server error")
Job and Workflow Dispatch
Test contexts for mutations and webhooks record dispatched jobs and workflows.
Job Dispatch Assertions
#[tokio::test]
async fn test_order_dispatches_jobs() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());
create_order(&ctx, order_args).await.unwrap();
// Verify job was dispatched
assert_job_dispatched!(ctx, "send_confirmation_email");
// Verify with argument predicate
assert_job_dispatched!(ctx, "send_confirmation_email", |args| {
args["order_id"] == "ord_123"
});
// Verify job was NOT dispatched
assert_job_not_dispatched!(ctx, "send_sms");
// Verify dispatch count
ctx.job_dispatch().assert_dispatch_count("send_confirmation_email", 1);
// Get all dispatched jobs
let jobs = ctx.job_dispatch().dispatched_jobs();
let email_jobs = ctx.job_dispatch().jobs_of_type("send_confirmation_email");
}
Workflow Dispatch Assertions
#[tokio::test]
async fn test_signup_starts_onboarding() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());
signup(&ctx, signup_args).await.unwrap();
// Verify workflow was started
assert_workflow_started!(ctx, "user_onboarding");
// Verify with input predicate
assert_workflow_started!(ctx, "user_onboarding", |input| {
input["user_id"].is_string() && input["plan"] == "premium"
});
// Verify workflow was NOT started
assert_workflow_not_started!(ctx, "enterprise_onboarding");
// Get started workflows
let workflows = ctx.workflow_dispatch().workflows_named("user_onboarding");
}
Webhook Job Dispatch
#[tokio::test]
async fn test_github_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_idempotency_key("delivery-abc123")
.build();
handle_github_webhook(&ctx, payload).await.unwrap();
assert_job_dispatched!(ctx, "process_push_event");
}
Database Testing
Embedded PostgreSQL
For zero-config database tests, use embedded PostgreSQL. Requires the embedded-db feature.
# Cargo.toml
[dev-dependencies]
forge = { version = "0.1", features = ["embedded-db"] }
#[tokio::test]
async fn test_with_embedded_db() {
// Downloads and starts PostgreSQL automatically (cached after first run)
let db = TestDatabase::embedded().await.unwrap();
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
// Test with real database
let users = list_users(&ctx).await.unwrap();
}
Explicit Database URL
#[tokio::test]
async fn test_with_explicit_db() {
let db = TestDatabase::from_url("postgres://localhost/test_db").await.unwrap();
// or from TEST_DATABASE_URL env var
let db = TestDatabase::from_env().await.unwrap();
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
}
Isolated Databases
Each test gets its own database. Prevents data collision between parallel tests.
#[tokio::test]
async fn test_user_creation() {
let base = TestDatabase::embedded().await.unwrap();
let db = base.isolated("test_user_creation").await.unwrap();
// Run migrations
db.migrate(Path::new("migrations")).await.unwrap();
// Or run setup SQL directly
db.run_sql(r#"
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT NOT NULL UNIQUE
);
"#).await.unwrap();
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
// Test in isolation
create_user(&ctx, user_args).await.unwrap();
// Optional: explicit cleanup (otherwise orphaned DBs are cleaned on next run)
db.cleanup().await.unwrap();
}
Testing Jobs
#[test]
fn test_export_job_progress() {
let ctx = TestJobContext::builder("export_data")
.with_job_id(Uuid::new_v4())
.with_max_attempts(3)
.as_user(Uuid::new_v4())
.with_env("EXPORT_BUCKET", "test-bucket")
.mock_http_json("storage.googleapis.com/*", json!({ "uploaded": true }))
.build();
// Simulate progress updates
ctx.progress(25, "Fetching data").unwrap();
ctx.progress(50, "Processing rows").unwrap();
ctx.progress(100, "Upload complete").unwrap();
// Verify progress was recorded
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 3);
assert_eq!(updates[0].percent, 25);
assert_eq!(updates[0].message, "Fetching data");
assert_eq!(updates[2].percent, 100);
}
Retry Simulation
#[test]
fn test_job_retry_behavior() {
// First attempt
let ctx = TestJobContext::builder("send_email")
.with_max_attempts(5)
.build();
assert!(!ctx.is_retry());
assert_eq!(ctx.attempt, 1);
assert!(!ctx.is_last_attempt());
// Third attempt (retry)
let ctx = TestJobContext::builder("send_email")
.as_retry(3)
.with_max_attempts(5)
.build();
assert!(ctx.is_retry());
assert_eq!(ctx.attempt, 3);
assert!(!ctx.is_last_attempt());
// Last attempt
let ctx = TestJobContext::builder("send_email")
.as_last_attempt()
.build();
assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}
Heartbeat
#[tokio::test]
async fn test_long_running_job() {
let ctx = TestJobContext::builder("batch_import").build();
// Heartbeat is a no-op in tests but verifies the call compiles
ctx.heartbeat().await.unwrap();
}
Testing Crons
#[test]
fn test_cron_execution() {
let ctx = TestCronContext::builder("daily_cleanup")
.with_run_id(Uuid::new_v4())
.with_timezone("America/New_York")
.with_env("RETENTION_DAYS", "30")
.build();
assert_eq!(ctx.cron_name, "daily_cleanup");
assert_eq!(ctx.timezone, "America/New_York");
assert!(!ctx.is_catch_up);
// Structured logging
ctx.log.info("Starting cleanup");
ctx.log.info_with("Deleted records", json!({ "count": 150 }));
let entries = ctx.log.entries();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].level, "info");
}
Lateness Detection
#[test]
fn test_late_cron_execution() {
let scheduled = Utc::now() - Duration::minutes(5);
let ctx = TestCronContext::builder("hourly_sync")
.scheduled_at(scheduled)
.build();
// More than 1 minute late
assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}
Catch-Up Runs
#[test]
fn test_catch_up_cron() {
let ctx = TestCronContext::builder("daily_report")
.as_catch_up()
.scheduled_at(Utc::now() - Duration::hours(25))
.build();
assert!(ctx.is_catch_up);
assert!(ctx.is_late());
}
Testing Workflows
#[test]
fn test_workflow_execution() {
let ctx = TestWorkflowContext::builder("order_fulfillment")
.with_run_id(Uuid::new_v4())
.with_version(2)
.as_user(Uuid::new_v4())
.with_tenant(Uuid::new_v4())
.build();
assert_eq!(ctx.workflow_name, "order_fulfillment");
assert_eq!(ctx.version, 2);
assert!(!ctx.is_resumed());
}
Step Tracking
#[test]
fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("user_onboarding").build();
// Record step execution
ctx.record_step_start("validate_email");
ctx.record_step_complete("validate_email", json!({ "valid": true }));
ctx.record_step_start("create_account");
ctx.record_step_complete("create_account", json!({ "user_id": "u_123" }));
// Verify steps
assert!(ctx.is_step_completed("validate_email"));
assert!(ctx.is_step_completed("create_account"));
assert!(!ctx.is_step_completed("send_welcome"));
// Get step result
let result: Option<serde_json::Value> = ctx.get_step_result("create_account");
assert_eq!(result.unwrap()["user_id"], "u_123");
// Verify order
let completed = ctx.completed_step_names();
assert_eq!(completed, vec!["validate_email", "create_account"]);
}
Resume Simulation
Test workflow resumption after crash or sleep.
#[test]
fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("payment_flow")
.as_resumed()
.with_completed_step("validate_card", json!({ "valid": true }))
.with_completed_step("reserve_funds", json!({ "hold_id": "h_123" }))
.build();
assert!(ctx.is_resumed());
assert!(ctx.is_step_completed("validate_card"));
assert!(ctx.is_step_completed("reserve_funds"));
assert!(!ctx.is_step_completed("capture_payment"));
// Resume from where we left off
let hold_id: String = ctx.get_step_result::<serde_json::Value>("reserve_funds")
.and_then(|v| v["hold_id"].as_str().map(String::from))
.unwrap();
assert_eq!(hold_id, "h_123");
}
Deterministic Time
#[test]
fn test_workflow_deterministic_time() {
let fixed_time = Utc.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap();
let ctx = TestWorkflowContext::builder("trial_expiration")
.with_workflow_time(fixed_time)
.build();
// workflow_time() returns the same value on every call
// even across replays, enabling deterministic execution
assert_eq!(ctx.workflow_time(), fixed_time);
}
Durable Sleep
#[tokio::test]
async fn test_workflow_sleep() {
let ctx = TestWorkflowContext::builder("reminder_flow").build();
assert!(!ctx.sleep_called());
// Sleep is a no-op in tests but records intent
ctx.sleep(Duration::from_secs(86400)).await.unwrap();
assert!(ctx.sleep_called());
}
Testing Daemons
#[test]
fn test_daemon_creation() {
let ctx = TestDaemonContext::builder("heartbeat_daemon")
.with_instance_id(Uuid::new_v4())
.with_env("HEARTBEAT_INTERVAL", "5")
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();
assert_eq!(ctx.daemon_name, "heartbeat_daemon");
assert!(!ctx.is_shutdown_requested());
}
Shutdown Signal Control
#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("worker_daemon").build();
// Initially not shutdown
assert!(!ctx.is_shutdown_requested());
// Trigger shutdown from test
ctx.request_shutdown();
// Daemon sees shutdown
assert!(ctx.is_shutdown_requested());
}
Graceful Shutdown Pattern
#[tokio::test]
async fn test_daemon_graceful_shutdown() {
let ctx = TestDaemonContext::builder("queue_processor").build();
// Spawn shutdown trigger
let shutdown_tx = ctx.shutdown_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = shutdown_tx.send(true);
});
// Wait for shutdown signal (with timeout)
tokio::time::timeout(
Duration::from_millis(200),
ctx.shutdown_signal(),
).await.expect("Shutdown should complete");
assert!(ctx.is_shutdown_requested());
}
Testing Webhooks
#[tokio::test]
async fn test_stripe_webhook() {
let ctx = TestWebhookContext::builder("stripe_webhook")
.with_header("Stripe-Signature", "t=123,v1=abc...")
.with_idempotency_key("evt_123")
.with_env("STRIPE_WEBHOOK_SECRET", "whsec_test")
.build();
assert_eq!(ctx.webhook_name, "stripe_webhook");
assert_eq!(ctx.idempotency_key, Some("evt_123".to_string()));
assert_eq!(ctx.header("Stripe-Signature"), Some("t=123,v1=abc..."));
}
Case-Insensitive Headers
#[test]
fn test_webhook_headers() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_header("Content-Type", "application/json")
.build();
// Headers are case-insensitive
assert_eq!(ctx.header("x-github-event"), Some("push"));
assert_eq!(ctx.header("X-GITHUB-EVENT"), Some("push"));
assert_eq!(ctx.header("content-type"), Some("application/json"));
}
Webhook Job Dispatch
#[tokio::test]
async fn test_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("payment_webhook").build();
ctx.dispatch_job("process_payment", json!({
"payment_id": "pay_123",
"amount": 1000
})).await.unwrap();
ctx.job_dispatch().assert_dispatched("process_payment");
ctx.job_dispatch().assert_dispatched_with("process_payment", |args| {
args["amount"] == 1000
});
}
Assertion Macros
Forge provides ergonomic assertion macros for common patterns.
Result Assertions
use forge::prelude::*;
// Assert success
let result = some_operation();
assert_ok!(result);
assert_ok!(result, "Operation should succeed");
// Assert failure
let result: Result<(), ForgeError> = Err(ForgeError::NotFound("user".into()));
assert_err!(result);
// Assert specific error variant
assert_err_variant!(result, ForgeError::NotFound(_));
Dispatch Assertions
// Jobs
assert_job_dispatched!(ctx, "send_email");
assert_job_dispatched!(ctx, "send_email", |args| args["to"] == "test@example.com");
assert_job_not_dispatched!(ctx, "send_sms");
// Workflows
assert_workflow_started!(ctx, "onboarding");
assert_workflow_started!(ctx, "onboarding", |input| input["plan"] == "premium");
assert_workflow_not_started!(ctx, "enterprise_setup");
// HTTP
assert_http_called!(ctx, "api.stripe.com/*");
assert_http_not_called!(ctx, "api.paypal.com/*");
JSON Matching
Partial matching for JSON values.
use forge::testing::assert_json_matches;
let actual = json!({
"id": 123,
"name": "Test",
"nested": { "foo": "bar" },
"extra": "ignored"
});
// Only specified fields must match
assert!(assert_json_matches(&actual, &json!({ "id": 123 })));
assert!(assert_json_matches(&actual, &json!({ "name": "Test" })));
assert!(assert_json_matches(&actual, &json!({ "nested": { "foo": "bar" } })));
// Non-matches
assert!(!assert_json_matches(&actual, &json!({ "id": 456 })));
assert!(!assert_json_matches(&actual, &json!({ "missing": true })));
Under the Hood
Test Isolation
Each test context is fully independent. Authentication, environment variables, HTTP mocks, and dispatch recorders are scoped to that instance. No global state mutation.
Deterministic Execution
Test contexts provide deterministic alternatives to non-deterministic operations:
workflow_time()returns fixed time instead of wall clock- HTTP mocks return controlled responses
- Environment comes from test configuration, not system
- Job/workflow dispatch records instead of executing
Embedded PostgreSQL
TestDatabase::embedded() downloads and starts a real PostgreSQL instance. The binary is cached after first download. Tests run against real SQL with real constraints, indexes, and behavior.
Transactional Rollback
Isolated databases are created fresh per test. No need for manual cleanup or transaction rollback tricks. Each test starts with a clean slate.
Production Parity
Test contexts mirror production contexts. The same methods (ctx.db(), ctx.http(), ctx.env()) work identically. Tests written against test contexts run against production contexts without modification.