feat(executor): async/streaming execution-path parity + decision-callback controls (#388, #389, #320, #369, #370)#471
feat(executor): async/streaming execution-path parity + decision-callback controls (#388, #389, #320, #369, #370)#471dgenio wants to merge 5 commits into
Conversation
…#370) Record guided decision points on StepRecord.decision (DecisionRecord: candidates, chosen, default_tool_name, duration_ms, timed_out) and downgrade flows with decision_candidates steps to DeterminismLevel.PARTIAL (#369). Add opt-in DecisionPolicy (timeout_s, max_decisions_per_flow, on_timeout) enforced via a bounded-join worker thread, with new DecisionTimeoutError (CW-E049) and DecisionBudgetExceededError (CW-E050) (#370). decision_policy=None keeps behavior unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
…ws (#388) execute_flow_async now consults the step cache (cached=True on hits, skips Tool.run_async), writes crash-resume checkpoints after each successful step / DAG level, and executes composed flow_name sub-flow steps with sub_result and deadline/cancel_token forwarding. Add resume_flow_async(trace_id) mirroring resume_flow (incl. CheckpointDriftError). Narrow the async-lane guard to keep rejecting only branching (#9) and decision callbacks (#102). Sync lane unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Add FlowExecutor.stream_flow_async, an async generator yielding the same FlowEvent lifecycle sequence as stream_flow by driving execute_flow_async on the calling loop (no worker thread). cancel_token/deadline end the stream at the next step boundary (FlowCancelledError carries the partial); abandoning the iterator cancels the backing task. Sync stream_flow gains optional cancel_token/deadline forwarded to the worker. Refactor the stream collector to emit via a callable so both lanes share it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Add StreamingTool (a Tool subclass) and ToolChunk: a streaming tool yields intermediate chunks plus a terminal is_final chunk carrying the schema-validated assembled output. stream_flow_async surfaces each chunk as a new FlowEvent(kind='step_chunk', chunk=...) between step_start and step_end, via a new additive on_step_chunk middleware hook + StepChunkContext. Streaming tools stay fully backward compatible on non-streaming paths (run/run_async/sync executor drain to the assembled output). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
There was a problem hiding this comment.
Pull request overview
This PR closes a cohesive set of gaps in ChainWeaver’s async/streaming/interactive execution paths by adding streaming tool chunk propagation, improving async-lane parity (cache/checkpoints/composition), and introducing decision-callback audit + optional guardrails, with accompanying public API/docs/test updates.
Changes:
- Add streaming tool support (
StreamingTool/ToolChunk) and propagate streamed chunks through a newFlowEvent(kind="step_chunk")plus an additiveon_step_chunkmiddleware hook. - Bring async execution closer to sync parity: step cache + checkpoint resume + composed sub-flow execution (plus
resume_flow_async). - Add decision-callback audit records (
StepRecord.decision) and optional policy controls (timeout + per-flow budget), and downgrade determinism level toPARTIALwhendecision_candidatesare present.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
chainweaver/executor.py |
Implements async parity (cache/checkpoints/composition), stream_flow_async, streaming chunk event emission, and decision policy enforcement. |
chainweaver/tools.py |
Adds ToolChunk + StreamingTool and the streaming execution API (run_streaming). |
chainweaver/events.py |
Extends FlowEvent schema to include step_chunk + chunk. |
chainweaver/middleware.py |
Adds StepChunkContext and an additive on_step_chunk hook. |
chainweaver/decisions.py |
Adds DecisionRecord + DecisionPolicy models and exports them. |
chainweaver/exceptions.py |
Adds DecisionTimeoutError / DecisionBudgetExceededError and assigns error codes. |
chainweaver/flow.py |
Downgrades determinism_level to PARTIAL when decision_candidates exist. |
chainweaver/__init__.py |
Exposes new public symbols (streaming + decision policy/errors + chunk context). |
AGENTS.md |
Updates entry points and async support matrix to reflect new capabilities. |
CHANGELOG.md |
Documents new streaming/async parity/decision-policy behavior and determinism reclassification. |
docs/reference/error-table.md |
Adds CW-E049/CW-E050 to the error table. |
tests/test_streaming_tools.py |
Adds coverage for streaming tool draining + step_chunk emission + JSON round-trip. |
tests/test_streaming_async.py |
Adds async streaming event-order and cancellation tests; sync stream cancellation coverage. |
tests/test_executor_async_parity.py |
Adds async parity tests for cache/checkpoint resume/sub-flow composition. |
tests/test_decision_audit_policy.py |
Adds decision audit record tests, determinism matrix tests, and policy timeout/budget tests. |
tests/test_executor_async.py |
Updates unsupported-construct assertions now that sub-flows are supported async. |
tests/test_composition.py |
Updates composition tests to assert async now executes sub-flows and records sub_result. |
tests/fixtures/public_api.json |
Regenerates the public API snapshot for new symbols/fields. |
| Unlike :meth:`stream_flow`, cancellation is prompt and cooperative: | ||
| passing a ``cancel_token`` (or a ``deadline``) ends the stream at the | ||
| next step boundary by raising | ||
| :class:`~chainweaver.exceptions.FlowCancelledError` from the iterator — | ||
| its :attr:`~chainweaver.exceptions.FlowCancelledError.result` carries | ||
| the partial run. No ``flow_end`` is emitted on cancellation (execution | ||
| raised before the flow-end hook). If the consumer stops iterating | ||
| early, the backing :meth:`execute_flow_async` task is cancelled. |
| # Streaming tool (issue #320): consume the chunk stream, emitting a | ||
| # ``step_chunk`` event per chunk (surfaced by ``stream_flow_async``), | ||
| # and use the terminal chunk's assembled — already output-schema | ||
| # validated — data as the step output. The step cache is bypassed | ||
| # (streaming tools are I/O-bound and typically non-deterministic). | ||
| if isinstance(tool, StreamingTool): | ||
| return _finish( | ||
| await self._run_streaming_step_async( |
There was a problem hiding this comment.
Good catch on the timeout — fixed in 5af0158: the streaming consumption is now bounded by the tool's timeout_seconds via asyncio.wait_for, surfacing ToolTimeoutError just like the non-streaming path (added a regression test).
On cache + step.retry: these are bypassed for streaming steps by design, not as an oversight. A streaming step has already emitted its intermediate step_chunk events to downstream consumers by the time it could fail or be cache-served, so a partial stream can't be safely replayed (retry) or reconstructed from a memoised final dict (cache hit would also skip all chunk events). I've updated the StreamingTool and _run_streaming_step_async docstrings to state this explicitly rather than implying full parity. (The non-streaming paths — run/run_async/sync execute_flow/non-streamed execute_flow_async — still drain to the same final output, so an ordinary consumer is unaffected.)
Generated by Claude Code
| final: dict[str, Any] | None = None | ||
| async for chunk in stream_fn(validated_input): | ||
| if chunk.is_final: | ||
| final = chunk.data | ||
| if final is None: | ||
| raise ToolDefinitionError( | ||
| name, "streaming tool produced no terminal (is_final=True) chunk." | ||
| ) | ||
| return final |
| validated_input = self.input_schema.model_validate(raw_inputs) | ||
| async for chunk in self.stream_fn(validated_input): | ||
| if chunk.is_final: | ||
| validated = self._validate_output(chunk.data) | ||
| yield ToolChunk(data=validated, is_final=True) | ||
| else: | ||
| yield chunk |
…docstring Address Copilot review on PR #471: - StreamingTool.run_streaming / _drain now enforce 'exactly one terminal is_final chunk, and it must be last' — a chunk after the terminal chunk or a missing terminal raises ToolDefinitionError (was: silently kept the last). - _run_streaming_step_async bounds the whole stream with the tool's timeout_seconds via asyncio.wait_for, surfacing ToolTimeoutError (was: an async streaming step could hang indefinitely). Document that cache bypass and retry-skip for streaming steps are intentional (a partial stream cannot be replayed/memoised). - Correct the stream_flow_async docstring: a terminal flow_end carrying the partial IS emitted before FlowCancelledError is raised. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Summary
Completes the recommended coherent group from the issue-triage report: a focused set of changes to ChainWeaver's asynchronous / streaming / interactive execution path in
executor.py(plus thetools.py/events.py/middleware.pysurfaces they touch). The five issues are mutually reinforcing — they all live on the same execution loop — and were implemented as five reviewable commits.Changes
decisions.py,executor.py,flow.py,exceptions.py)StepRecord.decisionnow carries aDecisionRecord(candidates, chosen, default, latency, timed_out) populated exactly when a callback resolves a step.decision_candidatesstep reportDeterminismLevel.PARTIAL(wasFULL) — matching thebranchesprecedent.DecisionPolicy(timeout_s, max_decisions_per_flow, on_timeout)enforced via a bounded-join worker thread; newDecisionTimeoutError(CW-E049) andDecisionBudgetExceededError(CW-E050).decision_policy=Nonekeeps behaviour unchanged.execute_flow_async#388 — async-lane parity (executor.py):execute_flow_asyncnow consults the step cache (cached=True, skipsrun_async), writes crash-resume checkpoints per step / DAG level, and runs composedflow_namesub-flows (sub_result+ deadline/cancel forwarding). Newresume_flow_async(trace_id)mirrorsresume_flowincl.CheckpointDriftError. The async lane still rejects branching (Add conditional branching support with safe predicate evaluation #9) and decision callbacks (Add guided decision points — hybrid execution with contextual tool narrowing #102).stream_flow_asyncand cancellation support for streamed execution #389 —stream_flow_async+ cancellation (executor.py,events.py): new async generator yields the same lifecycle event sequence on the calling loop;cancel_token/deadlineend the stream at the next boundary with the partial on.result. Syncstream_flowgains optionalcancel_token/deadline.tools.py,events.py,middleware.py,executor.py): newStreamingTool+ToolChunk;stream_flow_asyncemitsFlowEvent(kind="step_chunk", chunk=...)betweenstep_start/step_endvia an additiveon_step_chunkmiddleware hook. Streaming tools are fully backward compatible on non-streaming paths.Docs updated:
AGENTS.md(entry points, async support matrix, determinism),CHANGELOG.md,docs/reference/error-table.md. Public-API snapshot fixture regenerated.Why
These are the highest-value, most coherent slice of the open backlog: a single subsystem, several
priority: high, and a real documented gap (the async lane — the home of MCP-adapted tools — previously lacked caching, resume, and composition).Testing
ruff check chainweaver/ tests/ examples/)ruff format --check chainweaver/ tests/ examples/)python -m mypy chainweaver/ tests/)python -m pytest tests/— 1954 passed, 1 skipped, coverage 92.79% ≥ 80% gate)test_decision_audit_policy.py(20),test_executor_async_parity.py(6),test_streaming_async.py(10),test_streaming_tools.py(8), plus updatedtest_executor_async.py/test_composition.py.Also ran the banned-vocabulary check (
scripts/check_vocabulary.py) — clean.Issues closed by this PR
Closes #388
Closes #389
Closes #320
Closes #369
Closes #370
Related Issues
Checklist
AGENTS.mdanddocs/agent-context/)__all__+ snapshot updated)Risks / caveats
FULL → PARTIAL; consumers gating onFULL(catalog exporters, governance, attestation) will see them reclassified — the corrected signal, noted in the changelog. Per the maintainer's direction on this task, backward compatibility was explicitly not a constraint.retry/on_error/ contracts (previously dropped) — more correct, matches the sync lane.🤖 Generated with Claude Code
Generated by Claude Code