Multiple Nodes
Run Forge across multiple servers with automatic leader election, work distribution, and failure recovery.
The Code
# forge.toml
[cluster]
discovery = "postgres"
heartbeat_interval_secs = 5
dead_threshold_secs = 15
[node]
roles = ["gateway", "worker", "scheduler", "function"]
worker_capabilities = ["general"]
// Same application code runs on every node
#[forge::job]
pub async fn process_order(ctx: &JobContext, args: OrderArgs) -> Result<Order> {
// Any worker in the cluster can claim this job
charge_card(&args.payment).await?;
fulfill_order(&args.order_id).await
}
#[forge::cron("0 * * * *")]
pub async fn hourly_report(ctx: &CronContext) -> Result<()> {
// Only the scheduler leader runs this
generate_and_email_report(ctx.db()).await
}
#[forge::daemon]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
// Single instance across cluster via advisory lock
stream_prices(ctx).await
}
What Happens
Deploy the same binary to multiple servers. Each node registers itself in the forge_nodes table and begins sending heartbeats. Workers claim jobs using FOR UPDATE SKIP LOCKED. One node becomes the scheduler leader via PostgreSQL advisory lock. If a node stops sending heartbeats for 15 seconds, it is marked dead and its jobs are reclaimed.
No Raft. No Paxos. No Zookeeper. PostgreSQL provides the coordination.
Node Roles
| Role | Description |
|---|---|
gateway | HTTP endpoints for client requests |
function | Executes queries and mutations |
worker | Processes background jobs |
scheduler | Runs cron jobs (leader-only) |
A single node can have all roles (default) or specialize:
# API servers: handle HTTP, execute functions, no background work
[node]
roles = ["gateway", "function"]
# Workers: process jobs, no HTTP traffic
[node]
roles = ["worker"]
# Scheduler: only one needed, runs crons
[node]
roles = ["scheduler"]
Role specialization is optional. Most deployments run all roles on every node.
Cluster Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
heartbeat_interval_secs | u64 | 5 | Heartbeat frequency |
dead_threshold_secs | u64 | 15 | Time until node marked dead |
Discovery
Nodes register in the forge_nodes table. No additional infrastructure required.
[cluster]
discovery = "postgres"
Every node queries the same database. They find each other automatically.
Leader Election
Leader election uses PostgreSQL advisory locks instead of distributed consensus.
SELECT pg_try_advisory_lock(0x464F52470001)
The lock ID is a hash of the leader role. One node acquires the lock. Others return false and wait. No quorum, no split-brain, no network partition drama.
Leader Roles
| Role | Lock ID | Purpose |
|---|---|---|
scheduler | 0x464F52470001 | Cron triggering, job assignment |
metrics_aggregator | 0x464F52470002 | Metrics collection |
log_compactor | 0x464F52470003 | Log compaction |
Each role has an independent lock. Different nodes can lead different roles.
Automatic Failover
Advisory locks are session-bound. When a database connection closes, the lock releases automatically.
Node A: acquires lock → running → [crash]
↓
PostgreSQL detects connection drop
↓
Node B: waiting → acquires lock → running
Failover happens within the poll interval (5 seconds). No manual intervention.
Work Distribution
Workers claim jobs using the competing consumers pattern with FOR UPDATE SKIP LOCKED:
WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($1) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $3, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *
Each worker grabs different jobs. No coordination. No locking. No thundering herd. Job claiming is O(1) regardless of cluster size.
Add workers, throughput increases linearly.
Heartbeat and Failure Detection
Nodes send heartbeats every 5 seconds:
UPDATE forge_nodes
SET last_heartbeat = NOW()
WHERE id = $1
Nodes missing 3 consecutive heartbeats (15 seconds) are marked dead:
UPDATE forge_nodes
SET status = 'dead'
WHERE status = 'active'
AND last_heartbeat < NOW() - INTERVAL '15 seconds'
Dead node detection triggers:
- Jobs from that worker are released back to pending
- Leadership locks are released (connection closed)
- Node is removed from load balancer rotation
The 15-second threshold distinguishes slow nodes from crashed nodes. A node under load might miss one heartbeat but not three.
Stale Job Recovery
Jobs claimed more than 5 minutes ago with no heartbeat are released:
UPDATE forge_jobs
SET status = 'pending', worker_id = NULL, claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - INTERVAL '5 minutes'
This runs every 60 seconds. Long-running jobs should call ctx.heartbeat() to prevent false reclamation.
A worker crash at 0:00 means the job is reclaimed by 0:05 at worst. Another worker picks it up immediately.
Graceful Shutdown
On SIGTERM, nodes drain cleanly:
1. Stop accepting new work → status = 'draining'
2. Wait for in-flight jobs → up to 30 seconds
3. Release leadership locks → pg_advisory_unlock()
4. Deregister from cluster → DELETE FROM forge_nodes
Zero dropped requests during rolling deploys.
// Daemons receive shutdown signal
loop {
tokio::select! {
msg = work() => handle(msg).await?,
_ = ctx.shutdown_signal() => break,
}
}
// Function returns, lock releases, another node takes over
Drain Timeout
In-flight requests have 30 seconds to complete. After timeout, the node deregisters anyway. Jobs that were mid-execution will be reclaimed by stale job recovery.
Node Status Lifecycle
Joining → Active → Draining → (deregistered)
↓
Dead
| Status | Accepts Work | Description |
|---|---|---|
joining | No | Node starting up |
active | Yes | Healthy and processing |
draining | No | Shutting down gracefully |
dead | No | Missed heartbeat threshold |
Only active nodes receive traffic and claim jobs.
Under the Hood
Cryptographically Random Node IDs
Each node generates a UUID v4 on startup:
pub struct NodeId(pub Uuid);
impl NodeId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
No coordination needed. No ID collision in practice (2^122 possible values).
Session-Bound Advisory Locks
PostgreSQL advisory locks tie to the database session, not a lease timer:
- Lock held = connection alive
- Lock released = connection closed
- No explicit refresh needed
- No clock skew issues
The database connection is the heartbeat. If the process crashes, the OS closes the TCP connection, PostgreSQL releases the lock.
Load Metrics
Nodes report load with each heartbeat:
UPDATE forge_nodes
SET current_connections = $2,
current_jobs = $3,
cpu_usage = $4,
memory_usage = $5,
last_heartbeat = NOW()
WHERE id = $1
Available for load-aware routing in future versions.
Competing Consumers Pattern
Multiple workers safely pull from the same job queue:
- Worker queries for pending jobs with
FOR UPDATE SKIP LOCKED - Row lock acquired on selected jobs
- Status updated to
claimedatomically - Other workers skip locked rows, grab different jobs
No race conditions. No duplicate processing. No distributed coordination.
Patterns
Minimal Cluster
Two nodes, all roles. One becomes scheduler leader.
# Both nodes
[cluster]
discovery = "postgres"
[node]
roles = ["gateway", "worker", "scheduler", "function"]
Separated Concerns
API servers handle HTTP. Workers process jobs. One scheduler.
# API nodes (3x)
[node]
roles = ["gateway", "function"]
# Worker nodes (5x)
[node]
roles = ["worker"]
worker_capabilities = ["general"]
# Scheduler node (1x, with backup)
[node]
roles = ["scheduler"]
Specialized Workers
Route jobs to appropriate hardware.
# GPU workers
[node]
roles = ["worker"]
worker_capabilities = ["gpu", "ml"]
# Media workers
[node]
roles = ["worker"]
worker_capabilities = ["media", "transcode"]
# General workers
[node]
roles = ["worker"]
worker_capabilities = ["general"]
Jobs with worker_capability = "gpu" only run on GPU workers.
Blue-Green Deploy
- Spin up new nodes (they register, start claiming jobs)
- Set old nodes to draining (
status = 'draining') - Wait for in-flight work to complete
- Terminate old nodes
New nodes handle traffic before old nodes finish draining. Zero downtime.
Health Check
/health returns 200 if node is active, 503 if draining or dead.
# Kubernetes readiness probe
readinessProbe:
httpGet:
path: /health
port: 8080
Scaling Guidelines
| Nodes | Jobs/sec | Notes |
|---|---|---|
| 1 | ~100 | Single node, all roles |
| 3 | ~300 | Minimal redundancy |
| 10 | ~1,000 | Production scale |
| 50+ | ~5,000+ | Separate worker pools |
Throughput scales linearly with workers. Database becomes the bottleneck around 100 nodes. At that point, consider read replicas and connection pooling.
Testing
Test clustering behavior with multiple context instances:
#[tokio::test]
async fn test_job_claimed_once() {
let pool = TestDatabase::embedded().await;
// Simulate two workers
let worker1 = Uuid::new_v4();
let worker2 = Uuid::new_v4();
// Enqueue a job
let queue = JobQueue::new(pool.clone());
let job = JobRecord::new("test_job", json!({}), JobPriority::Normal, 3);
queue.enqueue(job).await.unwrap();
// Both workers try to claim
let claimed1 = queue.claim(worker1, &["general".into()], 1).await.unwrap();
let claimed2 = queue.claim(worker2, &["general".into()], 1).await.unwrap();
// Only one succeeds
assert_eq!(claimed1.len() + claimed2.len(), 1);
}
Leader Election Test
#[tokio::test]
async fn test_single_leader() {
let pool = TestDatabase::embedded().await;
let election1 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);
let election2 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);
// First acquires
assert!(election1.try_become_leader().await.unwrap());
// Second fails
assert!(!election2.try_become_leader().await.unwrap());
// Release and second can acquire
election1.release_leadership().await.unwrap();
assert!(election2.try_become_leader().await.unwrap());
}