Skip to main content

Testing

Use test contexts for queries, mutations, jobs, workflows, crons, daemons, and webhooks with isolated state, HTTP mocking, and PostgreSQL.

The Code

#[tokio::test]
async fn test_admin_can_delete_user() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_env("AUDIT_LOG_URL", "https://audit.example.com")
.mock_http_json("audit.example.com/*", json!({ "logged": true }))
.build();

let result = delete_user(&ctx, DeleteUserArgs { user_id: target_id }).await;

assert_ok!(result);
assert_job_dispatched!(ctx, "cleanup_user_data");
ctx.http().assert_called("audit.example.com/*");
}

What Happens

Each test context builder creates an isolated environment with mocked authentication, configurable environment variables, HTTP interception, and dispatch recording. Tests run against real PostgreSQL when needed via testcontainers or an explicit TEST_DATABASE_URL. This avoids global state pollution between tests.

Test Contexts

Every Forge function type has a corresponding test context:

Function TypeTest ContextBuilder
QueryTestQueryContextTestQueryContext::builder()
MutationTestMutationContextTestMutationContext::builder()
JobTestJobContextTestJobContext::builder("job_name")
CronTestCronContextTestCronContext::builder("cron_name")
WorkflowTestWorkflowContextTestWorkflowContext::builder("workflow_name")
DaemonTestDaemonContextTestDaemonContext::builder("daemon_name")
WebhookTestWebhookContextTestWebhookContext::builder("webhook_name")

Builder Pattern

All test contexts follow the same builder pattern with shared methods for authentication, environment, and database access. HTTP mocking is available on all contexts except TestQueryContext.

Authentication

// UUID-based auth (default for most apps)
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_role("manager")
.with_claim("org_id", json!("org-123"))
.with_claim("tier", json!("premium"))
.build();

// Non-UUID auth (Firebase, Clerk, Auth0)
let ctx = TestMutationContext::builder()
.as_subject("firebase-user-id-abc123")
.with_role("user")
.build();

// Unauthenticated
let ctx = TestQueryContext::minimal();
assert!(!ctx.auth.is_authenticated());

Environment Variables

let ctx = TestJobContext::builder("send_email")
.with_env("SMTP_HOST", "localhost")
.with_env("SMTP_PORT", "587")
.with_env("SMTP_USERNAME", "test@example.com")
.build();

// Inside the job function
let host = ctx.env_require("SMTP_HOST")?; // "localhost"
let port: u16 = ctx.env_parse("SMTP_PORT")?; // 587
let timeout = ctx.env_or("SMTP_TIMEOUT", "30"); // "30" (default)

// Verify access in tests
ctx.env_mock().assert_accessed("SMTP_HOST");
ctx.env_mock().assert_not_accessed("UNUSED_VAR");

Multi-Tenant Testing

let tenant_id = Uuid::new_v4();

let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_tenant(tenant_id)
.with_claim("tenant_id", json!(tenant_id.to_string()))
.build();

assert_eq!(ctx.tenant_id(), Some(tenant_id));

HTTP Mocking

Mock external HTTP calls with pattern matching. Requests are recorded for verification.

Pattern Matching

let ctx = TestMutationContext::builder()
// Wildcard at end
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
// Wildcard in middle
.mock_http_json("api.example.com/*/users", json!([]))
// Exact match
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();

Custom Handlers

let ctx = TestJobContext::builder("process_payment")
.mock_http("api.stripe.com/v1/charges", |req| {
// Inspect the request
let amount = req.body["amount"].as_i64().unwrap_or(0);

if amount > 10000 {
MockResponse::error(400, "Amount exceeds limit")
} else {
MockResponse::json(json!({
"id": "ch_123",
"amount": amount,
"status": "succeeded"
}))
}
})
.build();

Request Verification

#[tokio::test]
async fn test_payment_calls_stripe() {
let ctx = TestMutationContext::builder()
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
.build();

process_payment(&ctx, args).await.unwrap();

// Verify calls were made
ctx.http().assert_called("api.stripe.com/*");
ctx.http().assert_called_times("api.stripe.com/*", 1);
ctx.http().assert_not_called("api.paypal.com/*");

// Verify request body
ctx.http().assert_called_with_body("api.stripe.com/*", |body| {
body["amount"] == 1000 && body["currency"] == "usd"
});

// Get all recorded requests
let requests = ctx.http().requests_to("api.stripe.com/*");
assert_eq!(requests[0].method, "POST");
}

Response Helpers

// Success responses
MockResponse::json(json!({ "data": "value" })) // 200 with JSON
MockResponse::ok() // 200 with empty JSON

// Error responses
MockResponse::error(400, "Bad request")
MockResponse::not_found("Resource not found")
MockResponse::unauthorized("Invalid token")
MockResponse::internal_error("Server error")

Job and Workflow Dispatch

Test contexts for mutations and webhooks record dispatched jobs and workflows.

Job Dispatch Assertions

#[tokio::test]
async fn test_order_dispatches_jobs() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());

create_order(&ctx, order_args).await.unwrap();

// Verify job was dispatched
assert_job_dispatched!(ctx, "send_confirmation_email");

// Verify with argument predicate
assert_job_dispatched!(ctx, "send_confirmation_email", |args| {
args["order_id"] == "ord_123"
});

// Verify job was NOT dispatched
assert_job_not_dispatched!(ctx, "send_sms");

// Verify dispatch count
ctx.job_dispatch().assert_dispatch_count("send_confirmation_email", 1);

// Get all dispatched jobs
let jobs = ctx.job_dispatch().dispatched_jobs();
let email_jobs = ctx.job_dispatch().jobs_of_type("send_confirmation_email");
}

Workflow Dispatch Assertions

#[tokio::test]
async fn test_signup_starts_onboarding() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());

signup(&ctx, signup_args).await.unwrap();

// Verify workflow was started
assert_workflow_started!(ctx, "user_onboarding");

// Verify with input predicate
assert_workflow_started!(ctx, "user_onboarding", |input| {
input["user_id"].is_string() && input["plan"] == "premium"
});

// Verify workflow was NOT started
assert_workflow_not_started!(ctx, "enterprise_onboarding");

// Get started workflows
let workflows = ctx.workflow_dispatch().workflows_named("user_onboarding");
}

Webhook Job Dispatch

#[tokio::test]
async fn test_github_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_idempotency_key("delivery-abc123")
.build();

handle_github_webhook(&ctx, payload).await.unwrap();

assert_job_dispatched!(ctx, "process_push_event");
}

Database Testing

Two ways to get a PostgreSQL connection in tests:

MethodFeature flagEnv var neededWhat happens
TestcontainerstestcontainersNoStarts a Docker container per TestDatabase
Explicit URLNoneTEST_DATABASE_URLConnects to whatever you point it at

All three go through TestDatabase::from_env(). If TEST_DATABASE_URL is set it always wins, regardless of features.

Testcontainers

Spins up a throwaway PG container via Docker. No local PG install needed.

# Cargo.toml
[dev-dependencies]
forge = { version = "0.7.1", features = ["testcontainers"] }
#[tokio::test]
async fn test_with_container() {
// Starts a PG container automatically (requires Docker)
let db = TestDatabase::from_env().await.unwrap();

let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();

let users = list_users(&ctx).await.unwrap();
}

The container lives as long as the TestDatabase. When it drops, the container stops.

To use an external PG instead, set TEST_DATABASE_URL and the container is skipped entirely:

TEST_DATABASE_URL=postgres://localhost/test_db cargo test --features testcontainers

Explicit Database URL

No feature flags. Set TEST_DATABASE_URL and go.

#[tokio::test]
async fn test_with_explicit_db() {
let db = TestDatabase::from_url("postgres://localhost/test_db").await.unwrap();
// or from TEST_DATABASE_URL env var
let db = TestDatabase::from_env().await.unwrap();

let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
}

