This document outlines the architecture for a SQL-first smart realtime system that automatically generates triggers, types, migrations, and named query/mutation metadata from annotated PostgreSQL SQL files using a PPX deriver.
- SQL-First DX: Use standard PostgreSQL syntax with embedded annotations
- Automatic Trigger Generation: Smart triggers that broadcast complete parent items
- Type Safety: Auto-generated OCaml types from SQL schema
- Migration Management: Combined DDL and trigger updates
- Compile-Time Safety: All code generated via PPX at build time
- Composable Writes: Named mutations become callable OCaml functions that can be composed by server-side handlers
-- @table <table_name>
-- @id_column <column_name>
-- @broadcast_channel column=<column_name>
CREATE TABLE inventory (
id uuid DEFAULT uuidv7() PRIMARY KEY,
premise_id uuid NOT NULL REFERENCES premise(id),
name varchar NOT NULL,
description varchar NOT NULL,
quantity int NOT NULL DEFAULT 0
);-- @table inventory_period_map
-- @composite_key inventory_id, period_id
-- @broadcast_parent table=inventory query=get_complete_inventory
-- When this table changes, fetch the complete parent inventory
-- using the named query and broadcast it
CREATE TABLE inventory_period_map (
inventory_id uuid NOT NULL REFERENCES inventory(id),
period_id uuid NOT NULL REFERENCES period(id),
PRIMARY KEY (inventory_id, period_id)
);-- Simple: Single column channel
-- @broadcast_channel column=premise_id
-- Advanced: Computed channel (concatenate multiple columns)
-- @broadcast_channel computed="CONCAT(organization_id, ':', premise_id)"
-- Advanced: Conditional channel
-- @broadcast_channel conditional="CASE WHEN status = 'active' THEN premise_id ELSE NULL END"
-- Advanced: Subquery channel
-- @broadcast_channel subquery="SELECT premise_id FROM inventory WHERE id = NEW.inventory_id"Queries are read-only (SELECT) and produce named metadata used by both trigger generation and server-side query execution.
-- @query get_complete_inventory
-- Fetches complete inventory with all pricing periods
-- @cache_key inventory_id
SELECT
i.id,
i.premise_id,
i.name,
i.description,
i.quantity,
COALESCE(
JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
'[]'::jsonb
)::text as period_list
FROM inventory i
LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
LEFT JOIN period p ON p.id = pm.period_id
WHERE i.id = $1
GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;
-- @query get_inventory_list
-- Lists all inventory for a premise with pricing
SELECT
i.id,
i.premise_id,
i.name,
i.description,
i.quantity,
COALESCE(
JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
'[]'::jsonb
)::text as period_list
FROM inventory i
LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
LEFT JOIN period p ON p.id = pm.period_id
WHERE i.premise_id = $1
GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;Mutations are write operations and are also named. @handler sql is the default for single-statement writes. @handler ocaml is reserved for composed server-side logic that calls other named queries/mutations.
/*
@mutation add_todo
INSERT INTO todos (list_id, text) VALUES ($1, $2);
*/
/*
@mutation create_list
@handler ocaml
-- implemented in OCaml and free to compose other named mutations
*/The PPX generates an OCaml module from annotated SQL files:
/* Auto-generated by [%realtime_schema "..."] */
type sql_type = Uuid | Varchar | Text | Int | Bigint | Boolean | ... ;
type broadcast_channel = Column of string | Computed of string | ... ;
type table_metadata = { name; columns; id_column; broadcast_channel; ... } ;
type handler = Sql | Ocaml ;
type query_metadata = { name; sql; json_columns; params; handler; ... } ;
type mutation_metadata = { name; sql; params; handler; ... } ;
let schema_hash = "..."
let tables : table_metadata list = [ ... ]
let queries : query_metadata list = [ ... ]
let mutations : mutation_metadata list = [ ... ]
let generated_triggers_sql = "..."
let latest_migration_sql = "..."
let find_query name = ...
let find_mutation name = ...
let find_table name = ...
let find_column table_name column_name = ...
let table_name logical_name = ...
module Tables = struct
module Inventory = struct
let name = "inventory"
let id_column = Some "id"
let composite_key = []
end
end
module Queries = struct
module GetInventoryList = struct
let name = "get_inventory_list"
let sql = "SELECT ..."
let json_columns = ["period_list"]
let handler = Sql
type row = {
id : string;
premise_id : string;
name : string;
description : string;
quantity : int;
period_list : string;
} [@@platform native]
let caqti_type = Caqti_type.product(...) [@@platform native]
let param_type = Caqti_type.string [@@platform native]
let request row_type = Caqti_request.Infix.(param_type ->* row_type)(sql) [@@platform native]
let find_request row_type = Caqti_request.Infix.(param_type ->? row_type)(sql) [@@platform native]
let collect (module Db : Caqti_lwt.CONNECTION) row_type params = ... [@@platform native]
let find_opt (module Db : Caqti_lwt.CONNECTION) row_type params = ... [@@platform native]
end
end
module Mutations = struct
module AddTodo = struct
let name = "add_todo"
let sql = "INSERT INTO todos ..."
let handler = Sql
let param_type = Caqti_type.t2(Caqti_type.string, Caqti_type.string) [@@platform native]
let request = Caqti_request.Infix.(param_type ->. Caqti_type.unit)(sql) [@@platform native]
let exec (module Db : Caqti_lwt.CONNECTION) params = ... [@@platform native]
end
endEvery named query and mutation is emitted as reusable metadata. Server code can call generated Caqti functions directly from RealtimeSchema.Queries.* and RealtimeSchema.Mutations.*, and OCaml-backed mutation handlers can compose those generated helpers instead of hard-coding SQL strings in multiple places.
-- ============================================================================
-- AUTO-GENERATED TRIGGERS
-- Generated: 2026-04-02T10:30:00Z
-- Source: demos/ecommerce/server/sql/inventory.sql
-- DO NOT EDIT - Regenerate with `dune build`
-- ============================================================================
-- Drop existing triggers (for clean regeneration)
DROP TRIGGER IF EXISTS realtime_notify_inventory ON inventory;
DROP TRIGGER IF EXISTS realtime_notify_inventory_period_map ON inventory_period_map;
DROP FUNCTION IF EXISTS realtime_notify_inventory();
DROP FUNCTION IF EXISTS realtime_notify_inventory_period_map();
-- ============================================================================
-- TRIGGER: inventory (Direct Broadcast)
-- ============================================================================
CREATE OR REPLACE FUNCTION realtime_notify_inventory()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
channel_name TEXT;
record_id JSONB;
row_data JSONB;
BEGIN
-- Build ID
record_id := to_jsonb(CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END);
-- Build row data
row_data := CASE
WHEN TG_OP = 'DELETE' THEN NULL
ELSE to_jsonb(NEW)
END;
-- Build payload
payload := json_build_object(
'type', 'patch',
'table', 'inventory',
'id', record_id,
'action', TG_OP,
'data', row_data
);
-- Resolve channel (column=premise_id)
channel_name := CASE
WHEN TG_OP = 'DELETE' THEN OLD.premise_id::text
ELSE NEW.premise_id::text
END;
-- Broadcast and update timestamp
IF channel_name IS NOT NULL THEN
PERFORM pg_notify(channel_name, payload::text);
UPDATE premise SET updated_at = NOW() WHERE id = channel_name::UUID;
END IF;
-- Handle premise change on UPDATE
IF TG_OP = 'UPDATE' AND OLD.premise_id::text != NEW.premise_id::text THEN
PERFORM pg_notify(OLD.premise_id::text, payload::text);
UPDATE premise SET updated_at = NOW() WHERE id = OLD.premise_id;
END IF;
RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER realtime_notify_inventory
AFTER INSERT OR UPDATE OR DELETE ON inventory
FOR EACH ROW EXECUTE FUNCTION realtime_notify_inventory();
-- ============================================================================
-- TRIGGER: inventory_period_map (Parent Broadcast)
-- Broadcasts complete parent inventory using @query get_complete_inventory
-- ============================================================================
CREATE OR REPLACE FUNCTION realtime_notify_inventory_period_map()
RETURNS TRIGGER AS $$
DECLARE
parent_record RECORD;
complete_payload JSON;
channel_name TEXT;
parent_id UUID;
BEGIN
-- Get parent ID from changed row
parent_id := CASE
WHEN TG_OP = 'DELETE' THEN OLD.inventory_id
ELSE NEW.inventory_id
END;
-- Fetch complete parent using @query get_complete_inventory
SELECT
i.id,
i.premise_id,
i.name,
i.description,
i.quantity,
COALESCE(
JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
'[]'::jsonb
) as period_list
INTO parent_record
FROM inventory i
LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
LEFT JOIN period p ON p.id = pm.period_id
WHERE i.id = parent_id
GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;
IF parent_record IS NULL THEN
-- Parent might have been deleted
RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END IF;
-- Build complete patch payload
complete_payload := json_build_object(
'type', 'patch',
'table', 'inventory',
'id', to_jsonb(parent_record.id),
'action', CASE
WHEN TG_OP = 'DELETE' THEN 'DELETE'
ELSE 'UPDATE'
END,
'data', to_jsonb(parent_record)
);
-- Resolve channel from parent record
channel_name := parent_record.premise_id::text;
-- Broadcast and update timestamp
IF channel_name IS NOT NULL THEN
UPDATE premise SET updated_at = NOW() WHERE id = channel_name::UUID;
PERFORM pg_notify(channel_name, complete_payload::text);
END IF;
RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER realtime_notify_inventory_period_map
AFTER INSERT OR UPDATE OR DELETE ON inventory_period_map
FOR EACH ROW EXECUTE FUNCTION realtime_notify_inventory_period_map();/* Auto-generated migration tracking */
/* Schema hash: a1b2c3d4e5f6... */
let schema_version = "20260402103000";
let migrations = [
/* Combined DDL + Trigger migration */
"migration_20260402103000_initial.sql",
];
/* Migration content (generated as separate .sql files) */
module Migration_20260402103000 = {
let up = {|
-- ============================================================================
-- MIGRATION: 20260402103000_initial
-- ============================================================================
-- Create tables
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS inventory (
id uuid DEFAULT uuidv7() PRIMARY KEY,
premise_id uuid NOT NULL REFERENCES premise(id),
name varchar NOT NULL,
description varchar NOT NULL,
quantity int NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS inventory_period_map (
inventory_id uuid NOT NULL REFERENCES inventory(id),
period_id uuid NOT NULL REFERENCES period(id),
PRIMARY KEY (inventory_id, period_id)
);
-- Create triggers (auto-generated from annotations)
[SchemaTriggers.sql content here]
-- Track schema version
CREATE TABLE IF NOT EXISTS schema_migrations (
version varchar PRIMARY KEY,
applied_at timestamp DEFAULT NOW()
);
INSERT INTO schema_migrations (version) VALUES ('20260402103000')
ON CONFLICT (version) DO NOTHING;
|};
let down = {|
-- Rollback: 20260402103000_initial
DROP TRIGGER IF EXISTS realtime_notify_inventory ON inventory;
DROP TRIGGER IF EXISTS realtime_notify_inventory_period_map ON inventory_period_map;
DROP TABLE IF EXISTS inventory_period_map;
DROP TABLE IF EXISTS inventory;
DELETE FROM schema_migrations WHERE version = '20260402103000';
|};
};Patch decoding and CRUD logic use the generic StoreCrud module instead of per-table boilerplate:
/* In Store.re */
type patch = StoreCrud.patch(Model.InventoryItem.t);
/* Inside Runtime.Make functor body: */
let decodePatch =
StorePatch.compose([
StoreCrud.decodePatch(
~table=RealtimeSchema.table_name("inventory"),
~decodeRow=Model.InventoryItem.of_json,
(),
),
]);
let updateOfPatch = StoreCrud.updateOfPatch(
~getId=(item: Model.InventoryItem.t) => item.id,
~getItems=(config: config) => config.inventory,
~setItems=(config: config, items) => {...config, inventory: items},
);For multi-table stores, compose multiple StoreCrud.decodePatch calls and use a wrapped variant type:
type patch =
| InventoryPatch(StoreCrud.patch(Model.InventoryItem.t))
| UserPatch(StoreCrud.patch(User.t));The realtime schema system integrates via a PPX that reads SQL files at compile time and a CLI for trigger/migration generation.
The PPX reads annotated .sql files and generates an OCaml module with table/query metadata:
; demos/ecommerce/shared/js/dune
(library
(name common_js)
; ...
(preprocess
(pps realtime_schema_ppx)))/* demos/ecommerce/shared/js/RealtimeSchema.ml */
[%%realtime_schema "demos/ecommerce/server/sql"]This generates RealtimeSchema with:
find_table,find_column,table_namelookup helpersmodule Tableswith per-table metadata (name, id_column, composite_key)module Querieswith per-query metadata (sql, json_columns)generated_triggers_sqlfor auto-generated PL/pgSQL triggerslatest_migration_sqlfor schema migrationsschema_hashfor change detection
# Generate triggers, snapshot, and migrations
dune exec realtime-schema-codegen -- \
--sql-dir demos/ecommerce/server/sql \
--triggers-path demos/ecommerce/server/sql/generated/realtime.sql \
--snapshot-path demos/ecommerce/server/sql/generated/schema_snapshot.json \
--migrations-dir demos/ecommerce/server/sql/generated/migrationsServer code calls the generated Caqti helpers directly. No hand-written Caqti_request boilerplate is required:
(* List-collect query *)
let* item_rows =
Dream.sql request (fun db ->
RealtimeSchema.Queries.GetInventoryList.collect
db
RealtimeSchema.Queries.GetInventoryList.caqti_type
premise_id)
in
let items = Array.of_list item_rows(* Single-row lookup *)
let* item_row =
Dream.sql request (fun db ->
RealtimeSchema.Queries.GetCompleteInventory.find_opt
db
RealtimeSchema.Queries.GetCompleteInventory.caqti_type
item_id)(* Mutation *)
let* () =
RealtimeSchema.Mutations.AddTodo.exec db (list_id, text)If you need to build a custom request, you can still use RealtimeSchema.Queries.*.sql, RealtimeSchema.Mutations.*.sql, and RealtimeSchema.Queries.*.param_type:
let custom_request =
Caqti_request.Infix.(
RealtimeSchema.Queries.GetInventoryList.param_type ->*
RealtimeSchema.Queries.GetInventoryList.caqti_type
)(RealtimeSchema.Queries.GetInventoryList.sql)Client code sends named mutation commands over the active websocket connection, for example mutation add_todo {"list_id":"...","text":"..."}. The server executes the mutation, Postgres triggers broadcast a patch, and the client store applies that patch through StoreSync.
| SQL Type | OCaml Type | Caqti Type |
|---|---|---|
uuid |
string |
Caqti_type.string |
varchar, text |
string |
Caqti_type.string |
int, integer |
int |
Caqti_type.int |
bigint |
int64 |
Caqti_type.int64 |
boolean |
bool |
Caqti_type.bool |
timestamp, timestamptz |
Js.Date.t |
Custom float encoder |
json, jsonb |
'a (generic) |
Custom JSON encoder |
-- @query get_by_id
WHERE i.id = $1 -- Inferred: param $1 = string (from column type)- Define Schema: Create/modify
.sqlfiles with annotations - Build: Run
dune buildto generate all artifacts - Migrate: Apply generated migrations to database
- Deploy: Generated code is ready for server and client
- SQL-First: Standard PostgreSQL syntax, no new DSL to learn
- Explicit Control: Named queries referenced explicitly
- Type Safety: Compile-time guarantees for database operations
- Automatic Sync: Triggers keep clients updated automatically
- Complete Items: Parent items broadcast with all related data
- Migration Management: Version-controlled, reproducible schema changes
- Conditional broadcasting based on row values
- Multi-table broadcast aggregations
- Custom serialization formats
- Validation rules in annotations