From da566d046d95fa4c27a52639c5574e59eb0407c8 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 1 Jun 2026 04:25:57 +0800 Subject: [PATCH] Add ACP server package --- README.md | 1 + packages/bub-acp-server/README.md | 106 +++ packages/bub-acp-server/pyproject.toml | 29 + .../src/bub_acp_server/__init__.py | 7 + .../bub-acp-server/src/bub_acp_server/cli.py | 13 + .../src/bub_acp_server/config.py | 12 + .../src/bub_acp_server/plugin.py | 715 ++++++++++++++++++ .../src/bub_acp_server/py.typed | 1 + packages/bub-acp-server/tests/test_plugin.py | 263 +++++++ pyproject.toml | 2 + uv.lock | 42 + 11 files changed, 1191 insertions(+) create mode 100644 packages/bub-acp-server/README.md create mode 100644 packages/bub-acp-server/pyproject.toml create mode 100644 packages/bub-acp-server/src/bub_acp_server/__init__.py create mode 100644 packages/bub-acp-server/src/bub_acp_server/cli.py create mode 100644 packages/bub-acp-server/src/bub_acp_server/config.py create mode 100644 packages/bub-acp-server/src/bub_acp_server/plugin.py create mode 100644 packages/bub-acp-server/src/bub_acp_server/py.typed create mode 100644 packages/bub-acp-server/tests/test_plugin.py diff --git a/README.md b/README.md index 3fe3532..73d644b 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Below is the list of packages currently included in this repository. | ------------------------------------------------------------------------------------ | ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | | [`packages/bub-codex`](./packages/bub-codex/README.md) | `codex` | Provides a `run_model` hook that delegates model execution to the Codex CLI. | | [`packages/bub-cursor`](./packages/bub-cursor/README.md) | `cursor` | Provides a `run_model` hook that delegates model execution to the Cursor CLI, plus `bub login cursor`. | +| [`packages/bub-acp-server`](./packages/bub-acp-server/README.md) | `acp-server` | Exposes Bub as an Agent Client Protocol agent with `bub acp serve` for ACP-compatible editors. | | [`packages/bub-tg-feed`](./packages/bub-tg-feed/README.md) | `tg-feed` | Provides an AMQP-based channel adapter for Telegram feed messages. | | [`packages/bub-schedule`](./packages/bub-schedule/README.md) | `schedule` | Provides scheduling channel/tools backed by APScheduler with a JSON job store. | | [`packages/bub-tapestore-sqlalchemy`](./packages/bub-tapestore-sqlalchemy/README.md) | `tapestore-sqlalchemy` | Provides a SQLAlchemy-backed tape store for Bub conversation history. | diff --git a/packages/bub-acp-server/README.md b/packages/bub-acp-server/README.md new file mode 100644 index 0000000..4376c4c --- /dev/null +++ b/packages/bub-acp-server/README.md @@ -0,0 +1,106 @@ +# bub-acp-server + +Expose Bub as an Agent Client Protocol agent. + +## What It Provides + +- Bub plugin entry point: `acp-server` +- CLI command registered on Bub: `bub acp serve` +- Standalone console script: `bub-acp-server` +- ACP agent methods for `initialize`, `session/new`, `session/load`, `session/resume`, `session/list`, `session/close`, and `session/prompt` +- Streaming ACP `session/update` events from Bub stream events + +## Installation + +```bash +uv pip install "git+https://github.com/bubbuild/bub-contrib.git#subdirectory=packages/bub-acp-server" +``` + +Or from a Bub project: + +```bash +bub install bub-acp-server@main +``` + +## Usage + +Configure an ACP-compatible client to launch one of: + +```bash +bub acp serve +``` + +or: + +```bash +bub-acp-server +``` + +The process speaks ACP over stdio. Prompts are sent through Bub's hook pipeline with stream output enabled, so model chunks and tool events can be displayed by the ACP client as they arrive. + +Bub keeps using its own configuration, tools, skills, and tapes. The ACP client starts the process and displays the session; it does not replace Bub's model setup. + +ACP session metadata is stored under Bub home as `acp-sessions.json` so compatible clients can list sessions again after restarting. Keep `BUB_HOME` stable if you want the same ACP thread list across editor launches. + +`bub-acp-server` supports both ACP session load and resume. `session/load` restores the matching Bub history through the same ACP streaming path used by live turns. `session/resume` attaches the editor back to the Bub session without replaying history, so later turns keep streaming through Bub's normal hook pipeline. + +## Use In Zed + +Zed supports external terminal agents through ACP. Custom agents are configured in Zed's `settings.json` under `agent_servers`. + +Prerequisites: + +- `bub` is installed and available to Zed. +- `bub-acp-server` is installed in the Bub environment: + +```bash +bub install bub-acp-server@main +``` + +Open Zed's settings with the `zed: open settings` command and add a custom agent server: + +```json +{ + "agent_servers": { + "Bub": { + "type": "custom", + "command": "bub", + "args": ["acp", "serve"], + "env": {} + } + } +} +``` + +If Zed cannot find `bub`, use the absolute path printed by `command -v bub`: + +```json +{ + "agent_servers": { + "Bub": { + "type": "custom", + "command": "/absolute/path/to/bub", + "args": ["acp", "serve"], + "env": {} + } + } +} +``` + +After saving the settings, open Zed's agent panel with `cmd-?` on macOS or `ctrl-?` on Linux/Windows, then start a new thread and select `Bub`. + +Useful Zed commands while testing: + +- `dev: open acp logs` shows the JSON-RPC traffic between Zed and Bub. +- `zed: open settings` opens `settings.json`. + +Notes: + +- Zed launches Bub as a separate ACP process. Bub reads its own local configuration and credentials directly. +- Use `env` only for settings your Bub installation actually needs. +- If your Bub configuration is loaded from a project `.env`, use a wrapper command that loads that file before running `bub acp serve`. + +References: + +- Zed external agents documentation: https://zed.dev/docs/ai/external-agents +- Zed ACP client page: https://zed.dev/acp/editor/zed diff --git a/packages/bub-acp-server/pyproject.toml b/packages/bub-acp-server/pyproject.toml new file mode 100644 index 0000000..675c645 --- /dev/null +++ b/packages/bub-acp-server/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "bub-acp-server" +version = "0.1.0" +description = "Expose Bub as an ACP agent" +readme = "README.md" +authors = [ + { name = "Frost Ming", email = "me@frostming.com" } +] +requires-python = ">=3.12" +dependencies = [ + "agent-client-protocol==0.10.1", + "bub", +] + +[project.entry-points.bub] +acp-server = "bub_acp_server.plugin:ACPServerPlugin" + +[project.scripts] +bub-acp-server = "bub_acp_server.cli:main" + +[build-system] +requires = ["uv_build>=0.10.4,<0.11.0"] +build-backend = "uv_build" + +[dependency-groups] +dev = [ + "pytest>=9.0.3", + "pytest-asyncio>=0.21.0", +] diff --git a/packages/bub-acp-server/src/bub_acp_server/__init__.py b/packages/bub-acp-server/src/bub_acp_server/__init__.py new file mode 100644 index 0000000..ea1bf92 --- /dev/null +++ b/packages/bub-acp-server/src/bub_acp_server/__init__.py @@ -0,0 +1,7 @@ +"""Expose Bub as an ACP agent.""" + +from __future__ import annotations + +__all__ = ["ACPServerPlugin", "BubACPAgent", "run_acp_agent"] + +from bub_acp_server.plugin import ACPServerPlugin, BubACPAgent, run_acp_agent diff --git a/packages/bub-acp-server/src/bub_acp_server/cli.py b/packages/bub-acp-server/src/bub_acp_server/cli.py new file mode 100644 index 0000000..6100653 --- /dev/null +++ b/packages/bub-acp-server/src/bub_acp_server/cli.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import asyncio + +from bub.framework import BubFramework + +from bub_acp_server.plugin import run_acp_agent + + +def main() -> None: + framework = BubFramework() + framework.load_hooks() + asyncio.run(run_acp_agent(framework)) diff --git a/packages/bub-acp-server/src/bub_acp_server/config.py b/packages/bub-acp-server/src/bub_acp_server/config.py new file mode 100644 index 0000000..42e76ef --- /dev/null +++ b/packages/bub-acp-server/src/bub_acp_server/config.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +import bub +from pydantic_settings import SettingsConfigDict + + +@bub.config(name="acp-server") +class ACPServerSettings(bub.Settings): + model_config = SettingsConfigDict(env_prefix="BUB_ACP_SERVER_", extra="ignore") + + channel_name: str = "acp-server" + send_user_message_updates: bool = False diff --git a/packages/bub-acp-server/src/bub_acp_server/plugin.py b/packages/bub-acp-server/src/bub_acp_server/plugin.py new file mode 100644 index 0000000..b714e8d --- /dev/null +++ b/packages/bub-acp-server/src/bub_acp_server/plugin.py @@ -0,0 +1,715 @@ +from __future__ import annotations + +import asyncio +import base64 +import contextlib +import hashlib +import inspect +import json +import re +from collections.abc import AsyncIterable, AsyncIterator, Iterable, Mapping +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any, cast +from uuid import uuid4 + +import bub +import typer +from acp import ( + run_agent, + text_block, + update_agent_message_text, + update_user_message, + update_user_message_text, +) +from acp.interfaces import Client +from acp.schema import ( + AgentCapabilities, + AudioContentBlock, + ClientCapabilities, + CloseSessionResponse, + EmbeddedResourceContentBlock, + HttpMcpServer, + ImageContentBlock, + Implementation, + InitializeResponse, + ListSessionsResponse, + LoadSessionResponse, + McpServerStdio, + NewSessionResponse, + PromptResponse, + ResourceContentBlock, + SessionCapabilities, + SessionCloseCapabilities, + SessionInfo, + SessionListCapabilities, + SessionResumeCapabilities, + SseMcpServer, + TextContentBlock, + ToolKind, + ResumeSessionResponse, +) +from acp.helpers import start_tool_call, tool_content, update_tool_call +from bub import hookimpl +from bub.channels.message import ChannelMessage, MediaItem, MediaType +from bub.envelope import content_of, field_of +from bub.types import Envelope, OutboundChannelRouter, TurnResult +from republic import StreamEvent, TapeEntry, TapeQuery + +from bub_acp_server.config import ACPServerSettings + +if TYPE_CHECKING: + from bub.framework import BubFramework + +type ACPPromptBlock = ( + TextContentBlock | ImageContentBlock | AudioContentBlock | ResourceContentBlock | EmbeddedResourceContentBlock +) +type ACPMcpServer = HttpMcpServer | SseMcpServer | McpServerStdio +type StreamPayload = Mapping[str, object] + +_BUB_PROMPT_CONTEXT = re.compile(r"^acp_session_id=[^\n]+\n---Date: [^\n]+---\n", re.MULTILINE) +_CONTINUATION_PROMPT_PREFIX = "Continue the task until all targets are completed." + + +@dataclass(slots=True) +class ACPSession: + session_id: str + cwd: Path + additional_directories: list[str] = field(default_factory=list) + title: str | None = None + updated_at: str | None = None + + def touch(self) -> None: + self.updated_at = datetime.now(UTC).isoformat() + + def info(self) -> SessionInfo: + return SessionInfo( + session_id=self.session_id, + cwd=str(self.cwd), + additional_directories=self.additional_directories or None, + title=self.title, + updated_at=self.updated_at, + ) + + def to_json(self) -> dict[str, object]: + return { + "session_id": self.session_id, + "cwd": str(self.cwd), + "additional_directories": list(self.additional_directories), + "title": self.title, + "updated_at": self.updated_at, + } + + @classmethod + def from_json(cls, data: Mapping[str, object]) -> ACPSession | None: + session_id = data.get("session_id") + cwd = data.get("cwd") + if not isinstance(session_id, str) or not session_id: + return None + if not isinstance(cwd, str) or not cwd: + return None + + additional_directories = data.get("additional_directories") + if not isinstance(additional_directories, list): + additional_directories = [] + + title = data.get("title") + updated_at = data.get("updated_at") + return cls( + session_id=session_id, + cwd=Path(cwd).expanduser().resolve(), + additional_directories=[str(item) for item in additional_directories if isinstance(item, str)], + title=title if isinstance(title, str) else None, + updated_at=updated_at if isinstance(updated_at, str) else None, + ) + + +class ACPStreamRouter: + def __init__(self, client: Client, session_id: str) -> None: + self._client = client + self._session_id = session_id + self._tool_ids: dict[int, str] = {} + self._sent_text = False + + @property + def sent_text(self) -> bool: + return self._sent_text + + def wrap_stream(self, message: Envelope, stream: AsyncIterable[StreamEvent]) -> AsyncIterable[StreamEvent]: + del message + + async def iterator() -> AsyncIterator[StreamEvent]: + async for event in stream: + await self._publish_stream_event(event) + yield event + + return iterator() + + async def dispatch_output(self, message: Envelope) -> bool: + if field_of(message, "kind") == "error": + await self._send_agent_text(content_of(message)) + return True + + async def quit(self, session_id: str) -> None: + del session_id + + async def _publish_stream_event(self, event: StreamEvent) -> None: + if event.kind == "text": + delta = str(event.data.get("delta", "")) + if delta: + self._sent_text = True + await self._send_agent_text(delta) + elif event.kind == "user_text": + delta = str(event.data.get("delta", "")) + if delta: + await self._send_user_text(delta) + elif event.kind == "tool_call": + await self._send_tool_call(event.data) + elif event.kind == "tool_result": + await self._send_tool_result(event.data) + elif event.kind == "error": + message = event.data.get("message") or event.data.get("error") or "unknown error" + await self._send_agent_text(f"\nError: {message}") + + async def _send_agent_text(self, text: str) -> None: + if not text: + return + await self._client.session_update(self._session_id, update_agent_message_text(text)) + + async def _send_user_text(self, text: str) -> None: + if not text: + return + await self._client.session_update(self._session_id, update_user_message_text(text)) + + async def _send_tool_call(self, data: StreamPayload) -> None: + index = _int_value(data.get("index"), default=len(self._tool_ids)) + call = data.get("call") + tool_id = _tool_call_id(index, call) + self._tool_ids[index] = tool_id + title = _tool_title(call) + await self._client.session_update( + self._session_id, + start_tool_call( + tool_id, + title, + kind=_tool_kind(title), + status="in_progress", + raw_input=call, + ), + ) + + async def _send_tool_result(self, data: StreamPayload) -> None: + index = _int_value(data.get("index"), default=0) + tool_id = self._tool_ids.get(index, f"tool-{index}") + result = data.get("result") + await self._client.session_update( + self._session_id, + update_tool_call( + tool_id, + status="completed", + raw_output=result, + content=[tool_content(text_block(_stringify(result)))], + ), + ) + + +class BubACPAgent: + def __init__(self, framework: BubFramework) -> None: + self.framework = framework + self.settings = bub.ensure_config(ACPServerSettings) + self._client: Client | None = None + self._session_store_path = bub.home.expanduser() / "acp-sessions.json" + self._sessions: dict[str, ACPSession] = self._load_sessions() + self._prompt_lock = asyncio.Lock() + + def on_connect(self, conn: Client) -> None: + self._client = conn + + async def initialize( + self, + protocol_version: int, + client_capabilities: ClientCapabilities | None = None, + client_info: Implementation | None = None, + **kwargs: Any, + ) -> InitializeResponse: + del client_capabilities, client_info, kwargs + return InitializeResponse( + protocol_version=protocol_version, + agent_info=Implementation(name="bub", title="Bub", version="0.1.0"), + agent_capabilities=AgentCapabilities( + load_session=True, + session_capabilities=SessionCapabilities( + close=SessionCloseCapabilities(), + list=SessionListCapabilities(), + resume=SessionResumeCapabilities(), + ) + ), + ) + + async def new_session( + self, + cwd: str, + additional_directories: list[str] | None = None, + mcp_servers: list[ACPMcpServer] | None = None, + **kwargs: Any, + ) -> NewSessionResponse: + del mcp_servers, kwargs + session_id = uuid4().hex + session = ACPSession( + session_id=session_id, + cwd=Path(cwd).expanduser().resolve(), + additional_directories=list(additional_directories or []), + ) + session.touch() + self._sessions[session_id] = session + self._save_sessions() + return NewSessionResponse(session_id=session_id) + + async def load_session( + self, + cwd: str, + session_id: str, + additional_directories: list[str] | None = None, + mcp_servers: list[ACPMcpServer] | None = None, + **kwargs: Any, + ) -> LoadSessionResponse: + del mcp_servers, kwargs + session = self._load_or_adopt_session( + session_id=session_id, + cwd=cwd, + additional_directories=additional_directories, + ) + await self._attach_session_history(session) + return LoadSessionResponse() + + async def resume_session( + self, + cwd: str, + session_id: str, + additional_directories: list[str] | None = None, + mcp_servers: list[ACPMcpServer] | None = None, + **kwargs: Any, + ) -> ResumeSessionResponse: + del mcp_servers, kwargs + self._load_or_adopt_session( + session_id=session_id, + cwd=cwd, + additional_directories=additional_directories, + ) + return ResumeSessionResponse() + + async def list_sessions( + self, + additional_directories: list[str] | None = None, + cursor: str | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ListSessionsResponse: + del additional_directories, cursor, cwd, kwargs + self._sessions = self._load_sessions() + sessions = sorted(self._sessions.values(), key=lambda item: item.updated_at or "", reverse=True) + return ListSessionsResponse(sessions=[session.info() for session in sessions]) + + async def close_session(self, session_id: str, **kwargs: Any) -> CloseSessionResponse | None: + del kwargs + self._sessions.pop(session_id, None) + self._save_sessions() + return CloseSessionResponse() + + async def cancel(self, session_id: str, **kwargs: Any) -> None: + del kwargs + await self.framework.quit_via_router(session_id) + + async def prompt( + self, + prompt: list[ACPPromptBlock], + session_id: str, + message_id: str | None = None, + **kwargs: Any, + ) -> PromptResponse: + del kwargs + client = self._require_client() + session = self._sessions.get(session_id) or self._adopt_session(session_id) + session.touch() + self._save_sessions() + + content, media = _prompt_to_bub_content(prompt) + inbound = ChannelMessage( + session_id=session_id, + channel=self.settings.channel_name, + chat_id=session_id, + content=content, + is_active=True, + kind="normal", + media=media, + context={"acp_session_id": session_id}, + ) + if self.settings.send_user_message_updates: + await self._send_user_message_updates(prompt, session_id) + + result = await self._process_inbound_with_streaming(inbound, session, client) + if not result.model_output: + return PromptResponse(stop_reason="end_turn", user_message_id=message_id) + return PromptResponse(stop_reason="end_turn", user_message_id=message_id) + + def _require_client(self) -> Client: + if self._client is None: + raise RuntimeError("ACP client is not connected") + return self._client + + def _adopt_session(self, session_id: str) -> ACPSession: + session = ACPSession(session_id=session_id, cwd=self.framework.workspace) + session.touch() + self._sessions[session_id] = session + self._save_sessions() + return session + + def _load_or_adopt_session( + self, + *, + session_id: str, + cwd: str, + additional_directories: list[str] | None, + ) -> ACPSession: + session = self._sessions.get(session_id) + if session is None: + session = ACPSession( + session_id=session_id, + cwd=Path(cwd).expanduser().resolve(), + additional_directories=list(additional_directories or []), + ) + self._sessions[session_id] = session + else: + session.cwd = Path(cwd).expanduser().resolve() + session.additional_directories = list(additional_directories or []) + session.touch() + self._save_sessions() + return session + + def _load_sessions(self) -> dict[str, ACPSession]: + try: + raw = json.loads(self._session_store_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + if not isinstance(raw, list): + return {} + + sessions: dict[str, ACPSession] = {} + for item in raw: + if not isinstance(item, dict): + continue + session = ACPSession.from_json(item) + if session is not None: + sessions[session.session_id] = session + return sessions + + def _save_sessions(self) -> None: + self._session_store_path.parent.mkdir(parents=True, exist_ok=True) + payload = [session.to_json() for session in self._sessions.values()] + temp_path = self._session_store_path.with_suffix(".json.tmp") + temp_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + temp_path.replace(self._session_store_path) + + async def _attach_session_history(self, session: ACPSession) -> None: + client = self._require_client() + router = ACPStreamRouter(client, session.session_id) + inbound = ChannelMessage( + session_id=session.session_id, + channel=self.settings.channel_name, + chat_id=session.session_id, + content="", + is_active=False, + kind="normal", + context={"acp_session_id": session.session_id}, + ) + async for _ in router.wrap_stream(inbound, self._session_history_stream(session)): + pass + + async def _session_history_stream(self, session: ACPSession) -> AsyncIterator[StreamEvent]: + entries = await self._load_tape_entries(session) + pending_tool_indices: list[int] = [] + next_tool_index = 0 + + for entry in entries: + if entry.kind == "message": + event = _message_entry_stream_event(entry) + if event is not None: + yield event + elif entry.kind == "tool_call": + calls = _list_payload(entry.payload.get("calls")) + pending_tool_indices = [] + for call in calls: + tool_index = next_tool_index + next_tool_index += 1 + pending_tool_indices.append(tool_index) + yield StreamEvent("tool_call", {"index": tool_index, "call": call}) + elif entry.kind == "tool_result": + results = _list_payload(entry.payload.get("results")) + for index, result in enumerate(results): + tool_index = pending_tool_indices[index] if index < len(pending_tool_indices) else index + yield StreamEvent("tool_result", {"index": tool_index, "result": result}) + pending_tool_indices = [] + elif entry.kind == "error": + yield StreamEvent("error", {"message": _stringify(entry.payload.get("message") or entry.payload)}) + + async def _load_tape_entries(self, session: ACPSession) -> list[TapeEntry]: + tape_name = _session_tape_name(session.session_id, session.cwd) + store = _framework_tape_store(self.framework) + if store is not None: + query = TapeQuery(tape_name, store) + with contextlib.suppress(Exception): + result = store.fetch_all(query) + if inspect.isawaitable(result): + result = await result + return list(cast(Iterable[TapeEntry], result)) + return _load_tape_entries_from_file(bub.home.expanduser() / "tapes" / f"{tape_name}.jsonl") + + async def _send_user_message_updates(self, prompt: list[ACPPromptBlock], session_id: str) -> None: + client = self._require_client() + for block in prompt: + if _block_type(block) == "text": + await client.session_update(session_id, update_user_message(block)) + + async def _process_inbound_with_streaming( + self, + inbound: ChannelMessage, + session: ACPSession, + client: Client, + ) -> TurnResult: + async with self._prompt_lock: + router = ACPStreamRouter(client, session.session_id) + previous_router = cast( + OutboundChannelRouter | None, + getattr(self.framework, "_outbound_router", None), + ) + previous_workspace = self.framework.workspace + self.framework.workspace = session.cwd + self.framework.bind_outbound_router(router) + try: + result = await self.framework.process_inbound(inbound, stream_output=True) + finally: + self.framework.bind_outbound_router(previous_router) + self.framework.workspace = previous_workspace + if result.model_output and not router.sent_text: + await client.session_update(session.session_id, update_agent_message_text(result.model_output)) + return result + + +async def run_acp_agent(framework: BubFramework, *, use_unstable_protocol: bool = True) -> None: + async with framework.running(): + await run_agent(BubACPAgent(framework), use_unstable_protocol=use_unstable_protocol) + + +class ACPServerPlugin: + def __init__(self, framework: BubFramework) -> None: + self.framework = framework + + @hookimpl + def register_cli_commands(self, app: typer.Typer) -> None: + acp_app = typer.Typer(name="acp", help="Run Bub as an ACP agent.", add_completion=False) + + @acp_app.command("serve") + def serve() -> None: + asyncio.run(run_acp_agent(self.framework)) + + app.add_typer(acp_app, name="acp") + + +def _prompt_to_bub_content(prompt: list[ACPPromptBlock]) -> tuple[str, list[MediaItem]]: + parts: list[str] = [] + media: list[MediaItem] = [] + for block in prompt: + block_type = _block_type(block) + if block_type == "text": + parts.append(str(_block_value(block, "text", ""))) + elif block_type == "image": + media.append(_media_item(block, media_type="image")) + parts.append(_attachment_label(block, "image")) + elif block_type == "audio": + media.append(_media_item(block, media_type="audio")) + parts.append(_attachment_label(block, "audio")) + elif block_type == "resource_link": + name = _block_value(block, "name", "resource") + uri = _block_value(block, "uri", "") + parts.append(f"[resource: {name}] {uri}".strip()) + elif block_type == "resource": + parts.append(_embedded_resource_text(block)) + else: + parts.append(f"[unsupported ACP content: {block_type}]") + content = "\n".join(part for part in parts if part).strip() + return content or "[ACP prompt attachment]", media + + +def _media_item(block: ACPPromptBlock, *, media_type: MediaType) -> MediaItem: + data = str(_block_value(block, "data", "")) + mime_type = str(_block_value(block, "mime_type", "application/octet-stream")) + + async def fetch_data() -> bytes: + return base64.b64decode(data) + + return MediaItem(type=media_type, mime_type=mime_type, data_fetcher=fetch_data) + + +def _embedded_resource_text(block: ACPPromptBlock) -> str: + resource = _block_value(block, "resource", None) + if resource is None: + return "[resource]" + text = _block_value(resource, "text", None) + if text is not None: + return str(text) + uri = _block_value(resource, "uri", "") + return f"[resource: {uri}]".strip() + + +def _attachment_label(block: ACPPromptBlock, kind: str) -> str: + uri = _block_value(block, "uri", None) + return f"[{kind}: {uri}]" if uri else f"[{kind}]" + + +def _block_type(block: object) -> str: + return str(_block_value(block, "type", "")) + + +def _block_value(block: object, name: str, default: object = None) -> object: + if isinstance(block, Mapping): + return block.get(name, default) + return getattr(block, name, default) + + +def _tool_call_id(index: int, call: object) -> str: + candidate = _block_value(call, "id", None) or _block_value(call, "tool_call_id", None) + return str(candidate or f"tool-{index}") + + +def _tool_title(call: object) -> str: + name = _block_value(call, "name", None) + if name is None: + function = _block_value(call, "function", None) + name = _block_value(function, "name", None) + return str(name or "tool") + + +def _tool_kind(title: str) -> ToolKind: + lower_title = title.lower() + if any(token in lower_title for token in ("read", "cat", "view")): + return "read" + if any(token in lower_title for token in ("write", "edit", "patch")): + return "edit" + if any(token in lower_title for token in ("delete", "remove", "rm")): + return "delete" + if any(token in lower_title for token in ("search", "grep", "rg")): + return "search" + if any(token in lower_title for token in ("bash", "shell", "exec", "run")): + return "execute" + return "other" + + +def _int_value(value: object, *, default: int) -> int: + with contextlib.suppress(TypeError, ValueError): + return int(value) + return default + + +def _framework_tape_store(framework: BubFramework) -> object | None: + get_tape_store = getattr(framework, "get_tape_store", None) + if get_tape_store is None: + return None + store = get_tape_store() + return store if hasattr(store, "fetch_all") else None + + +def _session_tape_name(session_id: str, workspace: Path) -> str: + workspace_hash = hashlib.md5(str(workspace.resolve()).encode("utf-8"), usedforsecurity=False).hexdigest()[:16] + session_hash = hashlib.md5(session_id.encode("utf-8"), usedforsecurity=False).hexdigest()[:16] + return f"{workspace_hash}__{session_hash}" + + +def _load_tape_entries_from_file(path: Path) -> list[TapeEntry]: + entries: list[TapeEntry] = [] + try: + with path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + entry = _tape_entry_from_json_line(raw_line) + if entry is not None: + entries.append(entry) + except OSError: + return [] + return entries + + +def _tape_entry_from_json_line(line: str) -> TapeEntry | None: + line = line.strip() + if not line: + return None + try: + payload = json.loads(line) + except json.JSONDecodeError: + return None + if not isinstance(payload, dict): + return None + + entry_id = payload.get("id") + kind = payload.get("kind") + entry_payload = payload.get("payload") + meta = payload.get("meta") + date = payload.get("date") + if not isinstance(entry_id, int) or not isinstance(kind, str) or not isinstance(entry_payload, dict): + return None + if not isinstance(meta, dict): + meta = {} + if not isinstance(date, str): + date = datetime.fromtimestamp(0.0, tz=UTC).isoformat() + return TapeEntry(entry_id, kind, dict(entry_payload), dict(meta), date) + + +def _message_entry_stream_event(entry: TapeEntry) -> StreamEvent | None: + role = entry.payload.get("role") + content = _message_content(entry.payload.get("content")) + if not content: + return None + + if role == "user": + user_content = _clean_user_tape_content(content) + if not user_content: + return None + return StreamEvent("user_text", {"delta": user_content}) + if role == "assistant": + return StreamEvent("text", {"delta": content}) + return None + + +def _message_content(value: object) -> str: + if isinstance(value, str): + return value + if not isinstance(value, list): + return "" + + parts: list[str] = [] + for item in value: + if not isinstance(item, Mapping): + continue + if item.get("type") == "text": + text = item.get("text") + if isinstance(text, str): + parts.append(text) + return "\n".join(parts).strip() + + +def _clean_user_tape_content(content: str) -> str: + cleaned = _BUB_PROMPT_CONTEXT.sub("", content, count=1).strip() + if cleaned.startswith(_CONTINUATION_PROMPT_PREFIX): + return "" + return cleaned + + +def _list_payload(value: object) -> list[object]: + return value if isinstance(value, list) else [] + + +def _stringify(value: object) -> str: + if value is None: + return "" + if isinstance(value, str): + return value + return repr(value) diff --git a/packages/bub-acp-server/src/bub_acp_server/py.typed b/packages/bub-acp-server/src/bub_acp_server/py.typed new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/packages/bub-acp-server/src/bub_acp_server/py.typed @@ -0,0 +1 @@ + diff --git a/packages/bub-acp-server/tests/test_plugin.py b/packages/bub-acp-server/tests/test_plugin.py new file mode 100644 index 0000000..addee84 --- /dev/null +++ b/packages/bub-acp-server/tests/test_plugin.py @@ -0,0 +1,263 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest +from acp.schema import TextContentBlock +from bub.types import TurnResult +from republic import StreamEvent, TapeEntry, TapeQuery + +from bub_acp_server import plugin +from bub_acp_server.plugin import BubACPAgent + + +@pytest.fixture(autouse=True) +def isolated_bub_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("BUB_HOME", str(tmp_path / ".bub")) + + +class FakeClient: + def __init__(self) -> None: + self.updates: list[tuple[str, object]] = [] + + async def session_update(self, session_id: str, update: object, **kwargs: Any) -> None: + self.updates.append((session_id, update)) + + +class FakeFramework: + def __init__(self) -> None: + self.workspace = Path.cwd() + self.router = None + self.previous_routers: list[object] = [] + self.messages: list[object] = [] + self.stream_output_values: list[bool] = [] + + def bind_outbound_router(self, router: object) -> None: + self.previous_routers.append(router) + self.router = router + + async def quit_via_router(self, session_id: str) -> None: + return None + + async def process_inbound(self, inbound: object, stream_output: bool = False) -> TurnResult: + self.messages.append(inbound) + self.stream_output_values.append(stream_output) + + async def stream(): + yield StreamEvent("text", {"delta": "hello"}) + yield StreamEvent("tool_call", {"index": 0, "call": {"id": "call-1", "name": "bash"}}) + yield StreamEvent("tool_result", {"index": 0, "result": "ok"}) + yield StreamEvent("text", {"delta": " world"}) + yield StreamEvent("final", {"text": "hello world", "ok": True}) + + async for _ in self.router.wrap_stream(inbound, stream()): + pass + return TurnResult( + session_id=inbound.session_id, + prompt=inbound.content, + model_output="hello world", + ) + + +class FakeTapeStore: + def __init__(self, entries: list[TapeEntry]) -> None: + self.entries = entries + self.queries: list[str] = [] + + def fetch_all(self, query: TapeQuery) -> list[TapeEntry]: + self.queries.append(query.tape) + return self.entries + + +class TapeFramework(FakeFramework): + def __init__(self, entries: list[TapeEntry]) -> None: + super().__init__() + self.tape_store = FakeTapeStore(entries) + + def get_tape_store(self) -> FakeTapeStore: + return self.tape_store + + +class NoTextFramework(FakeFramework): + async def process_inbound(self, inbound: object, stream_output: bool = False) -> TurnResult: + self.messages.append(inbound) + self.stream_output_values.append(stream_output) + + async def stream(): + yield StreamEvent("final", {"text": "late text", "ok": True}) + + async for _ in self.router.wrap_stream(inbound, stream()): + pass + return TurnResult(session_id=inbound.session_id, prompt=inbound.content, model_output="late text") + + +@pytest.mark.asyncio +async def test_initialize_advertises_session_capabilities() -> None: + agent = BubACPAgent(FakeFramework()) + response = await agent.initialize(protocol_version=1) + + assert response.protocol_version == 1 + assert response.agent_info is not None + assert response.agent_info.name == "bub" + assert response.agent_capabilities is not None + assert response.agent_capabilities.session_capabilities is not None + assert response.agent_capabilities.session_capabilities.list is not None + assert response.agent_capabilities.session_capabilities.close is not None + assert response.agent_capabilities.session_capabilities.resume is not None + assert response.agent_capabilities.load_session is True + + +@pytest.mark.asyncio +async def test_resume_adopts_existing_editor_session_ids(tmp_path: Path) -> None: + agent = BubACPAgent(FakeFramework()) + + resume_response = await agent.resume_session(cwd=str(tmp_path), session_id="zed-session") + sessions = await agent.list_sessions(cwd=str(tmp_path)) + + assert resume_response is not None + assert [session.session_id for session in sessions.sessions] == ["zed-session"] + assert sessions.sessions[0].cwd == str(tmp_path) + + +@pytest.mark.asyncio +async def test_load_session_attaches_tape_history_through_streaming_router(tmp_path: Path) -> None: + session_id = "zed-session" + entries = [ + TapeEntry( + 1, + "message", + { + "role": "user", + "content": ( + f"acp_session_id={session_id}|channel=$acp-server|chat_id={session_id}\n" + "---Date: 2026-06-01T03:42:01+08:00---\n" + "HELLO" + ), + }, + ), + TapeEntry(2, "message", {"role": "assistant", "content": "Hi"}), + TapeEntry(3, "tool_call", {"calls": [{"id": "call-1", "name": "bash"}]}), + TapeEntry(4, "tool_result", {"results": ["ok"]}), + TapeEntry( + 5, + "message", + { + "role": "user", + "content": "Continue the task until all targets are completed. [context: acp_session_id=x]", + }, + ), + ] + framework = TapeFramework(entries) + client = FakeClient() + agent = BubACPAgent(framework) + agent.on_connect(client) + + response = await agent.load_session(cwd=str(tmp_path), session_id=session_id) + + assert response is not None + assert framework.tape_store.queries == [plugin._session_tape_name(session_id, tmp_path)] + update_names = [update.session_update for _, update in client.updates] + assert update_names == [ + "user_message_chunk", + "agent_message_chunk", + "tool_call", + "tool_call_update", + ] + assert client.updates[0][1].content.text == "HELLO" + assert client.updates[1][1].content.text == "Hi" + + +@pytest.mark.asyncio +async def test_sessions_survive_agent_restart(tmp_path: Path) -> None: + first_agent = BubACPAgent(FakeFramework()) + created = await first_agent.new_session(cwd=str(tmp_path)) + + second_agent = BubACPAgent(FakeFramework()) + sessions = await second_agent.list_sessions(cwd=str(tmp_path)) + + assert [session.session_id for session in sessions.sessions] == [created.session_id] + assert sessions.sessions[0].cwd == str(tmp_path) + + +@pytest.mark.asyncio +async def test_session_store_expands_user_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("BUB_HOME", "~/.custom-bub") + + agent = BubACPAgent(FakeFramework()) + await agent.new_session(cwd=str(tmp_path)) + + assert (tmp_path / ".custom-bub" / "acp-sessions.json").exists() + + +@pytest.mark.asyncio +async def test_run_acp_agent_registers_resume_routes_by_default(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, object] = {} + + class RunningFramework(FakeFramework): + def running(self): + class Context: + async def __aenter__(self) -> None: + return None + + async def __aexit__(self, *args: object) -> None: + return None + + return Context() + + async def fake_run_agent(agent: object, *, use_unstable_protocol: bool = False) -> None: + captured["agent"] = agent + captured["use_unstable_protocol"] = use_unstable_protocol + + monkeypatch.setattr(plugin, "run_agent", fake_run_agent) + + await plugin.run_acp_agent(RunningFramework()) + + assert isinstance(captured["agent"], BubACPAgent) + assert captured["use_unstable_protocol"] is True + + +@pytest.mark.asyncio +async def test_prompt_streams_bub_events_to_acp_client() -> None: + framework = FakeFramework() + client = FakeClient() + agent = BubACPAgent(framework) + agent.on_connect(client) + session = await agent.new_session(cwd=str(Path.cwd())) + + response = await agent.prompt( + [TextContentBlock(type="text", text="say hello")], + session_id=session.session_id, + message_id="user-message-1", + ) + + assert response.stop_reason == "end_turn" + assert response.user_message_id == "user-message-1" + assert framework.stream_output_values == [True] + assert framework.messages[0].content == "say hello" + assert framework.messages[0].channel == "acp-server" + assert framework.previous_routers[-1] is None + + update_names = [update.session_update for _, update in client.updates] + assert update_names == [ + "agent_message_chunk", + "tool_call", + "tool_call_update", + "agent_message_chunk", + ] + assert client.updates[0][1].content.text == "hello" + assert client.updates[-1][1].content.text == " world" + + +@pytest.mark.asyncio +async def test_prompt_sends_complete_output_when_stream_has_no_text_chunks() -> None: + framework = NoTextFramework() + client = FakeClient() + agent = BubACPAgent(framework) + agent.on_connect(client) + session = await agent.new_session(cwd=str(Path.cwd())) + + await agent.prompt([TextContentBlock(type="text", text="hello")], session_id=session.session_id) + + assert [update.content.text for _, update in client.updates] == ["late text"] diff --git a/pyproject.toml b/pyproject.toml index ee756a2..950197f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "bub", + "bub-acp-server", "bub-codex", "bub-cursor", "bub-discord", @@ -35,6 +36,7 @@ members = ["packages/*"] [tool.uv.sources] bub = { git = "https://github.com/bubbuild/bub.git" } +bub-acp-server = { workspace = true } bub-tg-feed = { workspace = true } bub-codex = { workspace = true } bub-cursor = { workspace = true } diff --git a/uv.lock b/uv.lock index 0e7c2fe..93a30a8 100644 --- a/uv.lock +++ b/uv.lock @@ -9,6 +9,7 @@ resolution-markers = [ [manifest] members = [ + "bub-acp-server", "bub-codex", "bub-contrib", "bub-cursor", @@ -34,6 +35,18 @@ members = [ "tape-dataset-opendal", ] +[[package]] +name = "agent-client-protocol" +version = "0.10.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/88/a0/3b96cd8374725c69bc3dae9fcc2082f3f6cafec1be35d24d7af0f8c3265f/agent_client_protocol-0.10.1.tar.gz", hash = "sha256:355c65ca19f0568344aafc2c1552b7066a8fc491df23ab28e7e253c6c9a85a25", size = 81924, upload-time = "2026-05-24T18:46:44.444Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/18/d8c7ff337cf621ea79a84006a7252ff057bfb5767549bb102cc6649f4ec2/agent_client_protocol-0.10.1-py3-none-any.whl", hash = "sha256:a03d3198f4d772f2e0ec012c00ac1cce131b4710220a3dc9fae3c991d047c750", size = 65401, upload-time = "2026-05-24T18:46:43.202Z" }, +] + [[package]] name = "aio-pika" version = "9.6.2" @@ -377,6 +390,33 @@ dependencies = [ { name = "typer" }, ] +[[package]] +name = "bub-acp-server" +version = "0.1.0" +source = { editable = "packages/bub-acp-server" } +dependencies = [ + { name = "agent-client-protocol" }, + { name = "bub" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-client-protocol", specifier = "==0.10.1" }, + { name = "bub", git = "https://github.com/bubbuild/bub.git" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=9.0.3" }, + { name = "pytest-asyncio", specifier = ">=0.21.0" }, +] + [[package]] name = "bub-codex" version = "0.1.0" @@ -388,6 +428,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "bub" }, + { name = "bub-acp-server" }, { name = "bub-codex" }, { name = "bub-cursor" }, { name = "bub-dingtalk" }, @@ -422,6 +463,7 @@ test = [ [package.metadata] requires-dist = [ { name = "bub", git = "https://github.com/bubbuild/bub.git" }, + { name = "bub-acp-server", editable = "packages/bub-acp-server" }, { name = "bub-codex", editable = "packages/bub-codex" }, { name = "bub-cursor", editable = "packages/bub-cursor" }, { name = "bub-dingtalk", editable = "packages/bub-dingtalk" },