From 3e4521e66cf47971186e59804f169aee0720d9d5 Mon Sep 17 00:00:00 2001 From: Fred Thiele <8555720+f3rdy@users.noreply.github.com> Date: Sat, 14 Mar 2026 11:27:03 +0100 Subject: [PATCH] feat(search): add --context flag to show parent object of matches When searching vault values, --context displays the parent dict of each match with sibling fields redacted by default. Combine with --show-match to reveal all field values. Matches are grouped by parent path. --- src/vaultctl/cli.py | 63 ++++++++++++++++++++++++++++++-- src/vaultctl/search.py | 26 +++++++++++++- tests/test_cli.py | 27 ++++++++++++++ tests/test_search.py | 82 ++++++++++++++++++++++++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 196 insertions(+), 4 deletions(-) diff --git a/src/vaultctl/cli.py b/src/vaultctl/cli.py index be4a6ae..e2203f1 100644 --- a/src/vaultctl/cli.py +++ b/src/vaultctl/cli.py @@ -2,6 +2,7 @@ from __future__ import annotations +import builtins import json import re import sys @@ -205,6 +206,7 @@ def list_cmd(vctx: VaultContext, filter_pattern: str | None) -> None: @click.option( "--prompt", "use_prompt", is_flag=True, default=False, help="Read pattern from stdin (avoids shell history)." ) +@click.option("--context", "-c", is_flag=True, default=False, help="Show parent object context for each match.") @pass_ctx def search( vctx: VaultContext, @@ -213,6 +215,7 @@ def search( show_match: bool, fixed_string: bool, use_prompt: bool, + context: bool, ) -> None: """Search vault for keys whose values match PATTERN (regex or fixed string). @@ -260,7 +263,9 @@ def search( sys.exit(1) try: - matches = search_values(data, pattern, include_values=show_match, fixed_string=fixed_string) + matches = search_values( + data, pattern, include_values=show_match, include_context=context, fixed_string=fixed_string + ) except (re.error, ValueError): click.echo("Error: Invalid regex pattern. Check syntax.", err=True) sys.exit(1) @@ -274,7 +279,10 @@ def search( err=True, ) - _print_search_results(matches, show_match=show_match) + if context: + _print_context_results(matches, show_match=show_match) + else: + _print_search_results(matches, show_match=show_match) def _print_search_results(matches: list[SearchMatch], *, show_match: bool) -> None: @@ -288,6 +296,57 @@ def _print_search_results(matches: list[SearchMatch], *, show_match: bool) -> No click.echo(line) +def _print_context_results(matches: list[SearchMatch], *, show_match: bool) -> None: + """Format and print search results grouped by parent context.""" + # Collect matched field names per (key, parent_path) for highlighting + groups: dict[tuple[str, str], list[SearchMatch]] = {} + for match in matches: + group_key = (match.key, match.parent_path) + groups.setdefault(group_key, []).append(match) + + first = True + for (top_key, parent_path), group_matches in groups.items(): + if not first: + click.echo() + first = False + + # Build the full header path + header = f"{top_key}.{parent_path}" if parent_path else top_key + + ctx = group_matches[0].context + if ctx is None: + # No parent context (top-level string match) -- fall back to flat display + for match in group_matches: + line = f" {match.key}.{match.path}" if match.path else f" {match.key}" + if show_match and match.value is not None: + display = match.value if len(match.value) <= 80 else match.value[:77] + "..." + line += f" = {display}" + click.echo(line) + continue + + click.echo(f"{header}:") + + # Determine which field names were matched + matched_fields: builtins.set[str] = builtins.set() + for m in group_matches: + # The field name is the last segment of the path after parent_path + suffix = m.path[len(parent_path) :].lstrip(".") if m.path and parent_path else m.path + matched_fields.add(suffix) + + for field_name, field_value in ctx.items(): + if field_name in matched_fields: + # Show matched field value (truncated) + if show_match: + display_val = str(field_value) + else: + sval = str(field_value) + display_val = sval[:4] + "..." if len(sval) > 4 else sval + else: + # Redact non-matched fields unless --show-match + display_val = str(field_value) if show_match else "****" + click.echo(f" {field_name}: {display_val}") + + @main.command() @click.argument("key") @click.option("--field", default=None, help="Access a specific field of a structured entry.") diff --git a/src/vaultctl/search.py b/src/vaultctl/search.py index b49f44c..2d71b9e 100644 --- a/src/vaultctl/search.py +++ b/src/vaultctl/search.py @@ -9,7 +9,7 @@ import re from collections.abc import Callable -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any # Maximum recursion depth to prevent runaway traversal. @@ -36,6 +36,8 @@ class SearchMatch: key: str path: str = "" value: str | None = None + parent_path: str = "" + context: dict[str, Any] | None = field(default=None, hash=False, compare=False) def _compile_pattern(pattern: str, *, fixed_string: bool = False, flags: int = 0) -> Callable[[str], bool]: @@ -82,6 +84,7 @@ def search_values( pattern: str, *, include_values: bool = False, + include_context: bool = False, max_depth: int = MAX_DEPTH, fixed_string: bool = False, ) -> list[SearchMatch]: @@ -96,6 +99,8 @@ def search_values( include_values: If True, populate ``SearchMatch.value`` with the matched string. **Security-sensitive** -- caller must gate this behind explicit user consent. + include_context: If True, populate ``SearchMatch.context`` with the + parent dict of the matched value and set ``parent_path``. max_depth: Maximum nesting depth for recursive traversal. fixed_string: If True, use literal substring matching instead of regex. @@ -117,6 +122,9 @@ def search_values( matcher=matcher, matches=matches, include_values=include_values, + include_context=include_context, + parent_dict=None, + parent_path_prefix="", depth=0, max_depth=max_depth, ) @@ -132,6 +140,9 @@ def _search_node( matcher: Callable[[str], bool], matches: list[SearchMatch], include_values: bool, + include_context: bool, + parent_dict: dict[str, Any] | None, + parent_path_prefix: str, depth: int, max_depth: int, ) -> None: @@ -141,11 +152,18 @@ def _search_node( if isinstance(node, str): if matcher(node): + ctx: dict[str, Any] | None = None + p_path = "" + if include_context and parent_dict is not None: + ctx = dict(parent_dict) + p_path = parent_path_prefix matches.append( SearchMatch( key=top_key, path=current_path, value=node if include_values else None, + parent_path=p_path, + context=ctx, ) ) elif isinstance(node, dict): @@ -158,6 +176,9 @@ def _search_node( matcher=matcher, matches=matches, include_values=include_values, + include_context=include_context, + parent_dict=node, + parent_path_prefix=current_path, depth=depth + 1, max_depth=max_depth, ) @@ -171,6 +192,9 @@ def _search_node( matcher=matcher, matches=matches, include_values=include_values, + include_context=include_context, + parent_dict=parent_dict, + parent_path_prefix=parent_path_prefix, depth=depth + 1, max_depth=max_depth, ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 82eb549..04ed4be 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -451,3 +451,30 @@ def test_search_pattern_too_long(runner, cli_env): result = runner.invoke(main, ["search", long_pattern]) assert result.exit_code == 1 assert "too long" in result.output + + +def test_search_context_shows_parent_object(runner, cli_env): + """--context should show the parent dict with redacted sibling fields.""" + result = runner.invoke(main, ["search", "admin", "--context"]) + assert result.exit_code == 0 + # Should show the parent path as header + assert "db_creds:" in result.output + # Matched field should show truncated value + assert "admi..." in result.output + # Non-matched fields should be redacted + assert "****" in result.output + + +def test_search_context_with_show_match(runner, cli_env): + """--context --show-match should show all field values in cleartext.""" + result = runner.invoke(main, ["search", "admin", "--context", "--show-match"]) + assert result.exit_code == 0 + assert "s3cret" in result.output + assert "****" not in result.output + + +def test_search_context_top_level_string(runner, cli_env): + """--context on a top-level string match falls back to flat display.""" + result = runner.invoke(main, ["search", "test_value", "--context"]) + assert result.exit_code == 0 + assert "test_key" in result.output diff --git a/tests/test_search.py b/tests/test_search.py index 15575ef..041818a 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -264,3 +264,85 @@ def test_search_match_is_frozen(self) -> None: match = SearchMatch(key="test", path="a.b", value="val") with pytest.raises(AttributeError): match.key = "changed" # type: ignore[misc] + + +class TestSearchContext: + """Tests for include_context parameter in search_values.""" + + def test_context_contains_parent_dict(self) -> None: + """With include_context, matches should include the parent dict.""" + data = { + "creds": { + "type": "usernamePassword", + "username": "admin", + "password": "s3cret", + } + } + matches = search_values(data, "admin", include_context=True) + assert len(matches) == 1 + assert matches[0].context is not None + assert matches[0].context["type"] == "usernamePassword" + assert matches[0].context["username"] == "admin" + assert matches[0].context["password"] == "s3cret" + + def test_context_parent_path(self) -> None: + """parent_path should be the path to the parent dict, not the field.""" + data = {"jenkins": {"domains": [{"credentials": [{"username": "deploy", "token": "tok123"}]}]}} + matches = search_values(data, "tok123", include_context=True) + assert len(matches) == 1 + assert matches[0].parent_path == "domains[0].credentials[0]" + assert matches[0].path == "domains[0].credentials[0].token" + + def test_context_not_included_by_default(self) -> None: + """Without include_context, context should be None.""" + data = {"creds": {"username": "admin", "password": "s3cret"}} + matches = search_values(data, "admin") + assert len(matches) == 1 + assert matches[0].context is None + assert matches[0].parent_path == "" + + def test_context_top_level_string_no_parent(self) -> None: + """Top-level string values have no parent dict, context should be None.""" + data = {"api_key": "secret123"} + matches = search_values(data, "secret", include_context=True) + assert len(matches) == 1 + assert matches[0].context is None + + def test_context_nested_list_of_dicts(self) -> None: + """Context works for matches inside lists of dicts.""" + data = { + "entries": [ + {"id": "first", "value": "aaa"}, + {"id": "second", "value": "bbb"}, + ] + } + matches = search_values(data, "bbb", include_context=True) + assert len(matches) == 1 + assert matches[0].context is not None + assert matches[0].context["id"] == "second" + assert matches[0].parent_path == "[1]" + + def test_context_multiple_matches_same_parent(self) -> None: + """Multiple matches in the same parent dict share the same parent_path.""" + data = { + "creds": { + "username": "admin", + "password": "admin123", + } + } + matches = search_values(data, "admin", include_context=True) + assert len(matches) == 2 + # Both share the same parent path (the creds dict itself) + for m in matches: + assert m.parent_path == "" + assert m.context is not None + assert "username" in m.context + assert "password" in m.context + + def test_context_with_include_values(self) -> None: + """include_context and include_values can be combined.""" + data = {"creds": {"username": "admin", "password": "s3cret"}} + matches = search_values(data, "admin", include_context=True, include_values=True) + assert len(matches) == 1 + assert matches[0].value == "admin" + assert matches[0].context is not None diff --git a/uv.lock b/uv.lock index 0163156..0e1e980 100644 --- a/uv.lock +++ b/uv.lock @@ -1059,7 +1059,7 @@ wheels = [ [[package]] name = "vaultctl" -version = "0.6.0" +version = "0.9.0" source = { editable = "." } dependencies = [ { name = "click" },