Multiple Nodes
Configure multi-node deployments with leader election, work distribution, and failure recovery.
The Code
# forge.toml
[cluster]
discovery = "postgres"
heartbeat_interval = "5s"
dead_threshold = "15s"
[node]
roles = ["gateway", "worker", "scheduler", "function"]
worker_capabilities = ["general"]
// Same application code runs on every node
#[forge::job]
pub async fn process_order(ctx: &JobContext, args: OrderArgs) -> Result<Order> {
// Any worker in the cluster can claim this job
charge_card(&args.payment).await?;
fulfill_order(&args.order_id).await
}
#[forge::cron("0 * * * *")]
pub async fn hourly_report(ctx: &CronContext) -> Result<()> {
// Only the scheduler leader runs this
generate_and_email_report(ctx.db()).await
}
#[forge::daemon]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
// Single instance across cluster via advisory lock
stream_prices(ctx).await
}
What Happens
Deploy the same binary to multiple servers. Each node registers itself in the forge_nodes table and begins sending heartbeats. Workers claim jobs using FOR UPDATE SKIP LOCKED. One node becomes the scheduler leader via PostgreSQL advisory lock. If a node stops sending heartbeats for 15 seconds, it is marked dead and its jobs are reclaimed.
Coordination relies on PostgreSQL advisory locks and tables rather than Raft, Paxos, or Zookeeper.
Node Roles
| Role | Description |
|---|---|
gateway | HTTP endpoints for client requests |
function | Executes queries and mutations |
worker | Processes background jobs |
scheduler | Runs cron jobs (leader-only) |
A single node can have all roles (default) or specialize:
# API servers: handle HTTP, execute functions, no background work
[node]
roles = ["gateway", "function"]
# Workers: process jobs, no HTTP traffic
[node]
roles = ["worker"]
# Scheduler: only one needed, runs crons
[node]
roles = ["scheduler"]
Role specialization is optional. Most deployments run all roles on every node.
Cluster Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
heartbeat_interval | string | "5s" | Heartbeat frequency |
dead_threshold | string | "15s" | Time until node marked dead |
Discovery
Nodes register in the forge_nodes table. Additional infrastructure is not required.
[cluster]
discovery = "postgres"
Every node queries the same database. They find each other automatically.
Leader Election
Leader election uses PostgreSQL advisory locks instead of distributed consensus.
SELECT pg_try_advisory_lock(0x464F52470001)
The lock ID is a hash of the leader role. One node acquires the lock. Others return false and wait, which avoids quorum management and split-brain handling in the application layer.
Leader Roles
| Role | Lock ID | Purpose |
|---|---|---|
scheduler | 0x464F52470001 | Cron triggering, job assignment |
metrics_aggregator | 0x464F52470002 | Metrics collection (planned, not yet active) |
log_compactor | 0x464F52470003 | Log compaction (planned, not yet active) |
Each role has an independent lock. Different nodes can lead different roles.
The forge_leaders table records which node currently holds each leader role. Query it to see the active scheduler without inspecting pg_locks directly.
Automatic Failover
Advisory locks are session-bound. When a database connection closes, the lock releases automatically.
Node A: acquires lock → running → [crash]
↓
PostgreSQL detects connection drop
↓
Node B: waiting → acquires lock → running
Failover happens within the leader check interval (default 5 seconds for a stable cluster). The heartbeat interval adapts when the cluster is stable, scaling up to 60 seconds between heartbeats, so the effective dead threshold adjusts accordingly.
Split-Brain Fencing
Advisory locks prevent most split-brain scenarios, but network partitions can create brief windows where two nodes both believe they are leader. Forge uses fencing tokens to guard against this.
Every leader election increments a current_term counter in the database. Before executing any leader-exclusive write (cron scheduling, stale job reclaim, workflow recovery), the leader checks that its local term still matches the database term. If another node won a newer election, the stale leader's writes are rejected.
This means leader-exclusive operations are safe even during a partition: the fencing check is an atomic compare-and-set against PostgreSQL, which is the single source of truth. You don't need to add your own distributed locking on top.
If you implement custom daemons that run in leader mode, follow the same pattern: read current_term when you acquire leadership, and verify it before any state-mutating database write.
Work Distribution
Workers claim jobs using the competing consumers pattern with FOR UPDATE SKIP LOCKED:
WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($2) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $3
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $1, claimed_at = NOW(),
attempts = attempts + 1
WHERE id IN (SELECT id FROM claimable)
RETURNING *
Each worker grabs different jobs without extra coordination or locking, reducing thundering herd contention. Job claiming is O(1) regardless of cluster size.
Add workers, throughput increases linearly.
Heartbeat and Failure Detection
Nodes send heartbeats every 5 seconds:
UPDATE forge_nodes
SET last_heartbeat = NOW()
WHERE id = $1
Nodes missing heartbeats past the dead threshold are marked dead. The effective threshold is the greater of the configured dead_threshold (default "15s") and 3x the current adaptive heartbeat interval:
UPDATE forge_nodes
SET status = 'dead'
WHERE status = 'active'
AND last_heartbeat < NOW() - make_interval(secs => $1)
-- $1 = max(dead_threshold seconds, 3 * current_adaptive_interval)
Dead node detection triggers:
- Jobs from that worker are released back to pending
- Leadership locks are released (connection closed)
- Node is removed from load balancer rotation
The 15-second threshold distinguishes slow nodes from crashed nodes. A node under load might miss one heartbeat but not three.
Stale Job Recovery
Claimed jobs that haven't started within 5 minutes and running jobs with no recent heartbeat are released:
UPDATE forge_jobs
SET status = 'pending', worker_id = NULL, claimed_at = NULL,
started_at = NULL, last_heartbeat = NULL
WHERE
(status = 'claimed' AND claimed_at < NOW() - INTERVAL '5 minutes')
OR
(status = 'running'
AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - INTERVAL '5 minutes')
This runs every 60 seconds. The runtime sends automatic heartbeats every 30 seconds for running jobs, so only genuinely stalled jobs get reclaimed.
A worker crash at 0:00 means the job is reclaimed by 0:05 at worst. Another worker picks it up immediately.
Graceful Shutdown
On SIGTERM, nodes drain cleanly:
1. Stop accepting new work → status = 'draining'
2. Wait for in-flight jobs → up to 30 seconds
3. Release leadership locks → pg_advisory_unlock()
4. Deregister from cluster → DELETE FROM forge_nodes
Rolling deploys can avoid dropped requests when drain settings are respected.
// Daemons receive shutdown signal
loop {
tokio::select! {
msg = work() => handle(msg).await?,
_ = ctx.shutdown_signal() => break,
}
}
// Function returns, lock releases, another node takes over
Drain Timeout
In-flight requests have 30 seconds to complete. After timeout, the node deregisters anyway. Jobs that were mid-execution will be reclaimed by stale job recovery.
Node Status Lifecycle
Joining → Active → Draining → (deregistered)
↓
Dead
| Status | Accepts Work | Description |
|---|---|---|
joining | No | Node starting up |
active | Yes | Healthy and processing |
draining | No | Shutting down gracefully |
dead | No | Missed heartbeat threshold |
Only active nodes receive traffic and claim jobs.
Under the Hood
Cryptographically Random Node IDs
Each node generates a UUID v4 on startup:
pub struct NodeId(pub Uuid);
impl NodeId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
Coordination is not required for node IDs, and collisions are unlikely in practice (2^122 possible values).
Session-Bound Advisory Locks
PostgreSQL advisory locks tie to the database session, not a lease timer:
- Lock held = connection alive
- Lock released = connection closed
- Explicit refresh is not required
- Clock skew issues are avoided because locks are connection-scoped
The database connection is the heartbeat. If the process crashes, the OS closes the TCP connection, PostgreSQL releases the lock.
Load Metrics
Nodes report load with each heartbeat:
UPDATE forge_nodes
SET current_connections = $2,
current_jobs = $3,
cpu_usage = $4,
memory_usage = $5,
last_heartbeat = NOW()
WHERE id = $1
Available for load-aware routing in future versions.
Competing Consumers Pattern
Multiple workers safely pull from the same job queue:
- Worker queries for pending jobs with
FOR UPDATE SKIP LOCKED - Row lock acquired on selected jobs
- Status updated to
claimedatomically - Other workers skip locked rows, grab different jobs
This avoids race conditions and duplicate processing without distributed coordination.
Multi-Node Caveats
Rate Limiting
Two backends ship out of the box, selected via [rate_limit] mode in forge.toml:
mode = "hybrid"(default):key = "user"andkey = "ip"use a per-node DashMap token bucket for low-latency DDoS protection. A user can make up torequests × node_countper window.key = "global"still uses a shared PostgreSQL counter. Right choice for traffic shaping and abuse mitigation.mode = "strict": every check hits the sharedforge_rate_limitstable. Counts are cluster-wide and exact. Use this when limits are billing-grade or quotas you contractually have to enforce. Slightly higher per-request latency; the table isUNLOGGEDso the write cost is small but it does add a round trip.
Both back the same RateLimiterBackend trait, so SDK consumers can plug in their own implementation if needed.
Workflow Drain on Boot
When you remove a deprecated workflow version's code and redeploy, the new binary won't have a handler for any in-flight runs of that version. The runtime detects this at boot, logs a warning per stranded (name, version) group, and flips /_api/ready to 503 with workflows: false until the stranded runs clear. The exact drain count is intentionally absent from the public probe payload — load balancers don't need it, and an unauthenticated probe shouldn't leak deployment state. Read the warning logs or query forge_workflow_runs directly when investigating.
Operators clear stuck runs directly in Postgres (no admin HTTP surface):
UPDATE forge_workflow_runs
SET status = 'retired_unresumable', resolution_reason = '...'
WHERE workflow_name = '...' AND workflow_version = '...'
AND status NOT IN ('completed','compensated','failed',
'retired_unresumable','cancelled_by_operator');
Within five seconds the next readiness probe re-runs the check and flips back to 200. See the workflow migration guide for the full deprecate→drain→remove flow.
OAuth State
OAuth 2.1 CSRF tokens and session state are stored in-memory on the node that initiated the flow. If a load balancer routes the OAuth callback to a different node, the CSRF check fails. For multi-instance deployments using MCP OAuth, configure sticky sessions for /_api/oauth/* paths or dedicate a single gateway node for MCP traffic.
Patterns
Minimal Cluster
Two nodes, all roles. One becomes scheduler leader.
# Both nodes
[cluster]
discovery = "postgres"
[node]
roles = ["gateway", "worker", "scheduler", "function"]
Separated Concerns
API servers handle HTTP. Workers process jobs. One scheduler.
# API nodes (3x)
[node]
roles = ["gateway", "function"]
# Worker nodes (5x)
[node]
roles = ["worker"]
worker_capabilities = ["general"]
# Scheduler node (1x, with backup)
[node]
roles = ["scheduler"]
Specialized Workers
Route jobs to appropriate hardware.
# GPU workers
[node]
roles = ["worker"]
worker_capabilities = ["gpu", "ml"]
# Media workers
[node]
roles = ["worker"]
worker_capabilities = ["media", "transcode"]
# General workers
[node]
roles = ["worker"]
worker_capabilities = ["general"]
Jobs with worker_capability = "gpu" only run on GPU workers.
Blue-Green Deploy
- Spin up new nodes (they register, start claiming jobs)
- Set old nodes to draining (
status = 'draining') - Wait for in-flight work to complete
- Terminate old nodes
New nodes handle traffic before old nodes finish draining to support rolling updates without dropped traffic.
Health Check
/_api/health is a liveness probe that always returns 200 with the current version. /_api/ready is the readiness probe that checks database connectivity and reactor status, returning 503 when the node is not ready.
# Kubernetes probes
livenessProbe:
httpGet:
path: /_api/health
port: 8080
readinessProbe:
httpGet:
path: /_api/ready
port: 8080
Scaling Guidelines
| Nodes | Jobs/sec | Notes |
|---|---|---|
| 1 | ~100 | Single node, all roles |
| 3 | ~300 | Minimal redundancy |
| 10 | ~1,000 | Production scale |
| 50+ | ~5,000+ | Separate worker pools |
Throughput scales linearly with workers. Database becomes the bottleneck around 100 nodes. At that point, consider read replicas and connection pooling.
Testing
Test clustering behavior with multiple context instances:
#[tokio::test]
async fn test_job_claimed_once() {
let pool = TestDatabase::from_env().await.unwrap();
// Simulate two workers
let worker1 = Uuid::new_v4();
let worker2 = Uuid::new_v4();
// Enqueue a job
let queue = JobQueue::new(pool.clone());
let job = JobRecord::new("test_job", json!({}), JobPriority::Normal, 3);
queue.enqueue(job).await.unwrap();
// Both workers try to claim
let claimed1 = queue.claim(worker1, &["general".into()], 1).await.unwrap();
let claimed2 = queue.claim(worker2, &["general".into()], 1).await.unwrap();
// Only one succeeds
assert_eq!(claimed1.len() + claimed2.len(), 1);
}
Leader Election Test
#[tokio::test]
async fn test_single_leader() {
let pool = TestDatabase::from_env().await.unwrap();
let election1 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);
let election2 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);
// First acquires
assert!(election1.try_become_leader().await.unwrap());
// Second fails
assert!(!election2.try_become_leader().await.unwrap());
// Release and second can acquire
election1.release_leadership().await.unwrap();
assert!(election2.try_become_leader().await.unwrap());
}