Skip to main content

Multiple Nodes

Run Forge across multiple servers with automatic leader election, work distribution, and failure recovery.

The Code

# forge.toml
[cluster]
discovery = "postgres"
heartbeat_interval_secs = 5
dead_threshold_secs = 15

[node]
roles = ["gateway", "worker", "scheduler", "function"]
worker_capabilities = ["general"]
// Same application code runs on every node
#[forge::job]
pub async fn process_order(ctx: &JobContext, args: OrderArgs) -> Result<Order> {
// Any worker in the cluster can claim this job
charge_card(&args.payment).await?;
fulfill_order(&args.order_id).await
}

#[forge::cron("0 * * * *")]
pub async fn hourly_report(ctx: &CronContext) -> Result<()> {
// Only the scheduler leader runs this
generate_and_email_report(ctx.db()).await
}

#[forge::daemon]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
// Single instance across cluster via advisory lock
stream_prices(ctx).await
}

What Happens

Deploy the same binary to multiple servers. Each node registers itself in the forge_nodes table and begins sending heartbeats. Workers claim jobs using FOR UPDATE SKIP LOCKED. One node becomes the scheduler leader via PostgreSQL advisory lock. If a node stops sending heartbeats for 15 seconds, it is marked dead and its jobs are reclaimed.

No Raft. No Paxos. No Zookeeper. PostgreSQL provides the coordination.

Node Roles

RoleDescription
gatewayHTTP endpoints for client requests
functionExecutes queries and mutations
workerProcesses background jobs
schedulerRuns cron jobs (leader-only)

A single node can have all roles (default) or specialize:

# API servers: handle HTTP, execute functions, no background work
[node]
roles = ["gateway", "function"]

# Workers: process jobs, no HTTP traffic
[node]
roles = ["worker"]

# Scheduler: only one needed, runs crons
[node]
roles = ["scheduler"]

Role specialization is optional. Most deployments run all roles on every node.

Cluster Configuration

ParameterTypeDefaultDescription
heartbeat_interval_secsu645Heartbeat frequency
dead_threshold_secsu6415Time until node marked dead

Discovery

Nodes register in the forge_nodes table. No additional infrastructure required.

[cluster]
discovery = "postgres"

Every node queries the same database. They find each other automatically.

Leader Election

Leader election uses PostgreSQL advisory locks instead of distributed consensus.

SELECT pg_try_advisory_lock(0x464F52470001)

The lock ID is a hash of the leader role. One node acquires the lock. Others return false and wait. No quorum, no split-brain, no network partition drama.

Leader Roles

RoleLock IDPurpose
scheduler0x464F52470001Cron triggering, job assignment
metrics_aggregator0x464F52470002Metrics collection
log_compactor0x464F52470003Log compaction

Each role has an independent lock. Different nodes can lead different roles.

Automatic Failover

Advisory locks are session-bound. When a database connection closes, the lock releases automatically.

Node A: acquires lock → running → [crash]

PostgreSQL detects connection drop

Node B: waiting → acquires lock → running

Failover happens within the poll interval (5 seconds). No manual intervention.

Work Distribution

Workers claim jobs using the competing consumers pattern with FOR UPDATE SKIP LOCKED:

WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($1) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $3, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *

Each worker grabs different jobs. No coordination. No locking. No thundering herd. Job claiming is O(1) regardless of cluster size.

Add workers, throughput increases linearly.

Heartbeat and Failure Detection

Nodes send heartbeats every 5 seconds:

UPDATE forge_nodes
SET last_heartbeat = NOW()
WHERE id = $1

Nodes missing 3 consecutive heartbeats (15 seconds) are marked dead:

UPDATE forge_nodes
SET status = 'dead'
WHERE status = 'active'
AND last_heartbeat < NOW() - INTERVAL '15 seconds'

Dead node detection triggers:

  • Jobs from that worker are released back to pending
  • Leadership locks are released (connection closed)
  • Node is removed from load balancer rotation

The 15-second threshold distinguishes slow nodes from crashed nodes. A node under load might miss one heartbeat but not three.

Stale Job Recovery

Jobs claimed more than 5 minutes ago with no heartbeat are released:

UPDATE forge_jobs
SET status = 'pending', worker_id = NULL, claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - INTERVAL '5 minutes'

This runs every 60 seconds. Long-running jobs should call ctx.heartbeat() to prevent false reclamation.

A worker crash at 0:00 means the job is reclaimed by 0:05 at worst. Another worker picks it up immediately.

Graceful Shutdown

On SIGTERM, nodes drain cleanly:

1. Stop accepting new work     → status = 'draining'
2. Wait for in-flight jobs → up to 30 seconds
3. Release leadership locks → pg_advisory_unlock()
4. Deregister from cluster → DELETE FROM forge_nodes

Zero dropped requests during rolling deploys.