Isolated Databases

Each test gets its own database. Prevents data collision between parallel tests.

#[tokio::test]
async fn test_user_creation() {
// One-liner: creates isolated DB, runs internal SQL, and applies migrations
let db = IsolatedTestDb::setup(
"test_user_creation",
&forge::get_internal_sql(),
Path::new("migrations"),
).await.unwrap();

let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();

create_user(&ctx, user_args).await.unwrap();

// Optional: explicit cleanup (otherwise orphaned DBs are cleaned on next run)
db.cleanup().await.unwrap();
}

For more control over the setup steps, use the manual approach:

#[tokio::test]
async fn test_with_custom_setup() {
let base = TestDatabase::from_env().await.unwrap();
let db = base.isolated("test_custom_setup").await.unwrap();

db.run_sql(r#"
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT NOT NULL UNIQUE
);
"#).await.unwrap();
db.migrate(Path::new("migrations")).await.unwrap();

let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();

create_user(&ctx, user_args).await.unwrap();
db.cleanup().await.unwrap();
}

Testing Jobs

#[test]
fn test_export_job_progress() {
let ctx = TestJobContext::builder("export_data")
.with_job_id(Uuid::new_v4())
.with_max_attempts(3)
.as_user(Uuid::new_v4())
.with_env("EXPORT_BUCKET", "test-bucket")
.mock_http_json("storage.googleapis.com/*", json!({ "uploaded": true }))
.build();

// Simulate progress updates
ctx.progress(25, "Fetching data").unwrap();
ctx.progress(50, "Processing rows").unwrap();
ctx.progress(100, "Upload complete").unwrap();

// Verify progress was recorded
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 3);
assert_eq!(updates[0].percent, 25);
assert_eq!(updates[0].message, "Fetching data");
assert_eq!(updates[2].percent, 100);
}

Retry Simulation

#[test]
fn test_job_retry_behavior() {
// First attempt
let ctx = TestJobContext::builder("send_email")
.with_max_attempts(5)
.build();

assert!(!ctx.is_retry());
assert_eq!(ctx.attempt, 1);
assert!(!ctx.is_last_attempt());

// Third attempt (retry)
let ctx = TestJobContext::builder("send_email")
.as_retry(3)
.with_max_attempts(5)
.build();

assert!(ctx.is_retry());
assert_eq!(ctx.attempt, 3);
assert!(!ctx.is_last_attempt());

// Last attempt
let ctx = TestJobContext::builder("send_email")
.as_last_attempt()
.build();

assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}

Heartbeat

#[tokio::test]
async fn test_long_running_job() {
let ctx = TestJobContext::builder("batch_import").build();

// Heartbeat is a no-op in tests but verifies the call compiles
ctx.heartbeat().await.unwrap();
}

Testing Crons

#[test]
fn test_cron_execution() {
let ctx = TestCronContext::builder("daily_cleanup")
.with_run_id(Uuid::new_v4())
.with_timezone("America/New_York")
.with_env("RETENTION_DAYS", "30")
.build();

assert_eq!(ctx.cron_name, "daily_cleanup");
assert_eq!(ctx.timezone, "America/New_York");
assert!(!ctx.is_catch_up);

// Structured logging
ctx.log.info("Starting cleanup");
ctx.log.info_with("Deleted records", json!({ "count": 150 }));

let entries = ctx.log.entries();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].level, "info");
}

Lateness Detection

#[test]
fn test_late_cron_execution() {
let scheduled = Utc::now() - Duration::minutes(5);

let ctx = TestCronContext::builder("hourly_sync")
.scheduled_at(scheduled)
.build();

// More than 1 minute late
assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}

Catch-Up Runs

#[test]
fn test_catch_up_cron() {
let ctx = TestCronContext::builder("daily_report")
.as_catch_up()
.scheduled_at(Utc::now() - Duration::hours(25))
.build();

assert!(ctx.is_catch_up);
assert!(ctx.is_late());
}

Testing Workflows

