Subscribe to Changes
Create real-time subscriptions for queries; the generated client manages the transport.
Prerequisites
Enable reactivity on tables you want to subscribe to:
-- In your migration
SELECT forge_enable_reactivity('todos');
This creates PostgreSQL triggers that emit NOTIFY on INSERT/UPDATE/DELETE.
The Code
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Forge parses the SQL at compile time and extracts table dependencies automatically. JOINs, subqueries, and CTEs are all tracked.
Frontend:
// Fetch once
const todos = await listTodos();
// Subscribe to changes (add Store$ suffix)
const todos = listTodosStore$();
What Happens
- Database triggers emit
NOTIFYon INSERT/UPDATE/DELETE - A dedicated listener connection receives changes via PostgreSQL
LISTEN - Forge re-executes affected queries and compares result hashes
- Only when data differs does it push updates over Server-Sent Events
Debouncing prevents notification storms: 50ms after the last change, or 200ms max. Rapid mutations coalesce into a single update.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
tables | ["t1", "t2"] | auto | Override table dependencies (for dynamic SQL) |
public | flag | false | Bypass authentication |
cache | "duration" | none | Cache TTL (e.g., "30s") |
timeout | u64 | none | Statement timeout in seconds |
When to use tables
For static SQL strings, Forge extracts dependencies automatically:
// Tables auto-extracted: ["todos"]
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
// Tables auto-extracted: ["orders", "users"]
#[forge::query]
pub async fn get_order_with_user(ctx: &QueryContext, id: Uuid) -> Result<OrderWithUser> {
sqlx::query_as("SELECT o.*, u.name FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = $1")
.bind(id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Use tables only when SQL is constructed dynamically:
// Dynamic SQL - must specify tables manually
#[forge::query(tables = ["users", "orders"])]
pub async fn search(ctx: &QueryContext, table: String, term: String) -> Result<Vec<Row>> {
let sql = format!("SELECT * FROM {} WHERE name ILIKE $1", table);
sqlx::query_as(&sql)
.bind(format!("%{}%", term))
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Patterns
Single Table Subscription
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
const todos = listTodosStore$();
{#if $todos.loading}
<p>Loading...</p>
{:else if $todos.error}
<p>Error: {$todos.error.message}</p>
{:else}
{#each $todos.data ?? [] as todo}
<TodoItem {todo} />
{/each}
{/if}
Multi-Table Subscription
JOINs and subqueries are tracked automatically.
#[forge::query]
pub async fn get_order_details(ctx: &QueryContext, order_id: Uuid) -> Result<OrderDetails> {
sqlx::query_as(
"SELECT o.*,
json_agg(json_build_object('name', p.name, 'qty', oi.quantity)) as items
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
JOIN products p ON p.id = oi.product_id
WHERE o.id = $1
GROUP BY o.id"
)
.bind(order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Changes to orders, order_items, or products trigger re-execution.
User-Scoped Subscription
Subscriptions respect authentication. Each user receives their own data.
#[forge::query]
pub async fn get_my_notifications(ctx: &QueryContext) -> Result<Vec<Notification>> {
let user_id = ctx.require_user_id()?;
sqlx::query_as("SELECT * FROM notifications WHERE user_id = $1 AND read = false")
.bind(user_id)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
When user A creates a notification for user B, only user B's subscription re-executes and receives the update.
Public Real-Time Feed
Public subscriptions bypass authentication checks.
#[forge::query(public)]
pub async fn get_live_events(ctx: &QueryContext) -> Result<Vec<Event>> {
sqlx::query_as(
"SELECT * FROM events
WHERE starts_at > now()
ORDER BY starts_at ASC
LIMIT 10"
)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Cached + Reactive
Cache the initial load. Push updates when data changes.
#[forge::query(cache = "30s")]
pub async fn get_order_stats(ctx: &QueryContext) -> Result<OrderStats> {
sqlx::query_as(
"SELECT count(*) as total, sum(amount) as revenue
FROM orders WHERE created_at > now() - interval '24 hours'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
The first request within 30s hits cache. Mutations to orders still push updates to active subscriptions.
Frontend Store
The generated store follows Svelte's store contract with additional state fields.
interface SubscriptionResult<T> {
loading: boolean; // true until first data arrives
data: T | null; // the result
error: ForgeError | null;
stale: boolean; // reserved for future use
}
Store Methods
const store = listTodosStore$();
// Access current value
$store.loading
$store.data
$store.error
$store.stale
// Manual actions
store.refetch(); // Force re-fetch
store.unsubscribe(); // Stop receiving updates
store.reset(); // Clear data and set loading
Auto-Cleanup
Subscriptions automatically unsubscribe when all Svelte subscribers detach, so manual cleanup is typically unnecessary.
// Subscription starts when component mounts
const todos = listTodosStore$();
// Subscription ends when component unmounts
// (when last subscriber unsubscribes)
Under the Hood
Migration Setup
Enable reactivity per table in your migrations:
-- @up
CREATE TABLE todos (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT false
);
SELECT forge_enable_reactivity('todos');
-- @down
SELECT forge_disable_reactivity('todos');
DROP TABLE IF EXISTS todos;
This creates triggers that emit changes to the forge_changes channel:
-- Payload format: table:operation:row_id[:changed_columns]
-- INSERT: todos:INSERT:550e8400-e29b-41d4-a716-446655440000
-- UPDATE: todos:UPDATE:550e8400-e29b-41d4-a716-446655440000:title,completed
-- DELETE: todos:DELETE:550e8400-e29b-41d4-a716-446655440000
On UPDATE, the trigger includes which columns actually changed. Forge uses this for fine-grained invalidation (see below).
LISTEN/NOTIFY
A dedicated PostgreSQL connection stays open with LISTEN forge_changes, avoiding polling and typically delivering sub-second latency.
Debouncing
High-frequency writes could spam connected clients. Forge debounces:
- 50ms quiet period: Wait 50ms after the last change before re-executing
- 200ms max window: Never wait longer than 200ms from the first change
Rapid mutations (bulk inserts, batch updates) coalesce into a single re-execution and push.
Hash Comparison
Re-executing a query does not guarantee data changed. A user might INSERT then DELETE the same row within the debounce window.
Forge hashes query results before pushing:
hash = hash(serialize(result))
Updates only reach the client when the hash differs from the last push, which prevents unnecessary re-renders.
Adaptive Tracking
Forge tracks changes at different granularities based on tracked row count per table:
| Tracked Rows | Tracking Mode | Behavior |
|---|---|---|
| 0 | None | No tracking (no subscriptions) |
| 1-100 | Row-level | Only tracked row IDs trigger re-execution |
| 101+ | Table-level | Any table change triggers re-execution |
Hysteresis prevents oscillation: switches to table-level above 100 tracked rows (> 100), back to row-level below 50 (< 50). Rows in the 50-100 range stay in their current mode.
Row-level tracking is more efficient for few subscriptions. Table-level avoids memory overhead when many clients subscribe. Maximum of 10,000 tracked rows per table.
Column-Aware Invalidation
Forge extracts which columns a query SELECTs at compile time. When an UPDATE arrives with a list of changed columns, Forge checks whether any changed column overlaps with the query's selected columns. If there's no overlap, the query is not re-executed.
// Extracted columns: ["title", "completed"]
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT title, completed FROM todos")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
An UPDATE todos SET updated_at = now() would not trigger re-execution for this query because updated_at is not in the selected columns. An UPDATE todos SET title = 'new' would.
For SELECT * queries or dynamic SQL where columns can't be determined, all UPDATEs trigger re-execution.
Auto-Reconnect
SSE connections may drop due to network issues or deploys. The client handles reconnection:
- Connection drops
- Exponential backoff with jitter: base 1s, doubling each attempt, capped at 30s
- Reconnect and re-register all active subscriptions
- Receive fresh data
Maximum 10 reconnection attempts before giving up.
Token Change Detection
When the auth token changes (login, logout, refresh), the client detects the change via hash comparison and reconnects to ensure subscriptions use the new identity.
Testing
Test subscriptions using TestQueryContext with a real database.
#[tokio::test]
async fn test_subscription_updates() {
let pool = setup_test_database().await;
let ctx = TestQueryContext::with_pool(pool.clone(), Some(user_id));
// Initial query
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 0);
// Insert data
sqlx::query("INSERT INTO todos (title) VALUES ('Test')")
.execute(&pool)
.await
.unwrap();
// Query again (simulating re-execution on change)
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 1);
}
For end-to-end subscription testing, use the full Forge runtime in your test suite. The client stores and SSE connection can be tested via browser automation or direct HTTP/SSE calls.