Skip to content

Latest commit

 

History

History
567 lines (463 loc) · 17.7 KB

File metadata and controls

567 lines (463 loc) · 17.7 KB

Realtime Query Refactor: SQL-First Smart Realtime System

Overview

This document outlines the architecture for a SQL-first smart realtime system that automatically generates triggers, types, migrations, and named query/mutation metadata from annotated PostgreSQL SQL files using a PPX deriver.

Goals

  1. SQL-First DX: Use standard PostgreSQL syntax with embedded annotations
  2. Automatic Trigger Generation: Smart triggers that broadcast complete parent items
  3. Type Safety: Auto-generated OCaml types from SQL schema
  4. Migration Management: Combined DDL and trigger updates
  5. Compile-Time Safety: All code generated via PPX at build time
  6. Composable Writes: Named mutations become callable OCaml functions that can be composed by server-side handlers

Architecture

1. SQL Comment Annotation Syntax

Basic Table Annotations

-- @table <table_name>
-- @id_column <column_name>
-- @broadcast_channel column=<column_name>
CREATE TABLE inventory (
  id uuid DEFAULT uuidv7() PRIMARY KEY,
  premise_id uuid NOT NULL REFERENCES premise(id),
  name varchar NOT NULL,
  description varchar NOT NULL,
  quantity int NOT NULL DEFAULT 0
);

Junction Table with Parent Broadcast

-- @table inventory_period_map
-- @composite_key inventory_id, period_id
-- @broadcast_parent table=inventory query=get_complete_inventory
--   When this table changes, fetch the complete parent inventory
--   using the named query and broadcast it
CREATE TABLE inventory_period_map (
  inventory_id uuid NOT NULL REFERENCES inventory(id),
  period_id uuid NOT NULL REFERENCES period(id),
  PRIMARY KEY (inventory_id, period_id)
);

Advanced Broadcast Channel Examples

-- Simple: Single column channel
-- @broadcast_channel column=premise_id

-- Advanced: Computed channel (concatenate multiple columns)
-- @broadcast_channel computed="CONCAT(organization_id, ':', premise_id)"

-- Advanced: Conditional channel
-- @broadcast_channel conditional="CASE WHEN status = 'active' THEN premise_id ELSE NULL END"

-- Advanced: Subquery channel
-- @broadcast_channel subquery="SELECT premise_id FROM inventory WHERE id = NEW.inventory_id"

2. Named Query Definitions

Queries are read-only (SELECT) and produce named metadata used by both trigger generation and server-side query execution.

-- @query get_complete_inventory
--   Fetches complete inventory with all pricing periods
-- @cache_key inventory_id
SELECT 
  i.id,
  i.premise_id,
  i.name,
  i.description,
  i.quantity,
  COALESCE(
    JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
    '[]'::jsonb
  )::text as period_list
FROM inventory i
LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
LEFT JOIN period p ON p.id = pm.period_id
WHERE i.id = $1
GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;

-- @query get_inventory_list
--   Lists all inventory for a premise with pricing
SELECT 
  i.id,
  i.premise_id,
  i.name,
  i.description,
  i.quantity,
  COALESCE(
    JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
    '[]'::jsonb
  )::text as period_list
FROM inventory i
LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
LEFT JOIN period p ON p.id = pm.period_id
WHERE i.premise_id = $1
GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;

3. Named Mutation Definitions

Mutations are write operations and are also named. @handler sql is the default for single-statement writes. @handler ocaml is reserved for composed server-side logic that calls other named queries/mutations.

/*
@mutation add_todo
INSERT INTO todos (list_id, text) VALUES ($1, $2);
*/

/*
@mutation create_list
@handler ocaml
-- implemented in OCaml and free to compose other named mutations
*/

PPX Output

1. RealtimeSchema (Auto-Generated)

The PPX generates an OCaml module from annotated SQL files:

/* Auto-generated by [%realtime_schema "..."] */

type sql_type = Uuid | Varchar | Text | Int | Bigint | Boolean | ... ;
type broadcast_channel = Column of string | Computed of string | ... ;
type table_metadata = { name; columns; id_column; broadcast_channel; ... } ;
type handler = Sql | Ocaml ;
type query_metadata = { name; sql; json_columns; params; handler; ... } ;
type mutation_metadata = { name; sql; params; handler; ... } ;

let schema_hash = "..."
let tables : table_metadata list = [ ... ]
let queries : query_metadata list = [ ... ]
let mutations : mutation_metadata list = [ ... ]
let generated_triggers_sql = "..."
let latest_migration_sql = "..."

