Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/geny_executor/stages/s12_agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from geny_executor.stages.s12_agent.interface import AgentOrchestrator, SubPipelineFactory
from geny_executor.stages.s12_agent.subagent_type import (
PipelineFactory,
SubAgentBuildContext,
SubagentTypeDescriptor,
SubagentTypeOrchestrator,
SubagentTypeRegistry,
Expand All @@ -23,6 +25,8 @@
"EvaluatorOrchestrator",
"SubPipelineFactory",
"DefaultSubPipelineFactory",
"PipelineFactory",
"SubAgentBuildContext",
"SubagentTypeDescriptor",
"SubagentTypeOrchestrator",
"SubagentTypeRegistry",
Expand Down
156 changes: 101 additions & 55 deletions src/geny_executor/stages/s12_agent/subagent_type.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,26 @@
"""Subagent-type registry + orchestrator.

Cycle 20260424 executor uplift — Phase 7 Sprint S7.5.

The pre-S7.5 :class:`DelegateOrchestrator` walks
``state.delegate_requests`` and looks up sub-pipelines by
``agent_type`` name in a :class:`SubPipelineFactory`. That works fine
for one-off delegations but skips two pieces of context the LLM and
the host both need:

1. **Per-type metadata** — what does ``code-reviewer`` actually do?
What tools does it own? What model does it run on? Without this,
neither the LLM (deciding when to delegate) nor admin UIs
(showing sub-agent activity) have anything to work with.
2. **A shared dispatcher for skill ``fork`` mode.** Phase 4 Skills
ship inline-only; the fork branch is stubbed pending an
orchestrator. S7.5 provides the orchestrator so a Phase 7+
``SkillTool.fork`` lands without re-implementing the dispatch
path.
After Phase D1 of the LLM backend upgrade, a sub-agent factory is no
longer zero-arg — it receives a :class:`SubAgentBuildContext` carrying
the parent's :class:`CredentialBundle`, descriptor, session ids, and
workspace snapshot. This is what makes **multi-provider sub-agents**
possible: a factory reads ``ctx.descriptor.provider`` and builds its
sub-pipeline manifest with the desired Stage 6 provider, then runs
``Pipeline.from_manifest`` with the shared bundle.

This module ships:

* :class:`SubagentTypeDescriptor` — frozen metadata + factory dataclass.
``agent_type`` is the name the LLM sees and the registry key.
* :class:`SubagentTypeRegistry` — thin id→descriptor map mirroring
the shape of :class:`~geny_executor.tools.registry.ToolRegistry`
(register / unregister / get / list).
Carries ``provider`` / ``provider_credentials_extras`` / ``parallel``
/ ``max_concurrent`` on top of the legacy fields.
* :class:`SubAgentBuildContext` — frozen build-time context passed to
every factory.
* :class:`SubagentTypeRegistry` — id→descriptor map mirroring
:class:`~geny_executor.tools.registry.ToolRegistry` (register /
unregister / get / list).
* :class:`SubagentTypeOrchestrator` — :class:`AgentOrchestrator`
subclass that consumes ``state.delegate_requests`` against the
registry. Each request becomes a sub-pipeline run; results land
on ``state.agent_results`` via the existing Stage 11 wiring.

Compatibility: the existing ``DelegateOrchestrator`` keeps working
unchanged. Hosts opt into the typed orchestrator by swapping it in
at the Stage 11 strategy slot.
registry. Serial dispatch in D1; parallel fan-out arrives in D2.
"""

from __future__ import annotations
Expand All @@ -41,7 +29,7 @@
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Tuple, Union

from geny_executor.core.state import PipelineState
from geny_executor.stages.s12_agent.interface import AgentOrchestrator
Expand All @@ -50,11 +38,30 @@
logger = logging.getLogger(__name__)


# A factory may be sync (returns a Pipeline) or async (returns
# Awaitable[Pipeline]). The orchestrator handles both — hosts that
# need to do async setup (e.g. attach an MCP manager) write an
# async factory.
PipelineFactory = Callable[[], Any]
@dataclass(frozen=True)
class SubAgentBuildContext:
"""Build-time context handed to every :data:`PipelineFactory`.

The orchestrator builds one of these per dispatch and forwards it
to the factory. The factory uses ``descriptor.provider`` (etc.) to
shape its sub-pipeline manifest, and ``credentials`` to pass the
parent's :class:`CredentialBundle` straight to
``Pipeline.from_manifest`` so authentication is single-channel
end-to-end.
"""

parent_session_id: str
sub_session_id: str
credentials: Any # CredentialBundle | None — typed loosely to avoid import cycles
descriptor: "SubagentTypeDescriptor"
workspace_snapshot: Optional[Mapping[str, Any]] = None
parent_state_shared: Mapping[str, Any] = field(default_factory=dict)


# A factory takes a build context and returns a Pipeline (sync) or an
# Awaitable[Pipeline] (async). Hosts that do async setup (MCP, storage)
# write an async factory.
PipelineFactory = Callable[[SubAgentBuildContext], Union[Any, Awaitable[Any]]]


@dataclass(frozen=True)
Expand All @@ -65,18 +72,30 @@ class SubagentTypeDescriptor:
agent_type: Stable identifier — registry key + the value the
LLM sees in ``[DELEGATE: <agent_type>]`` markers + the
field used in ``state.delegate_requests`` entries.
factory: Zero-arg callable returning a ready-to-run
:class:`Pipeline`. May be sync or async. Hosts that wire
session-scoped resources (storage path, MCP manager,
credentials) close over them in the factory.
factory: Callable receiving a :class:`SubAgentBuildContext` and
returning a ready-to-run :class:`Pipeline`. May be sync or
async.
description: One-line summary the LLM uses when choosing
whether to delegate. Mirrors ``Tool.description``.
allowed_tools: Tuple of tool names the sub-agent's pipeline
should expose. Empty tuple means "inherit parent" — the
host is responsible for actually applying this in the
factory; the registry just records intent.
host is responsible for applying this in the factory; the
registry just records intent.
provider: Override the sub-pipeline's Stage 6 provider
(e.g. ``"openai"``, ``"claude_code_cli"``). ``None`` means
"inherit parent" (factory may copy parent provider).
provider_credentials_extras: Free-form bag merged into the
parent's :class:`ProviderCredentials.extras` for *this*
sub-agent when the factory chooses to. Common use: bumping
``max_budget_usd`` for a critic sub-agent.
model_override: Canonical model id (``"claude-opus-4-7"``,
etc.) the sub-agent should run on. ``None`` inherits.
parallel: When ``True``, the orchestrator may dispatch this
sub-agent concurrently with its parallel-marked peers.
max_concurrent: Cap on simultaneous parallel sub-agents in a
group; the orchestrator uses ``min(max_concurrent)`` of
the group to size its semaphore. Ignored when
``parallel=False``.
extras: Free-form bag for host-specific descriptor data
(cost budget, persona ids, …).
"""
Expand All @@ -85,8 +104,12 @@ class SubagentTypeDescriptor:
factory: PipelineFactory
description: str = ""
allowed_tools: Tuple[str, ...] = ()
provider: Optional[str] = None
provider_credentials_extras: Mapping[str, Any] = field(default_factory=dict)
model_override: Optional[str] = None
extras: Dict[str, Any] = field(default_factory=dict)
parallel: bool = False
max_concurrent: int = 1
extras: Mapping[str, Any] = field(default_factory=dict)


class SubagentTypeRegistry:
Expand Down Expand Up @@ -123,9 +146,22 @@ def __contains__(self, agent_type: str) -> bool:
return agent_type in self._descriptors


async def _resolve_pipeline(factory: PipelineFactory) -> Any:
"""Call a factory and unwrap an awaitable when present."""
result = factory()
async def _resolve_pipeline(
factory: PipelineFactory, ctx: SubAgentBuildContext
) -> Any:
"""Call a factory with the build context and unwrap an awaitable.

For backward compatibility with zero-arg factories (the pre-D1
shape), we try ``factory(ctx)`` first; if it raises ``TypeError``
for an unexpected argument we fall back to ``factory()``.
"""
try:
result = factory(ctx)
except TypeError as e:
if "argument" not in str(e) and "positional" not in str(e):
raise
# Legacy zero-arg factory shape.
result = factory() # type: ignore[call-arg]
if inspect.isawaitable(result):
return await result
return result
Expand Down Expand Up @@ -210,12 +246,31 @@ async def _dispatch_one(
base_record["subagent_metadata"] = {
"description": descriptor.description,
"allowed_tools": list(descriptor.allowed_tools),
"provider": descriptor.provider,
"model_override": descriptor.model_override,
"parallel": descriptor.parallel,
"max_concurrent": descriptor.max_concurrent,
"extras": dict(descriptor.extras),
}

# Build the context handed to the factory. The parent's
# CredentialBundle (populated by Pipeline._init_state from
# the bundle passed to from_manifest_async) flows down so the
# sub-pipeline's Stage 6 can authenticate with the right
# provider without re-asking the host.
ws_snapshot = state.shared.get("workspace_snapshot")
sub_session_id = f"{state.session_id}-{agent_type}-{uuid.uuid4().hex[:8]}"
ctx = SubAgentBuildContext(
parent_session_id=state.session_id,
sub_session_id=sub_session_id,
credentials=state.credentials,
descriptor=descriptor,
workspace_snapshot=ws_snapshot,
parent_state_shared=dict(state.shared),
)

try:
sub_pipeline = await _resolve_pipeline(descriptor.factory)
sub_pipeline = await _resolve_pipeline(descriptor.factory, ctx)
except Exception as exc:
logger.warning(
"SubagentTypeOrchestrator: factory for %r raised: %s",
Expand All @@ -230,18 +285,9 @@ async def _dispatch_one(
"error": f"factory_error: {exc}",
}

sub_state = PipelineState(
session_id=f"{state.session_id}-{agent_type}-{uuid.uuid4().hex[:8]}",
)
sub_state = PipelineState(session_id=sub_session_id)

# PR-D.4.3 — thread workspace context to the sub-pipeline.
# The parent's host stashes the active WorkspaceStack as a
# snapshot under state.shared["workspace_snapshot"]; we copy
# it down so sub-tools see the same cwd/branch the parent
# was using when AgentTool fired. This keeps "subagent picks
# up where parent left off" semantics without requiring the
# sub to re-enter every worktree.
ws_snapshot = state.shared.get("workspace_snapshot")
# Thread workspace context to the sub-pipeline.
if ws_snapshot is not None:
sub_state.shared["workspace_snapshot"] = ws_snapshot

Expand Down
Loading
Loading