Subscribe to Changes
Turn any query into a real-time subscription. No WebSocket configuration.
Prerequisites
Enable reactivity on tables you want to subscribe to:
-- In your migration
SELECT forge_enable_reactivity('todos');
This creates PostgreSQL triggers that emit NOTIFY on INSERT/UPDATE/DELETE.
The Code
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
That's it. Forge parses the SQL at compile time and extracts table dependencies automatically. JOINs, subqueries, and CTEs are all tracked.
Frontend:
// Fetch once
const todos = await listTodos();
// Subscribe to changes (add Store$ suffix)
const todos = listTodosStore$();
What Happens
- Database triggers emit
NOTIFYon INSERT/UPDATE/DELETE - A dedicated listener connection receives changes via PostgreSQL
LISTEN - Forge re-executes affected queries and compares result hashes
- Only when data differs does it push updates over Server-Sent Events
Debouncing prevents notification storms: 50ms after the last change, or 200ms max. Rapid mutations coalesce into a single update.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
tables | ["t1", "t2"] | auto | Override table dependencies (for dynamic SQL) |
public | flag | false | Bypass authentication |
cache | "duration" | none | Cache TTL (e.g., "30s") |
timeout | u64 | none | Statement timeout in seconds |
When to use tables
For static SQL strings, Forge extracts dependencies automatically:
// Tables auto-extracted: ["todos"]
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
// Tables auto-extracted: ["orders", "users"]
#[forge::query]
pub async fn get_order_with_user(ctx: &QueryContext, id: Uuid) -> Result<OrderWithUser> {
sqlx::query_as("SELECT o.*, u.name FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = $1")
.bind(id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Use tables only when SQL is constructed dynamically:
// Dynamic SQL - must specify tables manually
#[forge::query(tables = ["users", "orders"])]
pub async fn search(ctx: &QueryContext, table: String, term: String) -> Result<Vec<Row>> {
let sql = format!("SELECT * FROM {} WHERE name ILIKE $1", table);
sqlx::query_as(&sql)
.bind(format!("%{}%", term))
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Patterns
Single Table Subscription
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
const todos = listTodosStore$();
{#if $todos.loading}
<p>Loading...</p>
{:else if $todos.error}
<p>Error: {$todos.error.message}</p>
{:else}
{#each $todos.data ?? [] as todo}
<TodoItem {todo} />
{/each}
{/if}
Multi-Table Subscription
JOINs and subqueries are tracked automatically.
#[forge::query]
pub async fn get_order_details(ctx: &QueryContext, order_id: Uuid) -> Result<OrderDetails> {
sqlx::query_as(
"SELECT o.*,
json_agg(json_build_object('name', p.name, 'qty', oi.quantity)) as items
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
JOIN products p ON p.id = oi.product_id
WHERE o.id = $1
GROUP BY o.id"
)
.bind(order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Changes to orders, order_items, or products trigger re-execution.
User-Scoped Subscription
Subscriptions respect authentication. Each user receives their own data.
#[forge::query]
pub async fn get_my_notifications(ctx: &QueryContext) -> Result<Vec<Notification>> {
let user_id = ctx.require_user_id()?;
sqlx::query_as("SELECT * FROM notifications WHERE user_id = $1 AND read = false")
.bind(user_id)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
When user A creates a notification for user B, only user B's subscription re-executes and receives the update.
Public Real-Time Feed
Public subscriptions work without authentication.
#[forge::query(public)]
pub async fn get_live_events(ctx: &QueryContext) -> Result<Vec<Event>> {
sqlx::query_as(
"SELECT * FROM events
WHERE starts_at > now()
ORDER BY starts_at ASC
LIMIT 10"
)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Cached + Reactive
Cache the initial load. Push updates when data changes.
#[forge::query(cache = "30s")]
pub async fn get_dashboard_stats(ctx: &QueryContext) -> Result<DashboardStats> {
sqlx::query_as(
"SELECT count(*) as total, sum(amount) as revenue
FROM orders WHERE created_at > now() - interval '24 hours'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
The first request within 30s hits cache. Mutations to orders still push updates to active subscriptions.
Frontend Store
The generated store follows Svelte's store contract with additional state fields.
interface SubscriptionResult<T> {
loading: boolean; // true until first data arrives
data: T | null; // the result
error: Error | null;
stale: boolean; // true during reconnection
}
Store Methods
const store = listTodosStore$();
// Access current value
$store.loading
$store.data
$store.error
$store.stale
// Manual actions
store.refetch(); // Force re-fetch
store.unsubscribe(); // Stop receiving updates
store.reset(); // Clear data and set loading
Auto-Cleanup
Subscriptions automatically unsubscribe when all Svelte subscribers detach. No manual cleanup required in most cases.
// Subscription starts when component mounts
const todos = listTodosStore$();
// Subscription ends when component unmounts
// (when last subscriber unsubscribes)
Under the Hood
Migration Setup
Enable reactivity per table in your migrations:
-- @up
CREATE TABLE todos (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT false
);
SELECT forge_enable_reactivity('todos');
-- @down
SELECT forge_disable_reactivity('todos');
DROP TABLE IF EXISTS todos;
This creates triggers that emit changes to the forge_changes channel:
-- Payload format: table:operation:row_id
-- Example: todos:INSERT:550e8400-e29b-41d4-a716-446655440000
LISTEN/NOTIFY
A dedicated PostgreSQL connection stays open with LISTEN forge_changes. No polling. Sub-second latency.
Debouncing
High-frequency writes could spam connected clients. Forge debounces:
- 50ms quiet period: Wait 50ms after the last change before re-executing
- 200ms max window: Never wait longer than 200ms from the first change
Rapid mutations (bulk inserts, batch updates) coalesce into a single re-execution and push.
Hash Comparison
Re-executing a query does not guarantee data changed. A user might INSERT then DELETE the same row within the debounce window.
Forge hashes query results before pushing:
hash = hash(function_name + serialize(result))
Updates only reach the client when the hash differs from the last push. No unnecessary re-renders.
Adaptive Tracking
Forge tracks changes at different granularities based on subscription count:
| Subscriptions | Tracking Mode | Behavior |
|---|---|---|
| 0 | None | No tracking |
| 1-50 | Row-level | Only tracked row IDs trigger re-execution |
| 50+ | Table-level | Any table change triggers re-execution |
Hysteresis prevents oscillation: threshold up (100 rows) differs from threshold down (50 rows).
Row-level tracking is more efficient for few subscriptions. Table-level avoids memory overhead when many clients subscribe. Forge re-evaluates every 60 seconds with a maximum of 10,000 tracked rows per table.
Auto-Reconnect
SSE connections may drop due to network issues or deploys. The client handles reconnection:
- Connection drops
- Store sets
stale: true(data may be outdated) - Exponential backoff: 1s, 2s, 4s... up to 30s
- Reconnect with jitter to prevent thundering herd
- Re-register all active subscriptions
- Receive fresh data, clear
staleflag
Maximum 10 reconnection attempts before giving up.
Token Change Detection
When the auth token changes (login, logout, refresh), the client detects the change via hash comparison and reconnects to ensure subscriptions use the new identity.
Testing
Test subscriptions using TestQueryContext with a real database.
#[tokio::test]
async fn test_subscription_updates() {
let pool = setup_test_database().await;
let ctx = TestQueryContext::with_pool(pool.clone(), Some(user_id));
// Initial query
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 0);
// Insert data
sqlx::query("INSERT INTO todos (title) VALUES ('Test')")
.execute(&pool)
.await
.unwrap();
// Query again (simulating re-execution on change)
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 1);
}
For end-to-end subscription testing, use the full Forge runtime in your test suite. The client stores and SSE connection can be tested via browser automation or direct HTTP/SSE calls.