Skip to content

Commit 86975e7

Browse files
tkattkatgreptile-apps[bot]seanmcguire12
authored
Agents streaming (#1315)
# why Simplify the agent streaming API by moving the `stream` flag into agent configuration. This allows TypeScript to properly infer return types at compile time and provides a cleaner API with a single `execute()` method. # what changed - Added `stream?: boolean` to `AgentConfig` - `agent({ stream: true }).execute()` → returns `AgentStreamResult` - `agent().execute()` → returns `AgentResult` (unchanged) - Added TypeScript overloads for proper type inference - Added streaming support to `V3AgentHandler` and caching layer - Extracted shared setup logic into `prepareAgentExecution()` helper # test plan - tested locally --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Sean McGuire <seanmcguire1@outlook.com>
1 parent c6f6ac2 commit 86975e7

File tree

8 files changed

+781
-153
lines changed

8 files changed

+781
-153
lines changed

.changeset/four-knives-ask.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@browserbasehq/stagehand": patch
3+
---
4+
5+
Add streaming support to agent through stream:true in the agent config

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ jobs:
188188
runs-on: ubuntu-latest
189189
timeout-minutes: 50
190190
env:
191+
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
191192
HEADLESS: true
192193
steps:
193194
- name: Check out repository code
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Stagehand } from "../lib/v3";
2+
import dotenv from "dotenv";
3+
import chalk from "chalk";
4+
5+
// Load environment variables
6+
dotenv.config();
7+
async function main() {
8+
console.log(`\n${chalk.bold("Stagehand 🤘 Agent Streaming Example")}\n`);
9+
// Initialize Stagehand
10+
const stagehand = new Stagehand({
11+
env: "LOCAL",
12+
verbose: 0,
13+
cacheDir: "stagehand-agent-cache",
14+
logInferenceToFile: false,
15+
experimental: true,
16+
});
17+
18+
await stagehand.init();
19+
20+
try {
21+
const page = stagehand.context.pages()[0];
22+
await page.goto("https://amazon.com");
23+
24+
// Create a streaming agent with stream: true in the config
25+
const agent = stagehand.agent({
26+
model: "anthropic/claude-sonnet-4-5-20250929",
27+
stream: true, // This makes execute() return AgentStreamResult
28+
});
29+
30+
const agentRun = await agent.execute({
31+
instruction: "go to amazon, and search for shampoo, stop after searching",
32+
maxSteps: 20,
33+
});
34+
// stream the text
35+
for await (const delta of agentRun.textStream) {
36+
process.stdout.write(delta);
37+
}
38+
// stream everything ( toolcalls, messages, etc.)
39+
// for await (const delta of result.fullStream) {
40+
// console.log(delta);
41+
// }
42+
43+
const finalResult = await agentRun.result;
44+
console.log("Final Result:", finalResult);
45+
} catch (error) {
46+
console.log(`${chalk.red("✗")} Error: ${error}`);
47+
}
48+
}
49+
main();

