Skip to main content

Testing

Test queries, mutations, jobs, workflows, crons, daemons, and webhooks with isolated contexts, HTTP mocking, and real PostgreSQL.

The Code

#[tokio::test]
async fn test_admin_can_delete_user() {
let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_env("AUDIT_LOG_URL", "https://audit.example.com")
.mock_http_json("audit.example.com/*", json!({ "logged": true }))
.build();

let result = delete_user(&ctx, DeleteUserArgs { user_id: target_id }).await;

assert_ok!(result);
assert_job_dispatched!(ctx, "cleanup_user_data");
ctx.http().assert_called("audit.example.com/*");
}

What Happens

Each test context builder creates an isolated environment with mocked authentication, configurable environment variables, HTTP interception, and dispatch recording. Tests run against real PostgreSQL when needed via embedded or isolated databases. No global state pollution between tests.

Test Contexts

Every Forge function type has a corresponding test context:

Function TypeTest ContextBuilder
QueryTestQueryContextTestQueryContext::builder()
MutationTestMutationContextTestMutationContext::builder()
JobTestJobContextTestJobContext::builder("job_name")
CronTestCronContextTestCronContext::builder("cron_name")
WorkflowTestWorkflowContextTestWorkflowContext::builder("workflow_name")
DaemonTestDaemonContextTestDaemonContext::builder("daemon_name")
WebhookTestWebhookContextTestWebhookContext::builder("webhook_name")

Builder Pattern

All test contexts follow the same builder pattern with shared methods for authentication, environment, HTTP mocking, and database access.

Authentication

// UUID-based auth (default for most apps)
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.with_role("manager")
.with_claim("org_id", json!("org-123"))
.with_claim("tier", json!("premium"))
.build();

// Non-UUID auth (Firebase, Clerk, Auth0)
let ctx = TestMutationContext::builder()
.as_subject("firebase-user-id-abc123")
.with_role("user")
.build();

// Unauthenticated
let ctx = TestQueryContext::minimal();
assert!(!ctx.auth.is_authenticated());

Environment Variables

let ctx = TestJobContext::builder("send_email")
.with_env("SMTP_HOST", "localhost")
.with_env("SMTP_PORT", "587")
.with_env("SMTP_USERNAME", "test@example.com")
.build();

// Inside the job function
let host = ctx.env_require("SMTP_HOST")?; // "localhost"
let port: u16 = ctx.env_parse("SMTP_PORT")?; // 587
let timeout = ctx.env_or("SMTP_TIMEOUT", "30"); // "30" (default)

// Verify access in tests
ctx.env_mock().assert_accessed("SMTP_HOST");
ctx.env_mock().assert_not_accessed("UNUSED_VAR");

Multi-Tenant Testing

let tenant_id = Uuid::new_v4();

let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_tenant(tenant_id)
.with_claim("tenant_id", json!(tenant_id.to_string()))
.build();

assert_eq!(ctx.tenant_id(), Some(tenant_id));

HTTP Mocking

Mock external HTTP calls with pattern matching. Requests are recorded for verification.

Pattern Matching

let ctx = TestMutationContext::builder()
// Wildcard at end
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
// Wildcard in middle
.mock_http_json("api.example.com/*/users", json!([]))
// Exact match
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();

Custom Handlers

let ctx = TestJobContext::builder("process_payment")
.mock_http("api.stripe.com/v1/charges", |req| {
// Inspect the request
let amount = req.body["amount"].as_i64().unwrap_or(0);

if amount > 10000 {
MockResponse::error(400, "Amount exceeds limit")
} else {
MockResponse::json(json!({
"id": "ch_123",
"amount": amount,
"status": "succeeded"
}))
}
})
.build();

Request Verification

