Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mcp-server/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Backend API Configuration
API_KEY=your-api-key-here
BACKEND_API_URL=http://localhost:8000
API_KEY=dev-secret-key
11 changes: 11 additions & 0 deletions mcp-server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Python
__pycache__/
*.pyc
*.pyo

# Virtual environment
venv/

# Environment (secrets)
.env
.env.local
68 changes: 68 additions & 0 deletions mcp-server/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Persistent HTTP client for backend API communication.

Uses a module-level client to avoid creating new TCP connections per tool call.
The client is initialized lazily on first use and reused for all subsequent calls.
Concurrent access is serialized via asyncio.Lock to prevent duplicate clients.
"""
import asyncio
from typing import Any, Optional

import httpx

from config import BACKEND_API_URL, API_KEY


# Persistent client reused across all tool calls
_client: Optional[httpx.AsyncClient] = None
_client_lock: asyncio.Lock = asyncio.Lock()


def _get_headers() -> dict[str, str]:
"""Return Authorization header with the configured API_KEY.

Raises ValueError if API_KEY is empty or unset.
"""
if not API_KEY:
raise ValueError(
"No API_KEY configured. Set API_KEY in .env or environment."
)
return {"Authorization": f"Bearer {API_KEY}"}
Comment thread
coderabbitai[bot] marked this conversation as resolved.


async def get_client() -> httpx.AsyncClient:
"""Get or create the persistent HTTP client."""
global _client
async with _client_lock:
if _client is None or _client.is_closed:
_client = httpx.AsyncClient(
base_url=BACKEND_API_URL,
timeout=120.0,
headers=_get_headers(),
)
return _client


async def api_get(path: str, **kwargs: Any) -> dict:
"""Make a GET request to the backend API."""
client = await get_client()
response = await client.get(path, **kwargs)
response.raise_for_status()
return response.json()


async def api_post(path: str, json: dict, **kwargs: Any) -> dict:
"""Make a POST request to the backend API."""
client = await get_client()
response = await client.post(path, json=json, **kwargs)
response.raise_for_status()
return response.json()


async def close_client() -> None:
"""Close the persistent client. Call on server shutdown."""
global _client
async with _client_lock:
local = _client
_client = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if local and not local.is_closed:
await local.aclose()
23 changes: 10 additions & 13 deletions mcp-server/config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
"""
API Configuration - Single Source of Truth for API Versioning
"""MCP server configuration from environment variables."""
import os

