diff --git a/code_puppy/agents/_runtime.py b/code_puppy/agents/_runtime.py index df8620b8b..a7e89074b 100644 --- a/code_puppy/agents/_runtime.py +++ b/code_puppy/agents/_runtime.py @@ -58,6 +58,7 @@ on_agent_run_end, on_agent_run_result, on_agent_run_start, + on_user_prompt_submit, ) from code_puppy.config import ( get_enable_streaming, @@ -270,6 +271,26 @@ async def run_with_mcp( prompt = _sanitize_prompt(prompt) group_id = str(uuid.uuid4()) + # Fire UserPromptSubmit hook — callbacks may rewrite or augment the prompt + # before it reaches the model. Hooks returning a plain string replace the + # prompt entirely; ``{"inject_context": "..."}`` prepends to it. + try: + hook_results = await on_user_prompt_submit( + prompt, agent_name=agent.name, session_id=group_id + ) + for hook_result in hook_results: + if hook_result is None: + continue + if isinstance(hook_result, str) and hook_result.strip(): + prompt = hook_result + elif isinstance(hook_result, dict): + injected = hook_result.get("inject_context") + if injected: + prompt = f"{injected}\n\n{prompt}" + except Exception: + # Hook failures never block user prompts. + pass + if agent._code_generation_agent is None: build_pydantic_agent(agent) pydantic_agent = agent._code_generation_agent diff --git a/code_puppy/callbacks.py b/code_puppy/callbacks.py index 93df3ff7d..cb356f753 100644 --- a/code_puppy/callbacks.py +++ b/code_puppy/callbacks.py @@ -39,6 +39,10 @@ "message_history_processor_start", "message_history_processor_end", "on_message", + "user_prompt_submit", + "pre_compact", + "session_end", + "notification", ] CallbackFunc = Callable[..., Any] @@ -78,6 +82,10 @@ "message_history_processor_start": [], "message_history_processor_end": [], "on_message": [], + "user_prompt_submit": [], + "pre_compact": [], + "session_end": [], + "notification": [], } logger = logging.getLogger(__name__) @@ -788,3 +796,72 @@ async def on_message(message_id: str, message: Any) -> List[Any]: List of results from registered callbacks. """ return await _trigger_callbacks("on_message", message_id, message) + + +async def on_user_prompt_submit(prompt: str, **kwargs: Any) -> List[Any]: + """Trigger callbacks when the user submits a prompt to an agent. + + Fires before the prompt is sent to the model. Callbacks may return: + - A string: replaces the prompt text entirely + - A dict with ``{"inject_context": "..."}``: the text is prepended to the prompt + - A dict with ``{"blocked": True, "reason": "..."}``: signals the caller to veto + - ``None``: no change + + Args: + prompt: The raw user prompt string. + **kwargs: Reserved for future context (session_id, agent_name, etc). + + Returns: + List of results from registered callbacks (in registration order). + """ + return await _trigger_callbacks("user_prompt_submit", prompt, **kwargs) + + +async def on_pre_compact( + agent_name: str, + session_id: Optional[str], + message_history: List[Any], + incoming_messages: List[Any], +) -> List[Any]: + """Trigger callbacks immediately before message-history compaction. + + This is a Claude-Code-compatible mirror of ``message_history_processor_start`` + specifically for the ``PreCompact`` hook event. Stays observation-only; + callbacks can log/annotate but cannot currently veto compaction. + + Args: + agent_name: Name of the agent whose history is being compacted. + session_id: Optional session identifier. + message_history: Current message history (pre-compaction). + incoming_messages: New messages being added in this pass. + + Returns: + List of results from registered callbacks. + """ + return await _trigger_callbacks( + "pre_compact", agent_name, session_id, message_history, incoming_messages + ) + + +async def on_session_end() -> List[Any]: + """Trigger callbacks when a session ends (distinct from app shutdown). + + ``shutdown`` fires once when the CLI process exits. ``session_end`` is the + Claude-Code-compatible twin and currently fires from the same site but is + intentionally a separate phase so plugins can opt into either semantic. + """ + return await _trigger_callbacks("session_end") + + +async def on_notification(notification_type: str, payload: Any = None) -> List[Any]: + """Trigger callbacks when a user-facing notification is emitted. + + Args: + notification_type: A short string tag (e.g. ``"warning"``, ``"info"``, + ``"error"``). Free-form; plugins choose their own taxonomy. + payload: The notification body (any JSON-ish object or string). + + Returns: + List of results from registered callbacks. + """ + return await _trigger_callbacks("notification", notification_type, payload) diff --git a/code_puppy/command_line/model_picker_completion.py b/code_puppy/command_line/model_picker_completion.py index c0c93afa2..0df5da6e8 100644 --- a/code_puppy/command_line/model_picker_completion.py +++ b/code_puppy/command_line/model_picker_completion.py @@ -16,6 +16,10 @@ get_total_pages, ) from code_puppy.config import get_global_model_name +from code_puppy.item_visibility import ( + load_hidden_models, + prune_stale_entries, +) from code_puppy.list_filtering import query_matches_text from code_puppy.model_switching import set_model_and_reload_agent @@ -211,34 +215,46 @@ def __init__(self, model_names: Optional[list[str]] = None): self.page_size = MODEL_PICKER_PAGE_SIZE self.result: Optional[str] = None - if self.current_model in self.visible_model_names: - self.selected_index = self.visible_model_names.index(self.current_model) + # Load visibility state and prune stale entries + self._hidden_models: set[str] = load_hidden_models() + prune_stale_entries(self.model_names) + self._hidden_models = load_hidden_models() # reload after pruning + self.show_all: bool = False # session-level; resets every picker open + + # Set initial selection to current model (in display list) + if self.current_model in self.display_model_names: + self.selected_index = self.display_model_names.index(self.current_model) self.page = get_page_for_index(self.selected_index, self.page_size) @property def total_pages(self) -> int: - return get_total_pages(len(self.visible_model_names), self.page_size) + return get_total_pages(len(self.display_model_names), self.page_size) @property def page_start(self) -> int: start, _ = get_page_bounds( - self.page, len(self.visible_model_names), self.page_size + self.page, len(self.display_model_names), self.page_size ) return start @property def page_end(self) -> int: _, end = get_page_bounds( - self.page, len(self.visible_model_names), self.page_size + self.page, len(self.display_model_names), self.page_size ) return end @property def models_on_page(self) -> list[str]: - return self.visible_model_names[self.page_start : self.page_end] + return self.display_model_names[self.page_start : self.page_end] @property def visible_model_names(self) -> list[str]: + """Models matching the current filter text (includes hidden models). + + This represents the raw filter-matched list — visibility is applied + on top by display_model_names. + """ if not self.filter_text: return self.model_names return [ @@ -247,23 +263,39 @@ def visible_model_names(self) -> list[str]: if query_matches_text(self.filter_text, model_name) ] + @property + def display_model_names(self) -> list[str]: + """Models shown in the list, respecting visibility settings. + + Decision matrix: + - show_all=True → All models (hidden ones dimmed in render) + - show_all=False, filter_text typed → Filter matches (hidden dimmed) + - show_all=False, filter_text empty → Non-hidden models only + """ + base = self.visible_model_names + if self.show_all: + return base + if self.filter_text: + return base + return [m for m in base if m not in self._hidden_models] + def _get_selected_model_name(self) -> Optional[str]: - if 0 <= self.selected_index < len(self.visible_model_names): - return self.visible_model_names[self.selected_index] + if 0 <= self.selected_index < len(self.display_model_names): + return self.display_model_names[self.selected_index] return None def _ensure_selection_visible(self) -> None: self.page = ensure_visible_page( self.selected_index, self.page, - len(self.visible_model_names), + len(self.display_model_names), self.page_size, ) def _set_filter_text(self, value: str) -> None: selected_model = self._get_selected_model_name() self.filter_text = value - visible_models = self.visible_model_names + visible_models = self.display_model_names if not visible_models: self.selected_index = 0 self.page = 0 @@ -298,7 +330,7 @@ def _move_up(self) -> None: self._ensure_selection_visible() def _move_down(self) -> None: - if self.selected_index < len(self.visible_model_names) - 1: + if self.selected_index < len(self.display_model_names) - 1: self.selected_index += 1 self._ensure_selection_visible() @@ -322,13 +354,26 @@ def _render(self): ) lines.append(("", "\n")) - if not self.visible_model_names: - empty_message = ( - "No models match the current filter." - if self.filter_text - else "No models available." - ) - lines.append(("fg:ansiyellow", f"\n {empty_message}\n")) + if not self.display_model_names: + visible_count = len(self.visible_model_names) + if visible_count > 0: + # Some models match the filter but all are hidden + lines.append(("fg:ansiyellow", "\n All filtered models are hidden.\n")) + lines.append(("fg:ansibrightblack", " Press ")) + lines.append(("", "Tab")) + lines.append(("fg:ansibrightblack", " to show all models.\n")) + elif self.filter_text: + # No models match the filter at all + lines.append( + ("fg:ansiyellow", "\n No models match the current filter.\n") + ) + elif self._hidden_models: + lines.append(("fg:ansiyellow", "\n All models are hidden.\n")) + lines.append(("fg:ansibrightblack", " Press ")) + lines.append(("", "Tab")) + lines.append(("fg:ansibrightblack", " to show all models.\n")) + else: + lines.append(("fg:ansiyellow", "\n No models available.\n")) lines.append(("fg:ansibrightblack", " Type ")) lines.append(("", "Adjust filter\n")) lines.append(("fg:ansibrightblack", " Backspace ")) @@ -337,21 +382,35 @@ def _render(self): lines.append(("fg:ansibrightblack", " Ctrl+U ")) lines.append(("", "Clear filter\n")) lines.append(("fg:ansiyellow", " Esc ")) - lines.append(("", "Exit\n")) + lines.append(("", "Cancel\n")) return lines lines.append(("fg:ansibrightblack", f"\n Current: {self.current_model}\n\n")) - for offset, model_name in enumerate(self.models_on_page): absolute_index = self.page_start + offset is_selected = absolute_index == self.selected_index is_current = model_name == self.current_model + is_hidden = model_name in self._hidden_models prefix = " › " if is_selected else " " - style = "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" - lines.append((style, f"{prefix}{model_name}")) + if is_hidden: + style = ( + "fg:ansibrightblack dim" + if is_selected + else "fg:ansibrightblack dim" + ) + arrow_style = ( + "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" + ) + else: + style = "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" + arrow_style = style + lines.append((arrow_style, prefix)) + lines.append((style, model_name)) if is_current: lines.append(("fg:ansigreen", " (active)")) + if is_hidden: + lines.append(("fg:ansibrightblack dim", " [hidden]")) lines.append(("", "\n")) lines.append(("", "\n")) @@ -366,6 +425,10 @@ def _render(self): lines.append(("", "Delete filter char\n")) lines.append(("fg:ansibrightblack", " Ctrl+U ")) lines.append(("", "Clear filter\n")) + lines.append(("fg:ansibrightblack", " Space ")) + lines.append(("", "Toggle visibility\n")) + lines.append(("fg:ansibrightblack", " Tab ")) + lines.append(("", "Show/hide all\n")) lines.append(("fg:ansigreen", " Enter ")) lines.append(("", "Select model\n")) lines.append(("fg:ansiyellow", " Esc ")) @@ -443,6 +506,34 @@ def _(event): self.result = None event.app.exit() + @kb.add("space") + def _(event): + """Toggle visibility of the currently highlighted model.""" + from code_puppy.item_visibility import ( + load_hidden_models, + toggle_model_hidden, + ) + + model = self._get_selected_model_name() + if model is None: + return + # Silently ignore if trying to hide the currently active model + if model == self.current_model: + return + toggle_model_hidden(model) + self._hidden_models = load_hidden_models() + self._set_filter_text(self.filter_text) + refresh() + event.app.invalidate() + + @kb.add("tab") + def _(event): + """Toggle show-all mode (session-level, not persisted).""" + self.show_all = not self.show_all + self._set_filter_text(self.filter_text) + refresh() + event.app.invalidate() + app = Application( layout=Layout(Window(content=control, wrap_lines=True)), key_bindings=kb, diff --git a/code_puppy/command_line/model_settings_menu.py b/code_puppy/command_line/model_settings_menu.py index 4c5b5c0fc..a5d077877 100644 --- a/code_puppy/command_line/model_settings_menu.py +++ b/code_puppy/command_line/model_settings_menu.py @@ -32,6 +32,10 @@ set_openai_reasoning_summary, set_openai_verbosity, ) +from code_puppy.item_visibility import ( + load_hidden_models, + prune_stale_entries, +) from code_puppy.messaging import emit_info from code_puppy.model_factory import ModelFactory from code_puppy.tools.command_runner import set_awaiting_user_input @@ -240,6 +244,12 @@ def __init__(self): self.all_models = _load_all_model_names() self.current_model_name = get_global_model_name() + # Load visibility state and prune stale entries + self._hidden_models: set = load_hidden_models() + prune_stale_entries(self.all_models) + self._hidden_models = load_hidden_models() # reload after pruning + self.show_all: bool = False # session-level; resets every menu open + # Navigation state self.view_mode = "models" # "models" or "settings" self.model_index = 0 # Index in model list (absolute) @@ -250,8 +260,9 @@ def __init__(self): self.page_size = MODELS_PER_PAGE # Try to pre-select the current model and set correct page - if self.current_model_name in self.all_models: - self.model_index = self.all_models.index(self.current_model_name) + # Use display_model_names to account for visibility + if self.current_model_name in self.display_model_names: + self.model_index = self.display_model_names.index(self.current_model_name) self.page = get_page_for_index(self.model_index, self.page_size) # Editing state @@ -267,31 +278,47 @@ def __init__(self): @property def total_pages(self) -> int: """Calculate total number of pages.""" - return get_total_pages(len(self.all_models), self.page_size) + return get_total_pages(len(self.display_model_names), self.page_size) + + @property + def display_model_names(self) -> List[str]: + """Models shown in the list, respecting visibility settings. + + Decision matrix: + - show_all=True → All models (hidden ones dimmed in render) + - show_all=False → Non-hidden models only + """ + if self.show_all: + return self.all_models + return [m for m in self.all_models if m not in self._hidden_models] @property def page_start(self) -> int: """Get the starting index for the current page.""" - start, _ = get_page_bounds(self.page, len(self.all_models), self.page_size) + start, _ = get_page_bounds( + self.page, len(self.display_model_names), self.page_size + ) return start @property def page_end(self) -> int: """Get the ending index (exclusive) for the current page.""" - _, end = get_page_bounds(self.page, len(self.all_models), self.page_size) + _, end = get_page_bounds( + self.page, len(self.display_model_names), self.page_size + ) return end @property def models_on_page(self) -> List[str]: """Get the models visible on the current page.""" - return self.all_models[self.page_start : self.page_end] + return self.display_model_names[self.page_start : self.page_end] def _ensure_selection_visible(self): """Ensure the current selection is on the visible page.""" self.page = ensure_visible_page( self.model_index, self.page, - len(self.all_models), + len(self.display_model_names), self.page_size, ) @@ -353,8 +380,14 @@ def _render_main_list(self) -> List: ) lines.append(("", "\n\n")) - if not self.all_models: - lines.append(("fg:ansiyellow", " No models available.")) + if not self.display_model_names: + if self._hidden_models: + lines.append(("fg:ansiyellow", " All models are hidden.")) + lines.append(("fg:ansibrightblack", " Press ")) + lines.append(("", "Tab")) + lines.append(("fg:ansibrightblack", " to show all models.\n")) + else: + lines.append(("fg:ansiyellow", " No models available.")) lines.append(("", "\n\n")) self._add_model_nav_hints(lines) return lines @@ -364,21 +397,36 @@ def _render_main_list(self) -> List: absolute_index = self.page_start + i is_selected = absolute_index == self.model_index is_current = model_name == self.current_model_name + is_hidden = model_name in self._hidden_models prefix = " › " if is_selected else " " - style = "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" + if is_hidden: + style = ( + "fg:ansibrightblack dim" + if is_selected + else "fg:ansibrightblack dim" + ) + arrow_style = ( + "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" + ) + else: + style = "fg:ansiwhite bold" if is_selected else "fg:ansibrightblack" + arrow_style = style # Check if model has any custom settings model_settings = get_all_model_settings(model_name) has_settings = len(model_settings) > 0 - lines.append((style, f"{prefix}{model_name}")) + lines.append((arrow_style, prefix)) + lines.append((style, model_name)) # Show indicators if is_current: lines.append(("fg:ansigreen", " (active)")) if has_settings: lines.append(("fg:ansicyan", " ⚙")) + if is_hidden: + lines.append(("fg:ansibrightblack dim", " [hidden]")) lines.append(("", "\n")) @@ -433,6 +481,10 @@ def _add_model_nav_hints(self, lines: List): if self.total_pages > 1: lines.append(("fg:ansibrightblack", " PgUp/PgDn ")) lines.append(("", "Change page\n")) + lines.append(("fg:ansibrightblack", " Space ")) + lines.append(("", "Toggle visibility\n")) + lines.append(("fg:ansibrightblack", " Tab ")) + lines.append(("", "Show/hide all\n")) lines.append(("fg:ansigreen", " Enter ")) lines.append(("", "Configure model\n")) lines.append(("fg:ansiyellow", " Esc ")) @@ -624,9 +676,9 @@ def _render_details_panel(self) -> List: def _enter_settings_view(self): """Enter settings view for the selected model.""" - if not self.all_models: + if not self.display_model_names: return - model_name = self.all_models[self.model_index] + model_name = self.display_model_names[self.model_index] self._load_model_settings(model_name) self.view_mode = "settings" @@ -825,7 +877,7 @@ def _(event): @kb.add("c-n") # Ctrl+N = next (Emacs-style) def _(event): if self.view_mode == "models": - if self.model_index < len(self.all_models) - 1: + if self.model_index < len(self.display_model_names) - 1: self.model_index += 1 self._ensure_selection_visible() self.update_display() @@ -906,6 +958,36 @@ def _(event): self._cancel_edit() event.app.exit() + @kb.add("space") + def _(event): + """Toggle visibility of the currently highlighted model.""" + if self.view_mode != "models": + return + from code_puppy.item_visibility import ( + load_hidden_models, + toggle_model_hidden, + ) + + if not self.display_model_names: + return + model = self.display_model_names[self.model_index] + # Silently ignore if trying to hide the currently active model + if model == self.current_model_name: + return + toggle_model_hidden(model) + self._hidden_models = load_hidden_models() + self._ensure_selection_visible() + self.update_display() + + @kb.add("tab") + def _(event): + """Toggle show-all mode (session-level, not persisted).""" + if self.view_mode != "models": + return + self.show_all = not self.show_all + self._ensure_selection_visible() + self.update_display() + layout = Layout(root_container) app = Application( layout=layout, diff --git a/code_puppy/hook_engine/README.md b/code_puppy/hook_engine/README.md index 7721ca2e4..0fbd6b7dc 100644 --- a/code_puppy/hook_engine/README.md +++ b/code_puppy/hook_engine/README.md @@ -71,10 +71,22 @@ Also available as environment variables: `CLAUDE_TOOL_INPUT`, `CLAUDE_TOOL_NAME` ## Exit Codes -- `0` - Allow (stdout shown in transcript) +- `0` - Allow (stdout injected into model context for ``SessionStart``, + ``UserPromptSubmit``, and ``PreToolUse`` events; observation-only for others) - `1` - Block (stderr shown as block reason) - `2` - Error feedback to Claude without blocking +### Where stdout goes + +| Event | Destination of ``stdout`` on exit 0 | +|------------------|----------------------------------------------------| +| ``SessionStart`` | Appended to the agent's system prompt (once) | +| ``UserPromptSubmit`` | Prepended to the user prompt before the model sees it | +| ``PreToolUse`` | Prepended to the tool's observation result | +| ``PostToolUse`` | Observation only — not injected | +| ``Stop`` / ``SubagentStop`` | Observation only | +| ``SessionEnd`` / ``PreCompact`` / ``Notification`` | Observation only | + See `docs/HOOKS.md` for the full user-facing guide. ## Tool Name Compatibility diff --git a/code_puppy/item_visibility.py b/code_puppy/item_visibility.py new file mode 100644 index 000000000..e982bf046 --- /dev/null +++ b/code_puppy/item_visibility.py @@ -0,0 +1,391 @@ +"""Reusable visibility store for picker menus. + +Provides atomic persistence, stale entry cleanup, and toggle operations +for hiding items from picker UIs. + +Usage: + # Model visibility (current scope) + from code_puppy.item_visibility import load_hidden_models, toggle_model_hidden + + hidden = load_hidden_models() + toggle_model_hidden("gpt-4") + + # Agent visibility (future work) + from code_puppy.item_visibility import VisibilityStore + agent_store = VisibilityStore("agent") + agent_store.toggle("typescript-reviewer") + + # MCP server visibility (future work) + mcp_store = VisibilityStore("mcp_servers") + mcp_store.toggle("my-custom-server") +""" + +from __future__ import annotations + +import json +import logging +import os +from typing import Iterable, Optional + +from code_puppy.config import DATA_DIR + +logger = logging.getLogger(__name__) + + +class VisibilityStore: + """Reusable visibility store for picker menus. + + Provides atomic persistence, stale entry cleanup, and toggle operations + for hiding items from picker UIs. + + Each store is independent — no shared mutable state between different + store instances. + + Usage: + store = VisibilityStore("model") # → DATA_DIR/model_visibility.json + store.toggle("gpt-4") + store.prune_stale(["gpt-4", "claude-3"]) # cleanup removed items + """ + + def __init__(self, name: str): + """Initialize a visibility store. + + Args: + name: Identifier for this visibility store. + Results in DATA_DIR/{name}_visibility.json + e.g., "model" → "model_visibility.json" + """ + self._name = name + self._file_path = os.path.join(DATA_DIR, f"{name}_visibility.json") + # JSON key: "hidden_models", "hidden_agents", etc. + self._hidden_key = f"hidden_{name}s" + + @property + def name(self) -> str: + """Return the store name.""" + return self._name + + @property + def file_path(self) -> str: + """Return the path to the visibility config file.""" + return self._file_path + + def load_hidden(self) -> set[str]: + """Load the set of hidden items from disk. + + Returns: + Set of hidden item names. Empty set if file missing, corrupt, + or permission denied. + """ + file_path = self._file_path + + # File doesn't exist — nothing hidden + if not os.path.exists(file_path): + return set() + + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + except (json.JSONDecodeError, OSError) as exc: + logger.warning( + f"Failed to read visibility file {file_path}: {exc}", + ) + return set() + + # Validate structure + if not isinstance(data, dict): + logger.warning( + f"Visibility file {file_path} has invalid structure (not a dict)", + ) + return set() + + hidden_list = data.get(self._hidden_key) + if not isinstance(hidden_list, list): + # Key missing or wrong type — nothing hidden + return set() + + return set(hidden_list) + + def save_hidden(self, hidden: set[str]) -> None: + """Save the set of hidden items to disk atomically. + + Creates DATA_DIR if it doesn't exist. Uses atomic write + (temp file + rename) to prevent corruption. + + Args: + hidden: Set of hidden item names to persist. + """ + file_path = self._file_path + + # Ensure directory exists + try: + os.makedirs(os.path.dirname(file_path), exist_ok=True) + except OSError as exc: + logger.error(f"Failed to create data directory: {exc}") + return + + # Build data structure + data = {self._hidden_key: sorted(hidden)} + + # Atomic write: write to temp file, then rename + tmp_path = file_path + ".tmp" + try: + content = json.dumps(data, indent=2) + "\n" + with open(tmp_path, "w", encoding="utf-8") as f: + f.write(content) + os.replace(tmp_path, file_path) + except OSError as exc: + logger.error(f"Failed to write visibility file {file_path}: {exc}") + # Clean up temp file if it exists + if os.path.exists(tmp_path): + try: + os.remove(tmp_path) + except OSError: + pass + + def is_hidden(self, item: str) -> bool: + """Check if an item is hidden. + + Args: + item: Item name to check. + + Returns: + True if item is hidden, False otherwise. + """ + return item in self.load_hidden() + + def toggle(self, item: str) -> bool: + """Toggle the visibility of an item. + + If the item is visible, it becomes hidden. + If the item is hidden, it becomes visible. + + Args: + item: Item name to toggle. + + Returns: + True if the item is now hidden, False if now visible. + """ + hidden = self.load_hidden() + + if item in hidden: + hidden.discard(item) + now_hidden = False + else: + hidden.add(item) + now_hidden = True + + self.save_hidden(hidden) + return now_hidden + + def add_hidden(self, item: str) -> None: + """Add an item to the hidden set. + + Args: + item: Item name to hide. + """ + hidden = self.load_hidden() + hidden.add(item) + self.save_hidden(hidden) + + def remove_hidden(self, item: str) -> None: + """Remove an item from the hidden set. + + Args: + item: Item name to unhide. + """ + hidden = self.load_hidden() + hidden.discard(item) + self.save_hidden(hidden) + + def prune_stale(self, valid_items: Iterable[str]) -> set[str]: + """Remove hidden items that are no longer valid. + + Compares the hidden set against provided valid items and removes + any hidden entries that don't exist in valid_items. + + Writes back to disk if changes were made. + + Args: + valid_items: Iterable of currently valid item names. + + Returns: + Set of removed (stale) item names. Empty if nothing changed. + """ + hidden = self.load_hidden() + valid_set = set(valid_items) + stale = hidden - valid_set + + if stale: + new_hidden = hidden - stale + self.save_hidden(new_hidden) + + return stale + + def clear(self) -> None: + """Remove the visibility config file. + + Idempotent — safe to call even if file doesn't exist. + """ + if os.path.exists(self._file_path): + try: + os.remove(self._file_path) + except OSError as exc: + logger.warning(f"Failed to remove visibility file: {exc}") + + +# ----------------------------------------------------------------------------- +# Module-level convenience functions for model visibility +# These provide a stable API for the current scope (models only) +# ----------------------------------------------------------------------------- + +# Lazy-initialized module-level store instance +_model_store: Optional[VisibilityStore] = None + + +def _get_model_store() -> VisibilityStore: + """Get the module-level model visibility store (lazy init).""" + global _model_store + if _model_store is None: + _model_store = VisibilityStore("model") + return _model_store + + +def load_hidden_models() -> set[str]: + """Load the set of hidden model names. + + Returns: + Set of hidden model names. Empty set if no visibility config exists. + """ + return _get_model_store().load_hidden() + + +def save_hidden_models(hidden: set[str]) -> None: + """Save the set of hidden model names. + + Args: + hidden: Set of hidden model names to persist. + """ + _get_model_store().save_hidden(hidden) + + +def toggle_model_hidden(model: str) -> bool: + """Toggle the visibility of a model. + + Args: + model: Model name to toggle. + + Returns: + True if the model is now hidden, False if now visible. + """ + return _get_model_store().toggle(model) + + +def is_model_hidden(model: str) -> bool: + """Check if a model is hidden. + + Args: + model: Model name to check. + + Returns: + True if model is hidden, False otherwise. + """ + return _get_model_store().is_hidden(model) + + +def prune_stale_entries(all_model_names: Iterable[str]) -> set[str]: + """Remove hidden model entries that are no longer valid. + + Args: + all_model_names: Iterable of currently valid model names. + + Returns: + Set of removed (stale) model names. Empty if nothing changed. + """ + return _get_model_store().prune_stale(all_model_names) + + +def clear_visibility_config() -> None: + """Remove the model visibility config file. + + Idempotent — safe to call even if file doesn't exist. + """ + _get_model_store().clear() + + +# ----------------------------------------------------------------------------- +# Module-level convenience functions for agent visibility +# ----------------------------------------------------------------------------- + +# Lazy-initialized module-level store instance for agents +_agent_store: Optional[VisibilityStore] = None + + +def _get_agent_store() -> VisibilityStore: + """Get the module-level agent visibility store (lazy init).""" + global _agent_store + if _agent_store is None: + _agent_store = VisibilityStore("agent") + return _agent_store + + +def load_hidden_agents() -> set[str]: + """Load the set of hidden agent names. + + Returns: + Set of hidden agent names. Empty set if no visibility config exists. + """ + return _get_agent_store().load_hidden() + + +def save_hidden_agents(hidden: set[str]) -> None: + """Save the set of hidden agent names. + + Args: + hidden: Set of hidden agent names to persist. + """ + _get_agent_store().save_hidden(hidden) + + +def toggle_agent_hidden(agent: str) -> bool: + """Toggle the visibility of an agent. + + Args: + agent: Agent name to toggle. + + Returns: + True if the agent is now hidden, False if now visible. + """ + return _get_agent_store().toggle(agent) + + +def is_agent_hidden(agent: str) -> bool: + """Check if an agent is hidden. + + Args: + agent: Agent name to check. + + Returns: + True if agent is hidden, False otherwise. + """ + return _get_agent_store().is_hidden(agent) + + +def prune_stale_agent_entries(all_agent_names: Iterable[str]) -> set[str]: + """Remove hidden agent entries that are no longer valid. + + Args: + all_agent_names: Iterable of currently valid agent names. + + Returns: + Set of removed (stale) agent names. Empty if nothing changed. + """ + return _get_agent_store().prune_stale(all_agent_names) + + +def clear_agent_visibility_config() -> None: + """Remove the agent visibility config file. + + Idempotent — safe to call even if file doesn't exist. + """ + _get_agent_store().clear() diff --git a/code_puppy/plugins/claude_code_hooks/register_callbacks.py b/code_puppy/plugins/claude_code_hooks/register_callbacks.py index 06044fc8f..82c72cf4f 100644 --- a/code_puppy/plugins/claude_code_hooks/register_callbacks.py +++ b/code_puppy/plugins/claude_code_hooks/register_callbacks.py @@ -2,10 +2,23 @@ Register callbacks for Claude Code hooks plugin. Integrates the hook engine with code_puppy's callback system. + +Responsibilities: + * Route code_puppy callback phases to the hook engine's event types. + * Propagate hook ``stdout`` (exit code 0) back into the agent context for + ``SessionStart``, ``UserPromptSubmit``, and ``PreToolUse`` events. + * Honour block signals (exit code 1) from PreToolUse hooks. + * Wire the Claude-Code-compatible event surface (``SessionEnd``, + ``PreCompact``, ``UserPromptSubmit``, ``Notification``) that previously + only existed in the schema. + +Hooks that are observation-only (``PostToolUse``, ``Stop``, ``SubagentStop``, +``SessionEnd``, ``PreCompact``, ``Notification``) intentionally don't inject +stdout anywhere — there's no model context to put it in at that point. """ import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from code_puppy.callbacks import register_callback from code_puppy.hook_engine import EventData, HookEngine @@ -31,6 +44,12 @@ _hook_engine: Optional[HookEngine] = None +# Cache of stdout captured from ``SessionStart`` hooks. Drained into the system +# prompt via ``load_prompt`` on the first agent run, then cleared so repeat +# reloads don't double-inject. Lives at module scope because SessionStart fires +# once per process but system-prompt assembly happens per agent build. +_session_start_context: List[str] = [] + def _initialize_engine() -> Optional[HookEngine]: config = load_hooks_config() @@ -55,12 +74,21 @@ def _initialize_engine() -> Optional[HookEngine]: _hook_engine = _initialize_engine() +def _collect_stdout(result) -> str: + """Join successful-hook stdout into a single string (empty if none).""" + parts: List[str] = [] + for execution in result.results: + if execution.success and execution.stdout and execution.stdout.strip(): + parts.append(execution.stdout.strip()) + return "\n\n".join(parts) + + async def on_pre_tool_call_hook( tool_name: str, tool_args: Dict[str, Any], context: Any = None, ) -> Optional[Dict[str, Any]]: - """Pre-tool callback — executes hooks before tool runs. Can block.""" + """Pre-tool callback — executes hooks before tool runs. Can block OR inject.""" if not _hook_engine: return None @@ -83,6 +111,13 @@ async def on_pre_tool_call_hook( "reason": result.blocking_reason, "error_message": result.blocking_reason, } + + stdout = _collect_stdout(result) + if stdout: + logger.debug( + f"PreToolUse hook produced {len(stdout)} chars of context for '{tool_name}'" + ) + return {"inject_context": stdout} return None except Exception as e: logger.error(f"Error in pre-tool hook: {e}", exc_info=True) @@ -113,12 +148,13 @@ async def on_post_tool_call_hook( logger.error(f"Error in post-tool hook: {e}", exc_info=True) -register_callback("pre_tool_call", on_pre_tool_call_hook) -register_callback("post_tool_call", on_post_tool_call_hook) - - async def on_startup_hook() -> None: - """Startup callback — fires SessionStart hooks when code_puppy boots.""" + """Startup callback — fires SessionStart hooks and caches any stdout. + + Captured stdout is drained into the system prompt via :func:`load_prompt` + on the next agent build, so a SessionStart hook can load a project + constitution or style guide by simply printing it. + """ if not _hook_engine: return @@ -129,11 +165,60 @@ async def on_startup_hook() -> None: ) try: - await _hook_engine.process_event("SessionStart", event_data) + result = await _hook_engine.process_event("SessionStart", event_data) + stdout = _collect_stdout(result) + if stdout: + _session_start_context.append(stdout) + logger.debug( + f"SessionStart hooks contributed {len(stdout)} chars to system prompt" + ) except Exception as e: logger.error(f"Error in SessionStart hook: {e}", exc_info=True) +def load_prompt_additions() -> Optional[str]: + """Return cached SessionStart stdout for injection into the system prompt.""" + if not _session_start_context: + return None + return "\n\n".join(_session_start_context) + + +async def on_user_prompt_submit_hook( + prompt: str, **kwargs: Any +) -> Optional[Dict[str, Any]]: + """UserPromptSubmit callback — may mutate the prompt via stdout or veto it.""" + if not _hook_engine: + return None + + event_data = EventData( + event_type="UserPromptSubmit", + tool_name="user_prompt", + tool_args={"prompt": prompt}, + context={k: v for k, v in kwargs.items() if v is not None}, + ) + + try: + result = await _hook_engine.process_event("UserPromptSubmit", event_data) + + if result.blocked: + logger.debug(f"UserPromptSubmit blocked: {result.blocking_reason}") + return { + "blocked": True, + "reason": result.blocking_reason, + } + + stdout = _collect_stdout(result) + if stdout: + logger.debug( + f"UserPromptSubmit hooks contributed {len(stdout)} chars of context" + ) + return {"inject_context": stdout} + return None + except Exception as e: + logger.error(f"Error in UserPromptSubmit hook: {e}", exc_info=True) + return None + + async def on_agent_run_end_hook( agent_name: str, model_name: str, @@ -170,7 +255,93 @@ async def on_agent_run_end_hook( logger.error(f"Error in {event_type} hook: {e}", exc_info=True) +async def on_session_end_hook() -> None: + """shutdown/session_end callback — fires SessionEnd hooks (observation).""" + if not _hook_engine: + return + + event_data = EventData( + event_type="SessionEnd", + tool_name="session", + tool_args={}, + ) + + try: + await _hook_engine.process_event("SessionEnd", event_data) + except Exception as e: + logger.error(f"Error in SessionEnd hook: {e}", exc_info=True) + + +def on_pre_compact_hook( + agent_name: str, + session_id: Optional[str], + message_history: List[Any], + incoming_messages: List[Any], +) -> None: + """message_history_processor_start callback — fires PreCompact hooks. + + This callback is synchronous to match the ``message_history_processor_start`` + contract, so we kick the async hook dispatch onto the running loop as a + fire-and-forget task. If there's no loop (shouldn't happen in practice), + we just log and skip — never blocking compaction. + """ + if not _hook_engine: + return + + event_data = EventData( + event_type="PreCompact", + tool_name="compaction", + tool_args={ + "agent_name": agent_name, + "session_id": session_id, + "history_len": len(message_history), + "incoming_len": len(incoming_messages), + }, + ) + + import asyncio + + async def _fire() -> None: + try: + await _hook_engine.process_event("PreCompact", event_data) + except Exception as e: + logger.error(f"Error in PreCompact hook: {e}", exc_info=True) + + try: + loop = asyncio.get_running_loop() + loop.create_task(_fire()) + except RuntimeError: + logger.debug("PreCompact fired outside an event loop; skipping") + + +async def on_notification_hook(notification_type: str, payload: Any = None) -> None: + """notification callback — fires Notification hooks (observation only).""" + if not _hook_engine: + return + + event_data = EventData( + event_type="Notification", + tool_name="notification", + tool_args={"type": notification_type, "payload": payload}, + ) + + try: + await _hook_engine.process_event("Notification", event_data) + except Exception as e: + logger.error(f"Error in Notification hook: {e}", exc_info=True) + + +# --- Registration ------------------------------------------------------------ + +register_callback("pre_tool_call", on_pre_tool_call_hook) +register_callback("post_tool_call", on_post_tool_call_hook) register_callback("startup", on_startup_hook) +register_callback("load_prompt", load_prompt_additions) +register_callback("user_prompt_submit", on_user_prompt_submit_hook) register_callback("agent_run_end", on_agent_run_end_hook) +register_callback("shutdown", on_session_end_hook) +register_callback("session_end", on_session_end_hook) +register_callback("message_history_processor_start", on_pre_compact_hook) +register_callback("notification", on_notification_hook) logger.info("Claude Code hooks plugin registered") diff --git a/code_puppy/pydantic_patches.py b/code_puppy/pydantic_patches.py index 84692d82b..d6b3424bc 100644 --- a/code_puppy/pydantic_patches.py +++ b/code_puppy/pydantic_patches.py @@ -269,10 +269,12 @@ async def _patched_call_tool( except Exception: tool_args = {"raw": call.args} - # --- pre_tool_call (with blocking support) --- - # Returns a string tool-result on block so pydantic-ai sees a clean - # "BLOCKED: ..." message and the agent can react gracefully, without - # triggering UnexpectedModelBehavior crashes. + # --- pre_tool_call (with blocking + context-injection support) --- + # On block: returns a string tool-result so pydantic-ai sees a clean + # "BLOCKED: ..." message and the agent can react gracefully. + # On ``{"inject_context": "..."}``: capture the text and prepend it + # to the real tool result so the model sees the hook's nudge. + injected_context_parts: list[str] = [] try: from code_puppy import callbacks from code_puppy.messaging import emit_warning @@ -282,11 +284,9 @@ async def _patched_call_tool( ) for callback_result in callback_results: - if ( - callback_result - and isinstance(callback_result, dict) - and callback_result.get("blocked") - ): + if not callback_result or not isinstance(callback_result, dict): + continue + if callback_result.get("blocked"): raw_reason = ( callback_result.get("error_message") or callback_result.get("reason") @@ -303,6 +303,9 @@ async def _patched_call_tool( block_msg = f"🚫 Hook blocked this tool call: {clean_reason}" emit_warning(block_msg) return f"ERROR: {block_msg}\n\nThe hook policy prevented this tool from running. Please inform the user and do not retry this specific command." + injected = callback_result.get("inject_context") + if injected: + injected_context_parts.append(str(injected)) except Exception: pass # other errors don't block tool execution @@ -318,6 +321,12 @@ async def _patched_call_tool( approved=approved, metadata=metadata, ) + # Prepend any pre-hook context so the model sees the nudge + # alongside the real tool output. Only applied to stringy + # results to avoid corrupting structured returns. + if injected_context_parts and isinstance(result, str): + header = "\n\n".join(injected_context_parts) + result = f"[hook-context]\n{header}\n\n{result}" return result except Exception as exc: error = exc diff --git a/tests/command_line/test_model_visibility_tui.py b/tests/command_line/test_model_visibility_tui.py new file mode 100644 index 000000000..18bdae64c --- /dev/null +++ b/tests/command_line/test_model_visibility_tui.py @@ -0,0 +1,182 @@ +"""Tests for visibility toggle in ModelSelectionMenu. + +These tests verify the TUI integration for hiding/showing models. +""" + +import pytest + +from code_puppy.command_line.model_picker_completion import ModelSelectionMenu +from code_puppy.item_visibility import ( + clear_visibility_config, + load_hidden_models, + save_hidden_models, + toggle_model_hidden, +) + + +class TestModelPickerVisibility: + """Tests for ModelSelectionMenu visibility integration.""" + + @pytest.fixture(autouse=True) + def preserve_user_config(self): + """Preserve user's visibility config before/after tests.""" + # Save user's config before tests + original_hidden = load_hidden_models() + + yield + + # Restore user's config after tests + clear_visibility_config() # Clear test artifacts + if original_hidden: + save_hidden_models(original_hidden) # Restore user's settings + + @pytest.fixture + def test_models(self): + """Standard test model list.""" + return ["gpt-4", "gpt-3.5-turbo", "claude-3", "claude-2", "gemini-pro"] + + def test_picker_starts_showing_all_when_no_hidden_config(self, test_models): + """Default: all models show.""" + menu = ModelSelectionMenu(model_names=test_models) + + assert menu.display_model_names == test_models + assert menu._hidden_models == set() + assert menu.show_all is False + + def test_picker_hides_configured_models(self, test_models): + """Hidden models excluded by default.""" + # Hide some models + toggle_model_hidden("gpt-3.5-turbo") + toggle_model_hidden("claude-2") + + # Create new menu (loads from config) + menu = ModelSelectionMenu(model_names=test_models) + + expected = ["gpt-4", "claude-3", "gemini-pro"] + assert menu.display_model_names == expected + assert menu._hidden_models == {"gpt-3.5-turbo", "claude-2"} + + def test_show_all_mode_reveals_hidden(self, test_models): + """A key reveals hidden with [hidden] label.""" + toggle_model_hidden("gpt-3.5-turbo") + + menu = ModelSelectionMenu(model_names=test_models) + menu.show_all = True + + # All models visible when show_all=True + assert menu.display_model_names == test_models + + def test_filter_shows_hidden_matches_dimmed(self, test_models): + """Filter text matches hidden model → appears in list.""" + toggle_model_hidden("gpt-3.5-turbo") + + menu = ModelSelectionMenu(model_names=test_models) + menu.filter_text = "gpt" # Type filter + + # With filter, hidden models still appear + assert menu.display_model_names == ["gpt-4", "gpt-3.5-turbo"] + + def test_all_hidden_shows_help_message(self, test_models): + """No visible + some hidden → helper text shown.""" + # Hide ALL models + for model in test_models: + toggle_model_hidden(model) + + menu = ModelSelectionMenu(model_names=test_models) + + # Display list should be empty + assert menu.display_model_names == [] + # But we have hidden models + assert len(menu._hidden_models) > 0 + + def test_selection_stays_valid_after_toggle(self, test_models): + """Index stays in bounds after toggling visibility.""" + toggle_model_hidden("claude-2") + + menu = ModelSelectionMenu(model_names=test_models) + + # Initial selection should be valid + assert 0 <= menu.selected_index < len(menu.display_model_names) + + # After unhiding, selection should still be valid + toggle_model_hidden("claude-2") + menu._hidden_models = load_hidden_models() + + assert 0 <= menu.selected_index < len(menu.display_model_names) + + def test_show_all_is_session_level_not_persisted(self, test_models): + """New picker → show_all=False even if previous session had it True.""" + menu1 = ModelSelectionMenu(model_names=test_models) + menu1.show_all = True + + # Create new picker + menu2 = ModelSelectionMenu(model_names=test_models) + + assert menu2.show_all is False + + def test_current_model_in_display_when_visible(self, test_models): + """If current model is not hidden, it appears in display.""" + menu = ModelSelectionMenu(model_names=test_models) + # Current model is from config (may not be in test_models) + # But if it's in the list, it should be visible + if menu.current_model in test_models: + assert menu.current_model in menu.display_model_names + + def test_current_model_selection_when_hidden(self, test_models): + """If current model is hidden, another model selected.""" + # Hide the current model (first one) + toggle_model_hidden("gpt-4") + + menu = ModelSelectionMenu(model_names=test_models) + + # Should still have valid selection + assert 0 <= menu.selected_index < len(menu.display_model_names) + # Selection should be visible model + assert menu.display_model_names[menu.selected_index] in menu.display_model_names + + +class TestPickerRendering: + """Tests for render output with visibility indicators.""" + + @pytest.fixture(autouse=True) + def preserve_user_config(self): + """Preserve user's visibility config before/after tests.""" + # Save user's config before tests + original_hidden = load_hidden_models() + + yield + + # Restore user's config after tests + clear_visibility_config() # Clear test artifacts + if original_hidden: + save_hidden_models(original_hidden) # Restore user's settings + + def test_hidden_model_gets_dim_style(self, test_models=None): + """Hidden models rendered with dim style.""" + if test_models is None: + test_models = ["gpt-4", "gpt-3.5-turbo", "claude-3"] + + toggle_model_hidden("gpt-3.5-turbo") + menu = ModelSelectionMenu(model_names=test_models) + menu.show_all = True # Show all to see hidden + + lines = menu._render() + + # Should have [hidden] tag somewhere in output + render_text = str(lines) + assert "[hidden]" in render_text.lower() or "hidden" in render_text.lower() + + def test_hidden_model_hidden_in_default_view(self): + """Hidden models not shown by default.""" + test_models = ["gpt-4", "gpt-3.5-turbo", "claude-3"] + toggle_model_hidden("gpt-3.5-turbo") + + menu = ModelSelectionMenu(model_names=test_models) + + # Should only show non-hidden + assert "gpt-3.5-turbo" not in str(menu.display_model_names) + assert "gpt-4" in str(menu.display_model_names) + + +# Make test_models fixture available +TestPickerRendering.test_hidden_model_gets_dim_style diff --git a/tests/plugins/test_claude_code_hooks_issue_298.py b/tests/plugins/test_claude_code_hooks_issue_298.py new file mode 100644 index 000000000..8f37e829e --- /dev/null +++ b/tests/plugins/test_claude_code_hooks_issue_298.py @@ -0,0 +1,292 @@ +""" +Tests for issue #298 fixes: + + 1. Hook stdout is captured and injected into the agent context + (SessionStart -> system prompt, PreToolUse -> tool result, + UserPromptSubmit -> user prompt). + 2. UserPromptSubmit, PreCompact, SessionEnd, and Notification events + are actually wired and fire when configured. + +We test the plugin's callback wiring in isolation (no real engine needed for +most cases; we mock ``_hook_engine`` and its ``process_event``). +""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from code_puppy import callbacks as cb_module + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_result(stdout: str = "", blocked: bool = False, reason: str = ""): + """Build a fake ProcessEventResult that mimics the real one closely enough.""" + execution = SimpleNamespace( + success=True, + stdout=stdout, + stderr="", + exit_code=0, + ) + return SimpleNamespace( + blocked=blocked, + blocking_reason=reason or None, + results=[execution] if stdout else [], + ) + + +# --------------------------------------------------------------------------- +# callbacks.py: new phases registered +# --------------------------------------------------------------------------- + + +class TestNewCallbackPhases: + @pytest.mark.parametrize( + "phase", + ["user_prompt_submit", "pre_compact", "session_end", "notification"], + ) + def test_phase_is_registered(self, phase): + # Should not raise; each phase must be a first-class entry in _callbacks + cb_module.register_callback(phase, lambda *a, **kw: None) + # Cleanup: remove the dummy we just registered + cb_module._callbacks[phase].pop() + + @pytest.mark.asyncio + async def test_on_user_prompt_submit_triggers(self): + seen = {} + + async def handler(prompt, **kwargs): + seen["prompt"] = prompt + seen["kwargs"] = kwargs + return "mutated" + + cb_module.register_callback("user_prompt_submit", handler) + try: + results = await cb_module.on_user_prompt_submit( + "hi", agent_name="a", session_id="s" + ) + assert results == ["mutated"] + assert seen["prompt"] == "hi" + assert seen["kwargs"]["agent_name"] == "a" + finally: + cb_module._callbacks["user_prompt_submit"].remove(handler) + + @pytest.mark.asyncio + async def test_on_session_end_triggers(self): + fired = [] + + async def handler(): + fired.append(True) + + cb_module.register_callback("session_end", handler) + try: + await cb_module.on_session_end() + assert fired == [True] + finally: + cb_module._callbacks["session_end"].remove(handler) + + @pytest.mark.asyncio + async def test_on_pre_compact_triggers(self): + received = {} + + async def handler(agent_name, session_id, history, incoming): + received.update( + agent=agent_name, sid=session_id, hl=len(history), il=len(incoming) + ) + + cb_module.register_callback("pre_compact", handler) + try: + await cb_module.on_pre_compact("agent", "sid", [1, 2], [3]) + assert received == {"agent": "agent", "sid": "sid", "hl": 2, "il": 1} + finally: + cb_module._callbacks["pre_compact"].remove(handler) + + @pytest.mark.asyncio + async def test_on_notification_triggers(self): + received = [] + + async def handler(ntype, payload): + received.append((ntype, payload)) + + cb_module.register_callback("notification", handler) + try: + await cb_module.on_notification("warning", "disk full") + assert received == [("warning", "disk full")] + finally: + cb_module._callbacks["notification"].remove(handler) + + +# --------------------------------------------------------------------------- +# claude_code_hooks plugin: stdout injection paths +# --------------------------------------------------------------------------- + + +class TestPreToolUseStdoutInjection: + @pytest.mark.asyncio + async def test_returns_inject_context_on_stdout(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock( + return_value=_make_result(stdout="remember to add a migration") + ) + + with patch.object(rc, "_hook_engine", mock_engine): + out = await rc.on_pre_tool_call_hook( + "replace_in_file", {"file_path": "x.py"} + ) + + assert out == {"inject_context": "remember to add a migration"} + + @pytest.mark.asyncio + async def test_block_still_returns_blocked_dict(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock( + return_value=_make_result(blocked=True, reason="nope") + ) + + with patch.object(rc, "_hook_engine", mock_engine): + out = await rc.on_pre_tool_call_hook("bash", {"command": "rm -rf /"}) + + assert out["blocked"] is True + assert "nope" in out["reason"] + + @pytest.mark.asyncio + async def test_no_stdout_returns_none(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock(return_value=_make_result()) + + with patch.object(rc, "_hook_engine", mock_engine): + out = await rc.on_pre_tool_call_hook("list_files", {"directory": "."}) + + assert out is None + + +class TestSessionStartStdoutInjection: + @pytest.mark.asyncio + async def test_startup_caches_stdout_and_load_prompt_returns_it(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock( + return_value=_make_result(stdout="Project constitution: be kind.") + ) + + # Reset cache so test is hermetic + rc._session_start_context.clear() + + with patch.object(rc, "_hook_engine", mock_engine): + await rc.on_startup_hook() + + try: + assert rc.load_prompt_additions() == "Project constitution: be kind." + finally: + rc._session_start_context.clear() + + def test_load_prompt_returns_none_when_cache_empty(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + rc._session_start_context.clear() + assert rc.load_prompt_additions() is None + + +class TestUserPromptSubmitHook: + @pytest.mark.asyncio + async def test_returns_inject_context_on_stdout(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock( + return_value=_make_result(stdout="Domain nudge!") + ) + + with patch.object(rc, "_hook_engine", mock_engine): + out = await rc.on_user_prompt_submit_hook("do stuff") + + assert out == {"inject_context": "Domain nudge!"} + + @pytest.mark.asyncio + async def test_blocked_prompt_returns_blocked(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock( + return_value=_make_result(blocked=True, reason="forbidden words") + ) + + with patch.object(rc, "_hook_engine", mock_engine): + out = await rc.on_user_prompt_submit_hook("leak secrets plz") + + assert out["blocked"] is True + + +# --------------------------------------------------------------------------- +# claude_code_hooks plugin: previously-missing event wiring +# --------------------------------------------------------------------------- + + +class TestPreviouslyMissingEvents: + @pytest.mark.asyncio + async def test_session_end_fires(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock(return_value=_make_result()) + + with patch.object(rc, "_hook_engine", mock_engine): + await rc.on_session_end_hook() + + mock_engine.process_event.assert_awaited_once() + event_type_arg = mock_engine.process_event.await_args.args[0] + assert event_type_arg == "SessionEnd" + + @pytest.mark.asyncio + async def test_pre_compact_fires_on_message_history_processor_start(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock(return_value=_make_result()) + + with patch.object(rc, "_hook_engine", mock_engine): + rc.on_pre_compact_hook("agent", "sid", [1], [2]) + # The hook fires an async task; yield once so it runs. + import asyncio + + await asyncio.sleep(0) + + mock_engine.process_event.assert_awaited_once() + assert mock_engine.process_event.await_args.args[0] == "PreCompact" + + @pytest.mark.asyncio + async def test_notification_fires(self): + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + mock_engine = MagicMock() + mock_engine.process_event = AsyncMock(return_value=_make_result()) + + with patch.object(rc, "_hook_engine", mock_engine): + await rc.on_notification_hook("warning", "uh oh") + + mock_engine.process_event.assert_awaited_once() + assert mock_engine.process_event.await_args.args[0] == "Notification" + + def test_all_callbacks_registered_at_import(self): + """Sanity check: the plugin wires every new event on import.""" + from code_puppy.plugins.claude_code_hooks import register_callbacks as rc + + # The functions exist (module-level) and are referenced by the right names + for fn in ( + "on_session_end_hook", + "on_pre_compact_hook", + "on_notification_hook", + "on_user_prompt_submit_hook", + "load_prompt_additions", + ): + assert callable(getattr(rc, fn)) diff --git a/tests/test_item_visibility.py b/tests/test_item_visibility.py new file mode 100644 index 000000000..b006e9262 --- /dev/null +++ b/tests/test_item_visibility.py @@ -0,0 +1,259 @@ +"""Tests for item_visibility module. + +Tests the generic VisibilityStore class and model-specific convenience functions. +""" + +import json +import os +from unittest.mock import patch + +import pytest + +from code_puppy.config import DATA_DIR +from code_puppy.item_visibility import ( + VisibilityStore, + clear_visibility_config, + is_model_hidden, + load_hidden_models, + prune_stale_entries, + save_hidden_models, + toggle_model_hidden, +) + +MODEL_VISIBILITY_CONFIG = os.path.join(DATA_DIR, "model_visibility.json") + + +class TestVisibilityStore: + """Tests for the generic VisibilityStore class.""" + + @pytest.fixture(autouse=True) + def cleanup(self, tmp_path): + """Clean up test files after each test.""" + yield + # Clean up any test visibility files + for f in tmp_path.glob("test_visibility*.json"): + f.unlink() + + @pytest.fixture + def store(self, tmp_path): + """Create a VisibilityStore with a temporary file path.""" + return VisibilityStore("test_visibility") + + def test_file_path_follows_naming_convention(self, store, tmp_path): + """Store name 'test_visibility' creates file 'test_visibility_visibility.json'.""" + assert store.name == "test_visibility" + # File path uses DATA_DIR, not tmp_path + assert "test_visibility_visibility.json" in store.file_path + + def test_load_hidden_returns_empty_when_missing(self, store): + """No file → empty set.""" + assert store.load_hidden() == set() + + def test_load_hidden_returns_empty_when_corrupt(self, store): + """Bad JSON → empty set + warning log.""" + with open(store.file_path, "w") as f: + f.write("not valid json {") + + with patch("code_puppy.item_visibility.logger") as mock_logger: + result = store.load_hidden() + assert result == set() + mock_logger.warning.assert_called() + + def test_load_hidden_returns_empty_when_key_missing(self, store): + """Valid JSON but wrong key → empty set.""" + with open(store.file_path, "w") as f: + json.dump({"wrong_key": ["item1"]}, f) + + result = store.load_hidden() + assert result == set() + + def test_load_hidden_returns_empty_when_wrong_type(self, store): + """Valid JSON but wrong value type → empty set.""" + with open(store.file_path, "w") as f: + json.dump({"hidden_test_visibilitys": "not a list"}, f) + + result = store.load_hidden() + assert result == set() + + def test_save_and_load_hidden_round_trip(self, store): + """Write set, read back, matches.""" + test_set = {"model-a", "model-b", "model-c"} + store.save_hidden(test_set) + + result = store.load_hidden() + assert result == test_set + + def test_save_creates_data_dir(self, tmp_path): + """DATA_DIR doesn't exist → creates it.""" + # Use a subdir that doesn't exist + test_store = VisibilityStore("test_visibility") + # Override file path to a non-existent directory + test_store._file_path = os.path.join( + tmp_path, "nonexistent", "deep", "test_visibility_visibility.json" + ) + + test_set = {"item1", "item2"} + test_store.save_hidden(test_set) + + assert os.path.exists(test_store.file_path) + assert test_store.load_hidden() == test_set + + def test_toggle_first_call_hides(self, store): + """Toggle visible → hidden, returns True.""" + result = store.toggle("model-x") + assert result is True + assert "model-x" in store.load_hidden() + + def test_toggle_second_call_restores(self, store): + """Toggle hidden → visible, returns False.""" + store.save_hidden({"model-y"}) + result = store.toggle("model-y") + assert result is False + assert "model-y" not in store.load_hidden() + + def test_is_hidden_true_and_false(self, store): + """Both hidden states verified.""" + store.save_hidden({"hidden-model"}) + assert store.is_hidden("hidden-model") is True + assert store.is_hidden("visible-model") is False + + def test_prune_stale_removes_gone_items(self, store): + """Item removed from config → pruned.""" + store.save_hidden({"stale-model", "valid-model"}) + + # Only 'valid-model' exists in current config + pruned = store.prune_stale(["valid-model", "another-valid"]) + + assert pruned == {"stale-model"} + assert store.load_hidden() == {"valid-model"} + + def test_prune_stale_noop_when_all_valid(self, store): + """No stale → returns empty set, no write.""" + store.save_hidden({"model-a", "model-b"}) + + # Track if save was called + with patch.object(store, "save_hidden") as mock_save: + pruned = store.prune_stale(["model-a", "model-b", "model-c"]) + assert pruned == set() + mock_save.assert_not_called() + + def test_clear_removes_file(self, store): + """File deleted, idempotent.""" + store.save_hidden({"model-x"}) + assert os.path.exists(store.file_path) + + store.clear() + + assert not os.path.exists(store.file_path) + # Calling again should not raise + store.clear() + + def test_save_empty_set_writes_valid_json(self, store): + """Saving empty set produces valid JSON.""" + store.save_hidden(set()) + + with open(store.file_path) as f: + data = json.load(f) + + assert data == {"hidden_test_visibilitys": []} + + +class TestModelVisibilityAliases: + """Tests for model-specific convenience functions.""" + + @pytest.fixture(autouse=True) + def clean_slate_restore(self): + """Clear before tests (clean slate), restore user's config after.""" + original_hidden = load_hidden_models() + clear_visibility_config() # Start with clean slate + + yield + + # Restore user's config after tests + if original_hidden: + save_hidden_models(original_hidden) + else: + clear_visibility_config() + + def test_load_hidden_models_returns_empty_when_missing(self): + """No file → empty set.""" + assert load_hidden_models() == set() + + def test_toggle_model_hidden_hides(self): + """toggle_model_hidden adds to hidden set.""" + result = toggle_model_hidden("test-model-xyz") + assert result is True + assert "test-model-xyz" in load_hidden_models() + + def test_toggle_model_hidden_restores(self): + """Second toggle removes from hidden set.""" + toggle_model_hidden("test-model-abc") + result = toggle_model_hidden("test-model-abc") + assert result is False + assert "test-model-abc" not in load_hidden_models() + + def test_is_model_hidden(self): + """is_model_hidden checks hidden state.""" + assert is_model_hidden("nonexistent") is False + toggle_model_hidden("hidden-test") + assert is_model_hidden("hidden-test") is True + + def test_prune_stale_entries(self): + """prune_stale_entries removes stale entries.""" + toggle_model_hidden("stale-entry") + toggle_model_hidden("keep-entry") + + pruned = prune_stale_entries(["keep-entry", "another-valid"]) + + assert pruned == {"stale-entry"} + assert load_hidden_models() == {"keep-entry"} + + def test_save_hidden_models(self): + """save_hidden_models persists a set.""" + test_set = {"model-a", "model-b"} + save_hidden_models(test_set) + + assert load_hidden_models() == test_set + + def test_clear_visibility_config(self): + """clear_visibility_config removes file.""" + toggle_model_hidden("test-model") + assert os.path.exists(os.path.join(DATA_DIR, "model_visibility.json")) + + clear_visibility_config() + + assert not os.path.exists(os.path.join(DATA_DIR, "model_visibility.json")) + + +class TestPersistence: + """Tests for cross-process persistence.""" + + @pytest.fixture(autouse=True) + def clean_slate_restore(self): + """Clear before tests (clean slate), restore user's config after.""" + original_hidden = load_hidden_models() + clear_visibility_config() # Start with clean slate + + yield + + # Restore user's config after tests + if original_hidden: + save_hidden_models(original_hidden) + else: + clear_visibility_config() + + def test_visibility_persists_across_process_restart(self): + """Write hidden set → new process reads same state.""" + # Simulate first "process" writing hidden models + hidden = {"persist-test-1", "persist-test-2"} + save_hidden_models(hidden) + + # Simulate second "process" reading + # (We can't actually spawn a new process, but we can verify the file) + config_path = os.path.join(DATA_DIR, "model_visibility.json") + assert os.path.exists(config_path) + + with open(config_path) as f: + data = json.load(f) + + assert set(data["hidden_models"]) == hidden