Scheduled Tasks
Run functions on a cron schedule with timezone support, catch-up runs, and exactly-once execution across your cluster.
The Code
use forge::prelude::*;
#[forge::cron("0 9 * * *", timezone = "America/New_York")]
pub async fn daily_report(ctx: &CronContext) -> Result<()> {
ctx.log.info("Generating report", json!({"run_id": ctx.run_id.to_string()}));
let stats = sqlx::query_as::<_, DailyStats>(
"SELECT count(*) as orders, sum(total) as revenue FROM orders WHERE created_at::date = current_date"
)
.fetch_one(ctx.db())
.await?;
ctx.http()
.post("https://api.slack.com/chat.postMessage")
.bearer_auth(ctx.env_require("SLACK_TOKEN")?)
.json(&json!({
"channel": "#daily-metrics",
"text": format!("Orders: {}, Revenue: ${:.2}", stats.orders, stats.revenue)
}))
.send()
.await?;
Ok(())
}
What Happens
The scheduler runs every second, checking for due cron executions. When your cron's scheduled time arrives:
- One node claims the run via
INSERT ... ON CONFLICT DO NOTHING - The database's unique constraint ensures only one node wins
- The cron executes with full context: database, HTTP, logging
- Status updates to
completedorfailed
No distributed locks. No consensus protocols. PostgreSQL handles coordination.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
| Schedule | "cron expression" | required | Standard cron (5-part: m h d M w) or extended (6-part with seconds) |
timezone | "tz" | "UTC" | IANA timezone (e.g., "America/New_York", "Europe/London") |
timeout | "duration" | "1h" | Maximum execution time ("30s", "5m", "1h") |
catch_up | flag | false | Run missed executions after downtime |
catch_up_limit | u32 | 10 | Maximum catch-up runs per tick |
Cron Expression Format
Standard 5-part expressions are normalized to 6-part by prepending 0 for seconds:
"0 9 * * *" // Daily at 9:00 AM -> "0 0 9 * * *"
"*/5 * * * *" // Every 5 minutes -> "0 */5 * * * *"
"0 0 1 * *" // Monthly on the 1st -> "0 0 0 1 * *"
"0 12 * * 1-5" // Weekdays at noon -> "0 0 12 * * 1-5"
Extended 6-part expressions include seconds:
"30 * * * * *" // Every minute at :30 seconds
"0 */5 * * * *" // Every 5 minutes at :00 seconds
Patterns
Every Minute
#[forge::cron("* * * * *")]
pub async fn heartbeat(ctx: &CronContext) -> Result<()> {
sqlx::query("UPDATE system_status SET last_heartbeat = NOW()")
.execute(ctx.db())
.await?;
Ok(())
}
Daily at Specific Time in Timezone
#[forge::cron("0 9 * * *", timezone = "Europe/London")]
pub async fn morning_summary(ctx: &CronContext) -> Result<()> {
// Runs at 9:00 AM London time, adjusting for DST
ctx.log.info("Morning summary", json!({"tz": ctx.timezone}));
Ok(())
}
Weekly with Catch-Up
Run missed executions after server downtime. If your server was down for 3 hours and missed 3 hourly runs, they execute on startup.
#[forge::cron("0 0 * * 0", catch_up, catch_up_limit = 4)]
pub async fn weekly_cleanup(ctx: &CronContext) -> Result<()> {
if ctx.is_catch_up {
ctx.log.info("Catch-up run", json!({"scheduled": ctx.scheduled_time.to_rfc3339()}));
}
sqlx::query("DELETE FROM sessions WHERE expires_at < NOW() - interval '7 days'")
.execute(ctx.db())
.await?;
Ok(())
}
Detect Late Execution
Check if execution started more than 1 minute after scheduled time:
#[forge::cron("*/15 * * * *", timeout = "5m")]
pub async fn sync_external_data(ctx: &CronContext) -> Result<()> {
if ctx.is_late() {
ctx.log.warn("Running late", json!({
"delay_seconds": ctx.delay().num_seconds(),
"scheduled": ctx.scheduled_time.to_rfc3339()
}));
}
// Continue with sync...
Ok(())
}
Long-Running with Extended Timeout
#[forge::cron("0 2 * * *", timeout = "2h")]
pub async fn nightly_data_export(ctx: &CronContext) -> Result<()> {
ctx.log.info("Starting export", json!({}));
let rows = sqlx::query_as::<_, ExportRow>(
"SELECT * FROM analytics WHERE exported = false LIMIT 100000"
)
.fetch_all(ctx.db())
.await?;
for chunk in rows.chunks(1000) {
upload_to_s3(ctx, chunk).await?;
}
ctx.log.info("Export complete", json!({"rows": rows.len()}));
Ok(())
}
Call External APIs
#[forge::cron("* * * * *", timezone = "UTC")]
pub async fn poll_iss_location(ctx: &CronContext) -> Result<()> {
let response = ctx.http()
.get("http://api.open-notify.org/iss-now.json")
.send()
.await
.map_err(|e| ForgeError::Internal(format!("HTTP failed: {}", e)))?;
if !response.status().is_success() {
ctx.log.error("API error", json!({"status": response.status().as_u16()}));
return Err(ForgeError::Internal("API request failed".into()));
}
let data: IssResponse = response.json().await?;
sqlx::query("INSERT INTO iss_locations (latitude, longitude) VALUES ($1, $2)")
.bind(data.position.latitude)
.bind(data.position.longitude)
.execute(ctx.db())
.await?;
Ok(())
}
Context Fields
| Field | Type | Description |
|---|---|---|
run_id | Uuid | Unique identifier for this cron run |
cron_name | String | Function name (e.g., "daily_report") |
scheduled_time | DateTime<Utc> | When the cron was supposed to run |
execution_time | DateTime<Utc> | When execution actually started |
timezone | String | Configured timezone |
is_catch_up | bool | true if this is a catch-up run |
auth | AuthContext | Authentication context (unauthenticated by default) |
log | CronLog | Structured logger |
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | &PgPool | Database connection pool |
ctx.http() | &reqwest::Client | HTTP client |
ctx.delay() | Duration | Time between scheduled and actual execution |
ctx.is_late() | bool | true if delay exceeds 1 minute |
ctx.env(key) | Option<String> | Environment variable |
ctx.env_require(key) | Result<String> | Required environment variable |
ctx.with_auth(auth) | Self | Set authentication context |
Logging
The ctx.log field provides structured logging:
ctx.log.info("Message", json!({"key": "value"}));
ctx.log.warn("Warning", json!({"count": 42}));
ctx.log.error("Failed", json!({"error": e.to_string()}));
ctx.log.debug("Debug info", json!({}));
Each log entry includes the cron name for correlation.
Under the Hood
Exactly-Once via Unique Constraint
The forge_cron_runs table enforces one execution per scheduled time:
CREATE TABLE forge_cron_runs (
id UUID PRIMARY KEY,
cron_name TEXT NOT NULL,
scheduled_time TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL,
node_id UUID,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error TEXT,
UNIQUE(cron_name, scheduled_time)
);
When the scheduler ticks:
INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
VALUES ($1, $2, $3, 'running', $4, NOW())
ON CONFLICT (cron_name, scheduled_time) DO NOTHING
Multiple nodes can attempt the same insert. PostgreSQL's unique constraint ensures exactly one succeeds. The node that wins (rows_affected > 0) executes the cron.
Leader-Only Scheduling
Only cluster leaders run the scheduler tick. Follower nodes skip cron processing entirely. Leadership is determined via PostgreSQL advisory locks (see Multiple Nodes).
This prevents duplicate scheduling attempts across the cluster.
Catch-Up with Limits
When catch_up is enabled, the scheduler checks for missed runs:
- Find the last completed run for this cron
- Calculate all scheduled times between then and now
- Limit to
catch_up_limit(default: 10) - Claim and execute each missed run
// After 3-hour downtime with hourly cron:
// - catch_up_limit = 10: runs all 3 missed executions
// - catch_up_limit = 1: runs only the most recent
This prevents thundering herd after extended outages. Large catch-up limits could overload your system if many crons missed many runs.
Timezone Handling
Schedules respect IANA timezones including DST transitions:
#[forge::cron("0 9 * * *", timezone = "America/New_York")]
At 9:00 AM on DST transition days:
- Spring forward: 9:00 AM still triggers (2:00 AM skipped, not 9:00 AM)
- Fall back: triggers once at the first 9:00 AM occurrence
The scheduler converts between UTC (storage) and local time (matching) for each run.
Testing
Use TestCronContext to test cron functions in isolation:
#[cfg(test)]
mod tests {
use super::*;
use forge::testing::*;
use chrono::Duration;
#[tokio::test]
async fn test_cron_executes() {
let ctx = TestCronContext::builder("daily_report").build();
daily_report(&ctx).await.unwrap();
assert!(!ctx.log.entries().is_empty());
}
#[tokio::test]
async fn test_catch_up_run() {
let ctx = TestCronContext::builder("weekly_cleanup")
.as_catch_up()
.build();
assert!(ctx.is_catch_up);
}
#[tokio::test]
async fn test_late_detection() {
let scheduled = Utc::now() - Duration::minutes(5);
let ctx = TestCronContext::builder("sync_task")
.scheduled_at(scheduled)
.build();
assert!(ctx.is_late());
assert!(ctx.delay() >= Duration::minutes(4));
}
#[tokio::test]
async fn test_with_timezone() {
let ctx = TestCronContext::builder("morning_summary")
.with_timezone("Europe/London")
.build();
assert_eq!(ctx.timezone, "Europe/London");
}
#[tokio::test]
async fn test_with_mocked_http() {
let ctx = TestCronContext::builder("poll_api")
.mock_http_json("http://api.example.com/*", json!({
"status": "ok",
"data": []
}))
.build();
poll_api(&ctx).await.unwrap();
ctx.http().assert_called("http://api.example.com/*");
}
#[tokio::test]
async fn test_logging() {
let ctx = TestCronContext::builder("my_cron").build();
ctx.log.info("Started");
ctx.log.warn("Warning");
ctx.log.error("Failed");
let entries = ctx.log.entries();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].level, "info");
assert_eq!(entries[1].level, "warn");
assert_eq!(entries[2].level, "error");
}
}
Test context builder methods:
.scheduled_at(datetime)- Set scheduled time.executed_at(datetime)- Set execution time.with_timezone("tz")- Set timezone.as_catch_up()- Mark as catch-up run.with_run_id(uuid)- Set specific run ID.as_user(uuid)- Set authenticated user.with_role("admin")- Add role.with_pool(pool)- Use real database.mock_http(pattern, handler)- Mock HTTP responses.mock_http_json(pattern, value)- Mock JSON response.with_env(key, value)- Mock environment variable