// Daemons receive shutdown signal
loop {
tokio::select! {
msg = work() => handle(msg).await?,
_ = ctx.shutdown_signal() => break,
}
}
// Function returns, lock releases, another node takes over

Drain Timeout

In-flight requests have 30 seconds to complete. After timeout, the node deregisters anyway. Jobs that were mid-execution will be reclaimed by stale job recovery.

Node Status Lifecycle

Joining → Active → Draining → (deregistered)

Dead
StatusAccepts WorkDescription
joiningNoNode starting up
activeYesHealthy and processing
drainingNoShutting down gracefully
deadNoMissed heartbeat threshold

Only active nodes receive traffic and claim jobs.

Under the Hood

Cryptographically Random Node IDs

Each node generates a UUID v4 on startup:

pub struct NodeId(pub Uuid);

impl NodeId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}

No coordination needed. No ID collision in practice (2^122 possible values).

Session-Bound Advisory Locks

PostgreSQL advisory locks tie to the database session, not a lease timer:

  • Lock held = connection alive
  • Lock released = connection closed
  • No explicit refresh needed
  • No clock skew issues

The database connection is the heartbeat. If the process crashes, the OS closes the TCP connection, PostgreSQL releases the lock.

Load Metrics

Nodes report load with each heartbeat:

UPDATE forge_nodes
SET current_connections = $2,
current_jobs = $3,
cpu_usage = $4,
memory_usage = $5,
last_heartbeat = NOW()
WHERE id = $1

Available for load-aware routing in future versions.

Competing Consumers Pattern

Multiple workers safely pull from the same job queue:

  1. Worker queries for pending jobs with FOR UPDATE SKIP LOCKED
  2. Row lock acquired on selected jobs
  3. Status updated to claimed atomically
  4. Other workers skip locked rows, grab different jobs

No race conditions. No duplicate processing. No distributed coordination.

Patterns

Minimal Cluster

Two nodes, all roles. One becomes scheduler leader.

# Both nodes
[cluster]
discovery = "postgres"

[node]
roles = ["gateway", "worker", "scheduler", "function"]

Separated Concerns

API servers handle HTTP. Workers process jobs. One scheduler.

# API nodes (3x)
[node]
roles = ["gateway", "function"]

# Worker nodes (5x)
[node]
roles = ["worker"]
worker_capabilities = ["general"]

# Scheduler node (1x, with backup)
[node]
roles = ["scheduler"]

Specialized Workers

Route jobs to appropriate hardware.

# GPU workers
[node]
roles = ["worker"]
worker_capabilities = ["gpu", "ml"]

# Media workers
[node]
roles = ["worker"]
worker_capabilities = ["media", "transcode"]

# General workers
[node]
roles = ["worker"]
worker_capabilities = ["general"]

Jobs with worker_capability = "gpu" only run on GPU workers.

Blue-Green Deploy

  1. Spin up new nodes (they register, start claiming jobs)
  2. Set old nodes to draining (status = 'draining')
  3. Wait for in-flight work to complete
  4. Terminate old nodes

New nodes handle traffic before old nodes finish draining. Zero downtime.

Health Check

/health returns 200 if node is active, 503 if draining or dead.

# Kubernetes readiness probe
readinessProbe:
httpGet:
path: /health
port: 8080

Scaling Guidelines

NodesJobs/secNotes
1~100Single node, all roles
3~300Minimal redundancy
10~1,000Production scale
50+~5,000+Separate worker pools

Throughput scales linearly with workers. Database becomes the bottleneck around 100 nodes. At that point, consider read replicas and connection pooling.

Testing

Test clustering behavior with multiple context instances:

#[tokio::test]
async fn test_job_claimed_once() {
let pool = TestDatabase::embedded().await;

// Simulate two workers
let worker1 = Uuid::new_v4();
let worker2 = Uuid::new_v4();

// Enqueue a job
let queue = JobQueue::new(pool.clone());
let job = JobRecord::new("test_job", json!({}), JobPriority::Normal, 3);
queue.enqueue(job).await.unwrap();

// Both workers try to claim
let claimed1 = queue.claim(worker1, &["general".into()], 1).await.unwrap();
let claimed2 = queue.claim(worker2, &["general".into()], 1).await.unwrap();

// Only one succeeds
assert_eq!(claimed1.len() + claimed2.len(), 1);
}

Leader Election Test

#[tokio::test]
async fn test_single_leader() {
let pool = TestDatabase::embedded().await;

let election1 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);

let election2 = LeaderElection::new(
pool.clone(),
NodeId::new(),
LeaderRole::Scheduler,
LeaderConfig::default(),
);

// First acquires
assert!(election1.try_become_leader().await.unwrap());

// Second fails
assert!(!election2.try_become_leader().await.unwrap());

// Release and second can acquire
election1.release_leadership().await.unwrap();
assert!(election2.try_become_leader().await.unwrap());
}