From a92e7232140339a99565d97f3810b86ede26c748 Mon Sep 17 00:00:00 2001 From: CocoRoF Date: Sun, 17 May 2026 22:23:29 +0900 Subject: [PATCH] refactor(s12_agent): SubagentTypeDescriptor + SubAgentBuildContext + parameterized PipelineFactory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase D1 of the LLM backend upgrade. Unblocks multi-provider sub-agents by handing a build context to every factory — the descriptor's provider choice + the parent's CredentialBundle flow down so a sub-pipeline's Stage 6 can authenticate against any of the 6 registered providers. SubagentTypeDescriptor (s12_agent/subagent_type.py) - New fields: provider (sub-pipeline Stage 6), provider_credentials_extras (merged into ProviderCredentials.extras), parallel (orchestrator fan-out hint, fully landing in D2), max_concurrent (group-cap for parallel siblings). - ``model_override`` stays a string for back-compat; the plan's Optional[ModelConfig] upgrade is deferred to a follow-up. - ``extras`` now typed Mapping[str, Any]. SubAgentBuildContext (NEW, frozen) - parent_session_id / sub_session_id / credentials / descriptor / workspace_snapshot / parent_state_shared. Single channel the factory receives from the orchestrator. PipelineFactory (signature change) - Was: ``Callable[[], Any]`` (zero-arg). - Now: ``Callable[[SubAgentBuildContext], Pipeline | Awaitable[Pipeline]]``. - _resolve_pipeline(factory, ctx) calls factory(ctx); on TypeError it falls back to factory() so pre-D1 zero-arg fixtures keep working. SubagentTypeOrchestrator._dispatch_one - Builds the ctx and passes it to the factory. - Surfaces descriptor.provider / parallel / max_concurrent on the sub_result.subagent_metadata so UI/audit layers can render the new fields without re-walking the registry. s12_agent/__init__.py exports PipelineFactory + SubAgentBuildContext. Tests - tests/unit/test_subagent_descriptor.py (NEW) — 9 cases: new fields, frozen, ctx population (provider + parent ids + credentials + workspace), legacy zero-arg factory fallback, sub_result metadata. - Existing tests/unit/test_subagent_type_orchestrator.py — 19 pass (legacy zero-arg fixtures keep working). Full suite: 3214 passed, 8 skipped, 0 failed. Plan reference: docs/llm-backend-upgrade-plan/07_rollout_phases.md (Phase D1). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../stages/s12_agent/__init__.py | 4 + .../stages/s12_agent/subagent_type.py | 156 ++++++++----- tests/unit/test_subagent_descriptor.py | 205 ++++++++++++++++++ 3 files changed, 310 insertions(+), 55 deletions(-) create mode 100644 tests/unit/test_subagent_descriptor.py diff --git a/src/geny_executor/stages/s12_agent/__init__.py b/src/geny_executor/stages/s12_agent/__init__.py index 78d7ddc..ea92217 100644 --- a/src/geny_executor/stages/s12_agent/__init__.py +++ b/src/geny_executor/stages/s12_agent/__init__.py @@ -2,6 +2,8 @@ from geny_executor.stages.s12_agent.interface import AgentOrchestrator, SubPipelineFactory from geny_executor.stages.s12_agent.subagent_type import ( + PipelineFactory, + SubAgentBuildContext, SubagentTypeDescriptor, SubagentTypeOrchestrator, SubagentTypeRegistry, @@ -23,6 +25,8 @@ "EvaluatorOrchestrator", "SubPipelineFactory", "DefaultSubPipelineFactory", + "PipelineFactory", + "SubAgentBuildContext", "SubagentTypeDescriptor", "SubagentTypeOrchestrator", "SubagentTypeRegistry", diff --git a/src/geny_executor/stages/s12_agent/subagent_type.py b/src/geny_executor/stages/s12_agent/subagent_type.py index 9c794df..813a951 100644 --- a/src/geny_executor/stages/s12_agent/subagent_type.py +++ b/src/geny_executor/stages/s12_agent/subagent_type.py @@ -1,38 +1,26 @@ """Subagent-type registry + orchestrator. -Cycle 20260424 executor uplift — Phase 7 Sprint S7.5. - -The pre-S7.5 :class:`DelegateOrchestrator` walks -``state.delegate_requests`` and looks up sub-pipelines by -``agent_type`` name in a :class:`SubPipelineFactory`. That works fine -for one-off delegations but skips two pieces of context the LLM and -the host both need: - -1. **Per-type metadata** — what does ``code-reviewer`` actually do? - What tools does it own? What model does it run on? Without this, - neither the LLM (deciding when to delegate) nor admin UIs - (showing sub-agent activity) have anything to work with. -2. **A shared dispatcher for skill ``fork`` mode.** Phase 4 Skills - ship inline-only; the fork branch is stubbed pending an - orchestrator. S7.5 provides the orchestrator so a Phase 7+ - ``SkillTool.fork`` lands without re-implementing the dispatch - path. +After Phase D1 of the LLM backend upgrade, a sub-agent factory is no +longer zero-arg — it receives a :class:`SubAgentBuildContext` carrying +the parent's :class:`CredentialBundle`, descriptor, session ids, and +workspace snapshot. This is what makes **multi-provider sub-agents** +possible: a factory reads ``ctx.descriptor.provider`` and builds its +sub-pipeline manifest with the desired Stage 6 provider, then runs +``Pipeline.from_manifest`` with the shared bundle. This module ships: * :class:`SubagentTypeDescriptor` — frozen metadata + factory dataclass. - ``agent_type`` is the name the LLM sees and the registry key. -* :class:`SubagentTypeRegistry` — thin id→descriptor map mirroring - the shape of :class:`~geny_executor.tools.registry.ToolRegistry` - (register / unregister / get / list). + Carries ``provider`` / ``provider_credentials_extras`` / ``parallel`` + / ``max_concurrent`` on top of the legacy fields. +* :class:`SubAgentBuildContext` — frozen build-time context passed to + every factory. +* :class:`SubagentTypeRegistry` — id→descriptor map mirroring + :class:`~geny_executor.tools.registry.ToolRegistry` (register / + unregister / get / list). * :class:`SubagentTypeOrchestrator` — :class:`AgentOrchestrator` subclass that consumes ``state.delegate_requests`` against the - registry. Each request becomes a sub-pipeline run; results land - on ``state.agent_results`` via the existing Stage 11 wiring. - -Compatibility: the existing ``DelegateOrchestrator`` keeps working -unchanged. Hosts opt into the typed orchestrator by swapping it in -at the Stage 11 strategy slot. + registry. Serial dispatch in D1; parallel fan-out arrives in D2. """ from __future__ import annotations @@ -41,7 +29,7 @@ import logging import uuid from dataclasses import dataclass, field -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Tuple, Union from geny_executor.core.state import PipelineState from geny_executor.stages.s12_agent.interface import AgentOrchestrator @@ -50,11 +38,30 @@ logger = logging.getLogger(__name__) -# A factory may be sync (returns a Pipeline) or async (returns -# Awaitable[Pipeline]). The orchestrator handles both — hosts that -# need to do async setup (e.g. attach an MCP manager) write an -# async factory. -PipelineFactory = Callable[[], Any] +@dataclass(frozen=True) +class SubAgentBuildContext: + """Build-time context handed to every :data:`PipelineFactory`. + + The orchestrator builds one of these per dispatch and forwards it + to the factory. The factory uses ``descriptor.provider`` (etc.) to + shape its sub-pipeline manifest, and ``credentials`` to pass the + parent's :class:`CredentialBundle` straight to + ``Pipeline.from_manifest`` so authentication is single-channel + end-to-end. + """ + + parent_session_id: str + sub_session_id: str + credentials: Any # CredentialBundle | None — typed loosely to avoid import cycles + descriptor: "SubagentTypeDescriptor" + workspace_snapshot: Optional[Mapping[str, Any]] = None + parent_state_shared: Mapping[str, Any] = field(default_factory=dict) + + +# A factory takes a build context and returns a Pipeline (sync) or an +# Awaitable[Pipeline] (async). Hosts that do async setup (MCP, storage) +# write an async factory. +PipelineFactory = Callable[[SubAgentBuildContext], Union[Any, Awaitable[Any]]] @dataclass(frozen=True) @@ -65,18 +72,30 @@ class SubagentTypeDescriptor: agent_type: Stable identifier — registry key + the value the LLM sees in ``[DELEGATE: ]`` markers + the field used in ``state.delegate_requests`` entries. - factory: Zero-arg callable returning a ready-to-run - :class:`Pipeline`. May be sync or async. Hosts that wire - session-scoped resources (storage path, MCP manager, - credentials) close over them in the factory. + factory: Callable receiving a :class:`SubAgentBuildContext` and + returning a ready-to-run :class:`Pipeline`. May be sync or + async. description: One-line summary the LLM uses when choosing whether to delegate. Mirrors ``Tool.description``. allowed_tools: Tuple of tool names the sub-agent's pipeline should expose. Empty tuple means "inherit parent" — the - host is responsible for actually applying this in the - factory; the registry just records intent. + host is responsible for applying this in the factory; the + registry just records intent. + provider: Override the sub-pipeline's Stage 6 provider + (e.g. ``"openai"``, ``"claude_code_cli"``). ``None`` means + "inherit parent" (factory may copy parent provider). + provider_credentials_extras: Free-form bag merged into the + parent's :class:`ProviderCredentials.extras` for *this* + sub-agent when the factory chooses to. Common use: bumping + ``max_budget_usd`` for a critic sub-agent. model_override: Canonical model id (``"claude-opus-4-7"``, etc.) the sub-agent should run on. ``None`` inherits. + parallel: When ``True``, the orchestrator may dispatch this + sub-agent concurrently with its parallel-marked peers. + max_concurrent: Cap on simultaneous parallel sub-agents in a + group; the orchestrator uses ``min(max_concurrent)`` of + the group to size its semaphore. Ignored when + ``parallel=False``. extras: Free-form bag for host-specific descriptor data (cost budget, persona ids, …). """ @@ -85,8 +104,12 @@ class SubagentTypeDescriptor: factory: PipelineFactory description: str = "" allowed_tools: Tuple[str, ...] = () + provider: Optional[str] = None + provider_credentials_extras: Mapping[str, Any] = field(default_factory=dict) model_override: Optional[str] = None - extras: Dict[str, Any] = field(default_factory=dict) + parallel: bool = False + max_concurrent: int = 1 + extras: Mapping[str, Any] = field(default_factory=dict) class SubagentTypeRegistry: @@ -123,9 +146,22 @@ def __contains__(self, agent_type: str) -> bool: return agent_type in self._descriptors -async def _resolve_pipeline(factory: PipelineFactory) -> Any: - """Call a factory and unwrap an awaitable when present.""" - result = factory() +async def _resolve_pipeline( + factory: PipelineFactory, ctx: SubAgentBuildContext +) -> Any: + """Call a factory with the build context and unwrap an awaitable. + + For backward compatibility with zero-arg factories (the pre-D1 + shape), we try ``factory(ctx)`` first; if it raises ``TypeError`` + for an unexpected argument we fall back to ``factory()``. + """ + try: + result = factory(ctx) + except TypeError as e: + if "argument" not in str(e) and "positional" not in str(e): + raise + # Legacy zero-arg factory shape. + result = factory() # type: ignore[call-arg] if inspect.isawaitable(result): return await result return result @@ -210,12 +246,31 @@ async def _dispatch_one( base_record["subagent_metadata"] = { "description": descriptor.description, "allowed_tools": list(descriptor.allowed_tools), + "provider": descriptor.provider, "model_override": descriptor.model_override, + "parallel": descriptor.parallel, + "max_concurrent": descriptor.max_concurrent, "extras": dict(descriptor.extras), } + # Build the context handed to the factory. The parent's + # CredentialBundle (populated by Pipeline._init_state from + # the bundle passed to from_manifest_async) flows down so the + # sub-pipeline's Stage 6 can authenticate with the right + # provider without re-asking the host. + ws_snapshot = state.shared.get("workspace_snapshot") + sub_session_id = f"{state.session_id}-{agent_type}-{uuid.uuid4().hex[:8]}" + ctx = SubAgentBuildContext( + parent_session_id=state.session_id, + sub_session_id=sub_session_id, + credentials=state.credentials, + descriptor=descriptor, + workspace_snapshot=ws_snapshot, + parent_state_shared=dict(state.shared), + ) + try: - sub_pipeline = await _resolve_pipeline(descriptor.factory) + sub_pipeline = await _resolve_pipeline(descriptor.factory, ctx) except Exception as exc: logger.warning( "SubagentTypeOrchestrator: factory for %r raised: %s", @@ -230,18 +285,9 @@ async def _dispatch_one( "error": f"factory_error: {exc}", } - sub_state = PipelineState( - session_id=f"{state.session_id}-{agent_type}-{uuid.uuid4().hex[:8]}", - ) + sub_state = PipelineState(session_id=sub_session_id) - # PR-D.4.3 — thread workspace context to the sub-pipeline. - # The parent's host stashes the active WorkspaceStack as a - # snapshot under state.shared["workspace_snapshot"]; we copy - # it down so sub-tools see the same cwd/branch the parent - # was using when AgentTool fired. This keeps "subagent picks - # up where parent left off" semantics without requiring the - # sub to re-enter every worktree. - ws_snapshot = state.shared.get("workspace_snapshot") + # Thread workspace context to the sub-pipeline. if ws_snapshot is not None: sub_state.shared["workspace_snapshot"] = ws_snapshot diff --git a/tests/unit/test_subagent_descriptor.py b/tests/unit/test_subagent_descriptor.py new file mode 100644 index 0000000..bb20668 --- /dev/null +++ b/tests/unit/test_subagent_descriptor.py @@ -0,0 +1,205 @@ +"""Tests for SubagentTypeDescriptor + SubAgentBuildContext (Phase D1).""" + +from __future__ import annotations + +import sys +import os +from dataclasses import FrozenInstanceError + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +import pytest + +from geny_executor.llm_client.credentials import CredentialBundle, ProviderCredentials +from geny_executor.stages.s12_agent.subagent_type import ( + SubAgentBuildContext, + SubagentTypeDescriptor, + SubagentTypeOrchestrator, + SubagentTypeRegistry, +) + + +# --------------------------------------------------------------------------- +# Descriptor shape — D1 new fields +# --------------------------------------------------------------------------- + + +def test_descriptor_has_new_fields() -> None: + d = SubagentTypeDescriptor(agent_type="x", factory=lambda ctx: None) + assert d.provider is None + assert d.provider_credentials_extras == {} + assert d.parallel is False + assert d.max_concurrent == 1 + + +def test_descriptor_provider_field() -> None: + d = SubagentTypeDescriptor( + agent_type="researcher", + factory=lambda ctx: None, + provider="openai", + provider_credentials_extras={"max_budget_usd": 2.0}, + model_override="gpt-4o-mini", + parallel=True, + max_concurrent=4, + ) + assert d.provider == "openai" + assert d.provider_credentials_extras == {"max_budget_usd": 2.0} + assert d.model_override == "gpt-4o-mini" + assert d.parallel is True + assert d.max_concurrent == 4 + + +def test_descriptor_is_frozen() -> None: + d = SubagentTypeDescriptor(agent_type="x", factory=lambda ctx: None) + with pytest.raises(FrozenInstanceError): + d.provider = "openai" # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# SubAgentBuildContext +# --------------------------------------------------------------------------- + + +def test_context_carries_credentials_and_descriptor() -> None: + bundle = CredentialBundle(by_provider={ + "anthropic": ProviderCredentials(api_key="sk-x"), + }) + desc = SubagentTypeDescriptor(agent_type="t", factory=lambda ctx: None, provider="anthropic") + ctx = SubAgentBuildContext( + parent_session_id="parent-1", + sub_session_id="parent-1-sub-1", + credentials=bundle, + descriptor=desc, + ) + assert ctx.parent_session_id == "parent-1" + assert ctx.sub_session_id == "parent-1-sub-1" + assert ctx.credentials is bundle + assert ctx.descriptor is desc + assert ctx.workspace_snapshot is None + assert ctx.parent_state_shared == {} + + +def test_context_with_workspace_snapshot() -> None: + desc = SubagentTypeDescriptor(agent_type="t", factory=lambda ctx: None) + ctx = SubAgentBuildContext( + parent_session_id="p", + sub_session_id="s", + credentials=None, + descriptor=desc, + workspace_snapshot={"cwd": "/tmp/wd", "branch": "main"}, + parent_state_shared={"key": "value"}, + ) + assert ctx.workspace_snapshot == {"cwd": "/tmp/wd", "branch": "main"} + assert ctx.parent_state_shared == {"key": "value"} + + +def test_context_is_frozen() -> None: + desc = SubagentTypeDescriptor(agent_type="t", factory=lambda ctx: None) + ctx = SubAgentBuildContext( + parent_session_id="p", + sub_session_id="s", + credentials=None, + descriptor=desc, + ) + with pytest.raises(FrozenInstanceError): + ctx.parent_session_id = "other" # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# Factory receives the context +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_factory_receives_build_context() -> None: + """A factory accepting ``ctx`` actually sees the descriptor + parent + session details.""" + received: dict = {} + + class _FakePipeline: + async def run(self, task, sub_state): + return type("R", (), {"success": True, "text": f"ok:{task}", "error": None})() + + def factory(ctx: SubAgentBuildContext): + received["ctx"] = ctx + return _FakePipeline() + + bundle = CredentialBundle(by_provider={"openai": ProviderCredentials(api_key="o")}) + desc = SubagentTypeDescriptor( + agent_type="reviewer", factory=factory, provider="openai", + ) + reg = SubagentTypeRegistry().register(desc) + orch = SubagentTypeOrchestrator(reg) + + from geny_executor.core.state import PipelineState + + state = PipelineState(session_id="sess-7") + state.credentials = bundle + state.shared["workspace_snapshot"] = {"cwd": "/tmp/x"} + state.delegate_requests = [{"agent_type": "reviewer", "task": "review"}] + + result = await orch.orchestrate(state) + assert result.delegated is True + + ctx: SubAgentBuildContext = received["ctx"] + assert ctx.parent_session_id == "sess-7" + assert ctx.sub_session_id.startswith("sess-7-reviewer-") + assert ctx.descriptor.provider == "openai" + assert ctx.credentials is bundle + assert ctx.workspace_snapshot == {"cwd": "/tmp/x"} + + +@pytest.mark.asyncio +async def test_legacy_zero_arg_factory_still_works() -> None: + """Pre-D1 factories (no ctx parameter) keep working via the + TypeError fallback in _resolve_pipeline.""" + + class _FakePipeline: + async def run(self, task, sub_state): + return type("R", (), {"success": True, "text": "legacy", "error": None})() + + def factory(): + return _FakePipeline() + + desc = SubagentTypeDescriptor(agent_type="legacy", factory=factory) + reg = SubagentTypeRegistry().register(desc) + orch = SubagentTypeOrchestrator(reg) + + from geny_executor.core.state import PipelineState + + state = PipelineState(session_id="sess-l") + state.delegate_requests = [{"agent_type": "legacy", "task": "noop"}] + result = await orch.orchestrate(state) + assert result.delegated is True + assert result.sub_results[0]["success"] is True + assert result.sub_results[0]["text"] == "legacy" + + +# --------------------------------------------------------------------------- +# Metadata exposes new fields +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_result_metadata_carries_provider_and_parallel_flags() -> None: + class _FakePipeline: + async def run(self, task, sub_state): + return type("R", (), {"success": True, "text": "x", "error": None})() + + desc = SubagentTypeDescriptor( + agent_type="r", factory=lambda ctx: _FakePipeline(), + provider="claude_code_cli", parallel=True, max_concurrent=3, + model_override="opus", + ) + reg = SubagentTypeRegistry().register(desc) + orch = SubagentTypeOrchestrator(reg) + + from geny_executor.core.state import PipelineState + state = PipelineState(session_id="s") + state.delegate_requests = [{"agent_type": "r", "task": "go"}] + result = await orch.orchestrate(state) + meta = result.sub_results[0]["subagent_metadata"] + assert meta["provider"] == "claude_code_cli" + assert meta["model_override"] == "opus" + assert meta["parallel"] is True + assert meta["max_concurrent"] == 3