diff --git a/devex-ui/src/components/TraceViewer.tsx b/devex-ui/src/components/TraceViewer.tsx index 8f473504..1ff0df6f 100644 --- a/devex-ui/src/components/TraceViewer.tsx +++ b/devex-ui/src/components/TraceViewer.tsx @@ -15,7 +15,7 @@ import { import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Button } from "@/components/ui/button"; -import { searchTraces, fetchTracesByCorrelation, fetchTraceById } from "@/data"; +import { searchTraces, fetchTracesByCorrelation } from "@/data"; interface TraceNode { id: string; @@ -45,6 +45,26 @@ interface SearchResult { display: string; } +// Helper function to identify nodes related to the selected node +const getRelatedNodeIds = (selected: TraceNode | null, all: TraceNode[]): Set => { + if (!selected) return new Set(); + + const relatedIds = new Set(); + + if (selected.type === 'Command') { + // highlight events caused by this command + all.forEach(node => { + if (node.causationId === selected.id) relatedIds.add(node.id); + }); + } else if (selected.type === 'Event' && selected.causationId) { + // highlight the command that caused this event + const cmd = all.find(n => n.id === selected.causationId && n.type === 'Command'); + if (cmd) relatedIds.add(cmd.id); + } + + return relatedIds; +}; + export const TraceViewer = () => { const [params, setParams] = useSearchParams(); const [searchQuery, setSearchQuery] = useState(() => params.get('q') || ""); @@ -123,6 +143,26 @@ export const TraceViewer = () => { try { // Load traces for the selected correlation ID const correlationId = selectedResult.correlationId; + + // If correlationId is undefined, perform fallback search instead + if (!correlationId) { + console.warn('CorrelationId is undefined, performing fallback search instead'); + const searchResults = await searchTraces(resultId); + if (searchResults.length > 0) { + // Find the exact match if possible + const exactMatch = searchResults.find(t => t.id === resultId); + const traceToShow = exactMatch || searchResults[0]; + + // Set the trace and edges + setTraces([traceToShow]); + setEdges([]); + setSelectedNode(traceToShow); + } else { + console.warn('No traces found for ID:', resultId); + } + return; + } + const { traces: filteredTraces, edges: newEdges } = await fetchTracesByCorrelation(correlationId); setTraces(filteredTraces); @@ -168,6 +208,9 @@ export const TraceViewer = () => { return selectedTraceId === nodeId; }; + // Calculate related node IDs based on selected node + const relatedNodeIds = getRelatedNodeIds(selectedNode, traces); + return (
@@ -254,17 +297,28 @@ export const TraceViewer = () => {
{traces.filter(t => t.level === level).map((node, index) => (
+ {/* Selection state logic: + 1. If node.id === selectedNode.id: orange border (primary selection) + 2. If node is related to selected node: yellow border (related) + 3. Otherwise: default slate border + 4. Background color based on node type (Command, Event, Snapshot) + */}
setSelectedNode(node)} + onClick={() => { + setSelectedNode(node); + setSelectedTraceId(node.id); // for URL sync and selection logic + }} >
{getNodeIcon(node.type)} @@ -278,10 +332,13 @@ export const TraceViewer = () => {
- {/* Arrow to next level if there's a causation relationship */} - {edges.some(e => e.from === node.id) && ( + {/* Arrow to next level based on edge type */} + {edges.some(e => e.from === node.id && e.type === 'causation') && ( )} + {edges.some(e => e.from === node.id && e.type === 'snapshot') && ( + + )}
))}
@@ -388,6 +445,10 @@ export const TraceViewer = () => { Causation Flow
+
+ + Snapshot Flow +
diff --git a/devex-ui/src/data/apiService.ts b/devex-ui/src/data/apiService.ts index ef6dc940..f69a0531 100644 --- a/devex-ui/src/data/apiService.ts +++ b/devex-ui/src/data/apiService.ts @@ -2,6 +2,9 @@ import { apiClient, API_CONFIG } from './api'; import type { Event, Command, CommandResult, CommandSchema } from './types'; import type { LogLine } from './mockLogs'; +import { isMock } from '@/config/apiMode'; +import { findTracesByCorrelationId, searchTracesFullText, traceStore } from '@/mocks/stores/trace.store'; +import { generateEdges } from '@/graph/edgeUtils'; // Events API export const fetchEvents = async (tenantId: string, limit = 50): Promise => { @@ -58,17 +61,40 @@ export const fetchRecentCommands = async (limit = 10) => { }; // Traces API +// Helper function to normalize trace data from API +function normalizeTrace(raw: any) { + return { + id: raw.id, + type: raw.type || 'Event', + subtype: raw.subtype || raw.type, + timestamp: raw.timestamp || raw.created_at, + correlationId: raw.correlationId || raw.correlation_id, + causationId: raw.causationId || raw.causation_id, + aggregateId: raw.aggregateId || raw.aggregate_id, + tenantId: raw.tenantId || raw.tenant_id, + level: raw.level ?? 0 + }; +} + export const searchTraces = async (query: string) => { - return apiClient.get(`${API_CONFIG.endpoints.traces}/search`, { query }); + if (isMock) return searchTracesFullText(query); + const raw = await apiClient.get(`${API_CONFIG.endpoints.traces}/search`, { query }); + return raw.map(normalizeTrace); }; export const fetchTracesByCorrelation = async (correlationId: string) => { - return apiClient.get(`${API_CONFIG.endpoints.traces}/correlation/${correlationId}`); + if (isMock) { + const traces = findTracesByCorrelationId(correlationId); + const edges = generateEdges(traces); + return { traces, edges }; + } + const raw = await apiClient.get(`${API_CONFIG.endpoints.traces}/correlation/${correlationId}`); + return { + traces: raw.traces.map(normalizeTrace), + edges: raw.edges + }; }; -export const fetchTraceById = async (traceId: string) => { - return apiClient.get(`${API_CONFIG.endpoints.traces}/${traceId}`); -}; export const fetchLogs = (tenant: string, limit=50) => apiClient.get(API_CONFIG.endpoints.logs, { tenant_id: tenant, limit: limit+'' }); diff --git a/devex-ui/src/graph/edgeUtils.ts b/devex-ui/src/graph/edgeUtils.ts index 183219a0..6bb49d9f 100644 --- a/devex-ui/src/graph/edgeUtils.ts +++ b/devex-ui/src/graph/edgeUtils.ts @@ -1,5 +1,5 @@ //devex-ui/src/graph/edgeUtils.ts -export interface Edge { from:string; to:string; type:'causation' } +export interface Edge { from:string; to:string; type:'causation' | 'snapshot' } export function generateEdges( traces:{ id:string; causationId?:string }[] @@ -12,4 +12,4 @@ export function generateEdges( seen.add(k); return [{ from:t.causationId, to:t.id, type:'causation' }]; }); -} \ No newline at end of file +} diff --git a/src/api/routes/index.ts b/src/api/routes/index.ts index c79c83b2..c5109570 100644 --- a/src/api/routes/index.ts +++ b/src/api/routes/index.ts @@ -5,6 +5,7 @@ import metricsRoutes from './metrics'; import commandsRoutes from './commands'; import eventsRoutes from './events'; import logsRoutes from './logs'; +import traceRoutes from './traces'; import accessLogMiddleware from '../middlewares/accessLog'; /** @@ -32,6 +33,9 @@ export const registerRoutes = (app: Express): void => { // Register logs routes app.use(logsRoutes); + + // Register trace routes + app.use(traceRoutes); }; export default registerRoutes; diff --git a/src/api/routes/traces.ts b/src/api/routes/traces.ts new file mode 100644 index 00000000..adac2358 --- /dev/null +++ b/src/api/routes/traces.ts @@ -0,0 +1,284 @@ +import { Router } from 'express'; +import pool from '../db'; + +const router = Router(); + +// Helper function to convert an event row to a TraceNode +const rowToTraceNode = (row: any) => ({ + id: row.id, + type: 'Event', + subtype: row.type, + timestamp: row.created_at, + correlationId: row.correlation_id, + causationId: row.causation_id, + aggregateId: row.aggregate_id, + tenantId: row.tenant_id, + level: 0 +}); + +// Helper function to convert a command row to a TraceNode +const rowToTraceNodeCommand = (row: any) => { + // Log the raw command row for debugging + console.log(`Processing command row:`, JSON.stringify(row, null, 2)); + + // Try to get correlationId from different possible locations + const correlationId = row.metadata?.correlationId || row.metadata?.['correlationId'] || row.correlationId; + + return { + id: row.id, + type: 'Command', + subtype: row.type, + timestamp: row.created_at, + correlationId: correlationId, + causationId: undefined, + aggregateId: row.payload?.aggregateId, + tenantId: row.tenant_id, + level: 0 + }; +}; + +// Helper function to convert a snapshot row to a TraceNode +const rowToTraceNodeSnapshot = (row: any) => ({ + id: `snapshot-${row.id}`, + type: 'Snapshot', + subtype: row.type, + timestamp: row.updated_at, + correlationId: null, + causationId: null, + aggregateId: row.id, + tenantId: row.tenant_id, + level: 0 +}); + +// Helper function to assign levels to trace nodes based on causation and snapshot relationships +const assignLevels = (nodes: any[], edges: any[] = []): any[] => { + const idToNode = new Map(nodes.map(n => [n.id, n])); + const levels = new Map(); + + // Create a map of snapshot backlinks (to -> from) + const snapshotBacklinks = new Map(); + for (const edge of edges) { + if (edge.type === 'snapshot') { + snapshotBacklinks.set(edge.to, edge.from); + } + } + + function computeLevel(id: string): number { + if (levels.has(id)) return levels.get(id)!; + + const node = idToNode.get(id); + if (!node) { + levels.set(id, 0); + return 0; + } + + const parentId = node.causationId || (snapshotBacklinks.get(node.id) ?? null); + if (!parentId || !idToNode.has(parentId)) { + levels.set(id, 0); + return 0; + } + + const parentLevel = computeLevel(parentId); + const level = parentLevel + 1; + levels.set(id, level); + return level; + } + + // Compute levels for all nodes + nodes.forEach(node => computeLevel(node.id)); + + // Return a new array with updated levels + return nodes.map(node => ({ + ...node, + level: levels.get(node.id) || 0 + })); +}; + +// Helper function to generate edges between traces +const generateEdges = (traces: { id: string; type: string; causationId?: string; aggregateId?: string }[]) => { + const seen = new Set(); + const edges = []; + + // Create a map of aggregate IDs to their last event + const aggregateIdToLastEventMap = new Map(); + + // First pass: collect causation edges and build aggregate map + for (const trace of traces) { + // Add causation edges for events + if ((trace.type === 'Event' || trace.type === 'Command') && trace.causationId) { + const k = trace.causationId + '→' + trace.id; + if (!seen.has(k)) { + seen.add(k); + edges.push({ from: trace.causationId, to: trace.id, type: 'causation' as const }); + } + } + + // Track the last event for each aggregate ID + if (trace.type === 'Event' && trace.aggregateId) { + aggregateIdToLastEventMap.set(trace.aggregateId, trace.id); + } + } + + // Second pass: add snapshot edges + for (const trace of traces) { + // Add edges from the last event of an aggregate to its snapshot + if (trace.type === 'Snapshot' && trace.aggregateId && aggregateIdToLastEventMap.has(trace.aggregateId)) { + const lastEventId = aggregateIdToLastEventMap.get(trace.aggregateId)!; + const k = lastEventId + '→' + trace.id; + if (!seen.has(k)) { + seen.add(k); + edges.push({ from: lastEventId, to: trace.id, type: 'snapshot' as const }); + } + } + } + + return edges; +}; + +// GET /api/traces/search?query=... +// Search traces by id, correlation_id, causation_id, aggregate_id, or type +router.get('/api/traces/search', async (req, res) => { + const query = req.query.query as string; + if (!query) return res.status(400).json({ error: 'query parameter is required' }); + + const client = await pool.connect(); + try { + // Search events + const eventsResult = await client.query( + `SELECT * FROM infra.event_metadata + WHERE id::text ILIKE $1 + OR correlation_id::text ILIKE $1 + OR causation_id::text ILIKE $1 + OR aggregate_id::text ILIKE $1 + OR type ILIKE $1 + ORDER BY created_at DESC + LIMIT 50`, + [`%${query}%`] + ); + + // Search commands + const commandsResult = await client.query( + `SELECT * FROM infra.commands + WHERE id::text ILIKE $1 + OR metadata->>'correlationId'::text ILIKE $1 + OR type ILIKE $1 + ORDER BY created_at DESC + LIMIT 25`, + [`%${query}%`] + ); + + // Search snapshots (aggregates) + const snapshotsResult = await client.query( + `SELECT * FROM infra.aggregates + WHERE id::text ILIKE $1 + OR type ILIKE $1 + ORDER BY updated_at DESC + LIMIT 25`, + [`%${query}%`] + ); + + // Combine all results - Commands should appear before Events at the same level + const traces = [ + ...commandsResult.rows.map(rowToTraceNodeCommand), + ...eventsResult.rows.map(rowToTraceNode), + ...snapshotsResult.rows.map(rowToTraceNodeSnapshot) + ]; + + // Generate edges between nodes + const edges = generateEdges(traces); + + // Assign proper levels to nodes based on causation and snapshot relationships + const tracesWithLevels = assignLevels(traces, edges); + + res.json(tracesWithLevels); + } catch (error) { + console.error('Error searching traces:', error); + res.status(500).json({ error: 'Failed to search traces' }); + } finally { + client.release(); + } +}); + +// GET /api/traces/correlation/:correlationId +// Fetch all traces with the same correlation_id +router.get('/api/traces/correlation/:correlationId', async (req, res) => { + const { correlationId } = req.params; + if (!correlationId) return res.status(400).json({ error: 'correlationId is required' }); + + const client = await pool.connect(); + try { + // Fetch events with matching correlation_id + const eventsResult = await client.query( + `SELECT * FROM infra.event_metadata + WHERE correlation_id::text = $1 + ORDER BY created_at ASC`, + [correlationId] + ); + + // Fetch commands with matching correlationId in metadata + // Try different ways the correlationId might be stored + const commandsResult = await client.query( + `SELECT * FROM infra.commands + WHERE metadata->>'correlationId' = $1 + OR metadata->>'correlation_id'::text = $1 + OR id::text = $1 + ORDER BY created_at ASC`, + [correlationId] + ); + + // Log the raw command results for debugging + console.log(`Raw command results for correlationId ${correlationId}:`, JSON.stringify(commandsResult.rows, null, 2)); + + // Get all events and commands + const events = eventsResult.rows.map(rowToTraceNode); + const commands = commandsResult.rows.map(rowToTraceNodeCommand); + + // Log the processed commands for debugging + console.log(`Processed commands for correlationId ${correlationId}:`, JSON.stringify(commands, null, 2)); + + // Extract aggregate IDs from events and commands to fetch relevant snapshots + const aggregateIds = new Set(); + [...events, ...commands].forEach(item => { + if (item.aggregateId) { + aggregateIds.add(item.aggregateId); + } + }); + + // Fetch snapshots for the involved aggregates + let snapshots: any[] = []; + if (aggregateIds.size > 0) { + const snapshotsResult = await client.query( + `SELECT * FROM infra.aggregates + WHERE id::text = ANY($1::text[]) + ORDER BY updated_at DESC`, + [Array.from(aggregateIds)] + ); + snapshots = snapshotsResult.rows.map(rowToTraceNodeSnapshot); + } + + // Combine all results - Commands should appear before Events at the same level + const traces = [...commands, ...events, ...snapshots]; + + // Generate edges between nodes + const edges = generateEdges(traces); + + // Assign proper levels to nodes based on causation and snapshot relationships + const tracesWithLevels = assignLevels(traces, edges); + + // Log the number of commands and events for debugging + console.log(`Found ${commands.length} commands and ${events.length} events for correlationId: ${correlationId}`); + + // Log the final response for debugging + console.log(`Final response for correlationId ${correlationId}:`, JSON.stringify({ traces: tracesWithLevels, edges }, null, 2)); + + res.json({ traces: tracesWithLevels, edges }); + } catch (error) { + console.error('Error fetching traces by correlation:', error); + res.status(500).json({ error: 'Failed to fetch traces by correlation' }); + } finally { + client.release(); + } +}); + + +export default router; diff --git a/src/core/system/aggregates/system.aggregate.ts b/src/core/system/aggregates/system.aggregate.ts index 65f915f5..d3a4b1fa 100644 --- a/src/core/system/aggregates/system.aggregate.ts +++ b/src/core/system/aggregates/system.aggregate.ts @@ -126,7 +126,7 @@ export class SystemAggregate extends BaseAggregate { payload, { userId: cmd.metadata?.userId, - correlationId: cmd.metadata?.correlationId, + correlationId: cmd.metadata?.correlationId ?? cmd.id, causationId: cmd.id, } ); @@ -154,7 +154,7 @@ export class SystemAggregate extends BaseAggregate { payload, { userId: cmd.metadata?.userId, - correlationId: cmd.metadata?.correlationId, + correlationId: cmd.metadata?.correlationId ?? cmd.id, causationId: cmd.id, } ); @@ -195,7 +195,7 @@ export class SystemAggregate extends BaseAggregate { payload, { userId: cmd.metadata?.userId, - correlationId: cmd.metadata?.correlationId, + correlationId: cmd.metadata?.correlationId ?? cmd.id, causationId: cmd.id, } ); @@ -229,7 +229,7 @@ export class SystemAggregate extends BaseAggregate { payload, { userId: cmd.metadata?.userId, - correlationId: cmd.metadata?.correlationId, + correlationId: cmd.metadata?.correlationId ?? cmd.id, causationId: cmd.id, } );