Skip to content

Commit 943d2d7

Browse files
tkattkatpirate
andauthored
Messages and abort (#1345)
# Agent Abort Signal and Message Continuation ## Why Enable users to cancel long-running agent tasks and continue conversations across multiple `execute()` calls. Also ensures graceful shutdown when `stagehand.close()` is called by automatically aborting any running agent tasks. ## What Changed ### New Features (behind `experimental: true`) #### Abort Signal Support - Pass `signal` to `agent.execute()` to cancel execution mid-run - Works with `AbortController` and `AbortSignal.timeout()` - Throws `AgentAbortError` when aborted #### Message Continuation - `execute()` now returns `messages` in the result - Pass previous messages to continue a conversation across calls ### New Utilities | File | Purpose | |---------------------------------|-------------------------------------------------------------------------------------------| | `combineAbortSignals.ts` | Merges multiple signals (uses native `AbortSignal.any()` on Node 20+, fallback for older) | | `errorHandling.ts` | Consolidates abort detection logic—needed because `close()` may cause indirect errors (e.g., null context) that should still be treated as abort | | `validateExperimentalFeatures.ts` | Single place for all experimental/CUA feature validation | ### CUA Limitations Abort signal and message continuation are not supported with CUA mode (throws `StagehandInvalidArgumentError`). This matches existing streaming limitation. ### Tests Added - `agent-abort-signal.spec.ts` (7 tests) - `agent-message-continuation.spec.ts` (4 tests) - `agent-experimental-validation.spec.ts` (17 tests) <!-- This is an auto-generated description by cubic. --> --- ## Summary by cubic Adds agent abort support and conversation continuation. You can cancel long runs, auto-abort on close, and carry messages across execute() calls. Feature is gated behind experimental: true and has clear CUA limitations. - **New Features** - Abort signal for execute() and stream() with AbortController and AbortSignal.timeout; throws AgentAbortError; stagehand.close() auto-aborts via an internal controller combined with any user signal. - Message continuation: execute() returns messages and accepts previous messages on the next call; tool calls and results are included. - **Refactors** - Centralized experimental/CUA validation via validateExperimentalFeatures: CUA disallows streaming, abort signal, and message continuation; experimental required for integrations, tools, streaming, callbacks, signal, and messages. - Public API updates: re-export ModelMessage; Agent types include messages and signal; AgentAbortError exported for consistent abort typing. <sup>Written for commit 5276e41. Summary will update automatically on new commits.</sup> <!-- End of auto-generated description by cubic. --> --------- Co-authored-by: Nick Sweeting <github@sweeting.me>
1 parent fdbb58c commit 943d2d7

File tree

12 files changed

+1000
-82
lines changed

12 files changed

+1000
-82
lines changed

.changeset/eleven-apples-yell.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@browserbasehq/stagehand": patch
3+
---
4+
5+
Add support for aborting / stopping an agent run & continuing an agent run using messages from prior runs
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import {
2+
ExperimentalNotConfiguredError,
3+
StagehandInvalidArgumentError,
4+
} from "../../types/public/sdkErrors";
5+
import type { AgentConfig, AgentExecuteOptionsBase } from "../../types/public";
6+
7+
export interface AgentValidationOptions {
8+
/** Whether experimental mode is enabled */
9+
isExperimental: boolean;
10+
/** Agent config options (integrations, tools, stream, cua, etc.) */
11+
agentConfig?: Partial<AgentConfig>;
12+
/** Execute options (callbacks, signal, messages, etc.) */
13+
executeOptions?:
14+
| (Partial<AgentExecuteOptionsBase> & { callbacks?: unknown })
15+
| null;
16+
/** Whether this is streaming mode (can be derived from agentConfig.stream) */
17+
isStreaming?: boolean;
18+
}
19+
20+
/**
21+
* Validates agent configuration and experimental feature usage.
22+
*
23+
* This utility consolidates all validation checks for both CUA and non-CUA agent paths:
24+
* - Invalid argument errors for CUA (streaming, abort signal, message continuation are not supported)
25+
* - Experimental feature checks for integrations and tools (both CUA and non-CUA)
26+
* - Experimental feature checks for non-CUA only (callbacks, signal, messages, streaming)
27+
*
28+
* Throws StagehandInvalidArgumentError for invalid/unsupported configurations.
29+
* Throws ExperimentalNotConfiguredError if experimental features are used without experimental mode.
30+
*/
31+
export function validateExperimentalFeatures(
32+
options: AgentValidationOptions,
33+
): void {
34+
const { isExperimental, agentConfig, executeOptions, isStreaming } = options;
35+
36+
// CUA-specific validation: certain features are not available at all
37+
if (agentConfig?.cua) {
38+
const unsupportedFeatures: string[] = [];
39+
40+
if (agentConfig?.stream) {
41+
unsupportedFeatures.push("streaming");
42+
}
43+
if (executeOptions?.signal) {
44+
unsupportedFeatures.push("abort signal");
45+
}
46+
if (executeOptions?.messages) {
47+
unsupportedFeatures.push("message continuation");
48+
}
49+
50+
if (unsupportedFeatures.length > 0) {
51+
throw new StagehandInvalidArgumentError(
52+
`${unsupportedFeatures.join(", ")} ${unsupportedFeatures.length === 1 ? "is" : "are"} not supported with CUA (Computer Use Agent) mode.`,
53+
);
54+
}
55+
}
56+
57+
// Skip experimental checks if already in experimental mode
58+
if (isExperimental) return;
59+
60+
const features: string[] = [];
61+
62+
// Check agent config features (check array length to avoid false positives for empty arrays)
63+
const hasIntegrations =
64+
agentConfig?.integrations && agentConfig.integrations.length > 0;
65+
const hasTools =
66+
agentConfig?.tools && Object.keys(agentConfig.tools).length > 0;
67+
if (hasIntegrations || hasTools) {
68+
features.push("MCP integrations and custom tools");
69+
}
70+
71+
// Check streaming mode (either explicit or derived from config) - only for non-CUA
72+
if (!agentConfig?.cua && (isStreaming || agentConfig?.stream)) {
73+
features.push("streaming");
74+
}
75+
76+
// Check execute options features - only for non-CUA
77+
if (executeOptions && !agentConfig?.cua) {
78+
if (executeOptions.callbacks) {
79+
features.push("callbacks");
80+
}
81+
if (executeOptions.signal) {
82+
features.push("abort signal");
83+
}
84+
if (executeOptions.messages) {
85+
features.push("message continuation");
86+
}
87+
}
88+
89+
if (features.length > 0) {
90+
throw new ExperimentalNotConfiguredError(`Agent ${features.join(", ")}`);
91+
}
92+
}

packages/core/lib/v3/handlers/v3AgentHandler.ts

Lines changed: 101 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@ import { mapToolResultToActions } from "../agent/utils/actionMapping";
2929
import {
3030
MissingLLMConfigurationError,
3131
StreamingCallbacksInNonStreamingModeError,
32+
AgentAbortError,
3233
} from "../types/public/sdkErrors";
3334

35+
function getErrorMessage(error: unknown): string {
36+
return error instanceof Error ? error.message : String(error);
37+
}
38+
3439
export class V3AgentHandler {
3540
private v3: V3;
3641
private logger: (message: LogLine) => void;
@@ -72,9 +77,11 @@ export class V3AgentHandler {
7277
);
7378
const tools = this.createTools();
7479
const allTools: ToolSet = { ...tools, ...this.mcpTools };
75-
const messages: ModelMessage[] = [
76-
{ role: "user", content: options.instruction },
77-
];
80+
81+
// Use provided messages for continuation, or start fresh with the instruction
82+
const messages: ModelMessage[] = options.messages?.length
83+
? [...options.messages, { role: "user", content: options.instruction }]
84+
: [{ role: "user", content: options.instruction }];
7885

7986
if (!this.llmClient?.getLanguageModel) {
8087
throw new MissingLLMConfigurationError();
@@ -176,41 +183,52 @@ export class V3AgentHandler {
176183
instructionOrOptions: string | AgentExecuteOptions,
177184
): Promise<AgentResult> {
178185
const startTime = Date.now();
179-
const {
180-
maxSteps,
181-
systemPrompt,
182-
allTools,
183-
messages,
184-
wrappedModel,
185-
initialPageUrl,
186-
} = await this.prepareAgent(instructionOrOptions);
187-
188-
const callbacks = (instructionOrOptions as AgentExecuteOptions).callbacks;
189-
190-
if (callbacks) {
191-
const streamingOnlyCallbacks = [
192-
"onChunk",
193-
"onFinish",
194-
"onError",
195-
"onAbort",
196-
];
197-
const invalidCallbacks = streamingOnlyCallbacks.filter(
198-
(name) => callbacks[name as keyof typeof callbacks] != null,
199-
);
200-
if (invalidCallbacks.length > 0) {
201-
throw new StreamingCallbacksInNonStreamingModeError(invalidCallbacks);
202-
}
203-
}
186+
const signal =
187+
typeof instructionOrOptions === "object"
188+
? instructionOrOptions.signal
189+
: undefined;
204190

205191
const state: AgentState = {
206192
collectedReasoning: [],
207193
actions: [],
208194
finalMessage: "",
209195
completed: false,
210-
currentPageUrl: initialPageUrl,
196+
currentPageUrl: "",
211197
};
212198

199+
let messages: ModelMessage[] = [];
200+
213201
try {
202+
const {
203+
options,
204+
maxSteps,
205+
systemPrompt,
206+
allTools,
207+
messages: preparedMessages,
208+
wrappedModel,
209+
initialPageUrl,
210+
} = await this.prepareAgent(instructionOrOptions);
211+
212+
messages = preparedMessages;
213+
state.currentPageUrl = initialPageUrl;
214+
215+
const callbacks = (instructionOrOptions as AgentExecuteOptions).callbacks;
216+
217+
if (callbacks) {
218+
const streamingOnlyCallbacks = [
219+
"onChunk",
220+
"onFinish",
221+
"onError",
222+
"onAbort",
223+
];
224+
const invalidCallbacks = streamingOnlyCallbacks.filter(
225+
(name) => callbacks[name as keyof typeof callbacks] != null,
226+
);
227+
if (invalidCallbacks.length > 0) {
228+
throw new StreamingCallbacksInNonStreamingModeError(invalidCallbacks);
229+
}
230+
}
231+
214232
const result = await this.llmClient.generateText({
215233
model: wrappedModel,
216234
system: systemPrompt,
@@ -221,21 +239,41 @@ export class V3AgentHandler {
221239
toolChoice: "auto",
222240
prepareStep: callbacks?.prepareStep,
223241
onStepFinish: this.createStepHandler(state, callbacks?.onStepFinish),
242+
abortSignal: options.signal,
224243
});
225244

226-
return this.consolidateMetricsAndResult(startTime, state, result);
245+
return this.consolidateMetricsAndResult(
246+
startTime,
247+
state,
248+
messages,
249+
result,
250+
);
227251
} catch (error) {
228-
const errorMessage = error?.message ?? String(error);
252+
// Re-throw validation errors that should propagate to the caller
253+
if (error instanceof StreamingCallbacksInNonStreamingModeError) {
254+
throw error;
255+
}
256+
257+
// Re-throw abort errors wrapped in AgentAbortError for consistent error typing
258+
if (signal?.aborted) {
259+
const reason = signal.reason ? String(signal.reason) : "aborted";
260+
throw new AgentAbortError(reason);
261+
}
262+
263+
const errorMessage = getErrorMessage(error);
229264
this.logger({
230265
category: "agent",
231266
message: `Error executing agent task: ${errorMessage}`,
232267
level: 0,
233268
});
269+
270+
// For non-abort errors, return a failure result instead of throwing
234271
return {
235272
success: false,
236273
actions: state.actions,
237274
message: `Failed to execute task: ${errorMessage}`,
238275
completed: false,
276+
messages,
239277
};
240278
}
241279
}
@@ -244,6 +282,7 @@ export class V3AgentHandler {
244282
instructionOrOptions: string | AgentStreamExecuteOptions,
245283
): Promise<AgentStreamResult> {
246284
const {
285+
options,
247286
maxSteps,
248287
systemPrompt,
249288
allTools,
@@ -303,17 +342,25 @@ export class V3AgentHandler {
303342
if (callbacks?.onFinish) {
304343
callbacks.onFinish(event);
305344
}
306-
try {
307-
const result = this.consolidateMetricsAndResult(
308-
startTime,
309-
state,
310-
event,
311-
);
312-
resolveResult(result);
313-
} catch (error) {
314-
handleError(error);
345+
const result = this.consolidateMetricsAndResult(
346+
startTime,
347+
state,
348+
messages,
349+
event,
350+
);
351+
resolveResult(result);
352+
},
353+
onAbort: (event) => {
354+
if (callbacks?.onAbort) {
355+
callbacks.onAbort(event);
315356
}
357+
// Reject the result promise with AgentAbortError when stream is aborted
358+
const reason = options.signal?.reason
359+
? String(options.signal.reason)
360+
: "Stream was aborted";
361+
rejectResult(new AgentAbortError(reason));
316362
},
363+
abortSignal: options.signal,
317364
});
318365

319366
const agentStreamResult = streamResult as AgentStreamResult;
@@ -324,7 +371,12 @@ export class V3AgentHandler {
324371
private consolidateMetricsAndResult(
325372
startTime: number,
326373
state: AgentState,
327-
result: { text?: string; usage?: LanguageModelUsage },
374+
inputMessages: ModelMessage[],
375+
result: {
376+
text?: string;
377+
usage?: LanguageModelUsage;
378+
response?: { messages?: ModelMessage[] };
379+
},
328380
): AgentResult {
329381
if (!state.finalMessage) {
330382
const allReasoning = state.collectedReasoning.join(" ").trim();
@@ -344,6 +396,13 @@ export class V3AgentHandler {
344396
);
345397
}
346398

399+
// Combine input messages with response messages for full conversation history
400+
const responseMessages = result.response?.messages || [];
401+
const fullMessages: ModelMessage[] = [
402+
...inputMessages,
403+
...responseMessages,
404+
];
405+
347406
return {
348407
success: state.completed,
349408
message: state.finalMessage || "Task execution completed",
@@ -358,6 +417,7 @@ export class V3AgentHandler {
358417
inference_time_ms: inferenceTimeMs,
359418
}
360419
: undefined,
420+
messages: fullMessages,
361421
};
362422
}
363423

0 commit comments

Comments
 (0)