Skip to main content

Subscribe to Changes

Turn any query into a real-time subscription. No WebSocket configuration.

Prerequisites

Enable reactivity on tables you want to subscribe to:

-- In your migration
SELECT forge_enable_reactivity('todos');

This creates PostgreSQL triggers that emit NOTIFY on INSERT/UPDATE/DELETE.

The Code

#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

That's it. Forge parses the SQL at compile time and extracts table dependencies automatically. JOINs, subqueries, and CTEs are all tracked.

Frontend:

// Fetch once
const todos = await listTodos();

// Subscribe to changes (add Store$ suffix)
const todos = listTodosStore$();

What Happens

  1. Database triggers emit NOTIFY on INSERT/UPDATE/DELETE
  2. A dedicated listener connection receives changes via PostgreSQL LISTEN
  3. Forge re-executes affected queries and compares result hashes
  4. Only when data differs does it push updates over Server-Sent Events

Debouncing prevents notification storms: 50ms after the last change, or 200ms max. Rapid mutations coalesce into a single update.

Attributes

AttributeTypeDefaultDescription
tables["t1", "t2"]autoOverride table dependencies (for dynamic SQL)
publicflagfalseBypass authentication
cache"duration"noneCache TTL (e.g., "30s")
timeoutu64noneStatement timeout in seconds

When to use tables

For static SQL strings, Forge extracts dependencies automatically:

// Tables auto-extracted: ["todos"]
#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

