Skip to main content

Scheduled Tasks

Run functions on a cron schedule with timezone support, catch-up runs, and exactly-once execution across your cluster.

The Code

use forge::prelude::*;

#[forge::cron("0 9 * * *", timezone = "America/New_York")]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
ctx.log.info("Generating report", json!({"run_id": ctx.run_id.to_string()}));

let stats = sqlx::query_as::<_, DailyStats>(
"SELECT count(*) as orders, sum(total) as revenue FROM orders WHERE created_at::date = current_date"
)
.fetch_one(ctx.db())
.await?;

ctx.http()
.post("https://api.slack.com/chat.postMessage")
.bearer_auth(ctx.env_require("SLACK_TOKEN")?)
.json(&json!({
"channel": "#daily-metrics",
"text": format!("Orders: {}, Revenue: ${:.2}", stats.orders, stats.revenue)
}))
.send()
.await?;

Ok(())
}

What Happens

The scheduler runs every second, checking for due cron executions. When your cron's scheduled time arrives:

  1. One node claims the run via INSERT ... ON CONFLICT DO NOTHING
  2. The database's unique constraint ensures only one node wins
  3. The cron executes with full context: database, HTTP, logging
  4. Status updates to completed or failed

No distributed locks. No consensus protocols. PostgreSQL handles coordination.

Attributes

AttributeTypeDefaultDescription
Schedule"cron expression"requiredStandard cron (5-part: m h d M w) or extended (6-part with seconds)
timezone"tz""UTC"IANA timezone (e.g., "America/New_York", "Europe/London")
timeout"duration""1h"Maximum execution time ("30s", "5m", "1h")
catch_upflagfalseRun missed executions after downtime
catch_up_limitu3210Maximum catch-up runs per tick

Cron Expression Format

Standard 5-part expressions are normalized to 6-part by prepending 0 for seconds:

"0 9 * * *"       // Daily at 9:00 AM -> "0 0 9 * * *"
"*/5 * * * *" // Every 5 minutes -> "0 */5 * * * *"
"0 0 1 * *" // Monthly on the 1st -> "0 0 0 1 * *"
"0 12 * * 1-5" // Weekdays at noon -> "0 0 12 * * 1-5"

Extended 6-part expressions include seconds:

"30 * * * * *"    // Every minute at :30 seconds
"0 */5 * * * *" // Every 5 minutes at :00 seconds

Patterns

Every Minute

#[forge::cron("* * * * *")]
pub async fn heartbeat(ctx: &CronContext) -> Result<()> {
sqlx::query("UPDATE system_status SET last_heartbeat = NOW()")
.execute(ctx.db())
.await?;
Ok(())
}

Daily at Specific Time in Timezone

#[forge::cron("0 9 * * *", timezone = "Europe/London")]
pub async fn morning_summary(ctx: &CronContext) -> Result<()> {
// Runs at 9:00 AM London time, adjusting for DST
ctx.log.info("Morning summary", json!({"tz": ctx.timezone}));
Ok(())
}

Weekly with Catch-Up

Run missed executions after server downtime. If your server was down for 3 hours and missed 3 hourly runs, they execute on startup.

#[forge::cron("0 0 * * 0", catch_up, catch_up_limit = 4)]
pub async fn weekly_cleanup(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
ctx.log.info("Catch-up run", json!({"scheduled": ctx.scheduled_time.to_rfc3339()}));
}

sqlx::query("DELETE FROM sessions WHERE expires_at < NOW() - interval '7 days'")
.execute(ctx.db())
.await?;

Ok(())
}

Detect Late Execution

Check if execution started more than 1 minute after scheduled time:

#[forge::cron("*/15 * * * *", timeout = "5m")]
pub async fn sync_external_data(ctx: &CronContext) -> Result<()> {
if ctx.is_late() {
ctx.log.warn("Running late", json!({
"delay_seconds": ctx.delay().num_seconds(),
"scheduled": ctx.scheduled_time.to_rfc3339()
}));
}

// Continue with sync...
Ok(())
}

Long-Running with Extended Timeout

#[forge::cron("0 2 * * *", timeout = "2h")]
pub async fn nightly_data_export(ctx: &CronContext) -> Result<()> {
ctx.log.info("Starting export", json!({}));

let rows = sqlx::query_as::<_, ExportRow>(
"SELECT * FROM analytics WHERE exported = false LIMIT 100000"
)
.fetch_all(ctx.db())
.await?;

for chunk in rows.chunks(1000) {
upload_to_s3(ctx, chunk).await?;
}

ctx.log.info("Export complete", json!({"rows": rows.len()}));
Ok(())
}

Call External APIs

#[forge::cron("* * * * *", timezone = "UTC")]
pub async fn poll_iss_location(ctx: &CronContext) -> Result<()> {
let response = ctx.http()
.get("http://api.open-notify.org/iss-now.json")
.send()
.await
.map_err(|e| ForgeError::Internal(format!("HTTP failed: {}", e)))?;

if !response.status().is_success() {
ctx.log.error("API error", json!({"status": response.status().as_u16()}));
return Err(ForgeError::Internal("API request failed".into()));
}

let data: IssResponse = response.json().await?;

sqlx::query("INSERT INTO iss_locations (latitude, longitude) VALUES ($1, $2)")
.bind(data.position.latitude)
.bind(data.position.longitude)
.execute(ctx.db())
.await?;

Ok(())
}

