Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@ unified behind a single `CredentialBundle` channel.

### Added

- **Multi-provider sub-agent system** (`stages.s12_agent.subagent_type`).
`SubagentTypeDescriptor` gains `provider`, `provider_credentials_extras`,
`parallel`, and `max_concurrent` fields. `SubAgentBuildContext` (frozen
dataclass) is now handed to every factory carrying the parent's
`CredentialBundle` + descriptor + session ids + workspace snapshot.
`PipelineFactory` signature changes from `Callable[[], Any]` to
`Callable[[SubAgentBuildContext], Pipeline | Awaitable[Pipeline]]`
(zero-arg legacy factories still work via TypeError fallback).
`SubagentTypeOrchestrator` now does mixed serial + parallel dispatch,
bounded by `asyncio.Semaphore(min(max_concurrent))` of each parallel
group.
- **`Pipeline.attach_runtime(subagent_registry=...)`** + matching
kwarg on `from_manifest{,_async}`. Pipeline stores the registry; on
call it rebuilds the agent stage's orchestrator as
`SubagentTypeOrchestrator(registry)`. `PipelineState.subagent_registry`
mirrors the slot so sub-agent factories can reach it.
- **`PipelineState.credentials`** + **`PipelineState.subagent_registry`**
— populated by `_init_state`. Sub-pipelines see the same bundle the
parent received.
- **`SkillMetadata.provider`** — fork-mode skills can declare their
preferred provider so the new fork runner picks the right client.
- **`make_credential_bundle_fork_runner(credentials, ...)`** in
`skills.fork` — multi-provider fork-mode runner. Routes via
`skill.metadata.provider` (falls back to `fallback_provider`),
builds the client via `ClientRegistry.get(...)` with credentials
from the bundle. Missing credentials surface as a structured
`ForkResult(is_error=True)` rather than crashing.
- **`ClaudeCodeCLIClient`** (`llm_client.claude_code`) — subprocess-backed
client driving Anthropic's `claude` CLI. Streams via stream-json, drops
the fields the CLI doesn't accept, and propagates token usage / cost.
Expand Down
121 changes: 121 additions & 0 deletions src/geny_executor/skills/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,127 @@ async def _runner(
return _runner


def make_credential_bundle_fork_runner(
credentials: Any,
*,
fallback_provider: str = "anthropic",
fallback_model: str = _DEFAULT_FORK_MODEL,
max_tokens: int = _DEFAULT_FORK_MAX_TOKENS,
) -> SkillForkRunner:
"""Multi-provider fork runner backed by a :class:`CredentialBundle`.

The runner chooses its client based on ``skill.metadata.provider``
(falling back to ``fallback_provider``). Authentication flows through
the host-supplied :class:`CredentialBundle`, so a fork-mode skill
can run on any of the 6 registered providers — including the CLI
backends — without separately wiring API keys.

Returns a runner unconditionally; missing credentials surface at
invocation time as a :class:`ForkResult` with ``is_error=True``
rather than crashing at construction.
"""
from geny_executor.core.config import ModelConfig
from geny_executor.core.pipeline import _creds_to_client_kwargs
from geny_executor.llm_client.credentials import ConfigError, CredentialBundle
from geny_executor.llm_client.registry import ClientRegistry

if not isinstance(credentials, CredentialBundle):
# Defensive: hosts wiring this runner pass a CredentialBundle.
credentials = CredentialBundle()

async def _runner(
*,
skill: "Skill",
rendered_body: str,
invoke_args: Dict[str, Any],
parent_context: ToolContext,
) -> ForkResult:
provider = (
getattr(skill.metadata, "provider", None) or fallback_provider
)

try:
creds = credentials.require(provider)
except ConfigError as e:
return ForkResult(
content=(
f"fork runner: provider {provider!r} has no configured "
f"credentials in this session ({e})"
),
metadata={"skill_id": skill.id, "provider": provider},
is_error=True,
)

try:
client_cls = ClientRegistry.get(provider)
except (KeyError, ValueError) as e:
return ForkResult(
content=f"fork runner: unknown provider {provider!r} ({e})",
metadata={"skill_id": skill.id, "provider": provider},
is_error=True,
)

client = client_cls(**_creds_to_client_kwargs(provider, creds))
model = skill.metadata.model_override or fallback_model
model_config = ModelConfig(model=model, max_tokens=max_tokens)

user_content = "Execute the skill following the system prompt."
if invoke_args:
import json as _json

user_content = (
user_content + "\n\nArguments:\n" + _json.dumps(invoke_args, indent=2)
)

try:
response = await client.create_message(
model_config=model_config,
messages=[{"role": "user", "content": user_content}],
system=rendered_body,
purpose=f"skill_fork:{skill.id}",
)
except Exception as exc: # noqa: BLE001
logger.warning(
"credential-bundle fork runner: skill=%s provider=%s API call failed: %s",
skill.id,
provider,
exc,
)
return ForkResult(
content=f"fork API call failed: {exc}",
metadata={"model": model, "provider": provider, "skill_id": skill.id},
is_error=True,
)

text_parts = []
for block in getattr(response, "content", None) or []:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
else:
if getattr(block, "type", None) == "text":
text_parts.append(getattr(block, "text", ""))

text = "\n".join(p for p in text_parts if p) or "(no text in fork response)"

usage = getattr(response, "usage", None)
meta: Dict[str, Any] = {
"model": model,
"provider": provider,
"skill_id": skill.id,
}
if usage is not None:
meta["input_tokens"] = getattr(usage, "input_tokens", 0)
meta["output_tokens"] = getattr(usage, "output_tokens", 0)
cost = getattr(usage, "cost_usd", None)
if cost is not None:
meta["cost_usd"] = cost

return ForkResult(content=text, metadata=meta)

return _runner


__all__ = [
"ForkResult",
"SkillForkRunner",
Expand Down
4 changes: 4 additions & 0 deletions src/geny_executor/skills/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class SkillMetadata:
version: Optional[str] = None
allowed_tools: Tuple[str, ...] = ()
model_override: Optional[str] = None
# Phase D4 — fork-mode skills can name their preferred provider
# so the runner picks the right client class via ClientRegistry.
# ``None`` means "inherit from parent" (handled by the runner).
provider: Optional[str] = None
execution_mode: str = "inline"
# PR-B.4.1 — richer schema. All optional with defaults so existing
# SKILL.md files load unchanged. ``category`` slots the skill into
Expand Down
212 changes: 212 additions & 0 deletions tests/unit/test_fork_multi_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Tests for the credential-bundle fork runner (Phase D4).

Validates that a fork-mode skill can route through any of the 6
providers via a single :class:`CredentialBundle`, replacing the
Anthropic-only default runner.
"""

from __future__ import annotations

import asyncio
import sys
import os
from pathlib import Path

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))

import pytest

from geny_executor.llm_client.base import BaseClient, ClientCapabilities
from geny_executor.llm_client.credentials import (
CredentialBundle,
ProviderCredentials,
)
from geny_executor.llm_client.types import APIResponse, ContentBlock
from geny_executor.skills.fork import (
ForkResult,
make_credential_bundle_fork_runner,
)
from geny_executor.skills.types import Skill, SkillMetadata


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


class _RecordingClient(BaseClient):
"""Fake BaseClient that records what it received and returns canned text."""

provider = "fake"
capabilities = ClientCapabilities()

last_call: dict = {}

def __init__(self, *, api_key: str = "", **kwargs):
super().__init__(api_key=api_key)
type(self).last_call = {"api_key": api_key, "kwargs": kwargs}

async def _send(self, request, *, purpose: str = ""):
type(self).last_call["request"] = request
type(self).last_call["purpose"] = purpose
return APIResponse(
content=[ContentBlock(type="text", text="forked-text")],
stop_reason="end_turn",
)


def _skill(
name: str = "test",
*,
provider: str | None = None,
model_override: str | None = None,
) -> Skill:
meta = SkillMetadata(
name=name,
description="test skill",
provider=provider,
model_override=model_override,
execution_mode="fork",
)
return Skill(
id=f"skill::{name}",
metadata=meta,
body="be terse and helpful",
)


def _ctx_stub():
"""A minimal stand-in for ToolContext (only attributes the runner reads)."""

class _C:
pass

return _C()


# ---------------------------------------------------------------------------
# Provider routing
# ---------------------------------------------------------------------------


def test_runner_returned_unconditionally() -> None:
"""Unlike make_default_fork_runner (returns None on missing key), the
bundle variant always returns a runner — credential issues surface
at invocation time."""
runner = make_credential_bundle_fork_runner(CredentialBundle())
assert runner is not None


@pytest.mark.asyncio
async def test_runner_uses_skill_provider(monkeypatch: pytest.MonkeyPatch) -> None:
"""``skill.metadata.provider="openai"`` → runner builds an OpenAI client."""
bundle = CredentialBundle(by_provider={
"openai": ProviderCredentials(api_key="sk-oai"),
})
runner = make_credential_bundle_fork_runner(bundle)

# Hijack ClientRegistry.get to return our recorder.
from geny_executor.llm_client.registry import ClientRegistry

monkeypatch.setattr(
ClientRegistry, "get", classmethod(lambda cls, p: _RecordingClient)
)

skill = _skill(provider="openai")
result = await runner(
skill=skill, rendered_body="be helpful", invoke_args={}, parent_context=_ctx_stub(),
)
assert isinstance(result, ForkResult)
assert result.is_error is False
assert result.content == "forked-text"
# The recorder saw the openai api_key from the bundle.
assert _RecordingClient.last_call["api_key"] == "sk-oai"
assert result.metadata["provider"] == "openai"


@pytest.mark.asyncio
async def test_runner_falls_back_to_fallback_provider(monkeypatch: pytest.MonkeyPatch) -> None:
"""When ``skill.metadata.provider`` is None, the runner uses the
fallback_provider parameter."""
bundle = CredentialBundle(by_provider={
"anthropic": ProviderCredentials(api_key="sk-anth"),
})
runner = make_credential_bundle_fork_runner(
bundle, fallback_provider="anthropic",
)
from geny_executor.llm_client.registry import ClientRegistry

monkeypatch.setattr(
ClientRegistry, "get", classmethod(lambda cls, p: _RecordingClient)
)

skill = _skill(provider=None)
result = await runner(
skill=skill, rendered_body="rules", invoke_args={}, parent_context=_ctx_stub(),
)
assert result.is_error is False
assert result.metadata["provider"] == "anthropic"
assert _RecordingClient.last_call["api_key"] == "sk-anth"


# ---------------------------------------------------------------------------
# Missing credentials → structured error
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_runner_returns_error_when_credentials_missing() -> None:
"""The runner does not crash on missing credentials — it returns a
ForkResult with is_error=True and an informative message."""
bundle = CredentialBundle() # empty
runner = make_credential_bundle_fork_runner(bundle)
skill = _skill(provider="openai")
result = await runner(
skill=skill, rendered_body="x", invoke_args={}, parent_context=_ctx_stub(),
)
assert result.is_error is True
assert "openai" in result.content
assert "credentials" in result.content.lower()


@pytest.mark.asyncio
async def test_runner_returns_error_when_provider_unknown() -> None:
bundle = CredentialBundle(by_provider={
"imaginary": ProviderCredentials(api_key="sk-x"),
})
runner = make_credential_bundle_fork_runner(bundle)
skill = _skill(provider="imaginary")
result = await runner(
skill=skill, rendered_body="x", invoke_args={}, parent_context=_ctx_stub(),
)
assert result.is_error is True
assert "imaginary" in result.content
assert "unknown" in result.content.lower()


# ---------------------------------------------------------------------------
# Model override + arguments
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_runner_uses_skill_model_override(monkeypatch: pytest.MonkeyPatch) -> None:
bundle = CredentialBundle(by_provider={
"anthropic": ProviderCredentials(api_key="sk-a"),
})
runner = make_credential_bundle_fork_runner(bundle)
from geny_executor.llm_client.registry import ClientRegistry

monkeypatch.setattr(
ClientRegistry, "get", classmethod(lambda cls, p: _RecordingClient)
)

skill = _skill(provider="anthropic", model_override="claude-opus-4-7")
result = await runner(
skill=skill, rendered_body="x", invoke_args={"q": "test"}, parent_context=_ctx_stub(),
)
assert result.metadata["model"] == "claude-opus-4-7"
# The argument payload should be in the user message.
req = _RecordingClient.last_call["request"]
user_text = req.messages[0]["content"]
assert "test" in user_text
Loading