Change API_VERSION here to update all API calls across the MCP server.
Example: "v1" -> "v2" will change /api/v1/* to /api/v2/*
"""
from dotenv import load_dotenv

# =============================================================================
# API VERSION CONFIGURATION
# =============================================================================
load_dotenv()

API_VERSION = "v1"
API_PREFIX = f"/api/{API_VERSION}"

# =============================================================================
# DERIVED PREFIXES (auto-calculated from version)
# =============================================================================
BACKEND_BASE_URL = os.getenv("BACKEND_API_URL", "http://localhost:8000")
BACKEND_API_URL = f"{BACKEND_BASE_URL}{API_PREFIX}"
API_KEY = os.getenv("API_KEY", "")

# Current versioned API prefix: /api/v1
API_PREFIX = f"/api/{API_VERSION}"
SERVER_NAME = "codeintel-mcp"
SERVER_VERSION = "0.4.0"
196 changes: 196 additions & 0 deletions mcp-server/formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Response formatters that convert API responses to markdown.

Each formatter is a pure function: takes API response dict, returns markdown string.
This makes them independently testable without any HTTP calls.
"""


def format_search_results(result: dict) -> str:
"""Format semantic search results as markdown.

Supports both v1 (count/results) and v2 (total/results) response shapes
so the formatter stays resilient across API versions.
"""
total = result.get("total") or result.get("count", 0)
cached = " (cached)" if result.get("cached") else ""
version = result.get("search_version", "v1")
output = f"# Code Search Results ({version})\n\nFound {total} results{cached}\n\n"

if not result.get("results"):
return output + "No results found.\n"

for idx, res in enumerate(result["results"], 1):
score_raw = res.get("score")
try:
score = float(score_raw) * 100
except (TypeError, ValueError):
score = 0
name = res.get("name", "unknown")
file_path = res.get("file_path", "unknown")
lang = res.get("language", "unknown")
line_start = res.get("line_start", 0)
line_end = res.get("line_end", 0)
code = res.get("code", "")

output += f"## {idx}. {name} ({score:.0f}% match)\n"
output += f"**File:** `{file_path}`\n"

# v2 adds qualified_name and signature
qualified = res.get("qualified_name")
if qualified and qualified != name:
output += f"**Qualified:** `{qualified}`\n"
signature = res.get("signature")
if signature:
output += f"**Signature:** `{signature}`\n"

output += f"**Language:** {lang} | **Lines:** {line_start}-{line_end}\n"

reason = res.get("match_reason")
if reason:
output += f"**Why:** {reason}\n"

output += f"\n```{lang}\n{code}\n```\n\n"

return output


def format_repositories(result: dict) -> str:
"""Format repository listing as markdown."""
output = "# Indexed Repositories\n\n"

if not result.get("repositories"):
return output + "No repositories indexed yet.\n"

for repo in result["repositories"]:
output += f"### {repo.get('name', 'unknown')}\n"
output += f"- **ID:** `{repo.get('id')}`\n"
output += f"- **Status:** {repo.get('status', 'unknown')}\n"
output += f"- **Functions:** {repo.get('file_count', 0):,}\n"
output += f"- **Branch:** {repo.get('branch', 'main')}\n\n"

return output


def format_dependency_graph(result: dict) -> str:
"""Format dependency graph analysis as markdown."""
nodes = result.get("nodes", [])
edges = result.get("edges", [])
metrics = result.get("metrics", {})

output = "# Dependency Graph Analysis\n\n"
output += f"**Total Files:** {len(nodes)}\n"
output += f"**Total Dependencies:** {metrics.get('total_edges', len(edges))}\n"
output += f"**Avg Dependencies per File:** {metrics.get('avg_dependencies', 0):.1f}\n\n"

# Most-imported files (highest number of dependents)
dependent_count: dict[str, int] = {}
for edge in edges:
target = edge.get("target", "")
dependent_count[target] = dependent_count.get(target, 0) + 1

if dependent_count:
sorted_deps = sorted(
dependent_count.items(), key=lambda x: x[1], reverse=True
)[:5]
output += "## Most Critical Files (High Impact)\n\n"
for file, count in sorted_deps:
output += f"- `{file}` - **{count} dependents**\n"
output += "\n"

high_import = [n for n in nodes if n.get("imports", 0) >= 3]
if high_import:
output += "## Files with Most Imports\n\n"
for f in sorted(high_import, key=lambda x: x.get("imports", 0), reverse=True)[:5]:
output += f"- `{f.get('id', '<unknown>')}` - imports {f.get('imports', 0)} files\n"

return output


def format_code_style(result: dict) -> str:
"""Format code style analysis as markdown."""
summary = result.get("summary", {})
output = "# Code Style Analysis\n\n"
output += f"**Files Analyzed:** {summary.get('total_files_analyzed', 0)}\n"
output += f"**Functions:** {summary.get('total_functions', 0)}\n"
output += f"**Async Adoption:** {summary.get('async_adoption', '0%')}\n"
output += f"**Type Hints:** {summary.get('type_hints_usage', '0%')}\n\n"

naming = result.get("naming_conventions", {}).get("functions")
if naming:
output += "## Function Naming Conventions\n\n"
for conv, info in naming.items():
output += f"- **{conv}:** {info.get('percentage', '?')} ({info.get('count', 0)} functions)\n"
output += "\n"

top_imports = result.get("top_imports")
if top_imports:
output += "## Most Common Imports\n\n"
for item in top_imports[:10]:
output += f"- `{item.get('module', '<unknown>')}` (used {item.get('count', 0)}x)\n"

return output


def format_impact_analysis(result: dict) -> str:
"""Format file impact analysis as markdown."""
output = f"# Impact Analysis: {result.get('file', 'unknown')}\n\n"
output += f"**Risk Level:** {result.get('risk_level', 'unknown').upper()}\n"
output += f"**Impact Summary:** {result.get('impact_summary', '')}\n\n"

deps = result.get("direct_dependencies", [])
output += f"## Dependencies ({len(deps)})\n"
output += "Files this file imports:\n"
for dep in deps[:10]:
output += f"- `{dep}`\n"
output += "\n"

dependents = result.get("all_dependents", [])
output += f"## Dependents ({len(dependents)})\n"
output += "Files that would be affected by changes:\n"
for dep in dependents[:15]:
output += f"- `{dep}`\n"

test_files = result.get("test_files")
if test_files:
output += "\n## Related Tests\n"
for test in test_files:
output += f"- `{test}`\n"

return output


def format_repository_insights(result: dict) -> str:
"""Format repository insights as markdown."""
output = f"# Repository Insights: {result.get('name', 'unknown')}\n\n"
output += f"**Status:** {result.get('status', 'unknown')}\n"
output += f"**Functions Indexed:** {result.get('functions_indexed', 0):,}\n"
output += f"**Total Files:** {result.get('total_files', 0)}\n"
output += f"**Total Dependencies:** {result.get('total_dependencies', 0)}\n\n"

metrics = result.get("graph_metrics", {})
critical = metrics.get("most_critical_files")
if critical:
output += "## Most Critical Files\n"
for item in critical[:5]:
output += f"- `{item.get('file', '<unknown>')}` ({item.get('dependents', 0)} dependents)\n"

return output


def format_codebase_dna(result: dict) -> str:
"""Format codebase DNA extraction as markdown."""
dna_markdown = result.get("dna", "")
cached = " (cached)" if result.get("cached") else ""

output = f"# Codebase DNA{cached}\n\n"
output += "**Use this information to write code that matches existing patterns.**\n\n"
output += dna_markdown
output += "\n---\n"
output += "**Instructions:** When generating code for this codebase:\n"
output += "1. Follow the auth patterns shown above\n"
output += "2. Use the service layer structure (singletons in dependencies.py)\n"
output += "3. Match the database conventions (ID types, timestamps, RLS)\n"
output += "4. Use the logging patterns shown\n"
output += "5. Follow the naming conventions\n"

return output
Loading