Skip to main content

Worker Pools

Route jobs to specialized workers based on capability requirements.

The Code

#[forge::job(worker_capability = "gpu")]
pub async fn train_model(ctx: &JobContext, args: TrainArgs) -> Result<Model> {
let dataset = load_dataset(&args.dataset_path).await?;
let model = run_training(&dataset, &args.hyperparams).await?;

ctx.progress(100, "Training complete")?;
Ok(model)
}
# forge.toml on GPU node
[node]
roles = ["worker"]
worker_capabilities = ["gpu", "general"]

What Happens

When a job specifies worker_capability = "gpu", Forge stores that requirement in the job record. Workers query for jobs matching their configured capabilities using PostgreSQL array operators. Jobs without a capability requirement run on any worker. Workers with multiple capabilities can claim jobs targeting any of them.

Configuration

Job Attribute

#[forge::job(worker_capability = "media")]
pub async fn transcode_video(ctx: &JobContext, args: TranscodeArgs) -> Result<Video> {
// Only workers with "media" capability will execute this
}

Node Configuration

[node]
worker_capabilities = ["media", "general"]
FieldTypeDefaultDescription
worker_capabilitiesstring[]["general"]Capabilities this node advertises

Patterns

Specialized GPU Pool

Run ML inference on dedicated GPU instances.

#[forge::job(worker_capability = "gpu", priority = "high")]
pub async fn generate_embeddings(ctx: &JobContext, args: EmbeddingArgs) -> Result<Vec<f32>> {
let model = load_model(&args.model_name).await?;
let embeddings = model.embed(&args.text).await?;
Ok(embeddings)
}

GPU worker node:

[node]
roles = ["worker"]
worker_capabilities = ["gpu"]

[worker]
max_concurrent_jobs = 4 # Limited by GPU memory

Media Processing Pool

Dedicate nodes to CPU-intensive transcoding.

#[forge::job(worker_capability = "media", timeout = "30m")]
pub async fn process_video(ctx: &JobContext, args: VideoArgs) -> Result<ProcessedVideo> {
ctx.progress(0, "Downloading source")?;
let source = download(&args.source_url).await?;

ctx.progress(25, "Transcoding")?;
let output = ffmpeg_transcode(&source, &args.format).await?;

ctx.progress(75, "Uploading result")?;
upload(&output, &args.destination).await?;

ctx.progress(100, "Complete")?;
Ok(ProcessedVideo { url: args.destination })
}

Media worker node:

[node]
roles = ["worker"]
worker_capabilities = ["media"]

[worker]
max_concurrent_jobs = 2 # Limit parallel transcodes
job_timeout_secs = 3600 # 1 hour max

High-Memory Analytics

Route memory-intensive batch jobs to larger instances.

#[forge::job(worker_capability = "analytics", timeout = "2h")]
pub async fn aggregate_metrics(ctx: &JobContext, args: AggregateArgs) -> Result<Report> {
let data = load_full_dataset(&args.date_range).await?; // May be 50GB+

let report = compute_aggregations(&data).await?;
ctx.heartbeat().await?;

Ok(report)
}

Multi-Capability Workers

A single node can serve multiple pools.

[node]
worker_capabilities = ["general", "media", "gpu"]

This worker claims jobs targeting general, media, or gpu. Useful for smaller deployments where dedicated pools are not yet needed.

Fallback to General Pool

Jobs without worker_capability run on any worker, including specialized ones.

#[forge::job]  // No capability specified
pub async fn send_email(ctx: &JobContext, args: EmailArgs) -> Result<()> {
// Runs on any worker: general, media, gpu, etc.
send(&args.to, &args.subject, &args.body).await
}

Under the Hood

Capability-Based Routing

Workers claim jobs by passing their capabilities as a PostgreSQL array:

WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($1) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $3, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *

The ANY($1) operator matches jobs where worker_capability is in the worker's capability array. Jobs with worker_capability IS NULL match any worker. SKIP LOCKED prevents thundering herd across all capability pools.

Bulkhead Isolation

Each capability pool operates independently:

  • GPU workers only claim GPU jobs (plus general jobs)
  • Media workers only claim media jobs (plus general jobs)
  • A surge in media jobs does not affect GPU job processing

This natural isolation prevents one workload from starving another. No configuration required beyond capability assignment.

Backpressure via Semaphore

Each worker maintains a semaphore limiting concurrent jobs:

let available = semaphore.available_permits();
if available == 0 {
continue; // At capacity, skip polling
}

let batch_size = available.min(config.batch_size);
let jobs = queue.claim(worker_id, &capabilities, batch_size).await?;

Workers only poll when they have capacity. Overloaded workers stop claiming. Jobs queue until capacity frees. This backpressure propagates naturally without coordination between nodes.

Priority Queue per Capability

Within each capability pool, jobs are ordered by priority:

ORDER BY priority DESC, scheduled_at ASC

A critical GPU job preempts a background GPU job. Priority operates within capability boundaries. Cross-capability priority comparison does not occur since workers claim from disjoint pools.

Scaling by Capability

Add workers to specific pools as demand grows:

# Scale GPU pool
FORGE_NODE_CAPABILITIES=gpu forge serve

# Scale media pool
FORGE_NODE_CAPABILITIES=media forge serve

# Scale general pool
FORGE_NODE_CAPABILITIES=general forge serve

Environment variables override forge.toml for container orchestration. Kubernetes can scale deployments per capability using separate node pools or taints/tolerations.

Testing

Test jobs execute without capability routing by default.

#[tokio::test]
async fn test_gpu_job() {
let ctx = TestJobContext::builder("train_model")
.build();

let args = TrainArgs {
dataset_path: "test_data".into(),
hyperparams: default_params(),
};

let result = train_model(&ctx, args).await;
assert!(result.is_ok());
}

Capability routing is a deployment concern. Tests verify job logic independent of which worker runs it.