Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/core-concepts/streaming-output.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ function ChatComponent() {
setIsComplete(false);
},

'.step_start': (data) => {
console.log('Step started:', data);
// A new generation cycle is beginning
},

'.text_start': (data) => {
console.log('Text start event received:', data);
setCurrentMessage('');
Expand All @@ -202,6 +207,11 @@ function ChatComponent() {
console.log('Tool result:', data.result);
},

'.step_finish': (data) => {
console.log('Step finished:', data);
// Generation cycle complete, may be followed by another step
},

'.stream_end': (data) => {
console.log('Stream ended:', data.finish_reason);
setIsComplete(true);
Expand Down Expand Up @@ -246,6 +256,7 @@ All streaming approaches emit the same core events with consistent data structur
### Available Events

- **`stream_start`** - Stream initialization with model and provider info
- **`step_start`** - Beginning of a generation step (emitted before each AI response cycle)
- **`text_start`** - Beginning of a text message
- **`text_delta`** - Incremental text chunks as they're generated
- **`text_complete`** - End of a complete text message
Expand All @@ -257,9 +268,13 @@ All streaming approaches emit the same core events with consistent data structur
- **`tool_call_delta`** - Incremental tool call params chunks as they're generated
- **`artifact`** - Binary artifacts produced by tools (images, audio, files)
- **`provider_tool_event`** - Provider-specific tool events (e.g., image generation, web search)
- **`step_finish`** - End of a generation step (emitted after tool calls or before stream end)
- **`error`** - Error handling with recovery information
- **`stream_end`** - Stream completion with usage statistics

> [!TIP]
> **Understanding Steps**: A "step" represents one cycle of AI generation. In a simple request without tools, there's typically one step. When using tools, each cycle of "AI generates → tools execute → AI continues" creates a new step. Use `step_start` and `step_finish` events to track these cycles in multi-turn tool interactions.

### Event Data Examples

Based on actual streaming output:
Expand All @@ -277,6 +292,12 @@ Based on actual streaming output:
}
}

// step_start event
{
"id": "anthropic_evt_abc123step",
"timestamp": 1756412888
}

// text_start event
{
"id": "anthropic_evt_8YI9ULcftpFtHzh3",
Expand Down Expand Up @@ -338,6 +359,12 @@ Based on actual streaming output:
}
}

// step_finish event
{
"id": "anthropic_evt_def456step",
"timestamp": 1756412895
}

// stream_end event
{
"id": "anthropic_evt_BZ3rqDYyprnywNyL",
Expand Down Expand Up @@ -673,12 +700,16 @@ The Vercel AI SDK format provides structured streaming data:
```
data: {"type":"start","messageId":"anthropic_evt_NPbGJs7D0oQhvz2K"}

data: {"type":"start-step"}

data: {"type":"text-start","id":"msg_013P3F8KkVG3Qasjeay3NUmY"}

data: {"type":"text-delta","id":"msg_013P3F8KkVG3Qasjeay3NUmY","delta":"Hello"}

data: {"type":"text-end","id":"msg_013P3F8KkVG3Qasjeay3NUmY"}

data: {"type":"finish-step"}

data: {"type":"finish","messageMetadata":{"finishReason":"stop","usage":{"promptTokens":1998,"completionTokens":288}}}

data: [DONE]
Expand Down
2 changes: 2 additions & 0 deletions src/Enums/StreamEventType.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ enum StreamEventType: string
case Artifact = 'artifact';
case Error = 'error';
case StreamEnd = 'stream_end';
case StepStart = 'step_start';
case StepFinish = 'step_finish';
}
7 changes: 7 additions & 0 deletions src/Events/Broadcasting/StepFinishBroadcast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Prism\Prism\Events\Broadcasting;

class StepFinishBroadcast extends StreamEventBroadcast {}
7 changes: 7 additions & 0 deletions src/Events/Broadcasting/StepStartBroadcast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Prism\Prism\Events\Broadcasting;

class StepStartBroadcast extends StreamEventBroadcast {}
51 changes: 39 additions & 12 deletions src/Providers/Anthropic/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use Prism\Prism\Streaming\Events\CitationEvent;
use Prism\Prism\Streaming\Events\ErrorEvent;
use Prism\Prism\Streaming\Events\ProviderToolEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -79,18 +81,28 @@ protected function processStream(Response $response, Request $request, int $dept
$streamEvent = $this->processEvent($event);

if ($streamEvent instanceof Generator) {
yield from $streamEvent;
foreach ($streamEvent as $event) {
yield $event;
}
} elseif ($streamEvent instanceof StreamEvent) {
yield $streamEvent;
}
}

if ($this->state->hasToolCalls()) {
yield from $this->handleToolCalls($request, $depth);
foreach ($this->handleToolCalls($request, $depth) as $item) {
yield $item;
}

return;
}

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield $this->emitStreamEndEvent();
}

Expand All @@ -115,8 +127,9 @@ protected function processEvent(array $event): StreamEvent|Generator|null

/**
* @param array<string, mixed> $event
* @return Generator<StreamEvent>
*/
protected function handleMessageStart(array $event): ?StreamStartEvent
protected function handleMessageStart(array $event): Generator
{
$message = $event['message'] ?? [];
$this->state->withMessageId($message['id'] ?? EventID::generate());
Expand All @@ -132,18 +145,25 @@ protected function handleMessageStart(array $event): ?StreamStartEvent
}

// Only emit StreamStartEvent once per streaming session
if (! $this->state->shouldEmitStreamStart()) {
return null;
if ($this->state->shouldEmitStreamStart()) {
$this->state->markStreamStarted();

yield new StreamStartEvent(
id: EventID::generate(),
timestamp: time(),
model: $message['model'] ?? 'unknown',
provider: 'anthropic'
);
}

$this->state->markStreamStarted();
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

return new StreamStartEvent(
id: EventID::generate(),
timestamp: time(),
model: $message['model'] ?? 'unknown',
provider: 'anthropic'
);
yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}
}

/**
Expand Down Expand Up @@ -544,6 +564,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator

$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Continue streaming if within step limit
$depth++;
if ($depth < $request->maxSteps()) {
Expand Down
23 changes: 23 additions & 0 deletions src/Providers/DeepSeek/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Prism\Prism\Providers\DeepSeek\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\ArtifactEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -96,6 +98,15 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

if ($this->hasToolCalls($data)) {
$toolCalls = $this->extractToolCalls($data, $toolCalls);

Expand Down Expand Up @@ -214,6 +225,12 @@ protected function processStream(Response $response, Request $request, int $dept
return;
}

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -381,6 +398,12 @@ protected function handleToolCalls(Request $request, string $text, array $toolCa
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

$this->state->resetTextState();
$this->state->withMessageId(EventID::generate());

Expand Down
26 changes: 26 additions & 0 deletions src/Providers/Gemini/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use Prism\Prism\Providers\Gemini\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\ArtifactEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -100,6 +102,16 @@ protected function processStream(Response $response, Request $request, int $dept
$this->state->markStreamStarted();
}

// Emit step start event once per step
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

// Update usage data from each chunk
$this->state->withUsage($this->extractUsage($data, $request));

Expand Down Expand Up @@ -219,6 +231,13 @@ protected function processStream(Response $response, Request $request, int $dept
return;
}

// Emit step finish before stream end
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -356,6 +375,13 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

$depth++;
if ($depth < $request->maxSteps()) {
$previousUsage = $this->state->usage();
Expand Down
26 changes: 26 additions & 0 deletions src/Providers/Groq/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\ArtifactEvent;
use Prism\Prism\Streaming\Events\ErrorEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -96,6 +98,16 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Emit step start event once per step
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

if ($this->hasError($data)) {
yield from $this->handleErrors($data, $request);

Expand Down Expand Up @@ -168,6 +180,13 @@ protected function processStream(Response $response, Request $request, int $dept
}
}

// Emit step finish before stream end
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -275,6 +294,13 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Reset text state for next response
$this->state->resetTextState();
$this->state->withMessageId(EventID::generate());
Expand Down
Loading