Skip to main content

Reactivity Internals

How Forge keeps subscriptions up to date after mutations, and the operational knobs you can tune.

Mental Model

A reactive query is a regular query that re-executes automatically whenever its underlying data changes. From the application's perspective, there is no polling, no manual cache invalidation, and no event plumbing — you mark a query #[forge::query], subscribe to it from the frontend, and the client store stays current.

Under the hood, the guarantee is built on a deterministic pipeline:

DB write
→ trigger fires → forge_change_log row inserted → NOTIFY forge_changes
→ ChangeListener receives NOTIFY on its dedicated PG connection
→ InvalidationEngine debounces (50ms quiet window, 200ms max)
→ coalesced batch delivered to Reactor
→ Reactor looks up subscriptions affected by changed tables
→ re-executes each affected query (bounded at 64 concurrent)
→ hashes result, compares to last-sent hash
→ pushes SSE update only when data actually changed
→ client store receives update, Svelte re-renders

The key properties that fall out of this design:

  • No false positives in re-renders. Hash comparison means clients only re-render when data genuinely changed, even if many mutations hit the same rows within a debounce window.
  • No polling. The LISTEN/NOTIFY connection delivers changes within milliseconds of commit; the 50ms debounce is a batching optimization, not a polling interval.
  • Shared re-execution. Fifty clients subscribed to the same query with the same args share one re-execution. The reactor fans the result out to all of them.
  • Automatic scope. Auth identity is baked into the subscription key. User A and User B subscribing to get_my_notifications are separate groups; a change that affects only user B's rows triggers only user B's re-execution.

Architecture

When a client subscribes to a query via SSE, the server remembers the query, its arguments, and which tables it touches. When any of those tables change, the server re-executes the query and pushes the new result if it differs from the last one sent.

The pipeline has four stages:

  1. Change detection (PostgreSQL NOTIFY)
  2. Debounce and batching (InvalidationEngine)
  3. Re-execution (Reactor)
  4. Fan-out (SSE push)

Each stage runs per-node. There is no global coordinator for reactivity.

Enabling Reactivity on Your Tables

Forge's system tables (forge_jobs, forge_workflow_runs, forge_workflow_steps) have reactivity enabled automatically. For your own application tables, call forge_enable_reactivity() in a migration:

SELECT forge_enable_reactivity('orders');
SELECT forge_enable_reactivity('messages');

This installs a trigger named forge_notify_orders (the pattern is forge_notify_{table_name}) that fires AFTER INSERT OR UPDATE OR DELETE on every row and notifies the runtime of the change. Once the trigger is in place, any #[forge::query] that reads from that table will re-execute automatically when the table changes — no other configuration needed.

To stop tracking changes on a table:

SELECT forge_disable_reactivity('orders');

This drops the trigger. Existing subscriptions that depend on the table continue to work but will stop receiving updates until the trigger is re-added.

Naming constraint. Table names must be 50 characters or fewer (the forge_notify_ prefix adds 13 characters; PostgreSQL caps identifiers at 63). The function validates this and raises an error at migration time if the name is too long.

When to call it. Add SELECT forge_enable_reactivity('your_table') at the bottom of the migration that creates the table. That way the trigger exists from the first insert.

Change Detection

Every table with reactivity enabled has a forge_notify_change() trigger. On INSERT, UPDATE, or DELETE, the trigger:

  1. Inserts a row into forge_change_log (a durable log with a BIGSERIAL sequence)
  2. Sends a NOTIFY forge_changes with payload v1:{table}:{OP}:{row_id}[:changed_cols]#{seq}

Each node runs a ChangeListener on a dedicated PostgreSQL connection. The listener tracks the last-seen sequence number from the change log. If the connection drops and reconnects, missed changes are replayed from forge_change_log using the stored sequence position.

If the change log has been trimmed past the listener's last-seen position (retention expired during downtime), the listener signals the reactor to perform a full resync of all active subscriptions.

The change log is automatically trimmed every 60 seconds by forge_trim_change_log(), keeping entries for 1 hour by default.

Debounce and Batching

The InvalidationEngine collects incoming changes and batches them before passing to the reactor. This prevents a burst of row-level changes from triggering individual re-executions.

ParameterDefaultDescription
Quiet window50msWait this long after the last change before flushing
Max window200msFlush at most this often, even if changes keep arriving

Changes are coalesced by table name. If 100 rows in the users table change within 50ms, the reactor sees one invalidation for users, not 100.

Re-execution

The Reactor determines which subscriptions are affected by a batch of table changes using an inverted index: a map from table name to the set of query groups that depend on that table.

For each affected query group, the reactor:

  1. Re-executes the query against the read replica (falls back to primary if no replicas configured)
  2. Hashes the result and compares to the last-sent hash
  3. Pushes an update SSE event only if the result changed

Re-execution runs with bounded concurrency (default: 64 concurrent queries). This prevents a large invalidation from overwhelming the database.

Read Replica Routing

Re-execution queries and data fetches run against the read pool by default. This keeps subscription load off the primary. The tradeoff is that recently-committed writes may not be visible for a short replication lag window (typically under 10ms for synchronous replicas, up to a few seconds for async replicas).

For queries marked #[query(consistent)], reads always go to the primary.

Subscription Deduplication

Multiple clients subscribing to the same query with the same arguments and auth scope share a single query group. The reactor re-executes once and fans out the result to all subscribers. Query groups are keyed by a hash of (function name, serialized args, auth scope).

The SubscriptionManager uses a DashMap with 64 shards for concurrent access.

Gap Detection

If the SSE send buffer fills up (slow client), the server drops the update and sends a gap event instead. The client should re-subscribe or re-fetch to recover. The generated client libraries handle this automatically with a re-subscribe.

Periodic Resync

Every 600 seconds (10 minutes), the reactor performs a sweep of all active subscriptions, re-executing each one to catch any changes that NOTIFY may have missed. This is a safety net, not the primary update path.

Operational Limits

Configure these in forge.toml under [realtime]:

SettingDefaultDescription
max_sessions_per_user8Max concurrent SSE connections per authenticated user
max_sessions_per_ip32Max concurrent SSE connections per source IP
max_subscriptions_per_user500Max total subscriptions across all of a user's sessions
max_cached_result_bytes10485760Max size of a cached query result (10 MiB)

Exceeding any limit returns HTTP 429 with a retry_after_secs hint.

What Reactivity Does Not Do

  • No delta/patch delivery. The server sends the full query result on each update, not a diff. Clients always get the complete current state.
  • No cross-node coordination. Each node runs its own invalidation pipeline. Nodes share the same PostgreSQL NOTIFY channel but maintain independent subscription state.
  • No logical replication. Change detection uses triggers and NOTIFY, not PostgreSQL logical replication slots. This keeps the setup simple and avoids replication slot management overhead.
  • No ephemeral pub-sub (yet). The channel SSE event type and forge_channels NOTIFY channel are reserved but not yet active.