Skip to main content

Persistent Services

Define daemon services with leader election and crash recovery.

The Code

#[forge::daemon]
pub async fn trade_stream(ctx: &DaemonContext) -> Result<()> {
let (mut ws, _) = connect_async("wss://stream.binance.com/ws/btcusdt@trade").await?;

loop {
tokio::select! {
msg = ws.next() => {
if let Some(Ok(Message::Text(text))) = msg {
let trade: Trade = serde_json::from_str(&text)?;
save_trade(ctx.db(), &trade).await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

What Happens

The macro transforms your function into a singleton service that:

  • Acquires leadership via PostgreSQL advisory lock before running
  • Restarts automatically if the function panics or returns an error
  • Releases the lock on graceful shutdown, allowing another node to take over
  • Sends heartbeats to track liveness

A single instance runs across the cluster; other nodes remain in standby and can take over within seconds if the leader stops.

Attributes

AttributeTypeDefaultDescription
leader_electedbooltrueAcquire advisory lock before running
restart_on_panicbooltrueRestart if function panics or errors
restart_delay"duration""5s"Wait time before restart
startup_delay"duration""0s"Wait time before first execution
max_restartsu32unlimitedStop after N restarts

Duration format: "500ms", "5s", "10m", "1h"

Patterns

WebSocket Consumer

Stream data from external sources into your database.

#[forge::daemon(restart_on_panic = true, restart_delay = "5s")]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
let url = ctx.env_require("PRICE_FEED_URL")?;
let (mut ws, _) = connect_async(&url).await?;

loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
let price: PriceUpdate = serde_json::from_str(&text)?;
sqlx::query("INSERT INTO prices (symbol, price, ts) VALUES ($1, $2, NOW())")
.bind(&price.symbol)
.bind(price.price)
.execute(ctx.db())
.await?;
}
Some(Ok(Message::Close(_))) => break,
Some(Err(e)) => return Err(e.into()),
None => break,
_ => {}
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Polling Service

Check external systems on a regular interval.

#[forge::daemon(restart_delay = "30s")]
pub async fn sync_inventory(ctx: &DaemonContext) -> Result<()> {
let api_key = ctx.env_require("INVENTORY_API_KEY")?;

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
let stock = ctx.http()
.get("https://api.warehouse.com/stock")
.bearer_auth(&api_key)
.send()
.await?
.json::<Vec<StockLevel>>()
.await?;

for item in stock {
sqlx::query("UPDATE products SET stock = $1 WHERE sku = $2")
.bind(item.quantity)
.bind(&item.sku)
.execute(ctx.db())
.await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Long-Running Heartbeat

Send periodic signals to prevent timeout on external integrations.

#[forge::daemon]
pub async fn keep_session_alive(ctx: &DaemonContext) -> Result<()> {
let session_id = ctx.env_require("SESSION_ID")?;

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {
ctx.http()
.post("https://api.partner.com/heartbeat")
.json(&json!({ "session_id": session_id }))
.send()
.await?;

ctx.heartbeat().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Non-Leader Daemon

Run on every node, not as a singleton. Useful for node-local work.

#[forge::daemon(leader_elected = false)]
pub async fn local_cache_warmer(ctx: &DaemonContext) -> Result<()> {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(300)) => {
warm_local_cache().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Limited Restarts

Stop permanently after repeated failures.

#[forge::daemon(max_restarts = 5, restart_delay = "10s")]
pub async fn flaky_integration(ctx: &DaemonContext) -> Result<()> {
let client = FlakyExternalClient::connect().await?;

loop {
tokio::select! {
event = client.next_event() => {
process_event(ctx.db(), event?).await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

After 5 restarts, the daemon stops with status failed. Check the forge_daemons table for the last error.

Startup Delay

Wait for dependent services to initialize.

#[forge::daemon(startup_delay = "30s")]
pub async fn depends_on_other_services(ctx: &DaemonContext) -> Result<()> {
// Runs 30 seconds after application startup
loop {
tokio::select! {
_ = do_work() => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Context Methods

MethodReturn TypeDescription
ctx.db()&PgPoolDatabase connection pool
ctx.db_conn()DbConn<'_>Database connection as DbConn for shared helpers
ctx.http()&reqwest::ClientHTTP client
ctx.dispatch_job(name, args)Result<Uuid>Dispatch a background job
ctx.start_workflow(name, input)Result<Uuid>Start a workflow
ctx.shutdown_signal()Future<()>Resolves when shutdown requested
ctx.is_shutdown_requested()boolCheck if shutdown has been requested
ctx.heartbeat()Result<()>Update last heartbeat timestamp
ctx.env(key)Option<String>Environment variable
ctx.env_or(key, default)StringEnvironment variable with default
ctx.env_require(key)Result<String>Required environment variable
ctx.env_parse::<T>(key)Result<T>Parse environment variable
ctx.env_parse_or(key, default)TParse environment variable with default
ctx.env_contains(key)boolCheck if environment variable is set
ctx.trace_id()StringCurrent trace identifier
ctx.span()&SpanCurrent tracing span

Context Fields

FieldTypeDescription
ctx.daemon_nameStringDaemon identifier
ctx.instance_idUuidUnique ID for this execution

Under the Hood

Advisory Lock-Based Leadership

Leader election uses pg_try_advisory_lock():

SELECT pg_try_advisory_lock(hash('daemon_name'))

The lock ID is a hash of the daemon name. One node acquires the lock, others return false and wait. Coordination uses PostgreSQL advisory locks rather than distributed consensus protocols.

Session-Bound Locks

PostgreSQL advisory locks are tied to the database connection. The lock is held for the connection lifetime:

  • Connection alive = lock held
  • Connection drops = lock released automatically
  • Explicit lease refresh is not required
  • Heartbeat-based timeout coordination is not required

If a node crashes, PostgreSQL detects the dead connection and releases the lock. Another node acquires it within 5 seconds (the standby poll interval).

Automatic Failover

Node A: acquires lock → running → [crash]

Node B: waiting → waiting → acquires lock → running

Failover occurs without manual intervention. Coordination relies on the database instead of an external service discovery layer.

Graceful Shutdown

On SIGTERM:

  1. ctx.shutdown_signal() resolves
  2. Daemon breaks out of its loop
  3. Function returns Ok(())
  4. Lock is explicitly released via pg_advisory_unlock()
  5. Node deregisters from cluster

Advisory locks clear when connections drop, avoiding zombie locks and long waits for dead connection detection.

Restart Behavior

When a daemon panics or returns an error:

  1. Error is logged and recorded in forge_daemons table
  2. Restart counter increments
  3. If max_restarts reached, daemon stops with failed status
  4. Otherwise, wait restart_delay seconds
  5. Acquire leadership again (another node may have taken over)
  6. Execute daemon

The restart delay prevents tight crash loops from overwhelming logs and the database.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("price_feed")
.with_env("PRICE_FEED_URL", "wss://test.example.com")
.build();

// Spawn daemon
let handle = tokio::spawn(async move {
price_feed(&ctx).await
});

// Trigger shutdown
ctx.request_shutdown();

// Daemon should exit cleanly
let result = handle.await.unwrap();
assert!(result.is_ok());
}

#[tokio::test]
async fn test_daemon_with_mocked_http() {
let ctx = TestDaemonContext::builder("sync_inventory")
.with_env("INVENTORY_API_KEY", "test_key")
.mock_http_json("https://api.warehouse.com/*", json!([
{ "sku": "ABC", "quantity": 100 }
]))
.build();

// Run one iteration then shutdown
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
ctx.request_shutdown();
});

sync_inventory(&ctx).await.unwrap();

ctx.http().assert_called("https://api.warehouse.com/stock");
}

#[test]
fn test_shutdown_signal_control() {
let ctx = TestDaemonContext::builder("test").build();

assert!(!ctx.is_shutdown_requested());
ctx.request_shutdown();
assert!(ctx.is_shutdown_requested());
}
}

Test context builder methods:

  • .with_pool(pool) - Use real database
  • .with_instance_id(uuid) - Set specific instance ID
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable