Skip to content

feat(executor): async/streaming execution-path parity + decision-callback controls (#388, #389, #320, #369, #370)#471

Open
dgenio wants to merge 5 commits into
mainfrom
claude/github-issue-triage-rle6rj
Open

feat(executor): async/streaming execution-path parity + decision-callback controls (#388, #389, #320, #369, #370)#471
dgenio wants to merge 5 commits into
mainfrom
claude/github-issue-triage-rle6rj

Conversation

@dgenio

@dgenio dgenio commented Jun 22, 2026

Copy link
Copy Markdown
Owner

Summary

Completes the recommended coherent group from the issue-triage report: a focused set of changes to ChainWeaver's asynchronous / streaming / interactive execution path in executor.py (plus the tools.py / events.py / middleware.py surfaces they touch). The five issues are mutually reinforcing — they all live on the same execution loop — and were implemented as five reviewable commits.

Changes

Docs updated: AGENTS.md (entry points, async support matrix, determinism), CHANGELOG.md, docs/reference/error-table.md. Public-API snapshot fixture regenerated.

Why

These are the highest-value, most coherent slice of the open backlog: a single subsystem, several priority: high, and a real documented gap (the async lane — the home of MCP-adapted tools — previously lacked caching, resume, and composition).

Testing

  • Linting passes (ruff check chainweaver/ tests/ examples/)
  • Formatting check passes (ruff format --check chainweaver/ tests/ examples/)
  • Type checking passes (python -m mypy chainweaver/ tests/)
  • All existing tests pass (python -m pytest tests/1954 passed, 1 skipped, coverage 92.79% ≥ 80% gate)
  • New tests added: test_decision_audit_policy.py (20), test_executor_async_parity.py (6), test_streaming_async.py (10), test_streaming_tools.py (8), plus updated test_executor_async.py / test_composition.py.

Also ran the banned-vocabulary check (scripts/check_vocabulary.py) — clean.

Issues closed by this PR

Closes #388
Closes #389
Closes #320
Closes #369
Closes #370

Related Issues

Checklist

  • Code follows project conventions (see AGENTS.md and docs/agent-context/)
  • Public API changes are documented (CHANGELOG, AGENTS.md, error table; __all__ + snapshot updated)
  • No secrets or credentials included

Risks / caveats

  • Behaviour change (Reflect decision callbacks in determinism levels and the execution audit trail #369): decision-bearing flows reclassify FULL → PARTIAL; consumers gating on FULL (catalog exporters, governance, attestation) will see them reclassified — the corrected signal, noted in the changelog. Per the maintainer's direction on this task, backward compatibility was explicitly not a constraint.
  • The async DAG proxy now also forwards per-step retry / on_error / contracts (previously dropped) — more correct, matches the sync lane.

🤖 Generated with Claude Code


Generated by Claude Code

claude added 4 commits June 22, 2026 20:10
…#370)

Record guided decision points on StepRecord.decision (DecisionRecord:
candidates, chosen, default_tool_name, duration_ms, timed_out) and downgrade
flows with decision_candidates steps to DeterminismLevel.PARTIAL (#369).

Add opt-in DecisionPolicy (timeout_s, max_decisions_per_flow, on_timeout)
enforced via a bounded-join worker thread, with new DecisionTimeoutError
(CW-E049) and DecisionBudgetExceededError (CW-E050) (#370). decision_policy=None
keeps behavior unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
…ws (#388)

execute_flow_async now consults the step cache (cached=True on hits, skips
Tool.run_async), writes crash-resume checkpoints after each successful step /
DAG level, and executes composed flow_name sub-flow steps with sub_result and
deadline/cancel_token forwarding. Add resume_flow_async(trace_id) mirroring
resume_flow (incl. CheckpointDriftError). Narrow the async-lane guard to keep
rejecting only branching (#9) and decision callbacks (#102). Sync lane
unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Add FlowExecutor.stream_flow_async, an async generator yielding the same
FlowEvent lifecycle sequence as stream_flow by driving execute_flow_async on
the calling loop (no worker thread). cancel_token/deadline end the stream at
the next step boundary (FlowCancelledError carries the partial); abandoning the
iterator cancels the backing task. Sync stream_flow gains optional
cancel_token/deadline forwarded to the worker. Refactor the stream collector to
emit via a callable so both lanes share it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Add StreamingTool (a Tool subclass) and ToolChunk: a streaming tool yields
intermediate chunks plus a terminal is_final chunk carrying the schema-validated
assembled output. stream_flow_async surfaces each chunk as a new
FlowEvent(kind='step_chunk', chunk=...) between step_start and step_end, via a
new additive on_step_chunk middleware hook + StepChunkContext. Streaming tools
stay fully backward compatible on non-streaming paths (run/run_async/sync
executor drain to the assembled output).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Copilot AI review requested due to automatic review settings June 22, 2026 20:41

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR closes a cohesive set of gaps in ChainWeaver’s async/streaming/interactive execution paths by adding streaming tool chunk propagation, improving async-lane parity (cache/checkpoints/composition), and introducing decision-callback audit + optional guardrails, with accompanying public API/docs/test updates.

Changes:

  • Add streaming tool support (StreamingTool/ToolChunk) and propagate streamed chunks through a new FlowEvent(kind="step_chunk") plus an additive on_step_chunk middleware hook.
  • Bring async execution closer to sync parity: step cache + checkpoint resume + composed sub-flow execution (plus resume_flow_async).
  • Add decision-callback audit records (StepRecord.decision) and optional policy controls (timeout + per-flow budget), and downgrade determinism level to PARTIAL when decision_candidates are present.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
chainweaver/executor.py Implements async parity (cache/checkpoints/composition), stream_flow_async, streaming chunk event emission, and decision policy enforcement.
chainweaver/tools.py Adds ToolChunk + StreamingTool and the streaming execution API (run_streaming).
chainweaver/events.py Extends FlowEvent schema to include step_chunk + chunk.
chainweaver/middleware.py Adds StepChunkContext and an additive on_step_chunk hook.
chainweaver/decisions.py Adds DecisionRecord + DecisionPolicy models and exports them.
chainweaver/exceptions.py Adds DecisionTimeoutError / DecisionBudgetExceededError and assigns error codes.
chainweaver/flow.py Downgrades determinism_level to PARTIAL when decision_candidates exist.
chainweaver/__init__.py Exposes new public symbols (streaming + decision policy/errors + chunk context).
AGENTS.md Updates entry points and async support matrix to reflect new capabilities.
CHANGELOG.md Documents new streaming/async parity/decision-policy behavior and determinism reclassification.
docs/reference/error-table.md Adds CW-E049/CW-E050 to the error table.
tests/test_streaming_tools.py Adds coverage for streaming tool draining + step_chunk emission + JSON round-trip.
tests/test_streaming_async.py Adds async streaming event-order and cancellation tests; sync stream cancellation coverage.
tests/test_executor_async_parity.py Adds async parity tests for cache/checkpoint resume/sub-flow composition.
tests/test_decision_audit_policy.py Adds decision audit record tests, determinism matrix tests, and policy timeout/budget tests.
tests/test_executor_async.py Updates unsupported-construct assertions now that sub-flows are supported async.
tests/test_composition.py Updates composition tests to assert async now executes sub-flows and records sub_result.
tests/fixtures/public_api.json Regenerates the public API snapshot for new symbols/fields.

Comment thread chainweaver/executor.py Outdated
Comment on lines +3252 to +3259
Unlike :meth:`stream_flow`, cancellation is prompt and cooperative:
passing a ``cancel_token`` (or a ``deadline``) ends the stream at the
next step boundary by raising
:class:`~chainweaver.exceptions.FlowCancelledError` from the iterator —
its :attr:`~chainweaver.exceptions.FlowCancelledError.result` carries
the partial run. No ``flow_end`` is emitted on cancellation (execution
raised before the flow-end hook). If the consumer stops iterating
early, the backing :meth:`execute_flow_async` task is cancelled.
Comment thread chainweaver/executor.py
Comment on lines +2634 to +2641
# Streaming tool (issue #320): consume the chunk stream, emitting a
# ``step_chunk`` event per chunk (surfaced by ``stream_flow_async``),
# and use the terminal chunk's assembled — already output-schema
# validated — data as the step output. The step cache is bypassed
# (streaming tools are I/O-bound and typically non-deterministic).
if isinstance(tool, StreamingTool):
return _finish(
await self._run_streaming_step_async(

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the timeout — fixed in 5af0158: the streaming consumption is now bounded by the tool's timeout_seconds via asyncio.wait_for, surfacing ToolTimeoutError just like the non-streaming path (added a regression test).

On cache + step.retry: these are bypassed for streaming steps by design, not as an oversight. A streaming step has already emitted its intermediate step_chunk events to downstream consumers by the time it could fail or be cache-served, so a partial stream can't be safely replayed (retry) or reconstructed from a memoised final dict (cache hit would also skip all chunk events). I've updated the StreamingTool and _run_streaming_step_async docstrings to state this explicitly rather than implying full parity. (The non-streaming paths — run/run_async/sync execute_flow/non-streamed execute_flow_async — still drain to the same final output, so an ordinary consumer is unaffected.)


Generated by Claude Code

Comment thread chainweaver/tools.py
Comment on lines +720 to +728
final: dict[str, Any] | None = None
async for chunk in stream_fn(validated_input):
if chunk.is_final:
final = chunk.data
if final is None:
raise ToolDefinitionError(
name, "streaming tool produced no terminal (is_final=True) chunk."
)
return final
Comment thread chainweaver/tools.py
Comment on lines +752 to +758
validated_input = self.input_schema.model_validate(raw_inputs)
async for chunk in self.stream_fn(validated_input):
if chunk.is_final:
validated = self._validate_output(chunk.data)
yield ToolChunk(data=validated, is_final=True)
else:
yield chunk
…docstring

Address Copilot review on PR #471:
- StreamingTool.run_streaming / _drain now enforce 'exactly one terminal
  is_final chunk, and it must be last' — a chunk after the terminal chunk or a
  missing terminal raises ToolDefinitionError (was: silently kept the last).
- _run_streaming_step_async bounds the whole stream with the tool's
  timeout_seconds via asyncio.wait_for, surfacing ToolTimeoutError (was: an
  async streaming step could hang indefinitely). Document that cache bypass and
  retry-skip for streaming steps are intentional (a partial stream cannot be
  replayed/memoised).
- Correct the stream_flow_async docstring: a terminal flow_end carrying the
  partial IS emitted before FlowCancelledError is raised.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011U2ZKg2GMec7iAUDt2zezq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

3 participants