let find_query name = ...
let find_mutation name = ...
let find_table name = ...
let find_column table_name column_name = ...
let table_name logical_name = ...

module Tables = struct
  module Inventory = struct
    let name = "inventory"
    let id_column = Some "id"
    let composite_key = []
  end
end

module Queries = struct
  module GetInventoryList = struct
    let name = "get_inventory_list"
    let sql = "SELECT ..."
    let json_columns = ["period_list"]
    let handler = Sql

    type row = {
      id : string;
      premise_id : string;
      name : string;
      description : string;
      quantity : int;
      period_list : string;
    } [@@platform native]

    let caqti_type = Caqti_type.product(...) [@@platform native]
    let param_type = Caqti_type.string [@@platform native]
    let request row_type = Caqti_request.Infix.(param_type ->* row_type)(sql) [@@platform native]
    let find_request row_type = Caqti_request.Infix.(param_type ->? row_type)(sql) [@@platform native]
    let collect (module Db : Caqti_lwt.CONNECTION) row_type params = ... [@@platform native]
    let find_opt (module Db : Caqti_lwt.CONNECTION) row_type params = ... [@@platform native]
  end
end

module Mutations = struct
  module AddTodo = struct
    let name = "add_todo"
    let sql = "INSERT INTO todos ..."
    let handler = Sql

    let param_type = Caqti_type.t2(Caqti_type.string, Caqti_type.string) [@@platform native]
    let request = Caqti_request.Infix.(param_type ->. Caqti_type.unit)(sql) [@@platform native]
    let exec (module Db : Caqti_lwt.CONNECTION) params = ... [@@platform native]
  end
end

Every named query and mutation is emitted as reusable metadata. Server code can call generated Caqti functions directly from RealtimeSchema.Queries.* and RealtimeSchema.Mutations.*, and OCaml-backed mutation handlers can compose those generated helpers instead of hard-coding SQL strings in multiple places.

2. SchemaTriggers.sql (Auto-Generated)

-- ============================================================================
-- AUTO-GENERATED TRIGGERS
-- Generated: 2026-04-02T10:30:00Z
-- Source: demos/ecommerce/server/sql/inventory.sql
-- DO NOT EDIT - Regenerate with `dune build`
-- ============================================================================

-- Drop existing triggers (for clean regeneration)
DROP TRIGGER IF EXISTS realtime_notify_inventory ON inventory;
DROP TRIGGER IF EXISTS realtime_notify_inventory_period_map ON inventory_period_map;

DROP FUNCTION IF EXISTS realtime_notify_inventory();
DROP FUNCTION IF EXISTS realtime_notify_inventory_period_map();

-- ============================================================================
-- TRIGGER: inventory (Direct Broadcast)
-- ============================================================================
CREATE OR REPLACE FUNCTION realtime_notify_inventory()
RETURNS TRIGGER AS $$
DECLARE
    payload JSON;
    channel_name TEXT;
    record_id JSONB;
    row_data JSONB;
BEGIN
    -- Build ID
    record_id := to_jsonb(CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END);
    
    -- Build row data
    row_data := CASE 
        WHEN TG_OP = 'DELETE' THEN NULL 
        ELSE to_jsonb(NEW) 
    END;
    
    -- Build payload
    payload := json_build_object(
        'type', 'patch',
        'table', 'inventory',
        'id', record_id,
        'action', TG_OP,
        'data', row_data
    );
    
    -- Resolve channel (column=premise_id)
    channel_name := CASE 
        WHEN TG_OP = 'DELETE' THEN OLD.premise_id::text 
        ELSE NEW.premise_id::text 
    END;
    
    -- Broadcast and update timestamp
    IF channel_name IS NOT NULL THEN
        PERFORM pg_notify(channel_name, payload::text);
        UPDATE premise SET updated_at = NOW() WHERE id = channel_name::UUID;
    END IF;
    
    -- Handle premise change on UPDATE
    IF TG_OP = 'UPDATE' AND OLD.premise_id::text != NEW.premise_id::text THEN
        PERFORM pg_notify(OLD.premise_id::text, payload::text);
        UPDATE premise SET updated_at = NOW() WHERE id = OLD.premise_id;
    END IF;
    
    RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER realtime_notify_inventory
AFTER INSERT OR UPDATE OR DELETE ON inventory
FOR EACH ROW EXECUTE FUNCTION realtime_notify_inventory();

