Skip to content

Commit f9c6647

Browse files
committed
test(ws): add E2E integration test for WebSocket
- Test creates real indexing job and connects via WebSocket - Verifies full event flow: connected → progress → completed - Tests against zustand repo (47 files, ~1182 functions) - Confirms real-time streaming works end-to-end Tested manually - all events received correctly
1 parent bc7b587 commit f9c6647

1 file changed

Lines changed: 167 additions & 0 deletions

File tree

backend/tests/test_ws_e2e.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#!/usr/bin/env python3
2+
"""
3+
End-to-end WebSocket test for playground indexing.
4+
5+
This script:
6+
1. Creates an indexing job via the REST API
7+
2. Connects to the WebSocket endpoint
8+
3. Listens for all events until completion/error
9+
4. Reports what we received
10+
11+
Usage: python3 test_ws_e2e.py
12+
"""
13+
import asyncio
14+
import aiohttp
15+
import json
16+
import sys
17+
from datetime import datetime
18+
19+
# Config
20+
BASE_URL = "http://localhost:8000/api/v1"
21+
WS_URL = "ws://localhost:8000/api/v1"
22+
TEST_REPO = "https://github.com/pmndrs/zustand" # Small, fast to index
23+
24+
25+
def log(msg: str, level: str = "INFO"):
26+
"""Print timestamped log message."""
27+
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
28+
icon = {"INFO": "ℹ️", "OK": "✅", "ERR": "❌", "WS": "🔌", "EVENT": "📨"}.get(level, "•")
29+
print(f"[{ts}] {icon} {msg}")
30+
31+
32+
async def create_indexing_job(session: aiohttp.ClientSession) -> dict:
33+
"""Create a new indexing job via REST API."""
34+
log("Creating indexing job for zustand...")
35+
36+
async with session.post(
37+
f"{BASE_URL}/playground/index",
38+
json={"github_url": TEST_REPO}
39+
) as resp:
40+
# 202 Accepted is the expected status for async job creation
41+
if resp.status not in (200, 202):
42+
text = await resp.text()
43+
log(f"Failed to create job: {resp.status} - {text}", "ERR")
44+
return None
45+
46+
data = await resp.json()
47+
job_id = data.get("job_id")
48+
log(f"Job created: {job_id} (status: {resp.status})", "OK")
49+
return data
50+
51+
52+
async def listen_websocket(job_id: str) -> list:
53+
"""Connect to WebSocket and collect all events."""
54+
events = []
55+
ws_endpoint = f"{WS_URL}/ws/playground/{job_id}"
56+
57+
log(f"Connecting to WebSocket: {ws_endpoint}", "WS")
58+
59+
async with aiohttp.ClientSession() as session:
60+
try:
61+
async with session.ws_connect(ws_endpoint, timeout=120) as ws:
62+
log("WebSocket connected!", "OK")
63+
64+
async for msg in ws:
65+
if msg.type == aiohttp.WSMsgType.TEXT:
66+
event = json.loads(msg.data)
67+
events.append(event)
68+
69+
event_type = event.get("type", "unknown")
70+
71+
# Log based on event type
72+
if event_type == "connected":
73+
log(f"Server acknowledged connection", "EVENT")
74+
elif event_type == "ping":
75+
log("Received keepalive ping", "EVENT")
76+
elif event_type == "cloning":
77+
repo = event.get("repo_name", "?")
78+
log(f"Cloning: {repo}", "EVENT")
79+
elif event_type == "progress":
80+
pct = event.get("percent", 0)
81+
files = event.get("files_processed", 0)
82+
total = event.get("files_total", 0)
83+
current = event.get("current_file") or ""
84+
funcs = event.get("functions_found", 0)
85+
# Truncate long paths
86+
if current and len(current) > 40:
87+
current = "..." + current[-37:]
88+
log(f"Progress: {pct}% ({files}/{total}) | {funcs} funcs | {current}", "EVENT")
89+
elif event_type == "completed":
90+
stats = event.get("stats", {})
91+
log(f"COMPLETED! Functions: {stats.get('functions_found', '?')}, Time: {stats.get('time_taken_seconds', '?')}s", "OK")
92+
break
93+
elif event_type == "error":
94+
log(f"ERROR: {event.get('message', 'Unknown error')}", "ERR")
95+
break
96+
else:
97+
log(f"Unknown event: {event_type}", "EVENT")
98+
99+
elif msg.type == aiohttp.WSMsgType.ERROR:
100+
log(f"WebSocket error: {ws.exception()}", "ERR")
101+
break
102+
elif msg.type == aiohttp.WSMsgType.CLOSED:
103+
log("WebSocket closed by server", "WS")
104+
break
105+
106+
except asyncio.TimeoutError:
107+
log("WebSocket connection timed out", "ERR")
108+
except Exception as e:
109+
log(f"WebSocket error: {e}", "ERR")
110+
111+
return events
112+
113+
114+
async def main():
115+
"""Run the end-to-end test."""
116+
print("\n" + "="*60)
117+
print(" WebSocket E2E Test - Playground Indexing")
118+
print("="*60 + "\n")
119+
120+
async with aiohttp.ClientSession() as session:
121+
# Step 1: Create job
122+
job_data = await create_indexing_job(session)
123+
if not job_data:
124+
sys.exit(1)
125+
126+
job_id = job_data.get("job_id")
127+
if not job_id:
128+
log("No job_id in response", "ERR")
129+
sys.exit(1)
130+
131+
# Step 2: Listen to WebSocket
132+
print()
133+
events = await listen_websocket(job_id)
134+
135+
# Step 3: Summary
136+
print("\n" + "="*60)
137+
print(" Test Summary")
138+
print("="*60)
139+
140+
event_types = [e.get("type") for e in events]
141+
print(f"\nTotal events received: {len(events)}")
142+
print(f"Event types: {' → '.join(event_types)}")
143+
144+
# Check expected flow
145+
# Note: "cloning" may be skipped if repo was recently cloned
146+
required = ["connected", "completed"]
147+
has_required = all(t in event_types for t in required)
148+
has_progress = "progress" in event_types
149+
150+
print()
151+
if has_required and has_progress:
152+
log("TEST PASSED - Full event flow received!", "OK")
153+
print()
154+
return 0
155+
elif "error" in event_types:
156+
log("TEST COMPLETED WITH ERROR - Error event received (may be expected)", "ERR")
157+
print()
158+
return 1
159+
else:
160+
log(f"TEST INCOMPLETE - Missing events. Got: {event_types}", "ERR")
161+
print()
162+
return 1
163+
164+
165+
if __name__ == "__main__":
166+
exit_code = asyncio.run(main())
167+
sys.exit(exit_code)

0 commit comments

Comments
 (0)