#[test]
fn test_workflow_execution() {
let ctx = TestWorkflowContext::builder("order_fulfillment")
.with_run_id(Uuid::new_v4())
.with_version(2)
.as_user(Uuid::new_v4())
.with_tenant(Uuid::new_v4())
.build();

assert_eq!(ctx.workflow_name, "order_fulfillment");
assert_eq!(ctx.version, 2);
assert!(!ctx.is_resumed());
}

Step Tracking

#[test]
fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("user_onboarding").build();

// Record step execution
ctx.record_step_start("validate_email");
ctx.record_step_complete("validate_email", json!({ "valid": true }));

ctx.record_step_start("create_account");
ctx.record_step_complete("create_account", json!({ "user_id": "u_123" }));

// Verify steps
assert!(ctx.is_step_completed("validate_email"));
assert!(ctx.is_step_completed("create_account"));
assert!(!ctx.is_step_completed("send_welcome"));

// Get step result
let result: Option<serde_json::Value> = ctx.get_step_result("create_account");
assert_eq!(result.unwrap()["user_id"], "u_123");

// Verify order
let completed = ctx.completed_step_names();
assert_eq!(completed, vec!["validate_email", "create_account"]);
}

Resume Simulation

Test workflow resumption after crash or sleep.

#[test]
fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("payment_flow")
.as_resumed()
.with_completed_step("validate_card", json!({ "valid": true }))
.with_completed_step("reserve_funds", json!({ "hold_id": "h_123" }))
.build();

assert!(ctx.is_resumed());
assert!(ctx.is_step_completed("validate_card"));
assert!(ctx.is_step_completed("reserve_funds"));
assert!(!ctx.is_step_completed("capture_payment"));

// Resume from where we left off
let hold_id: String = ctx.get_step_result::<serde_json::Value>("reserve_funds")
.and_then(|v| v["hold_id"].as_str().map(String::from))
.unwrap();
assert_eq!(hold_id, "h_123");
}

Deterministic Time

#[test]
fn test_workflow_deterministic_time() {
let fixed_time = Utc.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap();

let ctx = TestWorkflowContext::builder("trial_expiration")
.with_workflow_time(fixed_time)
.build();

// workflow_time() returns the same value on every call
// even across replays, enabling deterministic execution
assert_eq!(ctx.workflow_time(), fixed_time);
}

Durable Sleep

#[tokio::test]
async fn test_workflow_sleep() {
let ctx = TestWorkflowContext::builder("reminder_flow").build();

assert!(!ctx.sleep_called());

// Sleep is a no-op in tests but records intent
ctx.sleep(Duration::from_secs(86400)).await.unwrap();

assert!(ctx.sleep_called());
}

Testing Daemons

#[test]
fn test_daemon_creation() {
let ctx = TestDaemonContext::builder("heartbeat_daemon")
.with_instance_id(Uuid::new_v4())
.with_env("HEARTBEAT_INTERVAL", "5")
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();

assert_eq!(ctx.daemon_name, "heartbeat_daemon");
assert!(!ctx.is_shutdown_requested());
}

Shutdown Signal Control

#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("worker_daemon").build();

// Initially not shutdown
assert!(!ctx.is_shutdown_requested());

// Trigger shutdown from test
ctx.request_shutdown();

// Daemon sees shutdown
assert!(ctx.is_shutdown_requested());
}

Graceful Shutdown Pattern

#[tokio::test]
async fn test_daemon_graceful_shutdown() {
let ctx = TestDaemonContext::builder("queue_processor").build();

// Spawn shutdown trigger
let shutdown_tx = ctx.shutdown_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = shutdown_tx.send(true);
});

// Wait for shutdown signal (with timeout)
tokio::time::timeout(
Duration::from_millis(200),
ctx.shutdown_signal(),
).await.expect("Shutdown should complete");

assert!(ctx.is_shutdown_requested());
}

Testing Webhooks

