Worker Pools
Define worker capability routing for specialized job execution.
The Code
#[forge::job(worker_capability = "gpu")]
pub async fn train_model(ctx: &JobContext, args: TrainArgs) -> Result<Model> {
let dataset = load_dataset(&args.dataset_path).await?;
let model = run_training(&dataset, &args.hyperparams).await?;
ctx.progress(100, "Training complete")?;
Ok(model)
}
# forge.toml on GPU node
[node]
roles = ["worker"]
worker_capabilities = ["gpu", "general"]
What Happens
When a job specifies worker_capability = "gpu", Forge stores that requirement in the job record. Workers query for jobs matching their configured capabilities using PostgreSQL array operators. Jobs without a capability requirement run on any worker. Workers with multiple capabilities can claim jobs targeting any of them.
Configuration
Job Attribute
#[forge::job(worker_capability = "media")]
pub async fn transcode_video(ctx: &JobContext, args: TranscodeArgs) -> Result<Video> {
// Only workers with "media" capability will execute this
}
Node Configuration
[node]
worker_capabilities = ["media", "general"]
| Field | Type | Default | Description |
|---|---|---|---|
worker_capabilities | string[] | ["general"] | Capabilities this node advertises |
Patterns
Specialized GPU Pool
Run ML inference on dedicated GPU instances.
#[forge::job(worker_capability = "gpu", priority = "high")]
pub async fn generate_embeddings(ctx: &JobContext, args: EmbeddingArgs) -> Result<Vec<f32>> {
let model = load_model(&args.model_name).await?;
let embeddings = model.embed(&args.text).await?;
Ok(embeddings)
}
GPU worker node:
[node]
roles = ["worker"]
worker_capabilities = ["gpu"]
[worker]
max_concurrent_jobs = 4 # Limited by GPU memory
Media Processing Pool
Dedicate nodes to CPU-intensive transcoding.
#[forge::job(worker_capability = "media", timeout = "30m")]
pub async fn process_video(ctx: &JobContext, args: VideoArgs) -> Result<ProcessedVideo> {
ctx.progress(0, "Downloading source")?;
let source = download(&args.source_url).await?;
ctx.progress(25, "Transcoding")?;
let output = ffmpeg_transcode(&source, &args.format).await?;
ctx.progress(75, "Uploading result")?;
upload(&output, &args.destination).await?;
ctx.progress(100, "Complete")?;
Ok(ProcessedVideo { url: args.destination })
}
Media worker node:
[node]
roles = ["worker"]
worker_capabilities = ["media"]
[worker]
max_concurrent_jobs = 2 # Limit parallel transcodes
job_timeout_secs = 3600 # 1 hour max
High-Memory Analytics
Route memory-intensive batch jobs to larger instances.
#[forge::job(worker_capability = "analytics", timeout = "2h")]
pub async fn aggregate_metrics(ctx: &JobContext, args: AggregateArgs) -> Result<Report> {
let data = load_full_dataset(&args.date_range).await?; // May be 50GB+
let report = compute_aggregations(&data).await?;
ctx.heartbeat().await?;
Ok(report)
}
Multi-Capability Workers
A single node can serve multiple pools.
[node]
worker_capabilities = ["general", "media", "gpu"]
This worker claims jobs targeting general, media, or gpu. Useful for smaller deployments where dedicated pools are not yet needed.
Fallback to General Pool
Jobs without worker_capability run on any worker, including specialized ones.
#[forge::job] // Capability not specified
pub async fn send_email(ctx: &JobContext, args: EmailArgs) -> Result<()> {
// Runs on any worker: general, media, gpu, etc.
send(&args.to, &args.subject, &args.body).await
}
Under the Hood
Capability-Based Routing
Workers claim jobs by passing their capabilities as a PostgreSQL array:
WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($2) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $3
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET status = 'claimed', worker_id = $1, claimed_at = NOW()
WHERE id IN (SELECT id FROM claimable)
RETURNING *
The ANY($1) operator matches jobs where worker_capability is in the worker's capability array. Jobs with worker_capability IS NULL match any worker. SKIP LOCKED reduces thundering herd contention across capability pools.
Bulkhead Isolation
Each capability pool operates independently:
- GPU workers only claim GPU jobs (plus general jobs)
- Media workers only claim media jobs (plus general jobs)
- A surge in media jobs does not affect GPU job processing
This natural isolation prevents one workload from starving another. Additional configuration is not required beyond capability assignment.
Backpressure via Semaphore
Each worker maintains a semaphore limiting concurrent jobs:
let available = semaphore.available_permits();
if available == 0 {
continue; // At capacity, skip polling
}
let batch_size = available.min(config.batch_size);
let jobs = queue.claim(worker_id, &capabilities, batch_size).await?;
Workers only poll when they have capacity. Overloaded workers stop claiming. Jobs queue until capacity frees. This backpressure propagates naturally without coordination between nodes.
Priority Queue per Capability
Within each capability pool, jobs are ordered by priority:
ORDER BY priority DESC, scheduled_at ASC
A critical GPU job preempts a background GPU job. Priority operates within capability boundaries. Cross-capability priority comparison does not occur since workers claim from disjoint pools.
Scaling by Capability
Add workers to specific pools as demand grows. Configure capabilities in each node's forge.toml:
# gpu-worker.toml
[node]
roles = ["worker"]
worker_capabilities = ["gpu"]
# media-worker.toml
[node]
roles = ["worker"]
worker_capabilities = ["media"]
# general-worker.toml
[node]
roles = ["worker"]
worker_capabilities = ["general"]
# Start a worker deployment with gpu-worker.toml mounted or copied as forge.toml
./my-app
The runtime binary does not provide a built-in --config flag. Kubernetes can scale deployments per capability using separate node pools or taints/tolerations with different forge.toml configs mounted via ConfigMaps.
Testing
Test jobs execute without capability routing by default.
#[tokio::test]
async fn test_gpu_job() {
let ctx = TestJobContext::builder("train_model")
.build();
let args = TrainArgs {
dataset_path: "test_data".into(),
hyperparams: default_params(),
};
let result = train_model(&ctx, args).await;
assert!(result.is_ok());
}
Capability routing is a deployment concern. Tests verify job logic independent of which worker runs it.