Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions src/vaultctl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import builtins
import json
import re
import sys
Expand Down Expand Up @@ -205,6 +206,7 @@ def list_cmd(vctx: VaultContext, filter_pattern: str | None) -> None:
@click.option(
"--prompt", "use_prompt", is_flag=True, default=False, help="Read pattern from stdin (avoids shell history)."
)
@click.option("--context", "-c", is_flag=True, default=False, help="Show parent object context for each match.")
@pass_ctx
def search(
vctx: VaultContext,
Expand All @@ -213,6 +215,7 @@ def search(
show_match: bool,
fixed_string: bool,
use_prompt: bool,
context: bool,
) -> None:
"""Search vault for keys whose values match PATTERN (regex or fixed string).

Expand Down Expand Up @@ -260,7 +263,9 @@ def search(
sys.exit(1)

try:
matches = search_values(data, pattern, include_values=show_match, fixed_string=fixed_string)
matches = search_values(
data, pattern, include_values=show_match, include_context=context, fixed_string=fixed_string
)
except (re.error, ValueError):
click.echo("Error: Invalid regex pattern. Check syntax.", err=True)
sys.exit(1)
Expand All @@ -274,7 +279,10 @@ def search(
err=True,
)

_print_search_results(matches, show_match=show_match)
if context:
_print_context_results(matches, show_match=show_match)
else:
_print_search_results(matches, show_match=show_match)


def _print_search_results(matches: list[SearchMatch], *, show_match: bool) -> None:
Expand All @@ -288,6 +296,57 @@ def _print_search_results(matches: list[SearchMatch], *, show_match: bool) -> No
click.echo(line)


def _print_context_results(matches: list[SearchMatch], *, show_match: bool) -> None:
"""Format and print search results grouped by parent context."""
# Collect matched field names per (key, parent_path) for highlighting
groups: dict[tuple[str, str], list[SearchMatch]] = {}
for match in matches:
group_key = (match.key, match.parent_path)
groups.setdefault(group_key, []).append(match)

first = True
for (top_key, parent_path), group_matches in groups.items():
if not first:
click.echo()
first = False

# Build the full header path
header = f"{top_key}.{parent_path}" if parent_path else top_key

ctx = group_matches[0].context
if ctx is None:
# No parent context (top-level string match) -- fall back to flat display
for match in group_matches:
line = f" {match.key}.{match.path}" if match.path else f" {match.key}"
if show_match and match.value is not None:
display = match.value if len(match.value) <= 80 else match.value[:77] + "..."
line += f" = {display}"
click.echo(line)
continue

click.echo(f"{header}:")

# Determine which field names were matched
matched_fields: builtins.set[str] = builtins.set()
for m in group_matches:
# The field name is the last segment of the path after parent_path
suffix = m.path[len(parent_path) :].lstrip(".") if m.path and parent_path else m.path
matched_fields.add(suffix)

for field_name, field_value in ctx.items():
if field_name in matched_fields:
# Show matched field value (truncated)
if show_match:
display_val = str(field_value)
else:
sval = str(field_value)
display_val = sval[:4] + "..." if len(sval) > 4 else sval
else:
# Redact non-matched fields unless --show-match
display_val = str(field_value) if show_match else "****"
click.echo(f" {field_name}: {display_val}")


@main.command()
@click.argument("key")
@click.option("--field", default=None, help="Access a specific field of a structured entry.")
Expand Down
26 changes: 25 additions & 1 deletion src/vaultctl/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import re
from collections.abc import Callable
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any

# Maximum recursion depth to prevent runaway traversal.
Expand All @@ -36,6 +36,8 @@ class SearchMatch:
key: str
path: str = ""
value: str | None = None
parent_path: str = ""
context: dict[str, Any] | None = field(default=None, hash=False, compare=False)


def _compile_pattern(pattern: str, *, fixed_string: bool = False, flags: int = 0) -> Callable[[str], bool]:
Expand Down Expand Up @@ -82,6 +84,7 @@ def search_values(
pattern: str,
*,
include_values: bool = False,
include_context: bool = False,
max_depth: int = MAX_DEPTH,
fixed_string: bool = False,
) -> list[SearchMatch]:
Expand All @@ -96,6 +99,8 @@ def search_values(
include_values: If True, populate ``SearchMatch.value`` with the
matched string. **Security-sensitive** -- caller must gate this
behind explicit user consent.
include_context: If True, populate ``SearchMatch.context`` with the
parent dict of the matched value and set ``parent_path``.
max_depth: Maximum nesting depth for recursive traversal.
fixed_string: If True, use literal substring matching instead of regex.

Expand All @@ -117,6 +122,9 @@ def search_values(
matcher=matcher,
matches=matches,
include_values=include_values,
include_context=include_context,
parent_dict=None,
parent_path_prefix="",
depth=0,
max_depth=max_depth,
)
Expand All @@ -132,6 +140,9 @@ def _search_node(
matcher: Callable[[str], bool],
matches: list[SearchMatch],
include_values: bool,
include_context: bool,
parent_dict: dict[str, Any] | None,
parent_path_prefix: str,
depth: int,
max_depth: int,
) -> None:
Expand All @@ -141,11 +152,18 @@ def _search_node(

if isinstance(node, str):
if matcher(node):
ctx: dict[str, Any] | None = None
p_path = ""
if include_context and parent_dict is not None:
ctx = dict(parent_dict)
p_path = parent_path_prefix
matches.append(
SearchMatch(
key=top_key,
path=current_path,
value=node if include_values else None,
parent_path=p_path,
context=ctx,
)
)
elif isinstance(node, dict):
Expand All @@ -158,6 +176,9 @@ def _search_node(
matcher=matcher,
matches=matches,
include_values=include_values,
include_context=include_context,
parent_dict=node,
parent_path_prefix=current_path,
depth=depth + 1,
max_depth=max_depth,
)
Expand All @@ -171,6 +192,9 @@ def _search_node(
matcher=matcher,
matches=matches,
include_values=include_values,
include_context=include_context,
parent_dict=parent_dict,
parent_path_prefix=parent_path_prefix,
depth=depth + 1,
max_depth=max_depth,
)
Expand Down
27 changes: 27 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,30 @@ def test_search_pattern_too_long(runner, cli_env):
result = runner.invoke(main, ["search", long_pattern])
assert result.exit_code == 1
assert "too long" in result.output


def test_search_context_shows_parent_object(runner, cli_env):
"""--context should show the parent dict with redacted sibling fields."""
result = runner.invoke(main, ["search", "admin", "--context"])
assert result.exit_code == 0
# Should show the parent path as header
assert "db_creds:" in result.output
# Matched field should show truncated value
assert "admi..." in result.output
# Non-matched fields should be redacted
assert "****" in result.output


def test_search_context_with_show_match(runner, cli_env):
"""--context --show-match should show all field values in cleartext."""
result = runner.invoke(main, ["search", "admin", "--context", "--show-match"])
assert result.exit_code == 0
assert "s3cret" in result.output
assert "****" not in result.output


def test_search_context_top_level_string(runner, cli_env):
"""--context on a top-level string match falls back to flat display."""
result = runner.invoke(main, ["search", "test_value", "--context"])
assert result.exit_code == 0
assert "test_key" in result.output
82 changes: 82 additions & 0 deletions tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,85 @@ def test_search_match_is_frozen(self) -> None:
match = SearchMatch(key="test", path="a.b", value="val")
with pytest.raises(AttributeError):
match.key = "changed" # type: ignore[misc]


class TestSearchContext:
"""Tests for include_context parameter in search_values."""

def test_context_contains_parent_dict(self) -> None:
"""With include_context, matches should include the parent dict."""
data = {
"creds": {
"type": "usernamePassword",
"username": "admin",
"password": "s3cret",
}
}
matches = search_values(data, "admin", include_context=True)
assert len(matches) == 1
assert matches[0].context is not None
assert matches[0].context["type"] == "usernamePassword"
assert matches[0].context["username"] == "admin"
assert matches[0].context["password"] == "s3cret"

def test_context_parent_path(self) -> None:
"""parent_path should be the path to the parent dict, not the field."""
data = {"jenkins": {"domains": [{"credentials": [{"username": "deploy", "token": "tok123"}]}]}}
matches = search_values(data, "tok123", include_context=True)
assert len(matches) == 1
assert matches[0].parent_path == "domains[0].credentials[0]"
assert matches[0].path == "domains[0].credentials[0].token"

def test_context_not_included_by_default(self) -> None:
"""Without include_context, context should be None."""
data = {"creds": {"username": "admin", "password": "s3cret"}}
matches = search_values(data, "admin")
assert len(matches) == 1
assert matches[0].context is None
assert matches[0].parent_path == ""

def test_context_top_level_string_no_parent(self) -> None:
"""Top-level string values have no parent dict, context should be None."""
data = {"api_key": "secret123"}
matches = search_values(data, "secret", include_context=True)
assert len(matches) == 1
assert matches[0].context is None

def test_context_nested_list_of_dicts(self) -> None:
"""Context works for matches inside lists of dicts."""
data = {
"entries": [
{"id": "first", "value": "aaa"},
{"id": "second", "value": "bbb"},
]
}
matches = search_values(data, "bbb", include_context=True)
assert len(matches) == 1
assert matches[0].context is not None
assert matches[0].context["id"] == "second"
assert matches[0].parent_path == "[1]"

def test_context_multiple_matches_same_parent(self) -> None:
"""Multiple matches in the same parent dict share the same parent_path."""
data = {
"creds": {
"username": "admin",
"password": "admin123",
}
}
matches = search_values(data, "admin", include_context=True)
assert len(matches) == 2
# Both share the same parent path (the creds dict itself)
for m in matches:
assert m.parent_path == ""
assert m.context is not None
assert "username" in m.context
assert "password" in m.context

def test_context_with_include_values(self) -> None:
"""include_context and include_values can be combined."""
data = {"creds": {"username": "admin", "password": "s3cret"}}
matches = search_values(data, "admin", include_context=True, include_values=True)
assert len(matches) == 1
assert matches[0].value == "admin"
assert matches[0].context is not None
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading