diff --git a/bin/ask b/bin/ask index f3167bff..1557ce58 100755 --- a/bin/ask +++ b/bin/ask @@ -96,6 +96,17 @@ CALLER_ENV_HINTS = { VALID_CALLERS = set(CALLER_SESSION_FILES.keys()) | {"email", "manual"} +def _caller_pane_info() -> tuple[str, str]: + """Return (pane_id, terminal_type) from the current environment.""" + wez = (os.environ.get("WEZTERM_PANE") or "").strip() + if wez: + return wez, "wezterm" + tmux = (os.environ.get("TMUX_PANE") or "").strip() + if tmux: + return tmux, "tmux" + return "", "" + + def _env_int(name: str, default: int) -> int: raw = (os.environ.get(name) or "").strip() if not raw: @@ -335,6 +346,8 @@ def _send_via_unified_daemon( print("[ERROR] Invalid askd state", file=sys.stderr) return EXIT_ERROR + caller_pane_id, caller_terminal = _caller_pane_info() + req = { "type": "ask.request", "v": 1, @@ -346,6 +359,8 @@ def _send_via_unified_daemon( "message": message, "no_wrap": no_wrap, "caller": caller, + "caller_pane_id": caller_pane_id, + "caller_terminal": caller_terminal, } # Pass email-related env vars for email caller @@ -670,6 +685,13 @@ def main(argv: list[str]) -> int: if val: email_env_lines += f'$env:{key} = "{val}"\n' + win_caller_pane_id, win_caller_terminal = _caller_pane_info() + win_pane_env_lines = "" + if win_caller_pane_id: + win_pane_env_lines += f'$env:CCB_CALLER_PANE_ID = "{win_caller_pane_id}"\n' + if win_caller_terminal: + win_pane_env_lines += f'$env:CCB_CALLER_TERMINAL = "{win_caller_terminal}"\n' + script_content = f'''$ErrorActionPreference = "SilentlyContinue" $OutputEncoding = [System.Text.Encoding]::UTF8 [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 @@ -678,7 +700,7 @@ $env:PYTHONIOENCODING = "utf-8" $env:CCB_REQ_ID = "{task_id}" $env:CCB_CALLER = "{caller}" $env:CCB_WORK_DIR = "{os.getcwd()}" -{run_dir_line}{email_env_lines}$statusFile = "{status_file_win}" +{run_dir_line}{email_env_lines}{win_pane_env_lines}$statusFile = "{status_file_win}" $logFile = "{log_file_win}" function Write-CcbStatus([string]$line) {{ Add-Content -Path $statusFile -Value ("{{0}} {{1}}" -f (Get-Date -Format "yyyy-MM-ddTHH:mm:sszzz"), $line) -Encoding UTF8 @@ -717,6 +739,13 @@ exit $rc ccb_run_dir = os.environ.get("CCB_RUN_DIR", "") run_dir_line = f'export CCB_RUN_DIR="{ccb_run_dir}"\n' if ccb_run_dir else "" + bg_pane_id, bg_terminal = _caller_pane_info() + pane_env_lines = "" + if bg_pane_id: + pane_env_lines += f'export CCB_CALLER_PANE_ID="{bg_pane_id}"\n' + if bg_terminal: + pane_env_lines += f'export CCB_CALLER_TERMINAL="{bg_terminal}"\n' + quoted_status = shlex.quote(str(status_file)) quoted_ask_cmd = shlex.quote(ask_cmd) quoted_provider = shlex.quote(provider) @@ -730,7 +759,7 @@ echo "[CCB_TASK_START] task={task_id} provider={provider} caller={caller} pid=$$ export CCB_REQ_ID="{task_id}" export CCB_CALLER="{caller}" export CCB_WORK_DIR="{os.getcwd()}" -{run_dir_line}{email_env_lines}python3 {quoted_ask_cmd} {quoted_provider} --foreground --timeout {timeout} <<'ASKEOF' +{run_dir_line}{email_env_lines}{pane_env_lines}python3 {quoted_ask_cmd} {quoted_provider} --foreground --timeout {timeout} <<'ASKEOF' {message} ASKEOF rc=$? diff --git a/bin/ccb-completion-hook b/bin/ccb-completion-hook index 95d1ca14..3d3bdeb0 100755 --- a/bin/ccb-completion-hook +++ b/bin/ccb-completion-hook @@ -25,6 +25,8 @@ import json import os import subprocess import sys +import tempfile +from datetime import datetime from pathlib import Path # Add lib directory to path for imports @@ -61,6 +63,32 @@ def env_float(name: str, default: float) -> float: return default +def _debug_enabled() -> bool: + return env_bool("CCB_COMPLETION_HOOK_DEBUG", True) + + +def _debug_log_path() -> Path: + raw = os.environ.get("CCB_COMPLETION_HOOK_DEBUG_LOG", "").strip() + if raw: + return Path(raw).expanduser() + return Path(tempfile.gettempdir()) / "ccb-completion-hook.debug.log" + + +def _debug_log(message: str) -> None: + if not _debug_enabled(): + return + try: + path = _debug_log_path() + path.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z") + line = f"{ts} pid={os.getpid()} {message}\n" + with open(path, "a", encoding="utf-8") as fh: + fh.write(line) + print(f"[completion-hook-debug] {message}", file=sys.stderr) + except Exception: + pass + + def load_session_file(session_path: Path) -> dict: """Load session file with utf-8-sig encoding (handles BOM from PowerShell).""" try: @@ -139,31 +167,91 @@ def find_wezterm_cli() -> str | None: return None +def _env_true(name: str) -> bool: + return (os.environ.get(name, "").strip().lower() in {"1", "true", "yes", "on"}) + + +def _wezterm_cli_base_args(wezterm_bin: str) -> list[str]: + args = [wezterm_bin, "cli"] + wezterm_class = (os.environ.get("CODEX_WEZTERM_CLASS") or os.environ.get("WEZTERM_CLASS") or "").strip() + if wezterm_class: + args.extend(["--class", wezterm_class]) + if _env_true("CODEX_WEZTERM_PREFER_MUX"): + args.append("--prefer-mux") + if _env_true("CODEX_WEZTERM_NO_AUTO_START"): + args.append("--no-auto-start") + return args + + +def _send_wezterm_enter(base_args: list[str], pane_id: str) -> bool: + variants = ["Enter", "Return", "enter"] + for variant in variants: + result = subprocess.run( + [*base_args, "send-key", "--pane-id", pane_id, "--key", variant], + capture_output=True, + timeout=2, + ) + if result.returncode == 0: + return True + result = subprocess.run( + [*base_args, "send-key", "--pane-id", pane_id, variant], + capture_output=True, + timeout=2, + ) + if result.returncode == 0: + return True + return False + + def send_via_wezterm(pane_id: str, message: str, session_data: dict) -> bool: - """Send message to WezTerm pane using wezterm cli with stdin for large payloads.""" + """Send message to WezTerm pane and auto-submit reliably.""" try: wezterm = find_wezterm_cli() if not wezterm: return False - # Use stdin to avoid command line length limits on Windows - # wezterm cli send-text --pane-id --no-paste reads from stdin if no text arg + import time + + base_args = _wezterm_cli_base_args(wezterm) + payload = message.replace("\r", "").rstrip("\n") + if not payload: + return False + + # Preferred path: paste full content, then send an explicit Enter key event. + # Some TUIs don't treat pasted newlines as submit. result = subprocess.run( - [wezterm, "cli", "send-text", "--pane-id", pane_id, "--no-paste"], - input=message.encode("utf-8"), + [*base_args, "send-text", "--pane-id", pane_id], + input=payload.encode("utf-8"), capture_output=True, timeout=10 ) + _debug_log(f"wezterm send-text pane={pane_id!r} rc={result.returncode}") if result.returncode == 0: - # Send Enter after text - subprocess.run( - [wezterm, "cli", "send-text", "--pane-id", pane_id, "--no-paste"], + if _send_wezterm_enter(base_args, pane_id): + _debug_log(f"wezterm submit via send-key pane={pane_id!r} rc=0") + return True + # Older wezterm may lack send-key; fallback to sending CR byte. + submit = subprocess.run( + [*base_args, "send-text", "--pane-id", pane_id, "--no-paste"], input=b"\r", capture_output=True, - timeout=5 + timeout=5, ) - return True + _debug_log(f"wezterm submit via no-paste pane={pane_id!r} rc={submit.returncode}") + return submit.returncode == 0 + + # Fallback: no-paste with trailing CR for non-TUI panes. + time.sleep(0.1) + fallback = subprocess.run( + [*base_args, "send-text", "--pane-id", pane_id, "--no-paste"], + input=(payload + "\r").encode("utf-8"), + capture_output=True, + timeout=5 + ) + _debug_log(f"wezterm fallback no-paste pane={pane_id!r} rc={fallback.returncode}") + return fallback.returncode == 0 except Exception: + _debug_log(f"wezterm exception pane={pane_id!r}") pass return False @@ -173,6 +261,27 @@ def send_via_tmux(pane_id: str, message: str) -> bool: try: import time + def _send_enter() -> bool: + key_variants = ("Enter", "Return", "C-m") + max_retries = 3 + enter_delay = env_float("CCB_TMUX_ENTER_DELAY", 0.5) + retry_delay = env_float("CCB_TMUX_ENTER_RETRY_DELAY", 0.08) + for attempt in range(max_retries): + if enter_delay: + time.sleep(enter_delay) + for key in key_variants: + key_result = subprocess.run( + ["tmux", "send-keys", "-t", pane_id, key], + capture_output=True, + timeout=5, + ) + _debug_log(f"tmux send-keys pane={pane_id!r} key={key!r} rc={key_result.returncode}") + if key_result.returncode == 0: + return True + if retry_delay and attempt < max_retries - 1: + time.sleep(retry_delay) + return False + # Ensure pane is not in copy mode try: mode_result = subprocess.run( @@ -198,6 +307,7 @@ def send_via_tmux(pane_id: str, message: str) -> bool: capture_output=True, timeout=5 ) + _debug_log(f"tmux load-buffer pane={pane_id!r} rc={load_result.returncode}") if load_result.returncode != 0: return False try: @@ -207,15 +317,10 @@ def send_via_tmux(pane_id: str, message: str) -> bool: capture_output=True, timeout=5 ) + _debug_log(f"tmux paste-buffer pane={pane_id!r} rc={paste_result.returncode}") if paste_result.returncode == 0: - # Send Enter after paste (wait 1s for stability) - time.sleep(1.0) - subprocess.run( - ["tmux", "send-keys", "-t", pane_id, "Enter"], - capture_output=True, - timeout=5 - ) - return True + # Send Enter after paste to submit automatically. + return _send_enter() finally: # Clean up buffer subprocess.run( @@ -224,6 +329,7 @@ def send_via_tmux(pane_id: str, message: str) -> bool: timeout=2 ) except Exception: + _debug_log(f"tmux exception pane={pane_id!r}") pass return False @@ -398,6 +504,11 @@ def main() -> int: reply_content = args.reply or "" done_seen = env_bool("CCB_DONE_SEEN", True) status = normalize_completion_status(os.environ.get("CCB_COMPLETION_STATUS", ""), done_seen=done_seen) + _debug_log( + "start " + f"provider={provider} caller={caller} req_id={args.req_id or 'unknown'} " + f"status={status} done_seen={done_seen}" + ) if not reply_content: reply_content = default_reply_for_status(status, done_seen=done_seen) @@ -428,84 +539,98 @@ def main() -> int: status=status, ) - # Find caller's pane_id from session file - session_files = { - "claude": ".claude-session", - "codex": ".codex-session", - "gemini": ".gemini-session", - "opencode": ".opencode-session", - "droid": ".droid-session", - } - session_filename = session_files.get(caller, ".claude-session") - - # Search for session file in multiple locations (order matters - most specific first) - work_dir = os.environ.get("CCB_WORK_DIR", "") - search_paths: list[Path] = [] - seen_paths: set[str] = set() - config_dirnames = (".ccb", ".ccb_config") - - def add_search_path(path: Path | None) -> None: - if not path: - return - p = Path(path) - key = str(p) - if key in seen_paths: - return - seen_paths.add(key) - search_paths.append(p) - - # 1. Request's work_dir (most specific), with upward lookup for nested dirs. - if work_dir: - try: - add_search_path(find_project_session_file(Path(work_dir), session_filename)) - except Exception: - pass - - # 2. Current working directory (fallback), with upward lookup. - cwd = os.getcwd() - if cwd != work_dir: - try: - add_search_path(find_project_session_file(Path(cwd), session_filename)) - except Exception: - pass - - # 3. User's home-based locations - for dirname in config_dirnames: - add_search_path(Path.home() / ".local" / "share" / "codex-dual" / dirname / session_filename) - - # 4. On Windows, also check LOCALAPPDATA - if os.name == "nt": - localappdata = os.environ.get("LOCALAPPDATA", "") - if localappdata: - for dirname in reversed(config_dirnames): - p = Path(localappdata) / "codex-dual" / dirname / session_filename - key = str(p) - if key in seen_paths: - continue - seen_paths.add(key) - search_paths.insert(0, p) + # Direct pane routing: when the caller's pane ID was captured at ask-time and + # threaded through the chain, use it directly. This is necessary because multiple + # Claude Code instances in the same project share one session file, so the session + # file lookup would route to whichever instance wrote last — not the one that asked. + direct_pane_id = os.environ.get("CCB_CALLER_PANE_ID", "").strip() + direct_terminal = os.environ.get("CCB_CALLER_TERMINAL", "").strip() pane_id = None terminal = "tmux" # default session_data = {} - found_session_path = None - - for session_path in search_paths: - if session_path.exists(): - session_data = load_session_file(session_path) - pane_id = session_data.get("pane_id") - terminal = session_data.get("terminal", "tmux") - if pane_id: - found_session_path = session_path - # Validate: check if session's work_dir matches request's work_dir - session_work_dir = session_data.get("work_dir", "") - if work_dir and session_work_dir and not _work_dirs_compatible(str(session_work_dir), str(work_dir)): - # Session file is for a different project, skip it - pane_id = None - continue - break + + if direct_pane_id: + pane_id = direct_pane_id + terminal = direct_terminal or "tmux" + _debug_log(f"routing direct pane={pane_id!r} terminal={terminal}") + else: + # Fallback: find caller's pane_id from session file + session_files = { + "claude": ".claude-session", + "codex": ".codex-session", + "gemini": ".gemini-session", + "opencode": ".opencode-session", + "droid": ".droid-session", + } + session_filename = session_files.get(caller, ".claude-session") + + work_dir = os.environ.get("CCB_WORK_DIR", "") + search_paths: list[Path] = [] + seen_paths: set[str] = set() + config_dirnames = (".ccb", ".ccb_config") + + def add_search_path(path: Path | None) -> None: + if not path: + return + p = Path(path) + key = str(p) + if key in seen_paths: + return + seen_paths.add(key) + search_paths.append(p) + + # 1. Request's work_dir (most specific), with upward lookup for nested dirs. + if work_dir: + try: + add_search_path(find_project_session_file(Path(work_dir), session_filename)) + except Exception: + pass + + # 2. Current working directory (fallback), with upward lookup. + cwd = os.getcwd() + if cwd != work_dir: + try: + add_search_path(find_project_session_file(Path(cwd), session_filename)) + except Exception: + pass + + # 3. User's home-based locations + for dirname in config_dirnames: + add_search_path(Path.home() / ".local" / "share" / "codex-dual" / dirname / session_filename) + + # 4. On Windows, also check LOCALAPPDATA + if os.name == "nt": + localappdata = os.environ.get("LOCALAPPDATA", "") + if localappdata: + for dirname in reversed(config_dirnames): + p = Path(localappdata) / "codex-dual" / dirname / session_filename + key = str(p) + if key in seen_paths: + continue + seen_paths.add(key) + search_paths.insert(0, p) + + for session_path in search_paths: + if session_path.exists(): + session_data = load_session_file(session_path) + pane_id = session_data.get("pane_id") + terminal = session_data.get("terminal", "tmux") + if pane_id: + # Validate: check if session's work_dir matches request's work_dir + session_work_dir = session_data.get("work_dir", "") + if work_dir and session_work_dir and not _work_dirs_compatible(str(session_work_dir), str(work_dir)): + pane_id = None + _debug_log( + "session mismatch " + f"session={session_path} session_work_dir={session_work_dir!r} request_work_dir={work_dir!r}" + ) + continue + _debug_log(f"routing session pane={pane_id!r} terminal={terminal} session={session_path}") + break if not pane_id: + _debug_log("no pane found, falling back to ask --notify") # No pane_id found, try ask command timeout = env_float("CCB_COMPLETION_HOOK_TIMEOUT", 10.0) ask_cmd = find_ask_command() @@ -532,12 +657,16 @@ def main() -> int: timeout=timeout, env=env, ) + _debug_log(f"ask --notify fallback rc={result.returncode}") return result.returncode except Exception: + _debug_log("ask --notify fallback exception") return 0 # Send directly via terminal backend - if send_via_terminal(pane_id, message, terminal, session_data): + terminal_ok = send_via_terminal(pane_id, message, terminal, session_data) + _debug_log(f"direct terminal send pane={pane_id!r} terminal={terminal} ok={terminal_ok}") + if terminal_ok: return 0 # Fallback to ask command if terminal send failed @@ -558,14 +687,16 @@ def main() -> int: env=env, ) else: - subprocess.run( + fallback_result = subprocess.run( [ask_cmd, caller, "--notify", "--no-wrap", message], capture_output=True, text=True, timeout=timeout, env=env, ) + _debug_log(f"post-failure ask --notify rc={fallback_result.returncode if 'fallback_result' in locals() else 'n/a'}") except Exception: + _debug_log("post-failure ask --notify exception") pass return 0 diff --git a/lib/askd/adapters/base.py b/lib/askd/adapters/base.py index 99ac7c9f..f9bc6ba1 100644 --- a/lib/askd/adapters/base.py +++ b/lib/askd/adapters/base.py @@ -30,6 +30,9 @@ class ProviderRequest: email_from: str = "" # Multi-instance support: optional instance identifier (e.g., 'auth', 'payment') instance: Optional[str] = None + # Caller pane ID for direct routing back to the originating terminal pane + caller_pane_id: str = "" + caller_terminal: str = "" @dataclass diff --git a/lib/askd/adapters/claude.py b/lib/askd/adapters/claude.py index 54d00ec4..72aeb0c6 100644 --- a/lib/askd/adapters/claude.py +++ b/lib/askd/adapters/claude.py @@ -587,6 +587,8 @@ def _finalize_result(self, result: ProviderResult, req: ProviderRequest, task: Q email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) def _postprocess_reply(self, req: ProviderRequest, reply: str) -> str: diff --git a/lib/askd/adapters/codebuddy.py b/lib/askd/adapters/codebuddy.py index bb65eb85..d9fb9419 100644 --- a/lib/askd/adapters/codebuddy.py +++ b/lib/askd/adapters/codebuddy.py @@ -224,6 +224,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/adapters/codex.py b/lib/askd/adapters/codex.py index 44446911..18043436 100644 --- a/lib/askd/adapters/codex.py +++ b/lib/askd/adapters/codex.py @@ -329,6 +329,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) return result diff --git a/lib/askd/adapters/copilot.py b/lib/askd/adapters/copilot.py index 6c414ac1..be692972 100644 --- a/lib/askd/adapters/copilot.py +++ b/lib/askd/adapters/copilot.py @@ -224,6 +224,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/adapters/droid.py b/lib/askd/adapters/droid.py index 0c1e7958..25070233 100644 --- a/lib/askd/adapters/droid.py +++ b/lib/askd/adapters/droid.py @@ -224,6 +224,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/adapters/gemini.py b/lib/askd/adapters/gemini.py index dce6cb9e..58ba7bd6 100644 --- a/lib/askd/adapters/gemini.py +++ b/lib/askd/adapters/gemini.py @@ -284,6 +284,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/adapters/opencode.py b/lib/askd/adapters/opencode.py index 0f1586db..3f23476f 100644 --- a/lib/askd/adapters/opencode.py +++ b/lib/askd/adapters/opencode.py @@ -238,6 +238,8 @@ def _handle_task_locked(self, task: QueuedTask, session: Any, session_key: str, email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/adapters/qwen.py b/lib/askd/adapters/qwen.py index 4007136e..63f9383d 100644 --- a/lib/askd/adapters/qwen.py +++ b/lib/askd/adapters/qwen.py @@ -224,6 +224,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: email_msg_id=req.email_msg_id, email_from=req.email_from, work_dir=req.work_dir, + caller_pane_id=req.caller_pane_id, + caller_terminal=req.caller_terminal, ) result = ProviderResult( diff --git a/lib/askd/daemon.py b/lib/askd/daemon.py index 3c69d896..bcad84ae 100644 --- a/lib/askd/daemon.py +++ b/lib/askd/daemon.py @@ -169,6 +169,8 @@ def _handle_request(self, msg: dict) -> dict: email_req_id=str(msg.get("email_req_id") or ""), email_msg_id=str(msg.get("email_msg_id") or ""), email_from=str(msg.get("email_from") or ""), + caller_pane_id=str(msg.get("caller_pane_id") or ""), + caller_terminal=str(msg.get("caller_terminal") or ""), ) except Exception as exc: return { diff --git a/lib/completion_hook.py b/lib/completion_hook.py index d4fa9a4b..964693ae 100644 --- a/lib/completion_hook.py +++ b/lib/completion_hook.py @@ -89,6 +89,8 @@ def _run_hook_async( work_dir: str = "", done_seen: bool = True, status: str = COMPLETION_STATUS_COMPLETED, + caller_pane_id: str = "", + caller_terminal: str = "", ) -> None: """Run the completion hook in a background thread.""" if not env_bool("CCB_COMPLETION_HOOK_ENABLED", True): @@ -143,6 +145,10 @@ def _run(): # Pass work_dir for session file lookup if work_dir: env["CCB_WORK_DIR"] = work_dir + if caller_pane_id: + env["CCB_CALLER_PANE_ID"] = caller_pane_id + if caller_terminal: + env["CCB_CALLER_TERMINAL"] = caller_terminal # Pass reply via stdin to avoid command line length limits # Use longer timeout for SMTP retries (3 retries * 8s max backoff + send time) @@ -174,6 +180,8 @@ def notify_completion( email_from: str = "", work_dir: str = "", status: str | None = None, + caller_pane_id: str = "", + caller_terminal: str = "", ) -> None: """ Notify the caller that a CCB delegation task has completed. @@ -204,4 +212,6 @@ def notify_completion( work_dir, done_seen, normalized_status, + caller_pane_id, + caller_terminal, ) diff --git a/test/test_stability_regressions.py b/test/test_stability_regressions.py index 38f538d4..7c2bcfbf 100644 --- a/test/test_stability_regressions.py +++ b/test/test_stability_regressions.py @@ -3,6 +3,7 @@ import importlib.util import json import threading +from types import SimpleNamespace from importlib.machinery import SourceFileLoader from pathlib import Path @@ -53,6 +54,82 @@ def test_completion_hook_manual_caller_is_noop(monkeypatch) -> None: assert hook.main() == 0 +def test_completion_hook_wezterm_fallback_honors_send_failure(monkeypatch) -> None: + hook = _load_script_module("ccb_completion_hook_wezterm", REPO_ROOT / "bin" / "ccb-completion-hook") + + monkeypatch.setattr(hook, "find_wezterm_cli", lambda: "wezterm") + + calls: list[list[str]] = [] + + def _fake_run(cmd, **kwargs): + calls.append(cmd) + return SimpleNamespace(returncode=1, stdout="", stderr="err") + + monkeypatch.setattr(hook.subprocess, "run", _fake_run) + + ok = hook.send_via_wezterm("pane-1", "hello", {}) + + assert ok is False + assert len(calls) == 2 + assert "send-text" in calls[0] + assert "--no-paste" in calls[1] + + +def test_completion_hook_wezterm_send_key_fallbacks_to_cr(monkeypatch) -> None: + hook = _load_script_module("ccb_completion_hook_wezterm_submit", REPO_ROOT / "bin" / "ccb-completion-hook") + + monkeypatch.setattr(hook, "find_wezterm_cli", lambda: "wezterm") + + calls: list[list[str]] = [] + + def _fake_run(cmd, **kwargs): + calls.append(cmd) + if "send-key" in cmd: + return SimpleNamespace(returncode=1, stdout="", stderr="") + if "--no-paste" in cmd and kwargs.get("input") == b"\r": + return SimpleNamespace(returncode=0, stdout="", stderr="") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + monkeypatch.setattr(hook.subprocess, "run", _fake_run) + + ok = hook.send_via_wezterm("pane-1", "hello", {}) + + assert ok is True + assert any("send-key" in call for call in calls) + assert any("--no-paste" in call for call in calls) + + +def test_completion_hook_tmux_enter_retries_with_variants(monkeypatch) -> None: + hook = _load_script_module("ccb_completion_hook_tmux", REPO_ROOT / "bin" / "ccb-completion-hook") + + key_calls: list[str] = [] + + def _fake_run(cmd, **kwargs): + if cmd[:3] == ["tmux", "display-message", "-p"]: + return SimpleNamespace(returncode=0, stdout="0", stderr="") + if cmd[:3] == ["tmux", "load-buffer", "-b"]: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if cmd[:3] == ["tmux", "paste-buffer", "-p"]: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if cmd[:3] == ["tmux", "send-keys", "-t"]: + key = cmd[-1] + key_calls.append(key) + rc = 0 if key == "Return" else 1 + return SimpleNamespace(returncode=rc, stdout="", stderr="") + if cmd[:3] == ["tmux", "delete-buffer", "-b"]: + return SimpleNamespace(returncode=0, stdout="", stderr="") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + monkeypatch.setattr(hook.subprocess, "run", _fake_run) + monkeypatch.setenv("CCB_TMUX_ENTER_DELAY", "0") + monkeypatch.setenv("CCB_TMUX_ENTER_RETRY_DELAY", "0") + + ok = hook.send_via_tmux("%1", "hello") + + assert ok is True + assert key_calls[:2] == ["Enter", "Return"] + + def test_maybe_start_unified_daemon_honors_autostart_opt_out(monkeypatch, tmp_path: Path) -> None: ask = _load_script_module("ask_script_opt_out", REPO_ROOT / "bin" / "ask") popen_calls: list[dict] = []