#[tokio::test]
async fn test_payment_calls_stripe() {
let ctx = TestMutationContext::builder()
.mock_http_json("api.stripe.com/*", json!({ "id": "ch_123" }))
.build();

process_payment(&ctx, args).await.unwrap();

// Verify calls were made
ctx.http().assert_called("api.stripe.com/*");
ctx.http().assert_called_times("api.stripe.com/*", 1);
ctx.http().assert_not_called("api.paypal.com/*");

// Verify request body
ctx.http().assert_called_with_body("api.stripe.com/*", |body| {
body["amount"] == 1000 && body["currency"] == "usd"
});

// Get all recorded requests
let requests = ctx.http().requests_to("api.stripe.com/*");
assert_eq!(requests[0].method, "POST");
}

Response Helpers

// Success responses
MockResponse::json(json!({ "data": "value" })) // 200 with JSON
MockResponse::ok() // 200 with empty JSON

// Error responses
MockResponse::error(400, "Bad request")
MockResponse::not_found("Resource not found")
MockResponse::unauthorized("Invalid token")
MockResponse::internal_error("Server error")

Job and Workflow Dispatch

Test contexts for mutations and webhooks record dispatched jobs and workflows.

Job Dispatch Assertions

#[tokio::test]
async fn test_order_dispatches_jobs() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());

create_order(&ctx, order_args).await.unwrap();

// Verify job was dispatched
assert_job_dispatched!(ctx, "send_confirmation_email");

// Verify with argument predicate
assert_job_dispatched!(ctx, "send_confirmation_email", |args| {
args["order_id"] == "ord_123"
});

// Verify job was NOT dispatched
assert_job_not_dispatched!(ctx, "send_sms");

// Verify dispatch count
ctx.job_dispatch().assert_dispatch_count("send_confirmation_email", 1);

// Get all dispatched jobs
let jobs = ctx.job_dispatch().dispatched_jobs();
let email_jobs = ctx.job_dispatch().jobs_of_type("send_confirmation_email");
}

Workflow Dispatch Assertions

#[tokio::test]
async fn test_signup_starts_onboarding() {
let ctx = TestMutationContext::authenticated(Uuid::new_v4());

signup(&ctx, signup_args).await.unwrap();

// Verify workflow was started
assert_workflow_started!(ctx, "user_onboarding");

// Verify with input predicate
assert_workflow_started!(ctx, "user_onboarding", |input| {
input["user_id"].is_string() && input["plan"] == "premium"
});

// Verify workflow was NOT started
assert_workflow_not_started!(ctx, "enterprise_onboarding");

// Get started workflows
let workflows = ctx.workflow_dispatch().workflows_named("user_onboarding");
}

Webhook Job Dispatch

#[tokio::test]
async fn test_github_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_idempotency_key("delivery-abc123")
.build();

handle_github_webhook(&ctx, payload).await.unwrap();

assert_job_dispatched!(ctx, "process_push_event");
}

Database Testing

Embedded PostgreSQL

For zero-config database tests, use embedded PostgreSQL. Requires the embedded-db feature.

# Cargo.toml
[dev-dependencies]
forge = { version = "0.1", features = ["embedded-db"] }
#[tokio::test]
async fn test_with_embedded_db() {
// Downloads and starts PostgreSQL automatically (cached after first run)
let db = TestDatabase::embedded().await.unwrap();

let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();

// Test with real database
let users = list_users(&ctx).await.unwrap();
}

Explicit Database URL

#[tokio::test]
async fn test_with_explicit_db() {
let db = TestDatabase::from_url("postgres://localhost/test_db").await.unwrap();
// or from TEST_DATABASE_URL env var
let db = TestDatabase::from_env().await.unwrap();

let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();
}

Isolated Databases

Each test gets its own database. Prevents data collision between parallel tests.

