Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ctxd login # Prompt for an API key and store it
ctxd status # Check whether an API key is configured
ctxd install-app # Open the app installation page
ctxd search "text:deployment"
ctxd search text:test application:slack
ctxd search application:slack text:test
ctxd fetch doc-123
ctxd profile
ctxd logout # Remove the stored API key
Expand All @@ -54,7 +54,7 @@ from ctxd import Client

client = Client(api_key="<api-key>")

results = client.search("text:deployment application:slack")
results = client.search("application:slack text:deployment")
profile = client.get_profile()
document = client.fetch_document("doc-123")
```
Expand All @@ -65,7 +65,7 @@ API key example:
from ctxd import Client

client = Client(api_key="<api-key>")
results = client.search("text:deployment application:slack")
results = client.search("application:slack text:deployment")
```

Async example:
Expand Down
155 changes: 130 additions & 25 deletions src/ctxd/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import os
import re
import shlex
import sys
import webbrowser
from typing import Sequence
Expand All @@ -31,7 +32,9 @@ def main(argv: Sequence[str] | None = None) -> int:
client = Client()

if args.command == "search":
result = client.search(_normalize_search_query(args.query))
query = _normalize_search_query(args.query)
_validate_search_query(query)
result = client.search(query)
return _emit_result(result.model_dump(), as_json=True)
if args.command == "fetch":
result = client.fetch_document(args.document_uid)
Expand All @@ -58,7 +61,7 @@ def _build_parser() -> argparse.ArgumentParser:
"Examples:\n"
" ctxd login\n"
" ctxd install-app\n"
" ctxd search text:deployment application:slack\n"
" ctxd search application:slack text:deployment\n"
" ctxd fetch doc-123 --json"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
Expand Down Expand Up @@ -112,20 +115,27 @@ def _build_parser() -> argparse.ArgumentParser:
help="Search indexed app content and print JSON results.",
description=(
"Search indexed app content using ctxd DSL. Search output is always JSON. "
"The query can be quoted or passed as separate tokens."
"The query can be passed as separate tokens."
),
epilog=(
"Examples:\n"
" ctxd search text:deployment application:slack\n"
' ctxd search "text:deployment application:slack"'
" ctxd search text:deployment\n"
" ctxd search application:slack text:deployment\n"
"\n"
"Query format:\n"
" application:<app> text:<term>\n"
" application:<app> is optional; omit it to search all connected apps.\n"
" If application:<app> is present, it must be the first token.\n"
" Only one application filter is supported.\n"
" Use one text term; multi-word, quoted, and parenthesized text values are not supported."
Comment on lines +122 to +130

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fostiropoulos lmk what you think

),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
search_parser.add_argument(
"query",
nargs="+",
metavar="QUERY",
help="Search query or DSL tokens, for example: text:deployment application:slack.",
help="Search query tokens, for example: application:slack text:deployment.",
)

fetch_parser = subparsers.add_parser(
Expand Down Expand Up @@ -155,29 +165,116 @@ def _build_parser() -> argparse.ArgumentParser:


def _normalize_search_query(query_tokens: Sequence[str]) -> str:
normalized_tokens = [
_quote_shell_stripped_text_token(token) for token in query_tokens
]
return " ".join(normalized_tokens)
return " ".join(query_tokens)


def _quote_shell_stripped_text_token(token: str) -> str:
if not token.lower().startswith("text:"):
return token
def _validate_search_query(query: str) -> None:
_validate_raw_text_filters(query)
tokens = _split_search_query(query)
_validate_application_filters(tokens)
_validate_boolean_operators(tokens)
_validate_text_filters(tokens)
_validate_application_scoped_search_shape(tokens)

value = token[5:]
if not value or not re.search(r"\s", value):
return token

stripped_value = value.strip()
if (
stripped_value.startswith(("\"", "'", "("))
or stripped_value.endswith(("\"", "'", ")"))
):
return token
def _split_search_query(query: str) -> list[str]:
try:
return shlex.split(query)
except ValueError:
return query.split()


def _validate_raw_text_filters(query: str) -> None:
if re.search(r"(?i)(?:^|\s)text:[\"'()]", query):
raise ValueError(
"Invalid search query: quoted and parenthesized text values are not supported. "
"Use a single text term, for example text:incident."
)


def _validate_application_filters(tokens: Sequence[str]) -> None:
application_positions: list[int] = []
for index, token in enumerate(tokens):
if not token.lower().startswith("application:"):
continue

value = token[len("application:") :]
if not value:
raise ValueError(
"Invalid search query: application: requires an app name, for example application:slack."
)
if value.startswith("("):
raise ValueError(
"Invalid search query: grouped application filters are not supported. "
"Use one leading application filter, for example application:google_drive text:onboarding."
)
application_positions.append(index)

if not application_positions:
return
if application_positions[0] != 0:
raise ValueError(
"Invalid search query: application: must be the first token, "
"for example application:slack text:deployment."
)
if len(application_positions) > 1:
raise ValueError(
"Invalid search query: only one application filter is supported. "
"Omit application: to search all connected apps."
)


escaped_value = stripped_value.replace("\\", "\\\\").replace('"', '\\"')
return f'text:"{escaped_value}"'
def _validate_text_filters(tokens: Sequence[str]) -> None:
text_positions: list[int] = []
for index, token in enumerate(tokens):
if not token.lower().startswith("text:"):
continue

text_positions.append(index)
value = token[len("text:") :]
if value.startswith(("\"", "'", "(")) or value.endswith(("\"", "'", ")")):
raise ValueError(
"Invalid search query: quoted and parenthesized text values are not supported. "
"Use a single text term, for example text:incident."
)
if re.search(r"\s", value):
raise ValueError(
"Invalid search query: multi-word text values are not supported by the current DSL. "
"Use a single text term, for example text:incident."
)
if index + 1 < len(tokens) and not tokens[index + 1].lower().startswith(
("application:", "text:")
):
raise ValueError(
"Invalid search query: multi-word text values are not supported by the current DSL. "
"Use a single text term, for example text:incident."
)

if len(text_positions) > 1:
raise ValueError(
"Invalid search query: only one text filter is supported. "
"Use a single text term, for example text:incident."
)


def _validate_boolean_operators(tokens: Sequence[str]) -> None:
operators = {"AND", "OR"}
for token in tokens:
if token in operators:
raise ValueError(
"Invalid search query: AND/OR clauses are not supported. "
"Use application:<app> text:<term>, or omit application:<app> to search all apps."
)


def _validate_application_scoped_search_shape(tokens: Sequence[str]) -> None:
if not tokens or not tokens[0].lower().startswith("application:"):
return
if len(tokens) != 2 or not tokens[1].lower().startswith("text:"):
raise ValueError(
"Invalid search query: application:<app> must be followed by one text:<term> filter, "
"for example application:slack text:deployment."
)


def _handle_login(args: argparse.Namespace) -> int:
Expand Down Expand Up @@ -311,6 +408,14 @@ def _emit_result(payload: dict, *, as_json: bool) -> int:


def _payload_has_error(payload: dict) -> bool:
if "results" in payload:
return bool(payload.get("error") or payload.get("dsl_parse_error"))
if payload.get("error"):
return True
return bool(payload.get("dsl_parse_error"))
if payload.get("dsl_parse_error"):
return True
return "error" in payload and _document_payload_is_unresolved(payload)


def _document_payload_is_unresolved(payload: dict) -> bool:
return not any(payload.get(key) for key in ("id", "title", "text", "url"))
Loading
Loading