Context objects provide access to databases, authentication, environment variables, and dispatch capabilities within Forge handlers.
AuthContext
Authentication state for the current request. Available on all contexts via the auth field.
Fields
| Field | Type | Description |
|---|
| - | - | Fields are private; use methods below |
Methods
| Method | Signature | Description |
|---|
is_authenticated | fn is_authenticated(&self) -> bool | Check if user is authenticated |
user_id | fn user_id(&self) -> Option<Uuid> | Get user ID (if UUID-based auth) |
require_user_id | fn require_user_id(&self) -> Result<Uuid> | Get user ID or return 401 error |
subject | fn subject(&self) -> Option<&str> | Get raw subject claim (any format) |
require_subject | fn require_subject(&self) -> Result<&str> | Get subject or return 401 error |
has_role | fn has_role(&self, role: &str) -> bool | Check if user has role |
require_role | fn require_role(&self, role: &str) -> Result<()> | Require role or return 403 error |
claim | fn claim(&self, key: &str) -> Option<&Value> | Get custom JWT claim |
roles | fn roles(&self) -> &[String] | Get all user roles |
Constructors (Testing)
| Constructor | Signature | Description |
|---|
unauthenticated | fn unauthenticated() -> Self | Create unauthenticated context |
authenticated | fn authenticated(user_id: Uuid, roles: Vec<String>, claims: HashMap<String, Value>) -> Self | Create authenticated context with UUID |
authenticated_without_uuid | fn authenticated_without_uuid(roles: Vec<String>, claims: HashMap<String, Value>) -> Self | Create authenticated context for non-UUID providers (Firebase, Clerk) |
QueryContext
Read-only database access for query handlers.
Fields
| Field | Type | Description |
|---|
auth | AuthContext | Authentication context |
request | RequestMetadata | Request metadata (trace ID, client IP, timestamp) |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
require_user_id | fn require_user_id(&self) -> Result<Uuid> | Get user ID or return error |
require_subject | fn require_subject(&self) -> Result<&str> | Get subject or return error |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
MutationContext
Transactional database access with HTTP client and job/workflow dispatch.
Fields
| Field | Type | Description |
|---|
auth | AuthContext | Authentication context |
request | RequestMetadata | Request metadata (trace ID, client IP, timestamp) |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
require_user_id | fn require_user_id(&self) -> Result<Uuid> | Get user ID or return error |
require_subject | fn require_subject(&self) -> Result<&str> | Get subject or return error |
dispatch_job | async fn dispatch_job<T: Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> | Dispatch a background job |
start_workflow | async fn start_workflow<T: Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> | Start a workflow |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
JobContext
Background job execution context with progress reporting and heartbeats.
Fields
| Field | Type | Description |
|---|
job_id | Uuid | Unique job identifier |
job_type | String | Job type name |
attempt | u32 | Current attempt number (1-based) |
max_attempts | u32 | Maximum allowed attempts |
auth | AuthContext | Authentication context |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
progress | fn progress(&self, percentage: u8, message: impl Into<String>) -> Result<()> | Report progress (0-100, clamped) |
heartbeat | async fn heartbeat(&self) -> Result<()> | Send heartbeat to prevent stale reclaim |
save | async fn save(&self, key: &str, value: Value) -> Result<()> | Save data for retries/compensation |
saved | async fn saved(&self) -> Value | Get all saved job data |
set_saved | async fn set_saved(&self, data: Value) -> Result<()> | Replace all saved data |
is_cancel_requested | async fn is_cancel_requested(&self) -> Result<bool> | Check if cancellation was requested |
check_cancelled | async fn check_cancelled(&self) -> Result<()> | Return error if cancellation requested |
is_retry | fn is_retry(&self) -> bool | Check if this is a retry attempt |
is_last_attempt | fn is_last_attempt(&self) -> bool | Check if this is the final attempt |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
CronContext
Scheduled task execution context with timing information.
Fields
| Field | Type | Description |
|---|
run_id | Uuid | Unique run identifier |
cron_name | String | Cron job name |
scheduled_time | DateTime<Utc> | When the cron was supposed to run |
execution_time | DateTime<Utc> | Actual execution start time |
timezone | String | Configured timezone |
is_catch_up | bool | Whether this is a catch-up run after downtime |
auth | AuthContext | Authentication context |
log | CronLog | Structured logger |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
delay | fn delay(&self) -> Duration | Time between scheduled and actual execution |
is_late | fn is_late(&self) -> bool | Check if delay exceeds 1 minute |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
CronLog Methods
| Method | Signature | Description |
|---|
info | fn info(&self, message: &str, data: Value) | Log info with structured data |
warn | fn warn(&self, message: &str, data: Value) | Log warning with structured data |
error | fn error(&self, message: &str, data: Value) | Log error with structured data |
debug | fn debug(&self, message: &str, data: Value) | Log debug with structured data |
WorkflowContext
Durable workflow execution with step tracking, sleep, events, and compensation.
Fields
| Field | Type | Description |
|---|
run_id | Uuid | Unique workflow run identifier |
workflow_name | String | Workflow name |
version | u32 | Workflow version |
started_at | DateTime<Utc> | When the workflow started |
auth | AuthContext | Authentication context |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
workflow_time | fn workflow_time(&self) -> DateTime<Utc> | Deterministic time (consistent across replays) |
elapsed | fn elapsed(&self) -> Duration | Time since workflow started |
is_resumed | fn is_resumed(&self) -> bool | Check if this is a resumed execution |
tenant_id | fn tenant_id(&self) -> Option<Uuid> | Get tenant ID for multi-tenancy |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
Step Execution
| Method | Signature | Description |
|---|
step | fn step<T, F, Fut>(&self, name: impl Into<String>, f: F) -> StepRunner<T> | Create a step with fluent API |
parallel | fn parallel(&self) -> ParallelBuilder | Create parallel step builder |
Durable Operations
| Method | Signature | Description |
|---|
sleep | async fn sleep(&self, duration: Duration) -> Result<()> | Durable sleep (survives restarts) |
sleep_until | async fn sleep_until(&self, wake_at: DateTime<Utc>) -> Result<()> | Sleep until specific time |
wait_for_event | async fn wait_for_event<T: DeserializeOwned>(&self, event_name: &str, timeout: Option<Duration>) -> Result<T> | Wait for external event |
Step State
| Method | Signature | Description |
|---|
is_step_completed | fn is_step_completed(&self, name: &str) -> bool | Check if step completed |
is_step_started | fn is_step_started(&self, name: &str) -> bool | Check if step started (running, completed, or failed) |
get_step_result | fn get_step_result<T: DeserializeOwned>(&self, name: &str) -> Option<T> | Get completed step result |
get_step_state | fn get_step_state(&self, name: &str) -> Option<StepState> | Get full step state |
all_step_states | fn all_step_states(&self) -> HashMap<String, StepState> | Get all step states |
completed_steps_reversed | fn completed_steps_reversed(&self) -> Vec<String> | Get completed steps in reverse order |
Compensation
| Method | Signature | Description |
|---|
register_compensation | fn register_compensation(&self, step_name: &str, handler: CompensationHandler) | Register rollback handler |
get_compensation_handler | fn get_compensation_handler(&self, step_name: &str) -> Option<CompensationHandler> | Get compensation handler |
has_compensation | fn has_compensation(&self, step_name: &str) -> bool | Check if step has compensation |
run_compensation | async fn run_compensation(&self) -> Vec<(String, bool)> | Run all compensations in reverse |
StepRunner (Fluent API)
Created via ctx.step(name, || async { ... }).
| Method | Signature | Description |
|---|
compensate | fn compensate<F, Fut>(self, f: F) -> Self | Set rollback handler |
timeout | fn timeout(self, duration: Duration) -> Self | Set step timeout |
optional | fn optional(self) -> Self | Mark step as optional (failure continues workflow) |
run | async fn run(self) -> Result<T> | Execute the step |
ParallelBuilder
Created via ctx.parallel().
| Method | Signature | Description |
|---|
step | fn step<T, F, Fut>(self, name: &str, handler: F) -> Self | Add parallel step |
step_with_compensate | fn step_with_compensate<T, F, Fut, C, CFut>(self, name: &str, handler: F, compensate: C) -> Self | Add parallel step with compensation |
run | async fn run(self) -> Result<ParallelResults> | Execute all steps in parallel |
ParallelResults
Returned by ParallelBuilder::run().
| Method | Signature | Description |
|---|
get | fn get<T: DeserializeOwned>(&self, step_name: &str) -> Result<T> | Get typed result by step name |
contains | fn contains(&self, step_name: &str) -> bool | Check if result exists |
len | fn len(&self) -> usize | Number of results |
is_empty | fn is_empty(&self) -> bool | Check if empty |
iter | fn iter(&self) -> impl Iterator<Item = (&String, &Value)> | Iterate over results |
DaemonContext
Long-running service context with shutdown signaling and heartbeats.
Fields
| Field | Type | Description |
|---|
daemon_name | String | Daemon name |
instance_id | Uuid | Unique instance identifier |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
is_shutdown_requested | fn is_shutdown_requested(&self) -> bool | Check if shutdown was requested |
shutdown_signal | async fn shutdown_signal(&self) | Wait for shutdown signal |
heartbeat | async fn heartbeat(&self) -> Result<()> | Send heartbeat to indicate daemon is alive |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
Graceful Shutdown Pattern
loop {
tokio::select! {
_ = do_work() => {}
_ = ctx.shutdown_signal() => break,
}
}
WebhookContext
Incoming webhook handler context with headers, job dispatch, and idempotency.
Fields
| Field | Type | Description |
|---|
webhook_name | String | Webhook handler name |
request_id | String | Unique request identifier |
idempotency_key | Option<String> | Extracted idempotency key (if configured) |
Methods
| Method | Signature | Description |
|---|
db | fn db(&self) -> &PgPool | Get database connection pool |
http | fn http(&self) -> &Client | Get HTTP client for external calls |
header | fn header(&self, name: &str) -> Option<&str> | Get request header (case-insensitive) |
headers | fn headers(&self) -> &HashMap<String, String> | Get all headers |
dispatch_job | async fn dispatch_job<T: Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> | Dispatch background job for async processing |
env | fn env(&self, key: &str) -> Option<String> | Get environment variable |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get required env var |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
Metadata about the incoming request. Available on QueryContext and MutationContext.
Fields
| Field | Type | Description |
|---|
request_id | Uuid | Unique request identifier |
trace_id | String | Distributed tracing ID |
client_ip | Option<String> | Client IP address |
user_agent | Option<String> | User agent string |
timestamp | DateTime<Utc> | Request timestamp |
Environment Variables
All contexts implement EnvAccess for type-safe environment variable access.
| Method | Signature | Description |
|---|
env | fn env(&self, key: &str) -> Option<String> | Get env var, returns None if unset |
env_or | fn env_or(&self, key: &str, default: &str) -> String | Get env var with default value |
env_require | fn env_require(&self, key: &str) -> Result<String> | Get env var, returns error if unset |
env_parse | fn env_parse<T: FromStr>(&self, key: &str) -> Result<T> | Parse env var to type |
env_parse_or | fn env_parse_or<T: FromStr>(&self, key: &str, default: T) -> Result<T> | Parse env var with default |
env_contains | fn env_contains(&self, key: &str) -> bool | Check if env var is set |
let timeout = ctx.env("API_TIMEOUT");
let timeout = ctx.env_or("API_TIMEOUT", "30");
let api_key = ctx.env_require("API_KEY")?;
let port: u16 = ctx.env_parse("PORT")?;
let max_retries: u32 = ctx.env_parse_or("MAX_RETRIES", 3)?;
StepState
Workflow step state (returned by get_step_state).
Fields
| Field | Type | Description |
|---|
name | String | Step name |
status | StepStatus | Current status |
result | Option<Value> | Serialized result (if completed) |
error | Option<String> | Error message (if failed) |
started_at | Option<DateTime<Utc>> | When step started |
completed_at | Option<DateTime<Utc>> | When step completed |
StepStatus
| Variant | Description |
|---|
Pending | Step not yet started |
Running | Step currently executing |
Completed | Step finished successfully |
Failed | Step failed with error |
Compensated | Step rolled back via compensation |