-- ============================================================================
-- TRIGGER: inventory_period_map (Parent Broadcast)
-- Broadcasts complete parent inventory using @query get_complete_inventory
-- ============================================================================
CREATE OR REPLACE FUNCTION realtime_notify_inventory_period_map()
RETURNS TRIGGER AS $$
DECLARE
    parent_record RECORD;
    complete_payload JSON;
    channel_name TEXT;
    parent_id UUID;
BEGIN
    -- Get parent ID from changed row
    parent_id := CASE 
        WHEN TG_OP = 'DELETE' THEN OLD.inventory_id 
        ELSE NEW.inventory_id 
    END;
    
    -- Fetch complete parent using @query get_complete_inventory
    SELECT 
        i.id,
        i.premise_id,
        i.name,
        i.description,
        i.quantity,
        COALESCE(
            JSONB_AGG(TO_JSONB(p.*)) FILTER (WHERE p.id IS NOT NULL),
            '[]'::jsonb
        ) as period_list
    INTO parent_record
    FROM inventory i
    LEFT JOIN inventory_period_map pm ON pm.inventory_id = i.id
    LEFT JOIN period p ON p.id = pm.period_id
    WHERE i.id = parent_id
    GROUP BY i.id, i.premise_id, i.name, i.description, i.quantity;
    
    IF parent_record IS NULL THEN
        -- Parent might have been deleted
        RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
    END IF;
    
    -- Build complete patch payload
    complete_payload := json_build_object(
        'type', 'patch',
        'table', 'inventory',
        'id', to_jsonb(parent_record.id),
        'action', CASE 
            WHEN TG_OP = 'DELETE' THEN 'DELETE' 
            ELSE 'UPDATE' 
        END,
        'data', to_jsonb(parent_record)
    );
    
    -- Resolve channel from parent record
    channel_name := parent_record.premise_id::text;
    
    -- Broadcast and update timestamp
    IF channel_name IS NOT NULL THEN
        UPDATE premise SET updated_at = NOW() WHERE id = channel_name::UUID;
        PERFORM pg_notify(channel_name, complete_payload::text);
    END IF;
    
    RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER realtime_notify_inventory_period_map
AFTER INSERT OR UPDATE OR DELETE ON inventory_period_map
FOR EACH ROW EXECUTE FUNCTION realtime_notify_inventory_period_map();

4. Migration Generation (Auto-Generated)

/* Auto-generated migration tracking */
/* Schema hash: a1b2c3d4e5f6... */

let schema_version = "20260402103000";

let migrations = [
  /* Combined DDL + Trigger migration */
  "migration_20260402103000_initial.sql",
];

/* Migration content (generated as separate .sql files) */
module Migration_20260402103000 = {
  let up = {|
-- ============================================================================
-- MIGRATION: 20260402103000_initial
-- ============================================================================

-- Create tables
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE IF NOT EXISTS inventory (
  id uuid DEFAULT uuidv7() PRIMARY KEY,
  premise_id uuid NOT NULL REFERENCES premise(id),
  name varchar NOT NULL,
  description varchar NOT NULL,
  quantity int NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS inventory_period_map (
  inventory_id uuid NOT NULL REFERENCES inventory(id),
  period_id uuid NOT NULL REFERENCES period(id),
  PRIMARY KEY (inventory_id, period_id)
);

-- Create triggers (auto-generated from annotations)
[SchemaTriggers.sql content here]

-- Track schema version
CREATE TABLE IF NOT EXISTS schema_migrations (
  version varchar PRIMARY KEY,
  applied_at timestamp DEFAULT NOW()
);

INSERT INTO schema_migrations (version) VALUES ('20260402103000')
ON CONFLICT (version) DO NOTHING;
|};

  let down = {|
-- Rollback: 20260402103000_initial
DROP TRIGGER IF EXISTS realtime_notify_inventory ON inventory;
DROP TRIGGER IF EXISTS realtime_notify_inventory_period_map ON inventory_period_map;
DROP TABLE IF EXISTS inventory_period_map;
DROP TABLE IF EXISTS inventory;
DELETE FROM schema_migrations WHERE version = '20260402103000';
|};
};

5. Client-Side Patch Handling (StoreCrud)

Patch decoding and CRUD logic use the generic StoreCrud module instead of per-table boilerplate:

/* In Store.re */
type patch = StoreCrud.patch(Model.InventoryItem.t);

/* Inside Runtime.Make functor body: */
let decodePatch =
  StorePatch.compose([
    StoreCrud.decodePatch(
      ~table=RealtimeSchema.table_name("inventory"),
      ~decodeRow=Model.InventoryItem.of_json,
      (),
    ),
  ]);

let updateOfPatch = StoreCrud.updateOfPatch(
  ~getId=(item: Model.InventoryItem.t) => item.id,
  ~getItems=(config: config) => config.inventory,
  ~setItems=(config: config, items) => {...config, inventory: items},
);

For multi-table stores, compose multiple StoreCrud.decodePatch calls and use a wrapped variant type:

type patch =
  | InventoryPatch(StoreCrud.patch(Model.InventoryItem.t))
  | UserPatch(StoreCrud.patch(User.t));

Build Integration (Dune)

The realtime schema system integrates via a PPX that reads SQL files at compile time and a CLI for trigger/migration generation.

PPX: RealtimeSchema module

The PPX reads annotated .sql files and generates an OCaml module with table/query metadata:

; demos/ecommerce/shared/js/dune
(library
 (name common_js)
 ; ...
 (preprocess
  (pps realtime_schema_ppx)))
/* demos/ecommerce/shared/js/RealtimeSchema.ml */
[%%realtime_schema "demos/ecommerce/server/sql"]

This generates RealtimeSchema with:

