Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ chainweaver/
├── contracts.py ToolSafetyContract + SideEffectLevel/StabilityLevel/DeterminismLevel enums + merge_safety() + side_effect_exceeds() (#356) + evaluate_predicate() — determinism + operational safety vocabulary (#19, #125, #293, #9, #8)
├── approvals.py ApprovalCallback Protocol + ApprovalContext/ApprovalDecision/ApprovalRecord + coerce_approval_callback — execution-time ToolSafetyContract enforcement seam (#356); mirrors decisions.py
├── decorators.py @tool decorator for zero-boilerplate tool definition
├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19) + metadata provenance (#358/#359/#371) + dry_run_fn/run_dry (#357); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125)
├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19) + metadata provenance (#358/#359/#371) + dry_run_fn/run_dry (#357); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125); StreamingTool + ToolChunk for streamed output via run_streaming (#320)
├── flow.py FlowStep (+ output_mapping #386) + Flow + DAGFlow (+ dynamic_params #316) + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8) + ContextCollisionPolicy / on_context_collision (#337)
├── step_index.py Named sentinels for flow input/output validation records (#339)
├── _pointer.py Dependency-free RFC-6901 JSON pointer resolver shared by executor input_mapping (#387) and contrib json_pluck
Expand All @@ -58,8 +58,8 @@ chainweaver/
├── _execution/ Internal, no-I/O execution collaborators shared by both lanes (#330, #331); banned from importing LLM/network/random — see invariants
│ ├── __init__.py Re-exports merge_step_outputs + apply_output_mapping
│ └── context.py merge_step_outputs + apply_output_mapping: single context-merge honouring on_context_collision (#337) and output_mapping (#386)
├── middleware.py FlowExecutorMiddleware Protocol + lifecycle context models + BaseMiddleware (#131)
├── events.py FlowEvent streamable lifecycle payload yielded by FlowExecutor.stream_flow (#134)
├── middleware.py FlowExecutorMiddleware Protocol + lifecycle context models + BaseMiddleware (#131); optional on_step_chunk hook + StepChunkContext for streaming steps (#320)
├── events.py FlowEvent streamable lifecycle payload yielded by FlowExecutor.stream_flow / stream_flow_async (#134, #389) — incl. kind="step_chunk" carrying a ToolChunk for streaming tools (#320)
├── cache.py StepCache Protocol + InMemoryStepCache + FileStepCache + StepCacheKey (#127)
├── checkpoint.py Checkpointer Protocol + ExecutionSnapshot + InMemoryCheckpointer + FileCheckpointer (#128)
├── integrations/ Optional third-party adapters (each guards its extra import)
Expand Down Expand Up @@ -145,15 +145,16 @@ benchmarks/ Standalone benchmark scripts (not coverage-gated):

### Key entry points

- `FlowExecutor(..., decision_callback=...)` → wire a `DecisionCallback` for guided decision points (#102); steps with `decision_candidates` set call the callback to pick which tool to run. Either a class with `decide(ctx)` or a bare callable is accepted (coerced via `coerce_decision_callback`).
- `FlowExecutor(..., decision_callback=...)` → wire a `DecisionCallback` for guided decision points (#102); steps with `decision_candidates` set call the callback to pick which tool to run. Either a class with `decide(ctx)` or a bare callable is accepted (coerced via `coerce_decision_callback`). Each resolution is recorded on `StepRecord.decision` (`DecisionRecord`, #369). Pass `decision_policy=DecisionPolicy(timeout_s=..., max_decisions_per_flow=..., on_timeout=...)` to bound callback latency and per-flow decision count (#370).
- `KernelBackedExecutor(..., kernel=...)` from `chainweaver.integrations.agent_kernel` (#89) → optional `FlowExecutor` subclass that delegates `DAGFlowStep` instances with `step_type="capability"` through a `KernelProtocol`. The base `FlowExecutor` rejects capability steps; only this subclass dispatches them.
- `flow_to_selectable_item(flow, *, capability_id=None, tags=())` from `chainweaver.integrations.weaver_spec` (#107) → project a `Flow` or `DAGFlow` to a weaver-spec `SelectableItem` for contextweaver catalog ingestion.
- `RoutingDecisionAdapter(client=...)` from `chainweaver.integrations.contextweaver` (#106) → `DecisionCallback` impl that asks a `ContextweaverClient` for a `RoutingDecision` and returns the selected capability id.
- `FlowExecutor.execute_flow(flow_name, initial_input, *, version=None, force=False, deadline=None, cancel_token=None)` → `ExecutionResult`. `version` (#201) targets an exact registered flow version (default: latest); the version that ran is recorded on `ExecutionResult.flow_version`. `deadline` (wall-clock `time.time()` seconds) and `cancel_token` (`CancellationToken`, #142) cooperatively cancel **between** steps / DAG levels — never inside a tool — raising `FlowCancelledError` with the partial result.
- `FlowExecutor.execute_flow_async(flow_name, initial_input, *, version=None, force=False, deadline=None, cancel_token=None)` → `Awaitable[ExecutionResult]` (#80); async-native counterpart of `execute_flow`. Dispatches each step through `Tool.run_async` so async-fn tools (e.g. those produced by `chainweaver.mcp.MCPToolAdapter`) execute on the calling loop and sync-fn tools are offloaded to `asyncio.to_thread`. Supports linear and DAG flows with retries, middleware, and on_error policies; honours `version` / `deadline` / `cancel_token`; rejects composed `flow_name` sub-flow steps (#75, sync-only); defers step cache + checkpoint resume to a follow-up.
- `FlowExecutor.stream_flow(flow_name, initial_input, *, force=False)` → `Iterator[FlowEvent]` (#134); yields `kind="flow_start"` → (`step_start` → `step_end`)* → `flow_end` events as the flow runs on a worker thread. Cancellation is not supported for the sync variant; the background thread runs to completion.
- `FlowExecutor.execute_flow_async(flow_name, initial_input, *, version=None, force=False, deadline=None, cancel_token=None)` → `Awaitable[ExecutionResult]` (#80); async-native counterpart of `execute_flow`. Dispatches each step through `Tool.run_async` so async-fn tools (e.g. those produced by `chainweaver.mcp.MCPToolAdapter`) execute on the calling loop and sync-fn tools are offloaded to `asyncio.to_thread`. Supports linear and DAG flows with retries, middleware, and on_error policies; honours `version` / `deadline` / `cancel_token`; executes composed `flow_name` sub-flow steps, consults the step cache, and writes checkpoints — resume via `resume_flow_async(trace_id)` (#388). Still rejects conditional branching (#9) and `decision_candidates` (#102).
- `FlowExecutor.stream_flow(flow_name, initial_input, *, force=False, deadline=None, cancel_token=None)` → `Iterator[FlowEvent]` (#134); yields `kind="flow_start"` → (`step_start` → `step_end`)* → `flow_end` events as the flow runs on a worker thread. A `deadline` / `cancel_token` is checked at step boundaries on the worker (#389); abandoning the iterator still lets the in-flight step run to completion.
- `FlowExecutor.stream_flow_async(flow_name, initial_input, *, force=False, deadline=None, cancel_token=None)` → `AsyncIterator[FlowEvent]` (#389); async-native counterpart driving `execute_flow_async` on the calling loop (no worker thread). Same event order; `cancel_token` / `deadline` end the stream promptly at the next step boundary by raising `FlowCancelledError` (partial on `.result`), and abandoning the iterator cancels the backing task. Async-lane feature support applies (#388).
- `FlowExecutor(..., step_cache=...)` → memoize step outputs across runs (#127); keyed by `(tool_name, schema_hash, input_value_hash)`. Cache hits skip `Tool.fn` entirely (including retries and timeout) and surface as `StepRecord.cached=True`. Tools mark themselves `cacheable=False` to always run (side-effects, external state). `replay_flow` always bypasses the cache.
- `FlowExecutor(..., checkpointer=..., delete_on_success=True)` → crash-resume (#128); writes an `ExecutionSnapshot` after every successful linear step or DAG level. `FlowExecutor.resume_flow(trace_id)` validates the snapshot's flow version and tool `schema_hash` values against the current registry — drift raises `CheckpointDriftError` — then continues execution with the original `trace_id`. Snapshots are deleted on terminal success when `delete_on_success=True` (the default); preserved on failure for operator-driven retry.
- `FlowExecutor(..., checkpointer=..., delete_on_success=True)` → crash-resume (#128); writes an `ExecutionSnapshot` after every successful linear step or DAG level. `FlowExecutor.resume_flow(trace_id)` (or `resume_flow_async(trace_id)` for runs started on the async lane, #388) validates the snapshot's flow version and tool `schema_hash` values against the current registry — drift raises `CheckpointDriftError` — then continues execution with the original `trace_id`. Snapshots are deleted on terminal success when `delete_on_success=True` (the default); preserved on failure for operator-driven retry.
- `OTelTraceExporter(tracer=...)` from `chainweaver.integrations.opentelemetry` (#126) → emits OpenTelemetry spans as a `FlowExecutorMiddleware`: one parent `chainweaver.flow.{name}` span + one child `chainweaver.tool.{name}` span per `StepRecord`. After-the-fact export of a completed `ExecutionResult` via `export_result_to_otel(result, tracer=...)`. Optional extra: `pip install 'chainweaver[otel]'`.
- `MCPToolAdapter(session)` from `chainweaver.mcp` (#70, #150) → wraps each MCP tool advertised by an open `mcp.ClientSession` as a ChainWeaver `Tool`. `await adapter.discover_tools(server_prefix="…")` returns the wrapped tools; pass `include=[…]` to filter. The resulting tools are async-fn and must be run through `execute_flow_async`. Optional extra: `pip install 'chainweaver[mcp]'`.
- `FlowServer(executor, *, name="chainweaver", flow_names=None, server_prefix="")` from `chainweaver.mcp` (#72) → mounts registered flows as MCP tools on a FastMCP server. `server.serve(transport="stdio")` blocks; `await server.serve_async(transport=...)` returns to the loop. Synthesises the dispatcher signature from the flow's input schema so MCP clients call `tool(n=5)` directly. Optional extra: `pip install 'chainweaver[mcp]'`.
Expand Down Expand Up @@ -261,8 +262,8 @@ for features it does not yet honour, rather than diverging silently:
| Opt-in DAG-level concurrency (#344) | sequential | ✅ (`max_step_concurrency`) |
| Conditional branches / `default_next` (#9) | ✅ | ❌ rejected |
| `decision_candidates` (#102) | ✅ | ❌ rejected |
| Composed sub-flow (`flow_name`, #75) | ✅ | ❌ rejected |
| Step cache / checkpoint resume | ✅ | bypassed |
| Composed sub-flow (`flow_name`, #75) | ✅ | ✅ (#388) |
| Step cache / checkpoint resume | ✅ | ✅ (#388; resume via `resume_flow_async`) |

### State transitions (#335)

Expand All @@ -276,6 +277,9 @@ needing the new state must re-fetch via `get_flow`.
`type[BaseModel] | None`. `determinism_level` is a computed
`DeterminismLevel` (#8): linear `Flow` → `FULL` (or `NONE` if
`deterministic=False`); `DAGFlow` with any conditional `branches` → `PARTIAL`.
Any step (linear or DAG) with non-empty `decision_candidates` (#102) also
downgrades the flow to `PARTIAL` (#369), since a registered callback can pick
a different tool per run.

### `DAGFlowStep` conditional branching (#9)

Expand Down
69 changes: 69 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,75 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Streaming tool output propagation** (#320): a new
`chainweaver.StreamingTool` (a `Tool` subclass) produces its output as a
stream of `chainweaver.ToolChunk` objects via an async `run_streaming`
generator — zero or more intermediate chunks followed by one terminal
`is_final=True` chunk whose data is the schema-validated assembled output.
`stream_flow_async` surfaces each chunk as a new `FlowEvent(kind="step_chunk",
chunk=...)`, interleaved between `step_start` and `step_end`, so real-time
pipelines (voice, A2A, SSE) can consume partial output as it is produced.
Streaming tools are fully backward compatible: on the non-streaming paths
(`run` / `run_async` / sync `execute_flow` / non-streamed
`execute_flow_async`) they transparently drain to the assembled output and
behave like any other tool. A new optional `on_step_chunk` middleware hook
(with a `StepChunkContext`) receives chunks; it is additive and dispatched
only to middleware that define it, so existing middleware are unaffected.

- **`stream_flow_async` and streamed-run cancellation** (#389): a new
`FlowExecutor.stream_flow_async(...)` async generator yields the same
`flow_start → (step_start → step_end)* → flow_end` `FlowEvent` sequence as
`stream_flow`, driving `execute_flow_async` directly on the calling loop
(no worker thread). A `cancel_token` / `deadline` ends the stream promptly
at the next step boundary, raising `FlowCancelledError` with the partial run
on `.result`; abandoning the iterator cancels the backing task. The sync
`stream_flow` also gains optional `cancel_token` / `deadline` parameters,
checked at step boundaries on the worker thread (the in-flight step still
completes). Event ordering and the "observability never aborts a flow"
contract are unchanged.

- **Async-lane parity: cache, checkpoint resume, sub-flow composition**
(#388): `execute_flow_async` now consults the step cache (records
`StepRecord.cached=True` on hits and skips `Tool.run_async`), writes
crash-resume checkpoints after each successful step / DAG level, and
executes composed `flow_name` sub-flow steps (with `sub_result` populated
and `deadline` / `cancel_token` forwarded into the sub-flow). A new
`FlowExecutor.resume_flow_async(trace_id)` mirrors `resume_flow` —
including `CheckpointDriftError` on schema drift — on the async lane. The
async lane still rejects conditional branching (#9) and decision callbacks
(#102) up front via `AsyncLaneUnsupportedError`. Sync-lane behavior is
unchanged.

- **Decision-callback audit trail and policy controls** (#369, #370):
guided decision points (#102) are now visible in traces and bounded by
opt-in guardrails.
- *Audit record* (#369): `StepRecord.decision` carries a new
`chainweaver.decisions.DecisionRecord` (`candidates`, `chosen`,
`default_tool_name`, `duration_ms`, `timed_out`), populated **exactly
when** a registered `DecisionCallback` resolves a step. It round-trips
through JSON and is `None` for ordinary steps and for static fallbacks
where no callback ran.
- *Policy controls* (#370): `FlowExecutor(..., decision_policy=DecisionPolicy(...))`
adds a per-decision `timeout_s` (the callback runs on a bounded-join
worker thread; `on_timeout="error"` fails the step with the new
`DecisionTimeoutError` / `CW-E049`, `on_timeout="default"` falls back to
the step's static `tool_name` and records `timed_out=True`) and a
per-flow `max_decisions_per_flow` budget (exceeding it aborts the run
with `DecisionBudgetExceededError` / `CW-E050`). Sub-flows carry their
own independent budget. With `decision_policy=None` (the default),
behavior is unchanged.

### Changed

- **Determinism reclassification for decision-bearing flows** (#369,
breaking): a linear `Flow` or `DAGFlow` containing any step with non-empty
`decision_candidates` now reports `DeterminismLevel.PARTIAL` instead of
`FULL` (or `NONE` when `deterministic=False`), matching the existing
`branches` precedent — a registered callback can select different tools on
different runs, so the executed path is data-dependent. Consumers gating on
`FULL` (catalog exporters, governance policies, attestation) will see these
flows reclassified; this is the corrected signal.

- **OpenCode integration** (#276, #277, #278, #279, #280, #282): observe →
suggest → compile → expose for OpenCode, end to end and reversible.
- *Trace adapter* (#278/#276): `chainweaver.opencode.normalize_opencode_event`
Expand Down
14 changes: 13 additions & 1 deletion chainweaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
DecisionCallable,
DecisionCallback,
DecisionContext,
DecisionPolicy,
DecisionRecord,
coerce_decision_callback,
)
from chainweaver.decorators import tool
Expand All @@ -113,7 +115,9 @@
ContribError,
CostProfileError,
DAGDefinitionError,
DecisionBudgetExceededError,
DecisionCallbackError,
DecisionTimeoutError,
FlowAlreadyExistsError,
FlowAuthenticationError,
FlowAuthorizationError,
Expand Down Expand Up @@ -192,6 +196,7 @@
FlowEndContext,
FlowExecutorMiddleware,
FlowStartContext,
StepChunkContext,
StepEndContext,
StepStartContext,
)
Expand Down Expand Up @@ -255,7 +260,7 @@
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX, flow_output_step_index
from chainweaver.storage import FileStore, InMemoryStore, RegistryStore
from chainweaver.testing.replay import FixtureStaleError
from chainweaver.tools import Tool
from chainweaver.tools import StreamingTool, Tool, ToolChunk
from chainweaver.traces import (
AgentTraceEvent,
BacktestMismatch,
Expand Down Expand Up @@ -341,10 +346,14 @@
"DAGDefinitionError",
"DAGFlow",
"DAGFlowStep",
"DecisionBudgetExceededError",
"DecisionCallable",
"DecisionCallback",
"DecisionCallbackError",
"DecisionContext",
"DecisionPolicy",
"DecisionRecord",
"DecisionTimeoutError",
"DeterminismLevel",
"DraftFlow",
"DriftInfo",
Expand Down Expand Up @@ -436,15 +445,18 @@
"StabilityLevel",
"StepCache",
"StepCacheKey",
"StepChunkContext",
"StepDiff",
"StepEndContext",
"StepPlan",
"StepRecord",
"StepStartContext",
"StreamingTool",
"StructuredLLMFn",
"Suggestion",
"Tool",
"ToolChain",
"ToolChunk",
"ToolDefinitionError",
"ToolDescriptionProposal",
"ToolNotFoundError",
Expand Down
Loading
Loading