// Tables auto-extracted: ["orders", "users"]
#[forge::query]
pub async fn get_order_with_user(ctx: &QueryContext, id: Uuid) -> Result<OrderWithUser> {
sqlx::query_as("SELECT o.*, u.name FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = $1")
.bind(id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Use tables only when SQL is constructed dynamically:

// Dynamic SQL - must specify tables manually
#[forge::query(tables = ["users", "orders"])]
pub async fn search(ctx: &QueryContext, table: String, term: String) -> Result<Vec<Row>> {
let sql = format!("SELECT * FROM {} WHERE name ILIKE $1", table);
sqlx::query_as(&sql)
.bind(format!("%{}%", term))
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Patterns

Single Table Subscription

#[forge::query]
pub async fn list_todos(ctx: &QueryContext) -> Result<Vec<Todo>> {
sqlx::query_as("SELECT * FROM todos ORDER BY created_at DESC")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
const todos = listTodosStore$();

{#if $todos.loading}
<p>Loading...</p>
{:else if $todos.error}
<p>Error: {$todos.error.message}</p>
{:else}
{#each $todos.data ?? [] as todo}
<TodoItem {todo} />
{/each}
{/if}

Multi-Table Subscription

JOINs and subqueries are tracked automatically.

#[forge::query]
pub async fn get_order_details(ctx: &QueryContext, order_id: Uuid) -> Result<OrderDetails> {
sqlx::query_as(
"SELECT o.*,
json_agg(json_build_object('name', p.name, 'qty', oi.quantity)) as items
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
JOIN products p ON p.id = oi.product_id
WHERE o.id = $1
GROUP BY o.id"
)
.bind(order_id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Changes to orders, order_items, or products trigger re-execution.

User-Scoped Subscription

Subscriptions respect authentication. Each user receives their own data.

#[forge::query]
pub async fn get_my_notifications(ctx: &QueryContext) -> Result<Vec<Notification>> {
let user_id = ctx.require_user_id()?;

sqlx::query_as("SELECT * FROM notifications WHERE user_id = $1 AND read = false")
.bind(user_id)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

When user A creates a notification for user B, only user B's subscription re-executes and receives the update.

Public Real-Time Feed

Public subscriptions work without authentication.

#[forge::query(public)]
pub async fn get_live_events(ctx: &QueryContext) -> Result<Vec<Event>> {
sqlx::query_as(
"SELECT * FROM events
WHERE starts_at > now()
ORDER BY starts_at ASC
LIMIT 10"
)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Cached + Reactive

Cache the initial load. Push updates when data changes.

#[forge::query(cache = "30s")]
pub async fn get_dashboard_stats(ctx: &QueryContext) -> Result<DashboardStats> {
sqlx::query_as(
"SELECT count(*) as total, sum(amount) as revenue
FROM orders WHERE created_at > now() - interval '24 hours'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

The first request within 30s hits cache. Mutations to orders still push updates to active subscriptions.

Frontend Store

The generated store follows Svelte's store contract with additional state fields.

interface SubscriptionResult<T> {
loading: boolean; // true until first data arrives
data: T | null; // the result
error: Error | null;
stale: boolean; // true during reconnection
}

Store Methods

const store = listTodosStore$();

// Access current value
$store.loading
$store.data
$store.error
$store.stale

// Manual actions
store.refetch(); // Force re-fetch
store.unsubscribe(); // Stop receiving updates
store.reset(); // Clear data and set loading

Auto-Cleanup

Subscriptions automatically unsubscribe when all Svelte subscribers detach. No manual cleanup required in most cases.

// Subscription starts when component mounts
const todos = listTodosStore$();

// Subscription ends when component unmounts
// (when last subscriber unsubscribes)

Under the Hood

Migration Setup

Enable reactivity per table in your migrations:

-- @up
CREATE TABLE todos (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT false
);

SELECT forge_enable_reactivity('todos');

-- @down
SELECT forge_disable_reactivity('todos');
DROP TABLE IF EXISTS todos;

This creates triggers that emit changes to the forge_changes channel:

-- Payload format: table:operation:row_id
-- Example: todos:INSERT:550e8400-e29b-41d4-a716-446655440000

LISTEN/NOTIFY

A dedicated PostgreSQL connection stays open with LISTEN forge_changes. No polling. Sub-second latency.

Debouncing

High-frequency writes could spam connected clients. Forge debounces:

  • 50ms quiet period: Wait 50ms after the last change before re-executing
  • 200ms max window: Never wait longer than 200ms from the first change

Rapid mutations (bulk inserts, batch updates) coalesce into a single re-execution and push.

Hash Comparison

Re-executing a query does not guarantee data changed. A user might INSERT then DELETE the same row within the debounce window.

Forge hashes query results before pushing:

hash = hash(function_name + serialize(result))

Updates only reach the client when the hash differs from the last push. No unnecessary re-renders.

Adaptive Tracking

Forge tracks changes at different granularities based on subscription count:

SubscriptionsTracking ModeBehavior
0NoneNo tracking
1-50Row-levelOnly tracked row IDs trigger re-execution
50+Table-levelAny table change triggers re-execution

Hysteresis prevents oscillation: threshold up (100 rows) differs from threshold down (50 rows).

Row-level tracking is more efficient for few subscriptions. Table-level avoids memory overhead when many clients subscribe. Forge re-evaluates every 60 seconds with a maximum of 10,000 tracked rows per table.

Auto-Reconnect

SSE connections may drop due to network issues or deploys. The client handles reconnection:

  1. Connection drops
  2. Store sets stale: true (data may be outdated)
  3. Exponential backoff: 1s, 2s, 4s... up to 30s
  4. Reconnect with jitter to prevent thundering herd
  5. Re-register all active subscriptions
  6. Receive fresh data, clear stale flag

Maximum 10 reconnection attempts before giving up.

Token Change Detection

When the auth token changes (login, logout, refresh), the client detects the change via hash comparison and reconnects to ensure subscriptions use the new identity.

Testing

Test subscriptions using TestQueryContext with a real database.

#[tokio::test]
async fn test_subscription_updates() {
let pool = setup_test_database().await;
let ctx = TestQueryContext::with_pool(pool.clone(), Some(user_id));

// Initial query
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 0);

// Insert data
sqlx::query("INSERT INTO todos (title) VALUES ('Test')")
.execute(&pool)
.await
.unwrap();

// Query again (simulating re-execution on change)
let todos = list_todos(&ctx).await.unwrap();
assert_eq!(todos.len(), 1);
}

For end-to-end subscription testing, use the full Forge runtime in your test suite. The client stores and SSE connection can be tested via browser automation or direct HTTP/SSE calls.