Skip to main content

Persistent Services

Run always-on background processes with automatic leader election and crash recovery.

The Code

#[forge::daemon]
pub async fn trade_stream(ctx: &DaemonContext) -> Result<()> {
let (mut ws, _) = connect_async("wss://stream.binance.com/ws/btcusdt@trade").await?;

loop {
tokio::select! {
msg = ws.next() => {
if let Some(Ok(Message::Text(text))) = msg {
let trade: Trade = serde_json::from_str(&text)?;
save_trade(ctx.db(), &trade).await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

What Happens

The macro transforms your function into a singleton service that:

  • Acquires leadership via PostgreSQL advisory lock before running
  • Restarts automatically if the function panics or returns an error
  • Releases the lock on graceful shutdown, allowing another node to take over
  • Sends heartbeats to track liveness

Only one instance runs across your entire cluster. Other nodes wait in standby, ready to take over within seconds if the leader dies.

Attributes

AttributeTypeDefaultDescription
leader_electedbooltrueAcquire advisory lock before running
restart_on_panicbooltrueRestart if function panics or errors
restart_delay"duration""5s"Wait time before restart
startup_delay"duration""0s"Wait time before first execution
max_restartsu32unlimitedStop after N restarts

Duration format: "500ms", "5s", "10m", "1h"

Patterns

WebSocket Consumer

Stream data from external sources into your database.

#[forge::daemon(restart_on_panic = true, restart_delay = "5s")]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
let url = ctx.env_require("PRICE_FEED_URL")?;
let (mut ws, _) = connect_async(&url).await?;

loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
let price: PriceUpdate = serde_json::from_str(&text)?;
sqlx::query("INSERT INTO prices (symbol, price, ts) VALUES ($1, $2, NOW())")
.bind(&price.symbol)
.bind(price.price)
.execute(ctx.db())
.await?;
}
Some(Ok(Message::Close(_))) => break,
Some(Err(e)) => return Err(e.into()),
None => break,
_ => {}
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Polling Service

Check external systems on a regular interval.

#[forge::daemon(restart_delay = "30s")]
pub async fn sync_inventory(ctx: &DaemonContext) -> Result<()> {
let api_key = ctx.env_require("INVENTORY_API_KEY")?;

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
let stock = ctx.http()
.get("https://api.warehouse.com/stock")
.bearer_auth(&api_key)
.send()
.await?
.json::<Vec<StockLevel>>()
.await?;

for item in stock {
sqlx::query("UPDATE products SET stock = $1 WHERE sku = $2")
.bind(item.quantity)
.bind(&item.sku)
.execute(ctx.db())
.await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Long-Running Heartbeat

Send periodic signals to prevent timeout on external integrations.

#[forge::daemon]
pub async fn keep_session_alive(ctx: &DaemonContext) -> Result<()> {
let session_id = ctx.env_require("SESSION_ID")?;

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {
ctx.http()
.post("https://api.partner.com/heartbeat")
.json(&json!({ "session_id": session_id }))
.send()
.await?;

ctx.heartbeat().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Non-Leader Daemon

Run on every node, not as a singleton. Useful for node-local work.

#[forge::daemon(leader_elected = false)]
pub async fn local_cache_warmer(ctx: &DaemonContext) -> Result<()> {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(300)) => {
warm_local_cache().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Limited Restarts

Stop permanently after repeated failures.

#[forge::daemon(max_restarts = 5, restart_delay = "10s")]
pub async fn flaky_integration(ctx: &DaemonContext) -> Result<()> {
let client = FlakyExternalClient::connect().await?;

loop {
tokio::select! {
event = client.next_event() => {
process_event(ctx.db(), event?).await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

After 5 restarts, the daemon stops with status failed. Check the forge_daemons table for the last error.

Startup Delay

Wait for dependent services to initialize.

#[forge::daemon(startup_delay = "30s")]
pub async fn depends_on_other_services(ctx: &DaemonContext) -> Result<()> {
// Runs 30 seconds after application startup
loop {
tokio::select! {
_ = do_work() => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}

Context Methods

MethodReturn TypeDescription
ctx.db()&PgPoolDatabase connection pool
ctx.http()&reqwest::ClientHTTP client
ctx.shutdown_signal()Future<()>Resolves when shutdown requested
ctx.heartbeat()Result<()>Update last heartbeat timestamp
ctx.env(key)Option<String>Environment variable
ctx.env_require(key)Result<String>Required environment variable

Context Fields

FieldTypeDescription
ctx.daemon_nameStringDaemon identifier
ctx.instance_idUuidUnique ID for this execution

Under the Hood

Advisory Lock-Based Leadership

Leader election uses pg_try_advisory_lock():

SELECT pg_try_advisory_lock(hash('daemon_name'))

The lock ID is a hash of the daemon name. One node acquires the lock, others return false and wait. No Raft, no Paxos, no distributed consensus protocol.

Session-Bound Locks

PostgreSQL advisory locks are tied to the database connection. The lock is held for the connection lifetime:

  • Connection alive = lock held
  • Connection drops = lock released automatically
  • No explicit lease refresh needed
  • No heartbeat-based timeout coordination

If a node crashes, PostgreSQL detects the dead connection and releases the lock. Another node acquires it within 5 seconds (the standby poll interval).

Automatic Failover

Node A: acquires lock → running → [crash]

Node B: waiting → waiting → acquires lock → running

No manual intervention. No external service discovery. The database handles coordination.

Graceful Shutdown

On SIGTERM:

  1. ctx.shutdown_signal() resolves
  2. Daemon breaks out of its loop
  3. Function returns Ok(())
  4. Lock is explicitly released via pg_advisory_unlock()
  5. Node deregisters from cluster

No zombie locks. No 5-minute wait for dead connection detection.

Restart Behavior

When a daemon panics or returns an error:

  1. Error is logged and recorded in forge_daemons table
  2. Restart counter increments
  3. If max_restarts reached, daemon stops with failed status
  4. Otherwise, wait restart_delay seconds
  5. Acquire leadership again (another node may have taken over)
  6. Execute daemon

The restart delay prevents tight crash loops from overwhelming logs and the database.

Testing

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;

#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("price_feed")
.with_env("PRICE_FEED_URL", "wss://test.example.com")
.build();

// Spawn daemon
let handle = tokio::spawn(async move {
price_feed(&ctx).await
});

// Trigger shutdown
ctx.request_shutdown();

// Daemon should exit cleanly
let result = handle.await.unwrap();
assert!(result.is_ok());
}

#[tokio::test]
async fn test_daemon_with_mocked_http() {
let ctx = TestDaemonContext::builder("sync_inventory")
.with_env("INVENTORY_API_KEY", "test_key")
.mock_http_json("https://api.warehouse.com/*", json!([
{ "sku": "ABC", "quantity": 100 }
]))
.build();

// Run one iteration then shutdown
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
ctx.request_shutdown();
});

sync_inventory(&ctx).await.unwrap();

ctx.http().assert_called("https://api.warehouse.com/stock");
}

#[test]
fn test_shutdown_signal_control() {
let ctx = TestDaemonContext::builder("test").build();

assert!(!ctx.is_shutdown_requested());
ctx.request_shutdown();
assert!(ctx.is_shutdown_requested());
}
}

Test context builder methods:

  • .with_pool(pool) - Use real database
  • .with_instance_id(uuid) - Set specific instance ID
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable