Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
75e6671
v1.3.1:完善tavily协作
GuDaStudio Feb 4, 2026
3297e2c
v1.3.2:允许grok返回英文
GuDaStudio Feb 4, 2026
cc7f175
v1.3.3:完善README
GuDaStudio Feb 11, 2026
c211ab3
v1.3.4:允许传入model参数
GuDaStudio Feb 13, 2026
8976747
v1.3.5:可通过环境变量设置grok模型
GuDaStudio Feb 13, 2026
b502273
v1.3.6:web_fetch新增Firecrawl托底机制。Tavily提取失败时自动降级到Firecrawl Scrape,支持空m…
GuDaStudio Feb 15, 2026
f9d318f
v1.4.0:web_search多信源并行搜索升级。检测到Tavily/Firecrawl API Key时自动作为额外参考信源并行搜索…
GuDaStudio Feb 15, 2026
db82f06
v1.4.1:重构 web_search 工具描述与参数。更新描述以明确搜索功能,移除冗余细节。简化查询参数注释,提升清晰度。
GuDaStudio Feb 15, 2026
a5e6022
v1.4.2:将HTTP客户端超时时间延长至90秒以适配Tavily和Firecrawl搜索;更新额外资源章节标题以提升表述清晰度。
GuDaStudio Feb 15, 2026
1dfe1dd
fix:修正Firecrawl search响应结果提取路径为data.web,与API文档结构一致。
GuDaStudio Feb 15, 2026
66a55d3
v1.5.0:新增search_planning结构化搜索规划工具。基于Thoughtbox设计模式,引导LLM在搜索前完成意图分析→复杂…
GuDaStudio Feb 15, 2026
fb4233f
v1.5.1:基于真实调用反馈优化search_planning引导精度。SubQuery.boundary强制声明兄弟子查询互斥关系;S…
GuDaStudio Feb 15, 2026
29bdbd1
v1.5.2:IntentOutput新增unverified_terms字段,引导LLM显式标记外部分类/排名术语(如CCF-A、For…
GuDaStudio Feb 15, 2026
f96a02f
v1.6.0:增强web_search功能,新增URL提取与描述特性。引入extract_unique_urls工具从搜索结果中采集独立U…
GuDaStudio Feb 16, 2026
30c3e2f
v1.7.0:依赖于grok-4.20-beta的高质量回答,区分回答正文和信源,允许LLM单独调用工具提取信源
GuDaStudio Feb 17, 2026
01d30be
v1.7.1:调整grok搜搜提示词
GuDaStudio Feb 17, 2026
b06389f
v1.7.2:修复search_prompt触发Grok安全过滤的问题。重写搜索提示词移除越狱特征语句;修正search方法中search…
GuDaStudio Feb 28, 2026
ea31102
feat: 支持通过 GROK_SSL_VERIFY 环境变量禁用 SSL 证书验证
c00134744 Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
499 changes: 134 additions & 365 deletions README.md

Large diffs are not rendered by default.

525 changes: 133 additions & 392 deletions docs/README_EN.md

Large diffs are not rendered by default.

Binary file added images/wgrok.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/wogrok.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
60 changes: 53 additions & 7 deletions src/grok_search/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def __new__(cls):
def config_file(self) -> Path:
if self._config_file is None:
config_dir = Path.home() / ".config" / "grok-search"
config_dir.mkdir(parents=True, exist_ok=True)
try:
config_dir.mkdir(parents=True, exist_ok=True)
except OSError:
config_dir = Path.cwd() / ".grok-search"
config_dir.mkdir(parents=True, exist_ok=True)
self._config_file = config_dir / "config.json"
return self._config_file

Expand Down Expand Up @@ -81,30 +85,68 @@ def grok_api_key(self) -> str:

@property
def tavily_enabled(self) -> bool:
return os.getenv("TAVILY_ENABLED", "false").lower() in ("true", "1", "yes")
return os.getenv("TAVILY_ENABLED", "true").lower() in ("true", "1", "yes")

@property
def tavily_api_url(self) -> str:
return os.getenv("TAVILY_API_URL", "https://api.tavily.com")

@property
def tavily_api_key(self) -> str | None:
return os.getenv("TAVILY_API_KEY")

@property
def firecrawl_api_url(self) -> str:
return os.getenv("FIRECRAWL_API_URL", "https://api.firecrawl.dev/v2")

@property
def firecrawl_api_key(self) -> str | None:
return os.getenv("FIRECRAWL_API_KEY")

@property
def log_level(self) -> str:
return os.getenv("GROK_LOG_LEVEL", "INFO").upper()

@property
def ssl_verify_enabled(self) -> bool:
"""是否启用 SSL 证书验证,默认启用。设置为 false 可跳过验证(适用于内网自签名证书)"""
return os.getenv("GROK_SSL_VERIFY", "true").lower() not in ("false", "0", "no")

@property
def log_dir(self) -> Path:
log_dir_str = os.getenv("GROK_LOG_DIR", "logs")
if Path(log_dir_str).is_absolute():
return Path(log_dir_str)
user_log_dir = Path.home() / ".config" / "grok-search" / log_dir_str
user_log_dir.mkdir(parents=True, exist_ok=True)
return user_log_dir
log_dir = Path(log_dir_str)
if log_dir.is_absolute():
return log_dir

home_log_dir = Path.home() / ".config" / "grok-search" / log_dir_str
try:
home_log_dir.mkdir(parents=True, exist_ok=True)
return home_log_dir
except OSError:
pass

cwd_log_dir = Path.cwd() / log_dir_str
try:
cwd_log_dir.mkdir(parents=True, exist_ok=True)
return cwd_log_dir
except OSError:
pass

tmp_log_dir = Path("/tmp") / "grok-search" / log_dir_str
tmp_log_dir.mkdir(parents=True, exist_ok=True)
return tmp_log_dir

@property
def grok_model(self) -> str:
if self._cached_model is not None:
return self._cached_model

env_model = os.getenv("GROK_MODEL")
if env_model:
self._cached_model = env_model
return env_model

config_data = self._load_config_file()
file_model = config_data.get("model")
if file_model:
Expand Down Expand Up @@ -145,9 +187,13 @@ def get_config_info(self) -> dict:
"GROK_MODEL": self.grok_model,
"GROK_DEBUG": self.debug_enabled,
"GROK_LOG_LEVEL": self.log_level,
"GROK_SSL_VERIFY": self.ssl_verify_enabled,
"GROK_LOG_DIR": str(self.log_dir),
"TAVILY_API_URL": self.tavily_api_url,
"TAVILY_ENABLED": self.tavily_enabled,
"TAVILY_API_KEY": self._mask_api_key(self.tavily_api_key) if self.tavily_api_key else "未配置",
"FIRECRAWL_API_URL": self.firecrawl_api_url,
"FIRECRAWL_API_KEY": self._mask_api_key(self.firecrawl_api_key) if self.firecrawl_api_key else "未配置",
"config_status": config_status
}

Expand Down
24 changes: 13 additions & 11 deletions src/grok_search/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@
from pathlib import Path
from .config import config

LOG_DIR = config.log_dir
LOG_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOG_DIR / f"grok_search_{datetime.now().strftime('%Y%m%d')}.log"

logger = logging.getLogger("grok_search")
logger.setLevel(getattr(logging, config.log_level))

file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
file_handler.setLevel(getattr(logging, config.log_level))
logger.setLevel(getattr(logging, config.log_level, logging.INFO))

formatter = logging.Formatter(
_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)
try:
log_dir = config.log_dir
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"grok_search_{datetime.now().strftime('%Y%m%d')}.log"

file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(getattr(logging, config.log_level, logging.INFO))
file_handler.setFormatter(_formatter)
logger.addHandler(file_handler)
except OSError:
logger.addHandler(logging.NullHandler())

async def log_info(ctx, message: str, is_debug: bool = False):
if is_debug:
Expand Down
167 changes: 167 additions & 0 deletions src/grok_search/planning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from pydantic import BaseModel, Field
from typing import Optional, Literal
import uuid


class IntentOutput(BaseModel):
core_question: str = Field(description="Distilled core question in one sentence")
query_type: Literal["factual", "comparative", "exploratory", "analytical"] = Field(
description="factual=single answer, comparative=A vs B, exploratory=broad understanding, analytical=deep reasoning"
)
time_sensitivity: Literal["realtime", "recent", "historical", "irrelevant"] = Field(
description="realtime=today, recent=days/weeks, historical=months+, irrelevant=timeless"
)
domain: Optional[str] = Field(default=None, description="Specific domain if identifiable")
premise_valid: Optional[bool] = Field(default=None, description="False if the question contains a flawed assumption")
ambiguities: Optional[list[str]] = Field(default=None, description="Unresolved ambiguities that may affect search direction")
unverified_terms: Optional[list[str]] = Field(
default=None,
description="External classifications, rankings, or taxonomies that may be incomplete or outdated "
"in training data (e.g., 'CCF-A', 'Fortune 500', 'OWASP Top 10'). "
"Each should become a prerequisite sub-query in Phase 3."
)


class ComplexityOutput(BaseModel):
level: Literal[1, 2, 3] = Field(
description="1=simple (1-2 searches), 2=moderate (3-5 searches), 3=complex (6+ searches)"
)
estimated_sub_queries: int = Field(ge=1, le=20)
estimated_tool_calls: int = Field(ge=1, le=50)
justification: str


class SubQuery(BaseModel):
id: str = Field(description="Unique identifier (e.g., 'sq1')")
goal: str
expected_output: str = Field(description="What a successful result looks like")
tool_hint: Optional[str] = Field(default=None, description="Suggested tool: web_search | web_fetch | web_map")
boundary: str = Field(description="What this sub-query explicitly excludes — MUST state mutual exclusion with sibling sub-queries, not just the broader domain")
depends_on: Optional[list[str]] = Field(default=None, description="IDs of prerequisite sub-queries")


class SearchTerm(BaseModel):
term: str = Field(description="Search query string. MUST be ≤8 words. Drop redundant synonyms (e.g., use 'RAG' not 'RAG retrieval augmented generation').")
purpose: str = Field(description="Single sub-query ID this term serves (e.g., 'sq2'). ONE term per sub-query — do NOT combine like 'sq1+sq2'.")
round: int = Field(ge=1, description="Execution round: 1=broad discovery, 2+=targeted follow-up refined by round 1 findings")


class StrategyOutput(BaseModel):
approach: Literal["broad_first", "narrow_first", "targeted"] = Field(
description="broad_first=wide then narrow, narrow_first=precise then expand, targeted=known-item"
)
search_terms: list[SearchTerm]
fallback_plan: Optional[str] = Field(default=None, description="Fallback if primary searches fail")


class ToolPlanItem(BaseModel):
sub_query_id: str
tool: Literal["web_search", "web_fetch", "web_map"]
reason: str
params: Optional[dict] = Field(default=None, description="Tool-specific parameters")


class ExecutionOrderOutput(BaseModel):
parallel: list[list[str]] = Field(description="Groups of sub-query IDs runnable in parallel")
sequential: list[str] = Field(description="Sub-query IDs that must run in order")
estimated_rounds: int = Field(ge=1)


PHASE_NAMES = [
"intent_analysis",
"complexity_assessment",
"query_decomposition",
"search_strategy",
"tool_selection",
"execution_order",
]

REQUIRED_PHASES: dict[int, set[str]] = {
1: {"intent_analysis", "complexity_assessment", "query_decomposition"},
2: {"intent_analysis", "complexity_assessment", "query_decomposition", "search_strategy", "tool_selection"},
3: set(PHASE_NAMES),
}


class PhaseRecord(BaseModel):
phase: str
thought: str
data: dict | list | None = None
confidence: float = 1.0


class PlanningSession:
def __init__(self, session_id: str):
self.session_id = session_id
self.phases: dict[str, PhaseRecord] = {}
self.complexity_level: int | None = None

@property
def completed_phases(self) -> list[str]:
return [p for p in PHASE_NAMES if p in self.phases]

def required_phases(self) -> set[str]:
return REQUIRED_PHASES.get(self.complexity_level or 3, REQUIRED_PHASES[3])

def is_complete(self) -> bool:
if self.complexity_level is None:
return False
return self.required_phases().issubset(self.phases.keys())

def build_executable_plan(self) -> dict:
return {name: record.data for name, record in self.phases.items()}


class PlanningEngine:
def __init__(self):
self._sessions: dict[str, PlanningSession] = {}

def process_phase(
self,
phase: str,
thought: str,
session_id: str = "",
is_revision: bool = False,
revises_phase: str = "",
confidence: float = 1.0,
phase_data: dict | list | None = None,
) -> dict:
if session_id and session_id in self._sessions:
session = self._sessions[session_id]
else:
sid = session_id if session_id else uuid.uuid4().hex[:12]
session = PlanningSession(sid)
self._sessions[sid] = session

target = revises_phase if is_revision and revises_phase else phase
if target not in PHASE_NAMES:
return {"error": f"Unknown phase: {target}. Valid: {', '.join(PHASE_NAMES)}"}

session.phases[target] = PhaseRecord(
phase=target, thought=thought, data=phase_data, confidence=confidence
)

if target == "complexity_assessment" and isinstance(phase_data, dict):
level = phase_data.get("level")
if level in (1, 2, 3):
session.complexity_level = level

complete = session.is_complete()
result: dict = {
"session_id": session.session_id,
"completed_phases": session.completed_phases,
"complexity_level": session.complexity_level,
"plan_complete": complete,
}

remaining = [p for p in PHASE_NAMES if p in session.required_phases() and p not in session.phases]
if remaining:
result["phases_remaining"] = remaining

if complete:
result["executable_plan"] = session.build_executable_plan()

return result


engine = PlanningEngine()
Loading