Skip to content

Commit 7bba7fc

Browse files
committed
feat(new-deepnotes): Postgres ACL for realtime page and group Redis hashes
Wire Hyperdrive in UserRealtimeRoom so executeRealtimeWsBatch can authorize page: and group: HGET/HSET/SUBSCRIBE via resolveRealtimeHashFieldAccess (viewGroupPages / editGroupPages). Keep user-only sync gate when Hyperdrive is absent. Add Vitest for ACL-injected page HGET; update TRPC_REST_MAP and PLAN_PROGRESS.
1 parent 81b7d42 commit 7bba7fc

7 files changed

Lines changed: 274 additions & 11 deletions

File tree

new-deepnotes/PLAN_PROGRESS.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313
| Area | State |
1414
|------|--------|
15-
| **API / REST** | TRPC_REST_MAP HTTP rows largely done; **collab WS** MVP (**Durable Object** relay + Postgres append, legacy lib0 framing via `@deepnotes/collab-wire`); **realtime** **`GET /api/realtime-ws`** + **`UserRealtimeRoom` DO** + `@deepnotes/realtime-wire`. Framing + Vitest round-trips; **`USER_NOTIFICATION`** push after `performNotifyUsers`. **Hash slice:** when **Upstash** is configured (`UPSTASH_REDIS_*`), DO handles legacy **REQUEST** batches for **`user:{userId}`** Redis (**HGET**/**HSET**/**SUBSCRIBE**/**UNSUBSCRIBE****RESPONSE** + **DATA_NOTIFICATION** intra-DO fan-out). **Still open:** **`page`** / **`group`** hash ACL vs Postgres (legacy DataAbstraction); cross-process Redis pub/sub (legacy `expiremember`/KeyDB) not replicated. SPA remains REST-heavy for titles until client opts into hash WS. |
16-
| **SPA** | Auth, lists, **`PageEditorView`** (Tiptap+Y+collab **WebSocket when configured** else REST, **encrypted awareness + collaboration carets** on WS, link/underline/placeholder, **tables, images, tasks, code (lowlight), KaTeX math (inline + block), YouTube embeds (Vue node view + resize handle like legacy)**, highlight, align, sub/sup, HR, path breadcrumb, bump/favorite/recent, **snapshots list/save/restore/delete**, **set-as-main + soft-delete + purge**, **cross-group move/reencrypt**), groups/**members**/invite/join + **group settings** (join policy, soft-delete, **make public / make private**, **group purge**), notifications list **with decrypt**, **`/account`**, home **recents/favorites/starting/defaults** UIs, theme, **live notification toast** when **`/api/realtime-ws`** connected. Missing: **`page`/`group` realtime hashes + SPA wired** (full legacy parity; worker has **`user:{id}`** hash slice when Upstash is set); **spatial/world** canvas, native shells. |
15+
| **API / REST** | TRPC_REST_MAP HTTP rows largely done; **collab WS** MVP (**Durable Object** relay + Postgres append, legacy lib0 framing via `@deepnotes/collab-wire`); **realtime** **`GET /api/realtime-ws`** + **`UserRealtimeRoom` DO** + `@deepnotes/realtime-wire`. Framing + Vitest round-trips; **`USER_NOTIFICATION`** push after `performNotifyUsers`. **Hash slice:** when **Upstash** (`UPSTASH_REDIS_*`) **and** **`HYPERDRIVE`** are set, DO serves **`user:`** / **`page:`** / **`group:`** Redis hashes with Postgres ACL (`resolveRealtimeHashFieldAccess`: read=`viewGroupPages`, write=`editGroupPages`). With Upstash only, **`user:{id}`** still works (sync user-only gate). **Still open:** cross-process Redis pub/sub (legacy `expiremember`/KeyDB); SPA still REST-first for titles until it sends legacy **REQUEST** batches. |
16+
| **SPA** | Auth, lists, **`PageEditorView`** (Tiptap+Y+collab **WebSocket when configured** else REST, **encrypted awareness + collaboration carets** on WS, link/underline/placeholder, **tables, images, tasks, code (lowlight), KaTeX math (inline + block), YouTube embeds (Vue node view + resize handle like legacy)**, highlight, align, sub/sup, HR, path breadcrumb, bump/favorite/recent, **snapshots list/save/restore/delete**, **set-as-main + soft-delete + purge**, **cross-group move/reencrypt**), groups/**members**/invite/join + **group settings** (join policy, soft-delete, **make public / make private**, **group purge**), notifications list **with decrypt**, **`/account`**, home **recents/favorites/starting/defaults** UIs, theme, **live notification toast** when **`/api/realtime-ws`** connected. Missing: **legacy RealtimeClient-style REQUEST batches** from SPA for **`page`/`group`** hash titles (API path ready when Upstash+HYPERDRIVE); **spatial/world** canvas, native shells. |
1717

1818
**Deferred (confirm vs parity):** optional anon `GET …/groups/:id/pages`; richer Stripe webhook tests. **Infra naming:** Workers/DO replaces standalone collab/realtime/scheduler processes.
1919

@@ -26,7 +26,7 @@
2626
| **0** | Done | Map, OpenAPI, Drizzle baseline, CLIENT_FORKS |
2727
| **1** | Skip? | Legacy monorepo only |
2828
| **2** | Done | Turbo/CI/template DB/deploy docs |
29-
| **3** | WIP | **realtime:** `USER_NOTIFICATION` WS + DO + invite **`notifyUsers`**; **`user`-hash** REQUEST (**Upstash**) + RESP/DATA_NOTIFICATION in-DO (**`page`/`group` ACL + cross-node pub/sub** still open); collab WS MVP **done** |
29+
| **3** | WIP | **realtime:** `USER_NOTIFICATION` WS + DO + invite **`notifyUsers`**; hash **REQUEST** + **`page`/`group` Postgres ACL** when Hyperdrive + Upstash; cross-node Redis pub/sub **still open**; collab WS MVP **done** |
3030
| **4** | WIP | See checklist below |
3131
| **5** | Todo | Cutover after **parity gate** + metrics + decrypt spot-checks |
3232

@@ -46,7 +46,7 @@
4646
- [x] `[parity]` Page ops + group settings — **make private** + cross-group **move/reencrypt** + **purge** UI (page + group); **make public**, snapshots + main + soft-delete **done** in SPA
4747
- [ ] `[parity]` Editor UX vs legacy (rich + spatial/world if in scope) — **partial:** WS awareness+carets; TipTap **tables, images, tasks**, **code (lowlight), math (inline + block), YouTube** (Vue node view + resize handle), typography (highlight, align, sub/sup, HR), link/underline/placeholder (**no** spatial/world canvas yet)
4848
- [x] `[parity]` Notifications: decrypt/display as legacy
49-
- [x] Legacy **realtime****partial:** `USER_NOTIFICATION` over **`/api/realtime-ws`** + per-user DO (`@deepnotes/realtime-wire`); join-invite **DB notifications** + E2EE payloads; **`user:{id}`** hash **REQUEST**/RESP (**Upstash** optional); **`page`/`group`** caches + Redis pub/sub parity **not** done
49+
- [x] Legacy **realtime****partial:** `USER_NOTIFICATION` + hash **REQUEST** (`user` / `page` / `group` when **Upstash** + **Hyperdrive**); join-invite **DB notifications** + E2EE payloads; cross-process Redis pub/sub parity **not** done
5050
- [ ] Capacitor/Tauri **after web parity**
5151

5252
---
@@ -80,6 +80,7 @@
8080

8181
| Date | Note |
8282
|------|------|
83+
| 2026-04-29 | **Realtime hash ACL (Postgres):** `resolveRealtimeHashFieldAccess` in `@deepnotes/session` for **`page:`** / **`group:`** (`viewGroupPages` / `editGroupPages`); `UserRealtimeRoom` wires **Hyperdrive** + Vitest mock-ACL **page** HGET. |
8384
| 2026-04-29 | **Realtime Redis hash slice (Upstash):** `UserRealtimeRoom` handles binary **REQUEST** (`executeRealtimeWsBatch`); **`user:{userId}`** HGET/HSET/SUBSCRIBE/UNSUBSCRIBE + subscriber fan-out; **`realtime-ws-batch.test.ts`**. |
8485
| 2026-04-29 | **Realtime wire framing parity:** `@deepnotes/realtime-wire` decodes/encodes legacy **RESPONSE**, **DATA_NOTIFICATION**, and client **REQUEST** batches (msgpackr + lib0) with Vitest; Worker DO unchanged (`USER_NOTIFICATION` only). Moves toward Redis hash cache parity without E2E. |
8586
| 2026-04-29 | **Realtime `USER_NOTIFICATION` parity slice:** `@deepnotes/realtime-wire`, `UserRealtimeRoom` DO, `GET /api/realtime-ws`, `performNotifyUsers` + join-invite optional `notifications` + bootstrap `?inviteeUserId=` keyrings; SPA toast + `buildGroupInviteSentNotifications`. |

new-deepnotes/apps/api-worker/src/realtime-ws-batch.test.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import {
1010
canRealtimeHashAccess,
1111
executeRealtimeWsBatch,
1212
realtimeFullKey,
13+
redisHashKey,
14+
type RealtimeHashAclPort,
1315
type RealtimeHashPort,
1416
} from "./realtime-ws-batch.js";
1517

@@ -52,7 +54,7 @@ function createMemoryHashPort(
5254
}
5355

5456
describe("executeRealtimeWsBatch", () => {
55-
it("denies group/page hash access (user-only until Postgres ACL)", () => {
57+
it("denies page/group hash without Postgres ACL port (user-only sync)", () => {
5658
expect(canRealtimeHashAccess("u1", "group", "g1")).toBe(false);
5759
expect(canRealtimeHashAccess("u1", "page", "p1")).toBe(false);
5860
expect(canRealtimeHashAccess("u1", "user", "u1")).toBe(true);
@@ -86,6 +88,7 @@ describe("executeRealtimeWsBatch", () => {
8688
decoded,
8789
redis: port,
8890
hooks,
91+
acl: null,
8992
});
9093
expect(out.responseBytes).not.toBeNull();
9194
expect(out.subscribeNotifyBytes).toBeNull();
@@ -120,6 +123,7 @@ describe("executeRealtimeWsBatch", () => {
120123
},
121124
unsubscribeField: () => {},
122125
},
126+
acl: null,
123127
});
124128
expect(subs).toEqual([realtimeFullKey("user", "u1", "email")]);
125129
expect(out.subscribeNotifyBytes).not.toBeNull();
@@ -145,10 +149,46 @@ describe("executeRealtimeWsBatch", () => {
145149
decoded,
146150
redis: port,
147151
hooks: { subscribeField: () => {}, unsubscribeField: () => {} },
152+
acl: null,
148153
});
149154
expect(out.hsetBroadcastItems).toHaveLength(1);
150155
expect(out.hsetBroadcastItems[0]?.fullKey).toBe(
151156
realtimeFullKey("user", "u1", "email"),
152157
);
153158
});
159+
160+
it("HGET on page hash succeeds when ACL grants read", async () => {
161+
const { port } = createMemoryHashPort({
162+
"page:p9": { "encrypted-relative-title": "enc" },
163+
});
164+
const req = encodeRealtimeClientRequest({
165+
firstCommandId: 5,
166+
commands: [
167+
{
168+
type: RealtimeCommandType.HGET,
169+
args: ["page", "p9", "encrypted-relative-title"],
170+
},
171+
],
172+
});
173+
const decoded = decodeRealtimeClientBinaryMessage(req);
174+
if (decoded == null) {
175+
throw new Error("decode");
176+
}
177+
const acl: RealtimeHashAclPort = {
178+
async resolveBatch(needs) {
179+
expect(needs.get(redisHashKey("page", "p9"))?.read).toBe(true);
180+
const m = new Map<string, { readOk: boolean; writeOk: boolean }>();
181+
m.set(redisHashKey("page", "p9"), { readOk: true, writeOk: true });
182+
return m;
183+
},
184+
};
185+
const out = await executeRealtimeWsBatch({
186+
userId: "u1",
187+
decoded,
188+
redis: port,
189+
hooks: { subscribeField: () => {}, unsubscribeField: () => {} },
190+
acl,
191+
});
192+
expect(out.responseBytes).not.toBeNull();
193+
});
154194
});

new-deepnotes/apps/api-worker/src/realtime-ws-batch.ts

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* LEGACY-compat realtime hash commands over WS (RESPONSE / DATA_NOTIFICATION).
3-
* Only `user:{userId}` Redis hashes are allowed for parity until group/page ACL is wired via Postgres.
3+
* `user:{id}` allowed on the isolate; `page:` / `group:` when `acl` resolves Postgres membership.
44
*/
55
import {
66
RealtimeCommandType,
@@ -32,6 +32,75 @@ export function canRealtimeHashAccess(
3232
return false;
3333
}
3434

35+
export type RealtimeHashAclPort = {
36+
resolveBatch(
37+
needs: Map<string, { read: boolean; write: boolean }>,
38+
): Promise<Map<string, { readOk: boolean; writeOk: boolean }>>;
39+
};
40+
41+
function accumulateRealtimeHashNeeds(
42+
meta: { cmd: RealtimeClientCommand; commandId: number }[],
43+
): Map<string, { read: boolean; write: boolean }> {
44+
const needs = new Map<string, { read: boolean; write: boolean }>();
45+
function add(key: string, read: boolean, write: boolean): void {
46+
const cur = needs.get(key) ?? { read: false, write: false };
47+
if (read) {
48+
cur.read = true;
49+
}
50+
if (write) {
51+
cur.write = true;
52+
}
53+
needs.set(key, cur);
54+
}
55+
for (const { cmd } of meta) {
56+
switch (cmd.type) {
57+
case RealtimeCommandType.HGET: {
58+
const t = parseTripleArgs(cmd.args);
59+
if (t != null) {
60+
add(redisHashKey(t[0], t[1]), true, false);
61+
}
62+
break;
63+
}
64+
case RealtimeCommandType.SUBSCRIBE: {
65+
const t = parseTripleArgs(cmd.args);
66+
if (t != null) {
67+
add(redisHashKey(t[0], t[1]), true, false);
68+
}
69+
break;
70+
}
71+
case RealtimeCommandType.HSET: {
72+
const t = parseHSetArgs(cmd.args);
73+
if (t != null) {
74+
add(redisHashKey(t.prefix, t.suffix), false, true);
75+
}
76+
break;
77+
}
78+
default:
79+
break;
80+
}
81+
}
82+
return needs;
83+
}
84+
85+
function syncResolveRealtimeHashAccess(
86+
userId: string,
87+
needs: Map<string, { read: boolean; write: boolean }>,
88+
): Map<string, { readOk: boolean; writeOk: boolean }> {
89+
const out = new Map<string, { readOk: boolean; writeOk: boolean }>();
90+
for (const [key, need] of needs) {
91+
const i = key.indexOf(":");
92+
const prefix = i > 0 ? key.slice(0, i) : "";
93+
const suffix = i > 0 ? key.slice(i + 1) : "";
94+
const allowed =
95+
prefix !== "" && canRealtimeHashAccess(userId, prefix, suffix);
96+
out.set(key, {
97+
readOk: !need.read || allowed,
98+
writeOk: !need.write || allowed,
99+
});
100+
}
101+
return out;
102+
}
103+
35104
export type RealtimeHashPort = {
36105
hmget(key: string, fields: readonly string[]): Promise<(unknown | null)[]>;
37106
hset(key: string, entries: Record<string, unknown>): Promise<void>;
@@ -85,13 +154,22 @@ export async function executeRealtimeWsBatch(input: {
85154
decoded: DecodedRealtimeClientRequest;
86155
redis: RealtimeHashPort | null;
87156
hooks: RealtimeBatchHooks;
157+
acl: RealtimeHashAclPort | null;
88158
}): Promise<ExecuteRealtimeWsBatchResult> {
89-
const { userId, decoded, redis, hooks } = input;
159+
const { userId, decoded, redis, hooks, acl } = input;
90160
const meta = decoded.commands.map((cmd, i) => ({
91161
cmd,
92162
commandId: decoded.firstCommandId + i,
93163
}));
94164

165+
const needs = accumulateRealtimeHashNeeds(meta);
166+
const resolved =
167+
needs.size === 0
168+
? new Map<string, { readOk: boolean; writeOk: boolean }>()
169+
: acl != null
170+
? await acl.resolveBatch(needs)
171+
: syncResolveRealtimeHashAccess(userId, needs);
172+
95173
const hgetResponses = new Map<number, unknown>();
96174

97175
type AllowedHGet = {
@@ -112,7 +190,7 @@ export async function executeRealtimeWsBatch(input: {
112190
continue;
113191
}
114192
const [prefix, suffix, field] = t;
115-
if (!canRealtimeHashAccess(userId, prefix, suffix)) {
193+
if (!resolved.get(redisHashKey(prefix, suffix))?.readOk) {
116194
hgetResponses.set(row.commandId, undefined);
117195
continue;
118196
}
@@ -168,7 +246,7 @@ export async function executeRealtimeWsBatch(input: {
168246
if (t == null) {
169247
return;
170248
}
171-
if (!canRealtimeHashAccess(userId, t.prefix, t.suffix)) {
249+
if (!resolved.get(redisHashKey(t.prefix, t.suffix))?.writeOk) {
172250
return;
173251
}
174252
const fk = realtimeFullKey(t.prefix, t.suffix, t.field);
@@ -193,7 +271,7 @@ export async function executeRealtimeWsBatch(input: {
193271
}
194272
const [prefix, suffix, field] = t;
195273
const fk = realtimeFullKey(prefix, suffix, field);
196-
if (!canRealtimeHashAccess(userId, prefix, suffix)) {
274+
if (!resolved.get(redisHashKey(prefix, suffix))?.readOk) {
197275
subscribeItems.push({ prefix, suffix, field, value: undefined });
198276
return;
199277
}

new-deepnotes/apps/api-worker/src/user-realtime-room.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@ import {
44
encodeRealtimeServerDataNotification,
55
encodeUserNotificationServerMessage,
66
} from "@deepnotes/realtime-wire";
7+
import { resolveRealtimeHashFieldAccess } from "@deepnotes/session";
78

89
import {
910
executeRealtimeWsBatch,
11+
type RealtimeHashAclPort,
1012
type RealtimeHashPort,
1113
} from "./realtime-ws-batch.js";
14+
import { getDbForConnectionString } from "./db-pool.js";
1215

1316
export type UserRealtimeRoomEnv = {
1417
REALTIME_INTERNAL_SECRET?: string;
1518
UPSTASH_REDIS_REST_URL?: string;
1619
UPSTASH_REDIS_REST_TOKEN?: string;
20+
/** When set, `page:` / `group:` hash REQUEST batches use Postgres-backed ACL. */
21+
HYPERDRIVE?: Hyperdrive;
1722
};
1823

1924
/**
@@ -182,10 +187,24 @@ export class UserRealtimeRoom {
182187
}
183188

184189
try {
190+
const hyper = this.env.HYPERDRIVE;
191+
const acl: RealtimeHashAclPort | null =
192+
hyper != null
193+
? {
194+
resolveBatch: (needs) =>
195+
resolveRealtimeHashFieldAccess({
196+
db: getDbForConnectionString(hyper.connectionString),
197+
userId,
198+
needs,
199+
}),
200+
}
201+
: null;
202+
185203
const out = await executeRealtimeWsBatch({
186204
userId,
187205
decoded,
188206
redis: this.hashPort(),
207+
acl,
189208
hooks: {
190209
subscribeField: (fk) => {
191210
this.addSubscription(fk, ws);

new-deepnotes/docs/TRPC_REST_MAP.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Working checklist for Phase 0 of [docs/RESTART_PLAN.md](../../docs/RESTART_PLAN.
9595
| Load encrypted Yjs update chain from DB | `GET /api/pages/:pageId/collab-updates` (**implemented**`performGetPageCollabUpdates`; `viewGroupPages`; Postgres `page_updates`; response includes **encrypted page titles** (`encrypted_relative_title` / `encrypted_absolute_title` b64), `groupId`, `pageEncryptedSymmetricKeyring`, `groupEncryptedContentKeyring`, `groupAccessKeyring`, `memberEncryptedAccessKeyring` for client-side unwrap + cross-group move) | Replaces initial `ALL_UPDATES_UNMERGED`-style payload for SPA bootstrap; binary collab WebSocket is still [Phase 3 — realtime/collab](../PLAN_PROGRESS.md#not-started-phase-3--realtime--collab-only). |
9696
| Append updates (optimistic concurrency) | `POST /api/pages/:pageId/collab-updates` (**implemented**`performAppendPageCollabUpdates`; `editGroupPages`; body `expectedLastIndex` + `updates[]`) | **409** when `expectedLastIndex` is stale. |
9797
| Live duplex collab (Yjs + awareness relay, Postgres persist) | `GET /api/pages/:pageId/collab-ws` (**WebSocket Upgrade**`PageCollabRoom` Durable Object; cookie auth; internal `POST /api/internal/pages/:pageId/collab-ws-append` with `COLLAB_INTERNAL_SECRET`) | Greenfield path: legacy **lib0** `CollabMessageType` framing via `@deepnotes/collab-wire`; no Redis merge / **no** key rotation (RESTART_PLAN). SPA falls back to REST when WS unavailable. |
98-
| Live `USER_NOTIFICATION` (legacy Redis pub/sub) | `GET /api/realtime-ws` (**WebSocket**`UserRealtimeRoom` Durable Object per `userId`; worker pushes framed payloads after `performNotifyUsers` with `REALTIME_INTERNAL_SECRET`) | `@deepnotes/realtime-wire` framing. Join-invite path persists rows + optional SPA E2EE `notifications` on `POST …/join-invitations`. **Hash cache (partial):** when `UPSTASH_REDIS_*` is set, accepts legacy **REQUEST** batches for **HGET / HSET / SUBSCRIBE / UNSUBSCRIBE** on Redis key `user:{userId}` (Upstash); **RESPONSE** + **DATA_NOTIFICATION** (in-DO subscriber fan-out on HSET); **no** full legacy `DataAbstraction` ACL for `page` / `group` yet (Postgres-backed checks TBD). |
98+
| Live `USER_NOTIFICATION` (legacy Redis pub/sub) | `GET /api/realtime-ws` (**WebSocket**`UserRealtimeRoom` Durable Object per `userId`; worker pushes framed payloads after `performNotifyUsers` with `REALTIME_INTERNAL_SECRET`) | `@deepnotes/realtime-wire` framing. Join-invite path persists rows + optional SPA E2EE `notifications` on `POST …/join-invitations`. **Hash cache:** when `UPSTASH_REDIS_*` is set, accepts legacy **REQUEST** batches (**HGET / HSET / SUBSCRIBE / UNSUBSCRIBE**) on Redis keys `user:{userId}` plus `page:{pageId}` / `group:{groupId}` when **`HYPERDRIVE`** is configured — ACL via `@deepnotes/session` `resolveRealtimeHashFieldAccess` (read=`viewGroupPages`, write=`editGroupPages`). **RESPONSE** + **DATA_NOTIFICATION** (in-DO subscriber fan-out on HSET). **Not** replicated: cross-process Redis pub/sub, KeyDB `expiremember`. |
9999

100100
## Legacy app-server WebSocket → target
101101

new-deepnotes/packages/session/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ export type {
127127
NotifyUsersItem,
128128
RealtimeNotificationDelivery,
129129
} from "./notify-users.js";
130+
export { resolveRealtimeHashFieldAccess } from "./realtime-hash-acl.js";
131+
export type { RealtimeHashAccessNeeds } from "./realtime-hash-acl.js";
130132
export type { StripeBillingEnv } from "./stripe-billing.js";
131133
export {
132134
findUserIdByStripeCustomerId,

0 commit comments

Comments
 (0)