diff --git a/.changeset/petite-rocks-lead.md b/.changeset/petite-rocks-lead.md new file mode 100644 index 000000000..b09047e02 --- /dev/null +++ b/.changeset/petite-rocks-lead.md @@ -0,0 +1,5 @@ +--- +"@browserbasehq/stagehand": patch +--- + +clean up cdp session event handlers on target detach diff --git a/packages/core/lib/v3/understudy/cdp.ts b/packages/core/lib/v3/understudy/cdp.ts index ffda2f267..a268e5662 100644 --- a/packages/core/lib/v3/understudy/cdp.ts +++ b/packages/core/lib/v3/understudy/cdp.ts @@ -208,6 +208,40 @@ export class CdpConnection implements CDPSessionLike { } } + private clearSessionEventHandlers(sessionId: string): void { + const prefix = `${sessionId}:`; + for (const key of Array.from(this.eventHandlers.keys())) { + if (key.startsWith(prefix)) { + this.eventHandlers.delete(key); + } + } + } + + private rejectSessionPendingWork( + sessionId: string, + targetId: string | null, + ): void { + for (const [id, entry] of this.inflight.entries()) { + if (entry.sessionId === sessionId) { + entry.reject( + new PageNotFoundError( + `target closed before CDP response (sessionId=${sessionId}, targetId=${targetId})`, + ), + ); + this.inflight.delete(id); + } + } + for (const waiter of Array.from(this.sessionDispatchWaiters)) { + if (waiter.sessionId === sessionId) { + waiter.reject( + new PageNotFoundError( + `target closed before CDP send (sessionId=${sessionId}, targetId=${targetId})`, + ), + ); + } + } + } + getSession(sessionId: string): CdpSession | undefined { return this.sessions.get(sessionId); } @@ -334,25 +368,8 @@ export class CdpConnection implements CDPSessionLike { } else if (msg.method === "Target.detachedFromTarget") { const p = (msg as { params: Protocol.Target.DetachedFromTargetEvent }) .params; - for (const [id, entry] of this.inflight.entries()) { - if (entry.sessionId === p.sessionId) { - entry.reject( - new PageNotFoundError( - `target closed before CDP response (sessionId=${p.sessionId}, targetId=${p.targetId})`, - ), - ); - this.inflight.delete(id); - } - } - for (const waiter of Array.from(this.sessionDispatchWaiters)) { - if (waiter.sessionId === p.sessionId) { - waiter.reject( - new PageNotFoundError( - `target closed before CDP send (sessionId=${p.sessionId}, targetId=${p.targetId})`, - ), - ); - } - } + this.rejectSessionPendingWork(p.sessionId, p.targetId ?? null); + this.clearSessionEventHandlers(p.sessionId); this.sessions.delete(p.sessionId); this.sessionToTarget.delete(p.sessionId); this.latestCdpCallEvent.delete(p.sessionId); @@ -361,9 +378,11 @@ export class CdpConnection implements CDPSessionLike { // Remove any session mapping for this target for (const [sessionId, targetId] of this.sessionToTarget.entries()) { if (targetId === p.targetId) { + this.rejectSessionPendingWork(sessionId, p.targetId); + this.clearSessionEventHandlers(sessionId); + this.sessions.delete(sessionId); this.sessionToTarget.delete(sessionId); this.latestCdpCallEvent.delete(sessionId); - break; } } } diff --git a/packages/core/tests/unit/cdp-connection-close.test.ts b/packages/core/tests/unit/cdp-connection-close.test.ts index 6418d5b43..fc60d670f 100644 --- a/packages/core/tests/unit/cdp-connection-close.test.ts +++ b/packages/core/tests/unit/cdp-connection-close.test.ts @@ -1,7 +1,11 @@ -import { describe, it, expect, afterEach } from "vitest"; +import { describe, it, expect, afterEach, vi } from "vitest"; import { WebSocketServer, type WebSocket as ServerWebSocket } from "ws"; import { CdpConnection } from "../../lib/v3/understudy/cdp.js"; +type ConnectionInternals = { + eventHandlers: Map>; +}; + /** * Races a promise against a timeout. Returns "resolved" if the promise * settles before the deadline, or "timeout" if it doesn't. @@ -40,11 +44,50 @@ async function createPair(): Promise<{ return { conn, serverSocket, wss }; } +async function sendCdpEvent( + serverSocket: ServerWebSocket, + message: Record, +): Promise { + await new Promise((resolve, reject) => { + serverSocket.send(JSON.stringify(message), (error) => { + if (error) reject(error); + else resolve(); + }); + }); + await new Promise((resolve) => setTimeout(resolve, 0)); +} + +async function waitForSession( + conn: CdpConnection, + sessionId: string, +): Promise> { + const session = await raceTimeout( + new Promise>((resolve) => { + const check = () => { + const found = conn.getSession(sessionId); + if (found) { + resolve(found); + return; + } + setTimeout(check, 0); + }; + check(); + }), + 3_000, + ); + + expect(session).not.toBe("timeout"); + return session as ReturnType; +} + describe("CdpConnection", () => { let wss: WebSocketServer | null = null; afterEach(async () => { if (wss) { + for (const client of wss.clients) { + client.terminate(); + } await new Promise((resolve) => wss!.close(() => resolve())); wss = null; } @@ -98,4 +141,252 @@ describe("CdpConnection", () => { expect(result).toBe("rejected"); }); }); + + describe("session event listener cleanup", () => { + it("removes session-scoped event handlers when a target detaches", async () => { + const pair = await createPair(); + wss = pair.wss; + + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId: "session-a", + targetInfo: { + targetId: "target-a", + type: "page", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + + const session = await waitForSession(pair.conn, "session-a"); + + const fetchHandlerA = vi.fn(); + const fetchHandlerB = vi.fn(); + session!.on("Fetch.requestPaused", fetchHandlerA); + session!.on("Fetch.requestPaused", fetchHandlerB); + session!.on("Network.requestWillBeSent", () => {}); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId: "session-b", + targetInfo: { + targetId: "target-b", + type: "iframe", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + + const otherSession = await waitForSession(pair.conn, "session-b"); + otherSession!.on("Fetch.requestPaused", () => {}); + + const rootHandler = vi.fn(); + pair.conn.on("Target.targetCreated", rootHandler); + + const eventHandlers = (pair.conn as unknown as ConnectionInternals) + .eventHandlers; + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(true); + expect(eventHandlers.get("session-a:Fetch.requestPaused")?.size).toBe(2); + expect(eventHandlers.has("session-a:Network.requestWillBeSent")).toBe( + true, + ); + expect(eventHandlers.has("session-b:Fetch.requestPaused")).toBe(true); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.detachedFromTarget", + params: { + sessionId: "session-a", + targetId: "target-a", + }, + }); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.targetCreated", + params: { + targetInfo: { + targetId: "target-b", + type: "page", + title: "", + url: "about:blank", + attached: false, + canAccessOpener: false, + }, + }, + }); + + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(false); + expect(eventHandlers.has("session-a:Network.requestWillBeSent")).toBe( + false, + ); + expect(eventHandlers.has("session-b:Fetch.requestPaused")).toBe(true); + expect(eventHandlers.has("Target.targetCreated")).toBe(true); + expect(rootHandler).toHaveBeenCalledOnce(); + }); + + it("removes session-scoped event handlers when a target is destroyed", async () => { + const pair = await createPair(); + wss = pair.wss; + + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId: "session-a", + targetInfo: { + targetId: "target-a", + type: "page", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + + const session = await waitForSession(pair.conn, "session-a"); + session!.on("Fetch.requestPaused", () => {}); + + const eventHandlers = (pair.conn as unknown as ConnectionInternals) + .eventHandlers; + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(true); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.targetDestroyed", + params: { + targetId: "target-a", + }, + }); + + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(false); + }); + + it("removes all session-scoped event handlers for a destroyed target", async () => { + const pair = await createPair(); + wss = pair.wss; + + for (const sessionId of ["session-a", "session-b"]) { + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId, + targetInfo: { + targetId: "target-a", + type: "page", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + } + + const sessionA = await waitForSession(pair.conn, "session-a"); + const sessionB = await waitForSession(pair.conn, "session-b"); + + sessionA!.on("Fetch.requestPaused", () => {}); + sessionB!.on("Fetch.requestPaused", () => {}); + + const eventHandlers = (pair.conn as unknown as ConnectionInternals) + .eventHandlers; + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(true); + expect(eventHandlers.has("session-b:Fetch.requestPaused")).toBe(true); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.targetDestroyed", + params: { + targetId: "target-a", + }, + }); + + expect(eventHandlers.has("session-a:Fetch.requestPaused")).toBe(false); + expect(eventHandlers.has("session-b:Fetch.requestPaused")).toBe(false); + }); + + it("rejects in-flight session sends when a target is destroyed", async () => { + const pair = await createPair(); + wss = pair.wss; + + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId: "session-a", + targetInfo: { + targetId: "target-a", + type: "page", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + + const session = await waitForSession(pair.conn, "session-a"); + + const pending = session!.send("Runtime.evaluate", { + expression: "1+1", + }); + const resultPromise = pending + .then(() => "resolved") + .catch(() => "rejected"); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.targetDestroyed", + params: { + targetId: "target-a", + }, + }); + + const result = await raceTimeout(resultPromise, 3_000); + + expect(result).toBe("rejected"); + }); + + it("rejects session dispatch waiters when a target is destroyed", async () => { + const pair = await createPair(); + wss = pair.wss; + + await sendCdpEvent(pair.serverSocket, { + method: "Target.attachedToTarget", + params: { + sessionId: "session-a", + targetInfo: { + targetId: "target-a", + type: "page", + title: "", + url: "about:blank", + attached: true, + canAccessOpener: false, + }, + }, + }); + + const pending = pair.conn.waitForSessionDispatch( + "session-a", + "Fetch.enable", + ); + const resultPromise = pending + .then(() => "resolved") + .catch(() => "rejected"); + + await sendCdpEvent(pair.serverSocket, { + method: "Target.targetDestroyed", + params: { + targetId: "target-a", + }, + }); + + const result = await raceTimeout(resultPromise, 3_000); + + expect(result).toBe("rejected"); + }); + }); });