Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/reporter/src/hooks/queries/use-tdd-queries.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { tdd } from '../../api/client.js';
import { queryKeys } from '../../lib/query-keys.js';
import { SSE_STATE, useReportDataSSE } from '../use-sse.js';
import { SSE_STATE, useSSEState } from '../use-sse.js';

export function useComparison(id, options = {}) {
return useQuery({
Expand All @@ -14,10 +14,8 @@ export function useComparison(id, options = {}) {
}

export function useReportData(options = {}) {
// Use SSE for real-time updates
let { state: sseState } = useReportDataSSE({
enabled: options.polling !== false,
});
// Read SSE state from the singleton provider
let { state: sseState } = useSSEState();

// SSE is connected - it updates the cache directly, no polling needed
// Fall back to polling only when SSE is not connected
Expand Down
114 changes: 11 additions & 103 deletions src/reporter/src/hooks/use-sse.js
Original file line number Diff line number Diff line change
@@ -1,109 +1,17 @@
import { useQueryClient } from '@tanstack/react-query';
import { useEffect, useRef, useState } from 'react';
import { queryKeys } from '../lib/query-keys.js';
import { useContext } from 'react';
import { SSE_STATE, SSEContext } from '../providers/sse-provider.jsx';

/**
* SSE connection states
*/
export let SSE_STATE = {
CONNECTING: 'connecting',
CONNECTED: 'connected',
DISCONNECTED: 'disconnected',
ERROR: 'error',
};
// Re-export for consumers that import SSE_STATE from here
export { SSE_STATE };

/**
* Hook to manage SSE connection for real-time report data updates
* @param {Object} options
* @param {boolean} [options.enabled=true] - Whether SSE should be enabled
* Read SSE connection state from the singleton provider.
* @returns {{ state: string, error: Error|null }}
*/
export function useReportDataSSE(options = {}) {
let shouldEnable = options.enabled ?? true;

const queryClient = useQueryClient();
const eventSourceRef = useRef(null);
const reconnectAttemptRef = useRef(0);
const reconnectTimerRef = useRef(null);

const [state, setState] = useState(SSE_STATE.DISCONNECTED);
const [error, setError] = useState(null);

useEffect(() => {
if (!shouldEnable) {
// Clean up if disabled
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setState(SSE_STATE.DISCONNECTED);
return;
}

const connect = () => {
// Don't reconnect if already connected or connecting
// EventSource.CLOSED (2) is well-supported in all modern browsers
if (eventSourceRef.current && eventSourceRef.current.readyState !== 2) {
return;
}

setState(SSE_STATE.CONNECTING);
setError(null);

const eventSource = new EventSource('/api/events');
eventSourceRef.current = eventSource;

eventSource.onopen = () => {
setState(SSE_STATE.CONNECTED);
setError(null);
reconnectAttemptRef.current = 0;
};

eventSource.addEventListener('reportData', event => {
try {
const data = JSON.parse(event.data);
// Update React Query cache directly
queryClient.setQueryData(queryKeys.reportData(), data);
} catch {
// Ignore parse errors
}
});

eventSource.addEventListener('heartbeat', () => {
// Heartbeat received - connection is alive
});

eventSource.onerror = () => {
eventSource.close();
eventSourceRef.current = null;
setState(SSE_STATE.ERROR);
setError(new Error('SSE connection failed'));

// Exponential backoff for reconnection (max 30 seconds)
const attempt = reconnectAttemptRef.current;
const delay = Math.min(1000 * 2 ** attempt, 30000);
reconnectAttemptRef.current = attempt + 1;

reconnectTimerRef.current = window.setTimeout(() => {
connect();
}, delay);
};
};

connect();

return () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setState(SSE_STATE.DISCONNECTED);
};
}, [shouldEnable, queryClient]);

return { state, error };
export function useSSEState() {
let context = useContext(SSEContext);
if (!context) {
throw new Error('useSSEState must be used within SSEProvider');
}
return context;
}
9 changes: 6 additions & 3 deletions src/reporter/src/main.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ReactDOM from 'react-dom/client';
import AppRouter from './components/app-router.jsx';
import { ToastProvider } from './components/ui/toast.jsx';
import { queryClient } from './lib/query-client.js';
import { SSEProvider } from './providers/sse-provider.jsx';
import './reporter.css';

let initializeReporter = () => {
Expand All @@ -18,9 +19,11 @@ let initializeReporter = () => {
ReactDOM.createRoot(root).render(
<StrictMode>
<QueryClientProvider client={queryClient}>
<ToastProvider>
<AppRouter />
</ToastProvider>
<SSEProvider>
<ToastProvider>
<AppRouter />
</ToastProvider>
</SSEProvider>
</QueryClientProvider>
</StrictMode>
);
Expand Down
157 changes: 157 additions & 0 deletions src/reporter/src/providers/sse-provider.jsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { useQueryClient } from '@tanstack/react-query';
import { createContext, useEffect, useRef, useState } from 'react';
import { queryKeys } from '../lib/query-keys.js';

export let SSE_STATE = {
CONNECTING: 'connecting',
CONNECTED: 'connected',
DISCONNECTED: 'disconnected',
ERROR: 'error',
};

export let SSEContext = createContext(null);

/**
* Singleton SSE provider — manages one EventSource connection for the entire app.
* Updates React Query cache on reportData, comparisonUpdate, comparisonRemoved,
* and summaryUpdate events.
*/
export function SSEProvider({ enabled = true, children }) {
let queryClient = useQueryClient();
let eventSourceRef = useRef(null);
let reconnectAttemptRef = useRef(0);
let reconnectTimerRef = useRef(null);

let [state, setState] = useState(SSE_STATE.DISCONNECTED);
let [error, setError] = useState(null);

useEffect(() => {
if (!enabled) {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setState(SSE_STATE.DISCONNECTED);
return;
}

let connect = () => {
// EventSource.CLOSED === 2
if (eventSourceRef.current && eventSourceRef.current.readyState !== 2) {
return;
}

setState(SSE_STATE.CONNECTING);
setError(null);

let eventSource = new EventSource('/api/events');
eventSourceRef.current = eventSource;

eventSource.onopen = () => {
setState(SSE_STATE.CONNECTED);
setError(null);
reconnectAttemptRef.current = 0;
};

// Full report data — sent on initial connection
eventSource.addEventListener('reportData', event => {
try {
let data = JSON.parse(event.data);
queryClient.setQueryData(queryKeys.reportData(), data);
} catch {
// Ignore parse errors
}
});

// Incremental: single comparison added or changed
eventSource.addEventListener('comparisonUpdate', event => {
try {
let comparison = JSON.parse(event.data);
queryClient.setQueryData(queryKeys.reportData(), old => {
if (!old) return old;
let comparisons = old.comparisons || [];
let idx = comparisons.findIndex(c => c.id === comparison.id);
if (idx >= 0) {
comparisons = comparisons.map((c, i) =>
i === idx ? { ...c, ...comparison } : c
);
} else {
comparisons = [...comparisons, comparison];
}
return { ...old, comparisons };
});
} catch {
// Ignore parse errors
}
});

// Incremental: comparison removed
eventSource.addEventListener('comparisonRemoved', event => {
try {
let { id } = JSON.parse(event.data);
queryClient.setQueryData(queryKeys.reportData(), old => {
if (!old?.comparisons) return old;
return {
...old,
comparisons: old.comparisons.filter(c => c.id !== id),
};
});
} catch {
// Ignore parse errors
}
});

// Incremental: summary fields changed
eventSource.addEventListener('summaryUpdate', event => {
try {
let summary = JSON.parse(event.data);
queryClient.setQueryData(queryKeys.reportData(), old => {
if (!old) return old;
return { ...old, ...summary, comparisons: old.comparisons };
});
} catch {
// Ignore parse errors
}
});

eventSource.addEventListener('heartbeat', () => {
// Connection alive
});

eventSource.onerror = () => {
eventSource.close();
eventSourceRef.current = null;
setState(SSE_STATE.ERROR);
setError(new Error('SSE connection failed'));

let attempt = reconnectAttemptRef.current;
let delay = Math.min(1000 * 2 ** attempt, 30000);
reconnectAttemptRef.current = attempt + 1;

reconnectTimerRef.current = window.setTimeout(() => {
connect();
}, delay);
};
};

connect();

return () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setState(SSE_STATE.DISCONNECTED);
};
}, [enabled, queryClient]);

return (
<SSEContext.Provider value={{ state, error }}>
{children}
</SSEContext.Provider>
);
}
Loading