diff --git a/.changeset/spicy-sessions-pin.md b/.changeset/spicy-sessions-pin.md new file mode 100644 index 000000000..e19705b3b --- /dev/null +++ b/.changeset/spicy-sessions-pin.md @@ -0,0 +1,5 @@ +--- +"@browserbasehq/stagehand-server-v3": patch +--- + +Fix long-running requests (e.g. `agentExecute`) sometimes failing with "Stagehand session was closed" even though the action had already completed. diff --git a/packages/server-v3/src/lib/InMemorySessionStore.ts b/packages/server-v3/src/lib/InMemorySessionStore.ts index 67fa0d5e0..301114ffa 100644 --- a/packages/server-v3/src/lib/InMemorySessionStore.ts +++ b/packages/server-v3/src/lib/InMemorySessionStore.ts @@ -21,6 +21,8 @@ interface LruNode { stagehand: V3 | null; loggerRef: { current?: (message: LogLine) => void }; expiry: number; + /** Number of in-flight requests using this session's V3 instance. */ + inUse: number; prev: LruNode | null; next: LruNode | null; } @@ -91,7 +93,8 @@ export class InMemorySessionStore implements SessionStore { const expiredIds: string[] = []; for (const [sessionId, node] of this.items.entries()) { - if (this.ttlMs > 0 && node.expiry <= now) { + // Never expire a session that is actively serving a request. + if (this.ttlMs > 0 && node.expiry <= now && node.inUse === 0) { expiredIds.push(sessionId); } } @@ -129,13 +132,18 @@ export class InMemorySessionStore implements SessionStore { } /** - * Evict the least recently used session + * Evict the least recently used session that is not actively serving a + * request. If every cached session is in use, skip eviction rather than + * tear down a live request's browser context. */ private async evictLru(): Promise { - const lruNode = this.first; - if (!lruNode) return; + let node = this.first; + while (node && node.inUse > 0) { + node = node.next; + } + if (!node) return; - await this.deleteSession(lruNode.sessionId); + await this.deleteSession(node.sessionId); } async startSession(params: CreateSessionParams): Promise { @@ -160,8 +168,8 @@ export class InMemorySessionStore implements SessionStore { const node = this.items.get(sessionId); if (!node) return false; - // Check if expired - if (this.ttlMs > 0 && node.expiry <= Date.now()) { + // Check if expired, but don't expire a session that is actively serving a request + if (this.ttlMs > 0 && node.expiry <= Date.now() && node.inUse === 0) { await this.deleteSession(sessionId); return false; } @@ -180,7 +188,7 @@ export class InMemorySessionStore implements SessionStore { } // Check if expired - if (this.ttlMs > 0 && node.expiry <= Date.now()) { + if (this.ttlMs > 0 && node.expiry <= Date.now() && node.inUse === 0) { await this.deleteSession(sessionId); throw new Error(`Session expired: ${sessionId}`); } @@ -193,6 +201,9 @@ export class InMemorySessionStore implements SessionStore { node.loggerRef.current = ctx.logger; } + // Pin before any await so the node can't be evicted or TTL-expired during a lazy init() + node.inUse += 1; + // If V3 instance exists, return it if (node.stagehand) { return node.stagehand; @@ -204,6 +215,9 @@ export class InMemorySessionStore implements SessionStore { try { await stagehand.init(); } catch (error) { + // Undo the pin taken above; the node stays (stagehand still null) so a + // later request can retry init. + node.inUse -= 1; try { await stagehand.close(); } catch { @@ -215,6 +229,21 @@ export class InMemorySessionStore implements SessionStore { return stagehand; } + async releaseSession(sessionId: string): Promise { + const node = this.items.get(sessionId); + if (!node) return; + + // Ignore unmatched/double releases: never go negative, and don't refresh + // the TTL of an already-idle session + if (node.inUse === 0) return; + + node.inUse -= 1; + + if (this.ttlMs > 0) { + node.expiry = Date.now() + this.ttlMs; + } + } + /** * Build V3Options from stored params and request context */ @@ -286,6 +315,7 @@ export class InMemorySessionStore implements SessionStore { stagehand: null, // Lazy initialization loggerRef: {}, expiry: this.ttlMs > 0 ? Date.now() + this.ttlMs : Infinity, + inUse: 0, prev: this.last, next: null, }; @@ -342,16 +372,26 @@ export class InMemorySessionStore implements SessionStore { throw new Error("Max capacity must be greater than 0"); } const previousCapacity = this.maxCapacity; - this.maxCapacity = config.maxCapacity; - // Evict excess if new capacity is smaller - if (this.maxCapacity < previousCapacity) { - const excess = this.items.size - this.maxCapacity; - for (let i = 0; i < excess; i++) { - // Fire and forget - don't await to match cloud behavior - this.evictLru().catch(console.error); - } + // Evict before lowering capacity. Pinned (in-use) sessions can't be + // evicted, so the cache may briefly exceed the new capacity and converge + // as those requests finish — that's expected, not an error. + if (config.maxCapacity < previousCapacity) { + const excess = this.items.size - config.maxCapacity; + // Evict sequentially: deleteSession removes the node only after awaiting + // close, so firing these concurrently would make every call target the + // same LRU node. The batch stays fire-and-forget to match cloud behavior. + void (async () => { + for (let i = 0; i < excess; i++) { + try { + await this.evictLru(); + } catch (err) { + console.error(err); + } + } + })(); } + this.maxCapacity = config.maxCapacity; } if (config.ttlMs !== undefined) { diff --git a/packages/server-v3/src/lib/SessionStore.ts b/packages/server-v3/src/lib/SessionStore.ts index 24aa8f2a4..520db5468 100644 --- a/packages/server-v3/src/lib/SessionStore.ts +++ b/packages/server-v3/src/lib/SessionStore.ts @@ -138,6 +138,13 @@ export interface SessionStore { * - On cache miss: loading config, creating V3, caching it * - Updating the logger reference for streaming * + * Acquire/release contract: this call PINS the session. Implementations MUST + * NOT evict or TTL-expire a session between a getOrCreateStagehand and its + * matching releaseSession — otherwise a long-running request (e.g. an + * agentExecute that spends tens of seconds in agent "think time" with no CDP + * traffic) can have its browser context torn out mid-flight. Always use this + * via the withSession() wrapper, which guarantees the matching release. + * * @param sessionId - The session identifier * @param ctx - Request-time context containing values from headers * @returns The V3 instance ready for use @@ -145,6 +152,18 @@ export interface SessionStore { */ getOrCreateStagehand(sessionId: string, ctx: RequestContext): Promise; + /** + * Release a session previously pinned by getOrCreateStagehand. + * + * Decrements the in-use count so the session becomes eligible for eviction/ + * TTL again once no requests hold it. MUST clamp at zero (a release without a + * matching acquire is a no-op, never a negative count). Prefer calling this + * via withSession() rather than directly. + * + * @param sessionId - The session identifier + */ + releaseSession(sessionId: string): void | Promise; + /** * Create a new session with the given parameters. * Lower-level than startSession - just stores the config. diff --git a/packages/server-v3/src/lib/stream.ts b/packages/server-v3/src/lib/stream.ts index 46e7df7ed..a88561319 100644 --- a/packages/server-v3/src/lib/stream.ts +++ b/packages/server-v3/src/lib/stream.ts @@ -15,6 +15,7 @@ import { import { error, success } from "./response.js"; import { getSessionStore } from "./sessionStoreManager.js"; import type { RequestContext } from "./SessionStore.js"; +import { withSession } from "./withSession.js"; interface StreamingResponseOptions { sessionId: string; @@ -180,38 +181,18 @@ export async function createStreamingResponse({ : undefined, }; - let stagehand: V3Stagehand; - try { - stagehand = (await sessionStore.getOrCreateStagehand( - sessionId, - requestContext, - )) as V3Stagehand; - } catch (err) { - const loadError = err instanceof Error ? err : new Error(String(err)); - - sendData("error", "system", { status: "error", error: loadError.message }); - - if (shouldStreamResponse) { - reply.raw.end(); - return reply; - } - - return error( - reply, - loadError.message, - loadError instanceof AppError - ? loadError.statusCode - : StatusCodes.INTERNAL_SERVER_ERROR, - ); - } - - sendData("connected", "system", { status: "connected" }); - let result: Awaited> | null = null; let handlerError: Error | null = null; + // withSession pins the session for the whole handler so it can't be evicted + // or TTL-expired mid-request, and releases once the handler settles. Acquire + // failures (session not found/expired) surface here too and are rendered as + // an error response below, same as handler failures. try { - result = await handler({ stagehand, data: parsedData }); + result = await withSession(sessionId, requestContext, (stagehand) => { + sendData("connected", "system", { status: "connected" }); + return handler({ stagehand, data: parsedData }); + }); } catch (err) { handlerError = err instanceof Error ? err : new Error("Unknown error"); request.log.error( diff --git a/packages/server-v3/src/lib/withSession.ts b/packages/server-v3/src/lib/withSession.ts new file mode 100644 index 000000000..bc910cc35 --- /dev/null +++ b/packages/server-v3/src/lib/withSession.ts @@ -0,0 +1,54 @@ +import type { Stagehand as V3Stagehand } from "@browserbasehq/stagehand"; + +import { getSessionStore } from "./sessionStoreManager.js"; +import type { RequestContext } from "./SessionStore.js"; + +/** + * Acquire a session's V3 instance, run `fn`, and release exactly once when + * `fn` settles. + * + * The session is pinned — excluded from LRU eviction and TTL expiry — for the + * full duration of `fn`, including agent "think time" when no CDP traffic + * flows. Release happens only in the `finally`, i.e. strictly AFTER `fn` + * settles, so the session can never be evicted while the handler is still + * using its Stagehand instance. + * + * Note: we intentionally do NOT release on client disconnect. If the client + * goes away, the handler keeps running server-side (and may still be driving + * the browser — e.g. completing a payment); releasing then would let the + * session be evicted mid-operation, the exact bug this pinning prevents. The + * handler is bounded by its own step/timeout limits, so the `finally` always + * runs and the pin is released when the work actually finishes. + * + * This is the only supported way to use a session's V3 instance for a request: + * callers must never hold a stagehand reference past the end of `fn`. + * + * Acquire failures propagate to the caller before any pin is taken. + */ +export async function withSession( + sessionId: string, + ctx: RequestContext, + fn: (stagehand: V3Stagehand) => Promise, +): Promise { + const sessionStore = getSessionStore(); + const stagehand = (await sessionStore.getOrCreateStagehand( + sessionId, + ctx, + )) as V3Stagehand; + + try { + return await fn(stagehand); + } finally { + try { + await sessionStore.releaseSession(sessionId); + } catch (err) { + // A failed release leaves the session pinned (inUse not decremented), + // which leaks capacity. Don't rethrow (that would clobber the handler's + // result/error in a finally) — record it so the leak is detectable. + console.error( + `Failed to release session ${sessionId}; it may remain pinned:`, + err, + ); + } + } +} diff --git a/packages/server-v3/src/routes/v1/sessions/start.ts b/packages/server-v3/src/routes/v1/sessions/start.ts index b36d9f83f..d2614ea6f 100644 --- a/packages/server-v3/src/routes/v1/sessions/start.ts +++ b/packages/server-v3/src/routes/v1/sessions/start.ts @@ -14,6 +14,7 @@ import { } from "../../../lib/header.js"; import { error, success } from "../../../lib/response.js"; import { getSessionStore } from "../../../lib/sessionStoreManager.js"; +import { withSession } from "../../../lib/withSession.js"; import { AISDK_PROVIDERS } from "../../../types/model.js"; // Extended schema with custom refinement for local browser validation @@ -234,11 +235,11 @@ const startRouteHandler: RouteHandler = withErrorHandling( let finalCdpUrl = connectUrl ?? session.cdpUrl ?? ""; if (localBrowserLaunchOptions) { try { - const stagehand = await sessionStore.getOrCreateStagehand( + finalCdpUrl = await withSession( session.sessionId, { modelApiKey: localBrowserModelApiKey }, + (stagehand) => Promise.resolve(stagehand.connectURL()), ); - finalCdpUrl = stagehand.connectURL(); } catch (err) { request.log.error( { diff --git a/packages/server-v3/tests/unit/sessionPinning.test.ts b/packages/server-v3/tests/unit/sessionPinning.test.ts new file mode 100644 index 000000000..9de3a60ce --- /dev/null +++ b/packages/server-v3/tests/unit/sessionPinning.test.ts @@ -0,0 +1,227 @@ +import assert from "node:assert/strict"; +import { describe, it } from "node:test"; + +import type { V3 } from "@browserbasehq/stagehand"; + +import { InMemorySessionStore } from "../../src/lib/InMemorySessionStore.js"; +import type { CreateSessionParams } from "../../src/lib/SessionStore.js"; +import { + destroySessionStore, + initializeSessionStore, +} from "../../src/lib/sessionStoreManager.js"; +import { withSession } from "../../src/lib/withSession.js"; + +const PARAMS: CreateSessionParams = { + browserType: "local", + modelName: "openai/gpt-4o", +}; + +/** + * Inject a fake V3 instance onto a session node so getOrCreateStagehand returns + * it without launching a real browser. Returns a `closed` probe. + */ +function injectFakeStagehand( + store: InMemorySessionStore, + sessionId: string, +): { wasClosed: () => boolean } { + let closed = false; + const fake = { + close: async () => { + closed = true; + }, + connectURL: () => "ws://fake", + } as unknown as V3; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (store as any).items.get(sessionId).stagehand = fake; + return { wasClosed: () => closed }; +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const node = (store: InMemorySessionStore, id: string): any => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (store as any).items.get(id); + +describe("session pinning", () => { + it("does not evict an in-use session under capacity pressure", async () => { + const store = new InMemorySessionStore({ maxCapacity: 1 }); + await store.createSession("A", PARAMS); + injectFakeStagehand(store, "A"); + await store.getOrCreateStagehand("A", {}); // pin A (inUse = 1) + + // Capacity is 1, but A is pinned: creating B must NOT tear down A. + await store.createSession("B", PARAMS); + assert.equal(await store.hasSession("A"), true); + assert.equal(await store.hasSession("B"), true); + + // Once released, A becomes evictable again. + await store.releaseSession("A"); + await store.createSession("C", PARAMS); + assert.equal(await store.hasSession("A"), false); + }); + + it("does not TTL-expire an in-use session", async () => { + const store = new InMemorySessionStore({ maxCapacity: 100, ttlMs: 1000 }); + await store.createSession("A", PARAMS); + injectFakeStagehand(store, "A"); + await store.getOrCreateStagehand("A", {}); // pin + + node(store, "A").expiry = Date.now() - 1; // force expired + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (store as any).cleanupExpired(); + assert.equal(await store.hasSession("A"), true); // survives: in use + + await store.releaseSession("A"); // unpin; expiry refreshed + node(store, "A").expiry = Date.now() - 1; // force expired again + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (store as any).cleanupExpired(); + assert.equal(node(store, "A"), undefined); // now reaped + }); + + it("requires all concurrent holders to release before eviction, never going negative", async () => { + const store = new InMemorySessionStore({ maxCapacity: 1 }); + await store.createSession("A", PARAMS); + injectFakeStagehand(store, "A"); + await store.getOrCreateStagehand("A", {}); + await store.getOrCreateStagehand("A", {}); // inUse = 2 + assert.equal(node(store, "A").inUse, 2); + + await store.createSession("B", PARAMS); + await store.releaseSession("A"); // inUse = 1, still pinned + await store.createSession("C", PARAMS); + assert.equal(await store.hasSession("A"), true); + + // Extra releases must clamp at 0, not go negative. + await store.releaseSession("A"); + await store.releaseSession("A"); + assert.equal(node(store, "A").inUse, 0); + + await store.createSession("D", PARAMS); + assert.equal(await store.hasSession("A"), false); // now evicted + }); + + it("ignores an unmatched release without refreshing TTL", async () => { + const store = new InMemorySessionStore({ maxCapacity: 100, ttlMs: 1000 }); + await store.createSession("A", PARAMS); + node(store, "A").expiry = 12345; // sentinel + + // No matching acquire: a stray release must be a complete no-op. + await store.releaseSession("A"); + + assert.equal(node(store, "A").inUse, 0); + assert.equal(node(store, "A").expiry, 12345, "TTL must not be refreshed"); + }); + + it("downsizing capacity evicts all excess entries, not just one", async () => { + const store = new InMemorySessionStore({ maxCapacity: 3 }); + await store.createSession("A", PARAMS); + await store.createSession("B", PARAMS); + await store.createSession("C", PARAMS); + const a = injectFakeStagehand(store, "A"); + const b = injectFakeStagehand(store, "B"); + const c = injectFakeStagehand(store, "C"); + + store.updateCacheConfig({ maxCapacity: 1 }); + await new Promise((resolve) => setTimeout(resolve, 20)); // let evictions run + + // A and B (the two LRU) must both be evicted and closed — not the same + // node twice — while the most-recent C survives. + assert.equal(store.size, 1); + assert.equal(await store.hasSession("C"), true); + assert.equal(a.wasClosed(), true); + assert.equal(b.wasClosed(), true); + assert.equal(c.wasClosed(), false); + }); + + it("explicit endSession closes a session even while in use", async () => { + const store = new InMemorySessionStore(); + await store.createSession("A", PARAMS); + const probe = injectFakeStagehand(store, "A"); + await store.getOrCreateStagehand("A", {}); // pin + + await store.endSession("A"); + assert.equal(probe.wasClosed(), true); + assert.equal(node(store, "A"), undefined); + }); +}); + +describe("withSession", () => { + it("keeps the session pinned until fn settles, then releases exactly once", async () => { + const store = initializeSessionStore(); + try { + await store.createSession("A", PARAMS); + injectFakeStagehand(store as InMemorySessionStore, "A"); + + let releaseCount = 0; + const origRelease = store.releaseSession.bind(store); + store.releaseSession = (id: string) => { + releaseCount += 1; + return origRelease(id); + }; + + let finishHandler: () => void = () => {}; + const handlerDone = new Promise((resolve) => { + finishHandler = resolve; + }); + + const p = withSession("A", {}, async () => { + await handlerDone; // still running... + return "ok"; + }); + + // While fn is in flight the session must stay pinned and unreleased. + await new Promise((resolve) => setTimeout(resolve, 0)); + assert.equal(releaseCount, 0, "must not release while fn is running"); + assert.equal(node(store as InMemorySessionStore, "A").inUse, 1); + + finishHandler(); + assert.equal(await p, "ok"); + assert.equal(releaseCount, 1, "releases exactly once after fn settles"); + assert.equal(node(store as InMemorySessionStore, "A").inUse, 0); + } finally { + await destroySessionStore(); + } + }); + + it("surfaces a release failure instead of swallowing it", async () => { + const store = initializeSessionStore(); + const originalConsoleError = console.error; + let errorLogs = 0; + console.error = () => { + errorLogs += 1; + }; + try { + await store.createSession("A", PARAMS); + injectFakeStagehand(store as InMemorySessionStore, "A"); + store.releaseSession = () => { + throw new Error("release boom"); + }; + + // The handler result is still returned; the release failure is recorded, + // not thrown (a throw from finally would clobber the result). + const result = await withSession("A", {}, async () => "ok"); + assert.equal(result, "ok"); + assert.equal(errorLogs, 1, "release failure should be recorded"); + } finally { + console.error = originalConsoleError; + await destroySessionStore(); + } + }); + + it("releases the pin when fn throws", async () => { + const store = initializeSessionStore(); + try { + await store.createSession("A", PARAMS); + injectFakeStagehand(store as InMemorySessionStore, "A"); + + await assert.rejects( + withSession("A", {}, async () => { + throw new Error("boom"); + }), + /boom/, + ); + assert.equal(node(store as InMemorySessionStore, "A").inUse, 0); + } finally { + await destroySessionStore(); + } + }); +});