#[tokio::test]
async fn test_user_creation() {
let base = TestDatabase::embedded().await.unwrap();
let db = base.isolated("test_user_creation").await.unwrap();

// Run migrations
db.migrate(Path::new("migrations")).await.unwrap();

// Or run setup SQL directly
db.run_sql(r#"
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT NOT NULL UNIQUE
);
"#).await.unwrap();

let ctx = TestMutationContext::builder()
.as_user(Uuid::new_v4())
.with_pool(db.pool().clone())
.build();

// Test in isolation
create_user(&ctx, user_args).await.unwrap();

// Optional: explicit cleanup (otherwise orphaned DBs are cleaned on next run)
db.cleanup().await.unwrap();
}

Testing Jobs

#[test]
fn test_export_job_progress() {
let ctx = TestJobContext::builder("export_data")
.with_job_id(Uuid::new_v4())
.with_max_attempts(3)
.as_user(Uuid::new_v4())
.with_env("EXPORT_BUCKET", "test-bucket")
.mock_http_json("storage.googleapis.com/*", json!({ "uploaded": true }))
.build();

// Simulate progress updates
ctx.progress(25, "Fetching data").unwrap();
ctx.progress(50, "Processing rows").unwrap();
ctx.progress(100, "Upload complete").unwrap();

// Verify progress was recorded
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 3);
assert_eq!(updates[0].percent, 25);
assert_eq!(updates[0].message, "Fetching data");
assert_eq!(updates[2].percent, 100);
}

Retry Simulation

#[test]
fn test_job_retry_behavior() {
// First attempt
let ctx = TestJobContext::builder("send_email")
.with_max_attempts(5)
.build();

assert!(!ctx.is_retry());
assert_eq!(ctx.attempt, 1);
assert!(!ctx.is_last_attempt());

// Third attempt (retry)
let ctx = TestJobContext::builder("send_email")
.as_retry(3)
.with_max_attempts(5)
.build();

assert!(ctx.is_retry());
assert_eq!(ctx.attempt, 3);
assert!(!ctx.is_last_attempt());

// Last attempt
let ctx = TestJobContext::builder("send_email")
.as_last_attempt()
.build();

assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}

Heartbeat

#[tokio::test]
async fn test_long_running_job() {
let ctx = TestJobContext::builder("batch_import").build();

// Heartbeat is a no-op in tests but verifies the call compiles
ctx.heartbeat().await.unwrap();
}

Testing Crons

#[test]
fn test_cron_execution() {
let ctx = TestCronContext::builder("daily_cleanup")
.with_run_id(Uuid::new_v4())
.with_timezone("America/New_York")
.with_env("RETENTION_DAYS", "30")
.build();

assert_eq!(ctx.cron_name, "daily_cleanup");
assert_eq!(ctx.timezone, "America/New_York");
assert!(!ctx.is_catch_up);

// Structured logging
ctx.log.info("Starting cleanup");
ctx.log.info_with("Deleted records", json!({ "count": 150 }));

let entries = ctx.log.entries();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].level, "info");
}

Lateness Detection

#[test]
fn test_late_cron_execution() {
let scheduled = Utc::now() - Duration::minutes(5);

let ctx = TestCronContext::builder("hourly_sync")
.scheduled_at(scheduled)
.build();

// More than 1 minute late
assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}

Catch-Up Runs

#[test]
fn test_catch_up_cron() {
let ctx = TestCronContext::builder("daily_report")
.as_catch_up()
.scheduled_at(Utc::now() - Duration::hours(25))
.build();

assert!(ctx.is_catch_up);
assert!(ctx.is_late());
}

Testing Workflows

#[test]
fn test_workflow_execution() {
let ctx = TestWorkflowContext::builder("order_fulfillment")
.with_run_id(Uuid::new_v4())
.with_version(2)
.as_user(Uuid::new_v4())
.with_tenant(Uuid::new_v4())
.build();

assert_eq!(ctx.workflow_name, "order_fulfillment");
assert_eq!(ctx.version, 2);
assert!(!ctx.is_resumed());
}

Step Tracking