  • find_table, find_column, table_name lookup helpers
  • module Tables with per-table metadata (name, id_column, composite_key)
  • module Queries with per-query metadata (sql, json_columns)
  • generated_triggers_sql for auto-generated PL/pgSQL triggers
  • latest_migration_sql for schema migrations
  • schema_hash for change detection

CLI: Trigger and Migration Generation

# Generate triggers, snapshot, and migrations
dune exec realtime-schema-codegen -- \
  --sql-dir demos/ecommerce/server/sql \
  --triggers-path demos/ecommerce/server/sql/generated/realtime.sql \
  --snapshot-path demos/ecommerce/server/sql/generated/schema_snapshot.json \
  --migrations-dir demos/ecommerce/server/sql/generated/migrations

Server-Side: Caqti Queries and Mutations

Server code calls the generated Caqti helpers directly. No hand-written Caqti_request boilerplate is required:

(* List-collect query *)
let* item_rows =
  Dream.sql request (fun db ->
    RealtimeSchema.Queries.GetInventoryList.collect
      db
      RealtimeSchema.Queries.GetInventoryList.caqti_type
      premise_id)
in
let items = Array.of_list item_rows
(* Single-row lookup *)
let* item_row =
  Dream.sql request (fun db ->
    RealtimeSchema.Queries.GetCompleteInventory.find_opt
      db
      RealtimeSchema.Queries.GetCompleteInventory.caqti_type
      item_id)
(* Mutation *)
let* () =
  RealtimeSchema.Mutations.AddTodo.exec db (list_id, text)

If you need to build a custom request, you can still use RealtimeSchema.Queries.*.sql, RealtimeSchema.Mutations.*.sql, and RealtimeSchema.Queries.*.param_type:

let custom_request =
  Caqti_request.Infix.(
    RealtimeSchema.Queries.GetInventoryList.param_type ->*
    RealtimeSchema.Queries.GetInventoryList.caqti_type
  )(RealtimeSchema.Queries.GetInventoryList.sql)

Client code sends named mutation commands over the active websocket connection, for example mutation add_todo {"list_id":"...","text":"..."}. The server executes the mutation, Postgres triggers broadcast a patch, and the client store applies that patch through StoreSync.

Type Inference

SQL to OCaml Type Mapping

SQL Type OCaml Type Caqti Type
uuid string Caqti_type.string
varchar, text string Caqti_type.string
int, integer int Caqti_type.int
bigint int64 Caqti_type.int64
boolean bool Caqti_type.bool
timestamp, timestamptz Js.Date.t Custom float encoder
json, jsonb 'a (generic) Custom JSON encoder

Query Parameter Inference

-- @query get_by_id
WHERE i.id = $1  -- Inferred: param $1 = string (from column type)

Workflow

  1. Define Schema: Create/modify .sql files with annotations
  2. Build: Run dune build to generate all artifacts
  3. Migrate: Apply generated migrations to database
  4. Deploy: Generated code is ready for server and client

Benefits

  • SQL-First: Standard PostgreSQL syntax, no new DSL to learn
  • Explicit Control: Named queries referenced explicitly
  • Type Safety: Compile-time guarantees for database operations
  • Automatic Sync: Triggers keep clients updated automatically
  • Complete Items: Parent items broadcast with all related data
  • Migration Management: Version-controlled, reproducible schema changes

Future Enhancements

  • Conditional broadcasting based on row values
  • Multi-table broadcast aggregations
  • Custom serialization formats
  • Validation rules in annotations