packages/core/lib/v3/cache/AgentCache.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type {
1818
import type {
1919
AvailableModel,
2020
AgentResult,
21+
AgentStreamResult,
2122
AgentConfig,
2223
AgentExecuteOptions,
2324
Logger,
@@ -185,6 +186,135 @@ export class AgentCache {
185186
return await this.replayAgentCacheEntry(entry);
186187
}
187188

189+
/**
190+
* Attempts to replay a cached agent execution and returns it as a stream result.
191+
*
192+
* This method exists because the agent API exposes two execution modes:
193+
* - `execute()` - Returns a Promise<AgentResult> directly
194+
* - `stream()` - Returns an AgentStreamResult with async iterables for real-time output
195+
*
196+
* When a cache hit occurs, we need to return the appropriate type for each mode:
197+
* - For `execute()`, we use `tryReplay()` which returns AgentResult
198+
* - For `stream()`, we use `tryReplayAsStream()` which wraps the result in a
199+
* stream-compatible interface
200+
*
201+
* This ensures consumers using `stream()` can still iterate over `textStream`
202+
* and await `result` even when the response comes from cache, maintaining
203+
* API consistency regardless of whether the result was cached or live.
204+
*/
205+
async tryReplayAsStream(
206+
context: AgentCacheContext,
207+
): Promise<AgentStreamResult | null> {
208+
const result = await this.tryReplay(context);
209+
if (!result) return null;
210+
return this.createCachedStreamResult(result);
211+
}
212+
213+
/**
214+
* Creates a mock AgentStreamResult that wraps a cached AgentResult.
215+
*
216+
* AgentStreamResult (from the AI SDK) is a complex type with multiple async
217+
* iterables and promises. When serving from cache, we don't have an actual
218+
* LLM stream to consume - we just have the final result. This method creates
219+
* a "fake" stream
220+
221+
* This approach lets cached responses be transparent to the consumer -
222+
* they can use the same iteration patterns whether the result is live or cached.
223+
*/
224+
private createCachedStreamResult(
225+
cachedResult: AgentResult,
226+
): AgentStreamResult {
227+
const message = cachedResult.message ?? "";
228+
229+
async function* textStreamGenerator(): AsyncGenerator<string> {
230+
yield message;
231+
}
232+
233+
async function* fullStreamGenerator(): AsyncGenerator<{
234+
type: string;
235+
textDelta?: string;
236+
}> {
237+
yield { type: "text-delta", textDelta: message };
238+
yield { type: "finish" };
239+
}
240+
241+
const mockStreamResult = {
242+
textStream: textStreamGenerator(),
243+
fullStream: fullStreamGenerator(),
244+
result: Promise.resolve(cachedResult),
245+
text: Promise.resolve(message),
246+
usage: Promise.resolve({
247+
promptTokens: 0,
248+
completionTokens: 0,
249+
totalTokens: 0,
250+
}),
251+
finishReason: Promise.resolve("stop" as const),
252+
experimental_providerMetadata: Promise.resolve(undefined),
253+
response: Promise.resolve({
254+
id: "cached",
255+
timestamp: new Date(),
256+
modelId: "cached",
257+
}),
258+
rawResponse: Promise.resolve({ headers: {} }),
259+
warnings: Promise.resolve([]),
260+
steps: Promise.resolve([]),
261+
toolCalls: Promise.resolve([]),
262+
toolResults: Promise.resolve([]),
263+
[Symbol.asyncIterator]: () => textStreamGenerator(),
264+
} as unknown as AgentStreamResult;
265+
266+
return mockStreamResult;
267+
}
268+
269+
/**
270+
* Wraps an AgentStreamResult with caching logic.
271+
*
272+
* This method handles the complexity of caching for streaming responses:
273+
* 1. Begins recording agent replay steps
274+
* 2. Wraps the stream's result promise to capture completion
275+
* 3. On success: ends recording and stores the cache entry
276+
* 4. On error: discards the recording
277+
*
278+
* This keeps the caching orchestration in AgentCache rather than
279+
* spreading it across the V3 class.
280+
*
281+
* @param context - The cache context for this execution
282+
* @param streamResult - The stream result from the agent handler
283+
* @param beginRecording - Callback to start recording (from V3)
284+
* @param endRecording - Callback to end recording and get steps (from V3)
285+
* @param discardRecording - Callback to discard recording on error (from V3)
286+
* @returns The wrapped stream result with caching enabled
287+
*/
288+
wrapStreamForCaching(
289+
context: AgentCacheContext,
290+
streamResult: AgentStreamResult,
291+
beginRecording: () => void,
292+
endRecording: () => AgentReplayStep[],
293+
discardRecording: () => void,
294+
): AgentStreamResult {
295+
beginRecording();
296+
297+
const originalResultPromise = streamResult.result;
298+
const wrappedResultPromise = originalResultPromise.then(
299+
async (result) => {
300+
const agentSteps = endRecording();
301+
302+
if (result.success && agentSteps.length > 0) {
303+
await this.store(context, agentSteps, result);
304+
}
305+
306+
return result;
307+
},
308+
(error) => {
309+
discardRecording();
310+
throw error;
311+
},
312+
);
313+
314+
streamResult.result = wrappedResultPromise;
315+
return streamResult;
316+
}
317+
188318
async store(
189319
context: AgentCacheContext,
190320
steps: AgentReplayStep[],

0 commit comments

Comments
 (0)