#[tokio::test]
async fn test_stripe_webhook() {
let ctx = TestWebhookContext::builder("stripe_webhook")
.with_header("Stripe-Signature", "t=123,v1=abc...")
.with_idempotency_key("evt_123")
.with_env("STRIPE_WEBHOOK_SECRET", "whsec_test")
.build();

assert_eq!(ctx.webhook_name, "stripe_webhook");
assert_eq!(ctx.idempotency_key, Some("evt_123".to_string()));
assert_eq!(ctx.header("Stripe-Signature"), Some("t=123,v1=abc..."));
}

Case-Insensitive Headers

#[test]
fn test_webhook_headers() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_header("Content-Type", "application/json")
.build();

// Headers are case-insensitive
assert_eq!(ctx.header("x-github-event"), Some("push"));
assert_eq!(ctx.header("X-GITHUB-EVENT"), Some("push"));
assert_eq!(ctx.header("content-type"), Some("application/json"));
}

Webhook Job Dispatch

#[tokio::test]
async fn test_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("payment_webhook").build();

ctx.dispatch_job("process_payment", json!({
"payment_id": "pay_123",
"amount": 1000
})).await.unwrap();

ctx.job_dispatch().assert_dispatched("process_payment");
ctx.job_dispatch().assert_dispatched_with("process_payment", |args| {
args["amount"] == 1000
});
}

Assertion Macros

Forge provides ergonomic assertion macros for common patterns.

Result Assertions

use forge::prelude::*;

// Assert success
let result = some_operation();
assert_ok!(result);
assert_ok!(result, "Operation should succeed");

// Assert failure
let result: Result<(), ForgeError> = Err(ForgeError::NotFound("user".into()));
assert_err!(result);

// Assert specific error variant
assert_err_variant!(result, ForgeError::NotFound(_));

Dispatch Assertions

// Jobs
assert_job_dispatched!(ctx, "send_email");
assert_job_dispatched!(ctx, "send_email", |args| args["to"] == "test@example.com");
assert_job_not_dispatched!(ctx, "send_sms");

// Workflows
assert_workflow_started!(ctx, "onboarding");
assert_workflow_started!(ctx, "onboarding", |input| input["plan"] == "premium");
assert_workflow_not_started!(ctx, "enterprise_setup");

// HTTP
assert_http_called!(ctx, "api.stripe.com/*");
assert_http_not_called!(ctx, "api.paypal.com/*");

JSON Matching

Partial matching for JSON values.

use forge::testing::assert_json_matches;

let actual = json!({
"id": 123,
"name": "Test",
"nested": { "foo": "bar" },
"extra": "ignored"
});

// Only specified fields must match
assert!(assert_json_matches(&actual, &json!({ "id": 123 })));
assert!(assert_json_matches(&actual, &json!({ "name": "Test" })));
assert!(assert_json_matches(&actual, &json!({ "nested": { "foo": "bar" } })));

// Non-matches
assert!(!assert_json_matches(&actual, &json!({ "id": 456 })));
assert!(!assert_json_matches(&actual, &json!({ "missing": true })));

Under the Hood

Test Isolation

Each test context is fully independent. Authentication, environment variables, HTTP mocks, and dispatch recorders are scoped to that instance. This avoids global state mutation.

Deterministic Execution

Test contexts provide deterministic alternatives to non-deterministic operations:

  • workflow_time() returns fixed time instead of wall clock
  • HTTP mocks return controlled responses
  • Environment comes from test configuration, not system
  • Job/workflow dispatch records instead of executing

Real PostgreSQL

All database test modes (testcontainers, explicit URL) run against real PostgreSQL. Tests exercise real SQL with real constraints, indexes, and behavior.

Transactional Rollback

Isolated databases are created fresh per test. Manual cleanup or transaction rollback tricks are not required; each test starts with a clean slate.

Production Parity

Test contexts mirror production contexts. The same methods (ctx.db(), ctx.db_conn(), ctx.http(), ctx.env()) work identically. Tests written against test contexts run against production contexts without modification.