Context Fields

FieldTypeDescription
run_idUuidUnique identifier for this cron run
cron_nameStringFunction name (e.g., "daily_report")
scheduled_timeDateTime<Utc>When the cron was supposed to run
execution_timeDateTime<Utc>When execution actually started
timezoneStringConfigured timezone
is_catch_upbooltrue if this is a catch-up run
authAuthContextAuthentication context (unauthenticated by default)
logCronLogStructured logger

Context Methods

MethodReturn TypeDescription
ctx.db()&PgPoolDatabase connection pool
ctx.http()&reqwest::ClientHTTP client
ctx.delay()DurationTime between scheduled and actual execution
ctx.is_late()booltrue if delay exceeds 1 minute
ctx.env(key)Option<String>Environment variable
ctx.env_require(key)Result<String>Required environment variable
ctx.with_auth(auth)SelfSet authentication context

Logging

The ctx.log field provides structured logging:

ctx.log.info("Message", json!({"key": "value"}));
ctx.log.warn("Warning", json!({"count": 42}));
ctx.log.error("Failed", json!({"error": e.to_string()}));
ctx.log.debug("Debug info", json!({}));

Each log entry includes the cron name for correlation.

Under the Hood

Exactly-Once via Unique Constraint

The forge_cron_runs table enforces one execution per scheduled time:

CREATE TABLE forge_cron_runs (
id UUID PRIMARY KEY,
cron_name TEXT NOT NULL,
scheduled_time TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL,
node_id UUID,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error TEXT,
UNIQUE(cron_name, scheduled_time)
);

When the scheduler ticks:

INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
VALUES ($1, $2, $3, 'running', $4, NOW())
ON CONFLICT (cron_name, scheduled_time) DO NOTHING

Multiple nodes can attempt the same insert. PostgreSQL's unique constraint ensures exactly one succeeds. The node that wins (rows_affected > 0) executes the cron.

Leader-Only Scheduling

Only cluster leaders run the scheduler tick. Follower nodes skip cron processing entirely. Leadership is determined via PostgreSQL advisory locks (see Multiple Nodes).

This prevents duplicate scheduling attempts across the cluster.

Catch-Up with Limits

When catch_up is enabled, the scheduler checks for missed runs:

  1. Find the last completed run for this cron
  2. Calculate all scheduled times between then and now
  3. Limit to catch_up_limit (default: 10)
  4. Claim and execute each missed run
// After 3-hour downtime with hourly cron:
// - catch_up_limit = 10: runs all 3 missed executions
// - catch_up_limit = 1: runs only the most recent

This prevents thundering herd after extended outages. Large catch-up limits could overload your system if many crons missed many runs.

Timezone Handling

Schedules respect IANA timezones including DST transitions:

#[forge::cron("0 9 * * *", timezone = "America/New_York")]

At 9:00 AM on DST transition days:

  • Spring forward: 9:00 AM still triggers (2:00 AM skipped, not 9:00 AM)
  • Fall back: triggers once at the first 9:00 AM occurrence

The scheduler converts between UTC (storage) and local time (matching) for each run.

Testing

Use TestCronContext to test cron functions in isolation:

#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
use chrono::Duration;

#[tokio::test]
async fn test_cron_executes() {
let ctx = TestCronContext::builder("daily_report").build();

daily_report(&ctx).await.unwrap();

assert!(!ctx.log.entries().is_empty());
}

#[tokio::test]
async fn test_catch_up_run() {
let ctx = TestCronContext::builder("weekly_cleanup")
.as_catch_up()
.build();

assert!(ctx.is_catch_up);
}

#[tokio::test]
async fn test_late_detection() {
let scheduled = Utc::now() - Duration::minutes(5);
let ctx = TestCronContext::builder("sync_task")
.scheduled_at(scheduled)
.build();

assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}

#[tokio::test]
async fn test_with_timezone() {
let ctx = TestCronContext::builder("morning_summary")
.with_timezone("Europe/London")
.build();

assert_eq!(ctx.timezone, "Europe/London");
}

#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestCronContext::builder("poll_api")
.mock_http_json("http://api.example.com/*", json!({
"status": "ok",
"data": []
}))
.build();

poll_api(&ctx).await.unwrap();

ctx.http().assert_called("http://api.example.com/*");
}

#[tokio::test]
async fn test_logging() {
let ctx = TestCronContext::builder("my_cron").build();

ctx.log.info("Started");
ctx.log.warn("Warning");
ctx.log.error("Failed");

let entries = ctx.log.entries();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].level, "info");
assert_eq!(entries[1].level, "warn");
assert_eq!(entries[2].level, "error");
}
}

Test context builder methods:

  • .scheduled_at(datetime) - Set scheduled time
  • .executed_at(datetime) - Set execution time
  • .with_timezone("tz") - Set timezone
  • .as_catch_up() - Mark as catch-up run
  • .with_run_id(uuid) - Set specific run ID
  • .as_user(uuid) - Set authenticated user
  • .with_role("admin") - Add role
  • .with_pool(pool) - Use real database
  • .mock_http(pattern, handler) - Mock HTTP responses
  • .mock_http_json(pattern, value) - Mock JSON response
  • .with_env(key, value) - Mock environment variable