#[test]
fn test_workflow_steps() {
let ctx = TestWorkflowContext::builder("user_onboarding").build();

// Record step execution
ctx.record_step_start("validate_email");
ctx.record_step_complete("validate_email", json!({ "valid": true }));

ctx.record_step_start("create_account");
ctx.record_step_complete("create_account", json!({ "user_id": "u_123" }));

// Verify steps
assert!(ctx.is_step_completed("validate_email"));
assert!(ctx.is_step_completed("create_account"));
assert!(!ctx.is_step_completed("send_welcome"));

// Get step result
let result: Option<serde_json::Value> = ctx.get_step_result("create_account");
assert_eq!(result.unwrap()["user_id"], "u_123");

// Verify order
let completed = ctx.completed_step_names();
assert_eq!(completed, vec!["validate_email", "create_account"]);
}

Resume Simulation

Test workflow resumption after crash or sleep.

#[test]
fn test_workflow_resume() {
let ctx = TestWorkflowContext::builder("payment_flow")
.as_resumed()
.with_completed_step("validate_card", json!({ "valid": true }))
.with_completed_step("reserve_funds", json!({ "hold_id": "h_123" }))
.build();

assert!(ctx.is_resumed());
assert!(ctx.is_step_completed("validate_card"));
assert!(ctx.is_step_completed("reserve_funds"));
assert!(!ctx.is_step_completed("capture_payment"));

// Resume from where we left off
let hold_id: String = ctx.get_step_result::<serde_json::Value>("reserve_funds")
.and_then(|v| v["hold_id"].as_str().map(String::from))
.unwrap();
assert_eq!(hold_id, "h_123");
}

Deterministic Time

#[test]
fn test_workflow_deterministic_time() {
let fixed_time = Utc.with_ymd_and_hms(2024, 1, 15, 10, 30, 0).unwrap();

let ctx = TestWorkflowContext::builder("trial_expiration")
.with_workflow_time(fixed_time)
.build();

// workflow_time() returns the same value on every call
// even across replays, enabling deterministic execution
assert_eq!(ctx.workflow_time(), fixed_time);
}

Durable Sleep

#[tokio::test]
async fn test_workflow_sleep() {
let ctx = TestWorkflowContext::builder("reminder_flow").build();

assert!(!ctx.sleep_called());

// Sleep is a no-op in tests but records intent
ctx.sleep(Duration::from_secs(86400)).await.unwrap();

assert!(ctx.sleep_called());
}

Testing Daemons

#[test]
fn test_daemon_creation() {
let ctx = TestDaemonContext::builder("heartbeat_daemon")
.with_instance_id(Uuid::new_v4())
.with_env("HEARTBEAT_INTERVAL", "5")
.mock_http_json("api.example.com/health", json!({ "status": "ok" }))
.build();

assert_eq!(ctx.daemon_name, "heartbeat_daemon");
assert!(!ctx.is_shutdown_requested());
}

Shutdown Signal Control

#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("worker_daemon").build();

// Initially not shutdown
assert!(!ctx.is_shutdown_requested());

// Trigger shutdown from test
ctx.request_shutdown();

// Daemon sees shutdown
assert!(ctx.is_shutdown_requested());
}

Graceful Shutdown Pattern

#[tokio::test]
async fn test_daemon_graceful_shutdown() {
let ctx = TestDaemonContext::builder("queue_processor").build();

// Spawn shutdown trigger
let shutdown_tx = ctx.shutdown_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = shutdown_tx.send(true);
});

// Wait for shutdown signal (with timeout)
tokio::time::timeout(
Duration::from_millis(200),
ctx.shutdown_signal(),
).await.expect("Shutdown should complete");

assert!(ctx.is_shutdown_requested());
}

Testing Webhooks

#[tokio::test]
async fn test_stripe_webhook() {
let ctx = TestWebhookContext::builder("stripe_webhook")
.with_header("Stripe-Signature", "t=123,v1=abc...")
.with_idempotency_key("evt_123")
.with_env("STRIPE_WEBHOOK_SECRET", "whsec_test")
.build();

assert_eq!(ctx.webhook_name, "stripe_webhook");
assert_eq!(ctx.idempotency_key, Some("evt_123".to_string()));
assert_eq!(ctx.header("Stripe-Signature"), Some("t=123,v1=abc..."));
}

Case-Insensitive Headers

#[test]
fn test_webhook_headers() {
let ctx = TestWebhookContext::builder("github_webhook")
.with_header("X-GitHub-Event", "push")
.with_header("Content-Type", "application/json")
.build();

// Headers are case-insensitive
assert_eq!(ctx.header("x-github-event"), Some("push"));
assert_eq!(ctx.header("X-GITHUB-EVENT"), Some("push"));
assert_eq!(ctx.header("content-type"), Some("application/json"));
}

Webhook Job Dispatch

#[tokio::test]
async fn test_webhook_dispatches_job() {
let ctx = TestWebhookContext::builder("payment_webhook").build();

ctx.dispatch_job("process_payment", json!({
"payment_id": "pay_123",
"amount": 1000
})).await.unwrap();

ctx.job_dispatch().assert_dispatched("process_payment");
ctx.job_dispatch().assert_dispatched_with("process_payment", |args| {
args["amount"] == 1000
});
}

Assertion Macros

Forge provides ergonomic assertion macros for common patterns.

Result Assertions

use forge::prelude::*;

// Assert success
let result = some_operation();
assert_ok!(result);
assert_ok!(result, "Operation should succeed");

// Assert failure
let result: Result<(), ForgeError> = Err(ForgeError::NotFound("user".into()));
assert_err!(result);

// Assert specific error variant
assert_err_variant!(result, ForgeError::NotFound(_));

Dispatch Assertions

// Jobs
assert_job_dispatched!(ctx, "send_email");
assert_job_dispatched!(ctx, "send_email", |args| args["to"] == "test@example.com");
assert_job_not_dispatched!(ctx, "send_sms");

// Workflows
assert_workflow_started!(ctx, "onboarding");
assert_workflow_started!(ctx, "onboarding", |input| input["plan"] == "premium");
assert_workflow_not_started!(ctx, "enterprise_setup");

// HTTP
assert_http_called!(ctx, "api.stripe.com/*");
assert_http_not_called!(ctx, "api.paypal.com/*");

JSON Matching

Partial matching for JSON values.

use forge::testing::assert_json_matches;

let actual = json!({
"id": 123,
"name": "Test",
"nested": { "foo": "bar" },
"extra": "ignored"
});

// Only specified fields must match
assert!(assert_json_matches(&actual, &json!({ "id": 123 })));
assert!(assert_json_matches(&actual, &json!({ "name": "Test" })));
assert!(assert_json_matches(&actual, &json!({ "nested": { "foo": "bar" } })));

// Non-matches
assert!(!assert_json_matches(&actual, &json!({ "id": 456 })));
assert!(!assert_json_matches(&actual, &json!({ "missing": true })));

Under the Hood

Test Isolation

Each test context is fully independent. Authentication, environment variables, HTTP mocks, and dispatch recorders are scoped to that instance. No global state mutation.

Deterministic Execution

Test contexts provide deterministic alternatives to non-deterministic operations:

  • workflow_time() returns fixed time instead of wall clock
  • HTTP mocks return controlled responses
  • Environment comes from test configuration, not system
  • Job/workflow dispatch records instead of executing

Embedded PostgreSQL

TestDatabase::embedded() downloads and starts a real PostgreSQL instance. The binary is cached after first download. Tests run against real SQL with real constraints, indexes, and behavior.

Transactional Rollback

Isolated databases are created fresh per test. No need for manual cleanup or transaction rollback tricks. Each test starts with a clean slate.

Production Parity

Test contexts mirror production contexts. The same methods (ctx.db(), ctx.http(), ctx.env()) work identically. Tests written against test contexts run against production contexts without modification.