diff --git a/cc_config.py b/cc_config.py index f377e0b..355d310 100644 --- a/cc_config.py +++ b/cc_config.py @@ -20,6 +20,7 @@ "verbose": False, "thinking": False, "thinking_budget": 10000, + "thinking_mode": "loud", # "loud" = think-out-loud with tags "custom_base_url": "", # for "custom" provider "max_tool_output": 32000, "max_agent_depth": 3, diff --git a/context.py b/context.py index 6f838a6..43228af 100644 --- a/context.py +++ b/context.py @@ -7,6 +7,18 @@ from memory import get_memory_context +_THINK_OUT_LOUD_PROMPT = """ +# Think-Out-Loud Mode + +Wrap your internal reasoning in `<""" + """thinking>...` XML tags. +These are displayed to the user in italic but stripped from your context in future turns. + +Use thinking for: planning, analyzing code, debugging, weighing options. +Keep it focused -- commit to an approach, avoid loops. +You can interleave thinking blocks with visible text freely. +Do NOT wrap simple responses in thinking. +""" + # ── Prompt injection detection ─────────────────────────────────────────── _THREAT_PATTERNS = [ re.compile(r'ignore\s+(previous|all|above|prior)(\s+\w+)*\s+(instructions?|prompts?|rules?)', re.I), diff --git a/tests/test_thinking_parser.py b/tests/test_thinking_parser.py new file mode 100644 index 0000000..9759441 --- /dev/null +++ b/tests/test_thinking_parser.py @@ -0,0 +1,148 @@ +"""Tests for thinking_parser module.""" +import sys, os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from thinking_parser import ThinkingStreamParser, LoopDetector, strip_thinking_tags, OPEN_TAG, CLOSE_TAG + +# Build test strings using the tag constants to avoid XML transport issues +def _wrap(inner): + return OPEN_TAG + inner + CLOSE_TAG + + +class TestThinkingStreamParser: + def test_no_thinking_tags(self): + p = ThinkingStreamParser() + events = p.feed("Hello world") + events += p.finalize() + texts = "".join(t for k, t in events if k == "text") + assert texts == "Hello world" + assert not any(k == "thinking" for k, _ in events) + + def test_basic_thinking_block(self): + p = ThinkingStreamParser() + events = p.feed(_wrap("reasoning") + "answer") + events += p.finalize() + thinking = "".join(t for k, t in events if k == "thinking") + text = "".join(t for k, t in events if k == "text") + assert thinking == "reasoning" + assert text == "answer" + + def test_multiple_blocks(self): + p = ThinkingStreamParser() + events = p.feed(_wrap("A") + "X" + _wrap("B") + "Y") + events += p.finalize() + thinking = "".join(t for k, t in events if k == "thinking") + text = "".join(t for k, t in events if k == "text") + assert thinking == "AB" + assert text == "XY" + + def test_partial_open_tag_across_chunks(self): + p = ThinkingStreamParser() + all_events = [] + # Split the open tag across two chunks + all_events += p.feed("before" + OPEN_TAG[:4]) + all_events += p.feed(OPEN_TAG[4:] + "inside" + CLOSE_TAG + "after") + all_events += p.finalize() + thinking = "".join(t for k, t in all_events if k == "thinking") + text = "".join(t for k, t in all_events if k == "text") + assert thinking == "inside" + assert text == "beforeafter" + + def test_partial_close_tag_across_chunks(self): + p = ThinkingStreamParser() + all_events = [] + all_events += p.feed(OPEN_TAG + "inside" + CLOSE_TAG[:6]) + all_events += p.feed(CLOSE_TAG[6:] + "outside") + all_events += p.finalize() + thinking = "".join(t for k, t in all_events if k == "thinking") + text = "".join(t for k, t in all_events if k == "text") + assert thinking == "inside" + assert text == "outside" + + def test_unclosed_tag_flushed_on_finalize(self): + p = ThinkingStreamParser() + events = p.feed(OPEN_TAG + "unclosed") + events += p.finalize() + thinking = "".join(t for k, t in events if k == "thinking") + assert thinking == "unclosed" + + def test_text_before_thinking(self): + p = ThinkingStreamParser() + events = p.feed("before" + _wrap("during") + "after") + events += p.finalize() + assert events == [ + ("text", "before"), + ("thinking", "during"), + ("text", "after"), + ] + + def test_empty_thinking_block(self): + p = ThinkingStreamParser() + events = p.feed(_wrap("") + "text") + events += p.finalize() + text = "".join(t for k, t in events if k == "text") + assert text == "text" + + def test_character_by_character(self): + p = ThinkingStreamParser() + full = _wrap("abc") + "xyz" + events = [] + for ch in full: + events += p.feed(ch) + events += p.finalize() + thinking = "".join(t for k, t in events if k == "thinking") + text = "".join(t for k, t in events if k == "text") + assert thinking == "abc" + assert text == "xyz" + + def test_newlines_preserved(self): + p = ThinkingStreamParser() + events = p.feed(_wrap("line1\nline2\n")) + events += p.finalize() + thinking = "".join(t for k, t in events if k == "thinking") + assert thinking == "line1\nline2\n" + + def test_full_thinking_text_property(self): + p = ThinkingStreamParser() + p.feed(_wrap("part1") + "gap" + _wrap("part2")) + assert p.full_thinking_text == "part1part2" + + +class TestLoopDetector: + def test_no_repetition(self): + d = LoopDetector() + assert not d.feed("Normal text without any repeating patterns at all here.") + + def test_short_text(self): + d = LoopDetector() + assert not d.feed("abc" * 10) + + def test_long_repeating_pattern(self): + d = LoopDetector() + pattern = "I need to analyze this carefully now. ok " * 10 + assert d.feed(pattern) + + def test_incremental_detection(self): + d = LoopDetector() + pattern = "I keep repeating this same thought!! " + for _ in range(6): + assert not d.feed(pattern) + for _ in range(6): + if d.feed(pattern): + return + assert False, "Should have detected loop" + + +class TestStripThinkingTags: + def test_basic(self): + assert strip_thinking_tags(_wrap("r") + "answer") == "answer" + + def test_multiline(self): + result = strip_thinking_tags(_wrap("line1\nline2\n") + "answer") + assert result == "answer" + + def test_multiple_blocks(self): + assert strip_thinking_tags(_wrap("a") + "X" + _wrap("b") + "Y") == "XY" + + def test_no_tags(self): + assert strip_thinking_tags("just text") == "just text" diff --git a/thinking_parser.py b/thinking_parser.py new file mode 100644 index 0000000..6e1e32c --- /dev/null +++ b/thinking_parser.py @@ -0,0 +1,119 @@ +"""Incremental parser for thinking XML tags in text stream, with loop detection.""" + +import re + +OPEN_TAG = "<" + "thinking>" +CLOSE_TAG = "" +_THINKING_RE = re.compile(r"<" + r"thinking>.*?", re.DOTALL) + + +class ThinkingStreamParser: + """Parses a stream of text chunks, separating thinking blocks from regular text.""" + + def __init__(self): + self._buffer = "" + self._in_thinking = False + self._thinking_text: list[str] = [] + + def feed(self, chunk: str) -> list[tuple[str, str]]: + """Feed a chunk. Returns list of ("text", content) or ("thinking", content) tuples.""" + self._buffer += chunk + results: list[tuple[str, str]] = [] + + while self._buffer: + if self._in_thinking: + end_idx = self._buffer.find(CLOSE_TAG) + if end_idx != -1: + thinking_content = self._buffer[:end_idx] + if thinking_content: + results.append(("thinking", thinking_content)) + self._thinking_text.append(thinking_content) + self._buffer = self._buffer[end_idx + len(CLOSE_TAG):] + self._in_thinking = False + else: + for i in range(1, min(len(CLOSE_TAG), len(self._buffer) + 1)): + if CLOSE_TAG[:i] == self._buffer[-i:]: + emit = self._buffer[:-i] + if emit: + results.append(("thinking", emit)) + self._thinking_text.append(emit) + self._buffer = self._buffer[-i:] + return results + results.append(("thinking", self._buffer)) + self._thinking_text.append(self._buffer) + self._buffer = "" + else: + start_idx = self._buffer.find(OPEN_TAG) + if start_idx != -1: + text_before = self._buffer[:start_idx] + if text_before: + results.append(("text", text_before)) + self._buffer = self._buffer[start_idx + len(OPEN_TAG):] + self._in_thinking = True + else: + for i in range(1, min(len(OPEN_TAG), len(self._buffer) + 1)): + if OPEN_TAG[:i] == self._buffer[-i:]: + emit = self._buffer[:-i] + if emit: + results.append(("text", emit)) + self._buffer = self._buffer[-i:] + return results + results.append(("text", self._buffer)) + self._buffer = "" + + return results + + def finalize(self) -> list[tuple[str, str]]: + """Flush remaining buffer.""" + results: list[tuple[str, str]] = [] + if self._buffer: + kind = "thinking" if self._in_thinking else "text" + results.append((kind, self._buffer)) + if self._in_thinking: + self._thinking_text.append(self._buffer) + self._buffer = "" + return results + + @property + def full_thinking_text(self) -> str: + return "".join(self._thinking_text) + + @property + def in_thinking(self) -> bool: + return self._in_thinking + + +class LoopDetector: + """Detects repetitive patterns in a stream of text.""" + + WINDOW_SIZE = 2000 + PATTERN_LENGTHS = list(range(20, 51)) + list(range(55, 201, 5)) + MIN_REPEATS = 8 + + def __init__(self): + self._window = "" + + def feed(self, text: str) -> bool: + """Feed text. Returns True if a loop is detected.""" + self._window += text + if len(self._window) > self.WINDOW_SIZE: + self._window = self._window[-self.WINDOW_SIZE:] + if len(self._window) < 200: + return False + return any(self._check_pattern(plen) for plen in self.PATTERN_LENGTHS) + + def _check_pattern(self, pattern_length: int) -> bool: + needed = pattern_length * self.MIN_REPEATS + if len(self._window) < needed: + return False + tail = self._window[-needed:] + pattern = tail[:pattern_length] + return all( + tail[i : i + pattern_length] == pattern + for i in range(pattern_length, needed, pattern_length) + ) + + +def strip_thinking_tags(content: str) -> str: + """Remove all thinking blocks from content.""" + return _THINKING_RE.sub("", content).strip()