Persistent Services
Run always-on background processes with automatic leader election and crash recovery.
The Code
#[forge::daemon]
pub async fn trade_stream(ctx: &DaemonContext) -> Result<()> {
let (mut ws, _) = connect_async("wss://stream.binance.com/ws/btcusdt@trade").await?;
loop {
tokio::select! {
msg = ws.next() => {
if let Some(Ok(Message::Text(text))) = msg {
let trade: Trade = serde_json::from_str(&text)?;
save_trade(ctx.db(), &trade).await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
What Happens
The macro transforms your function into a singleton service that:
- Acquires leadership via PostgreSQL advisory lock before running
- Restarts automatically if the function panics or returns an error
- Releases the lock on graceful shutdown, allowing another node to take over
- Sends heartbeats to track liveness
Only one instance runs across your entire cluster. Other nodes wait in standby, ready to take over within seconds if the leader dies.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
leader_elected | bool | true | Acquire advisory lock before running |
restart_on_panic | bool | true | Restart if function panics or errors |
restart_delay | "duration" | "5s" | Wait time before restart |
startup_delay | "duration" | "0s" | Wait time before first execution |
max_restarts | u32 | unlimited | Stop after N restarts |
Duration format: "500ms", "5s", "10m", "1h"
Patterns
WebSocket Consumer
Stream data from external sources into your database.
#[forge::daemon(restart_on_panic = true, restart_delay = "5s")]
pub async fn price_feed(ctx: &DaemonContext) -> Result<()> {
let url = ctx.env_require("PRICE_FEED_URL")?;
let (mut ws, _) = connect_async(&url).await?;
loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
let price: PriceUpdate = serde_json::from_str(&text)?;
sqlx::query("INSERT INTO prices (symbol, price, ts) VALUES ($1, $2, NOW())")
.bind(&price.symbol)
.bind(price.price)
.execute(ctx.db())
.await?;
}
Some(Ok(Message::Close(_))) => break,
Some(Err(e)) => return Err(e.into()),
None => break,
_ => {}
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
Polling Service
Check external systems on a regular interval.
#[forge::daemon(restart_delay = "30s")]
pub async fn sync_inventory(ctx: &DaemonContext) -> Result<()> {
let api_key = ctx.env_require("INVENTORY_API_KEY")?;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
let stock = ctx.http()
.get("https://api.warehouse.com/stock")
.bearer_auth(&api_key)
.send()
.await?
.json::<Vec<StockLevel>>()
.await?;
for item in stock {
sqlx::query("UPDATE products SET stock = $1 WHERE sku = $2")
.bind(item.quantity)
.bind(&item.sku)
.execute(ctx.db())
.await?;
}
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
Long-Running Heartbeat
Send periodic signals to prevent timeout on external integrations.
#[forge::daemon]
pub async fn keep_session_alive(ctx: &DaemonContext) -> Result<()> {
let session_id = ctx.env_require("SESSION_ID")?;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {
ctx.http()
.post("https://api.partner.com/heartbeat")
.json(&json!({ "session_id": session_id }))
.send()
.await?;
ctx.heartbeat().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
Non-Leader Daemon
Run on every node, not as a singleton. Useful for node-local work.
#[forge::daemon(leader_elected = false)]
pub async fn local_cache_warmer(ctx: &DaemonContext) -> Result<()> {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(300)) => {
warm_local_cache().await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
Limited Restarts
Stop permanently after repeated failures.
#[forge::daemon(max_restarts = 5, restart_delay = "10s")]
pub async fn flaky_integration(ctx: &DaemonContext) -> Result<()> {
let client = FlakyExternalClient::connect().await?;
loop {
tokio::select! {
event = client.next_event() => {
process_event(ctx.db(), event?).await?;
}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
After 5 restarts, the daemon stops with status failed. Check the forge_daemons table for the last error.
Startup Delay
Wait for dependent services to initialize.
#[forge::daemon(startup_delay = "30s")]
pub async fn depends_on_other_services(ctx: &DaemonContext) -> Result<()> {
// Runs 30 seconds after application startup
loop {
tokio::select! {
_ = do_work() => {}
_ = ctx.shutdown_signal() => break,
}
}
Ok(())
}
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | &PgPool | Database connection pool |
ctx.http() | &reqwest::Client | HTTP client |
ctx.shutdown_signal() | Future<()> | Resolves when shutdown requested |
ctx.heartbeat() | Result<()> | Update last heartbeat timestamp |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_require(key) | Result<String> | Required environment variable |
Context Fields
| Field | Type | Description |
|---|---|---|
ctx.daemon_name | String | Daemon identifier |
ctx.instance_id | Uuid | Unique ID for this execution |
Under the Hood
Advisory Lock-Based Leadership
Leader election uses pg_try_advisory_lock():
SELECT pg_try_advisory_lock(hash('daemon_name'))
The lock ID is a hash of the daemon name. One node acquires the lock, others return false and wait. No Raft, no Paxos, no distributed consensus protocol.
Session-Bound Locks
PostgreSQL advisory locks are tied to the database connection. The lock is held for the connection lifetime:
- Connection alive = lock held
- Connection drops = lock released automatically
- No explicit lease refresh needed
- No heartbeat-based timeout coordination
If a node crashes, PostgreSQL detects the dead connection and releases the lock. Another node acquires it within 5 seconds (the standby poll interval).
Automatic Failover
Node A: acquires lock → running → [crash]
↓
Node B: waiting → waiting → acquires lock → running
No manual intervention. No external service discovery. The database handles coordination.
Graceful Shutdown
On SIGTERM:
ctx.shutdown_signal()resolves- Daemon breaks out of its loop
- Function returns
Ok(()) - Lock is explicitly released via
pg_advisory_unlock() - Node deregisters from cluster
No zombie locks. No 5-minute wait for dead connection detection.
Restart Behavior
When a daemon panics or returns an error:
- Error is logged and recorded in
forge_daemonstable - Restart counter increments
- If
max_restartsreached, daemon stops withfailedstatus - Otherwise, wait
restart_delayseconds - Acquire leadership again (another node may have taken over)
- Execute daemon
The restart delay prevents tight crash loops from overwhelming logs and the database.
Testing
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
#[tokio::test]
async fn test_daemon_shutdown() {
let ctx = TestDaemonContext::builder("price_feed")
.with_env("PRICE_FEED_URL", "wss://test.example.com")
.build();
// Spawn daemon
let handle = tokio::spawn(async move {
price_feed(&ctx).await
});
// Trigger shutdown
ctx.request_shutdown();
// Daemon should exit cleanly
let result = handle.await.unwrap();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_daemon_with_mocked_http() {
let ctx = TestDaemonContext::builder("sync_inventory")
.with_env("INVENTORY_API_KEY", "test_key")
.mock_http_json("https://api.warehouse.com/*", json!([
{ "sku": "ABC", "quantity": 100 }
]))
.build();
// Run one iteration then shutdown
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
ctx.request_shutdown();
});
sync_inventory(&ctx).await.unwrap();
ctx.http().assert_called("https://api.warehouse.com/stock");
}
#[test]
fn test_shutdown_signal_control() {
let ctx = TestDaemonContext::builder("test").build();
assert!(!ctx.is_shutdown_requested());
ctx.request_shutdown();
assert!(ctx.is_shutdown_requested());
}
}
Test context builder methods:
.with_pool(pool)- Use real database.with_instance_id(uuid)- Set specific instance ID.mock_http(pattern, handler)- Mock HTTP responses.mock_http_json(pattern, value)- Mock JSON response.with_env(key, value)- Mock environment variable