Skip to content

Commit cf2a497

Browse files
committed
fix(sse): fix memory leaks in SSE stream cleanup and add memory telemetry
1 parent 4cfe8be commit cf2a497

File tree

8 files changed

+154
-20
lines changed

8 files changed

+154
-20
lines changed

apps/sim/app/api/a2a/serve/[agentId]/route.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
1919
import { SSE_HEADERS } from '@/lib/core/utils/sse'
2020
import { getBaseUrl } from '@/lib/core/utils/urls'
2121
import { markExecutionCancelled } from '@/lib/execution/cancellation'
22+
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
2223
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
2324
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
2425
import {
@@ -1016,16 +1017,22 @@ async function handleTaskResubscribe(
10161017
let pollTimeoutId: ReturnType<typeof setTimeout> | null = null
10171018

10181019
const abortSignal = request.signal
1019-
abortSignal.addEventListener('abort', () => {
1020-
isCancelled = true
1021-
if (pollTimeoutId) {
1022-
clearTimeout(pollTimeoutId)
1023-
pollTimeoutId = null
1024-
}
1025-
})
1020+
abortSignal.addEventListener(
1021+
'abort',
1022+
() => {
1023+
isCancelled = true
1024+
if (pollTimeoutId) {
1025+
clearTimeout(pollTimeoutId)
1026+
pollTimeoutId = null
1027+
}
1028+
},
1029+
{ once: true }
1030+
)
10261031

1032+
let sseDecremented = false
10271033
const stream = new ReadableStream({
10281034
async start(controller) {
1035+
incrementSSEConnections('a2a-resubscribe')
10291036
const sendEvent = (event: string, data: unknown): boolean => {
10301037
if (isCancelled || abortSignal.aborted) return false
10311038
try {
@@ -1047,6 +1054,10 @@ async function handleTaskResubscribe(
10471054
clearTimeout(pollTimeoutId)
10481055
pollTimeoutId = null
10491056
}
1057+
if (!sseDecremented) {
1058+
sseDecremented = true
1059+
decrementSSEConnections('a2a-resubscribe')
1060+
}
10501061
}
10511062

10521063
if (
@@ -1165,6 +1176,10 @@ async function handleTaskResubscribe(
11651176
clearTimeout(pollTimeoutId)
11661177
pollTimeoutId = null
11671178
}
1179+
if (!sseDecremented) {
1180+
sseDecremented = true
1181+
decrementSSEConnections('a2a-resubscribe')
1182+
}
11681183
},
11691184
})
11701185

apps/sim/app/api/mcp/events/route.ts

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { getSession } from '@/lib/auth'
1414
import { SSE_HEADERS } from '@/lib/core/utils/sse'
1515
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
1616
import { mcpPubSub } from '@/lib/mcp/pubsub'
17+
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1718
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
1819

1920
const logger = createLogger('McpEventsSSE')
@@ -41,10 +42,24 @@ export async function GET(request: NextRequest) {
4142

4243
const encoder = new TextEncoder()
4344
const unsubscribers: Array<() => void> = []
45+
let cleaned = false
46+
47+
const cleanup = () => {
48+
if (cleaned) return
49+
cleaned = true
50+
for (const unsub of unsubscribers) {
51+
unsub()
52+
}
53+
decrementSSEConnections('mcp-events')
54+
logger.info(`SSE connection closed for workspace ${workspaceId}`)
55+
}
4456

4557
const stream = new ReadableStream({
4658
start(controller) {
59+
incrementSSEConnections('mcp-events')
60+
4761
const send = (eventName: string, data: Record<string, unknown>) => {
62+
if (cleaned) return
4863
try {
4964
controller.enqueue(
5065
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
@@ -82,6 +97,10 @@ export async function GET(request: NextRequest) {
8297

8398
// Heartbeat to keep the connection alive
8499
const heartbeat = setInterval(() => {
100+
if (cleaned) {
101+
clearInterval(heartbeat)
102+
return
103+
}
85104
try {
86105
controller.enqueue(encoder.encode(': heartbeat\n\n'))
87106
} catch {
@@ -91,20 +110,24 @@ export async function GET(request: NextRequest) {
91110
unsubscribers.push(() => clearInterval(heartbeat))
92111

93112
// Cleanup when client disconnects
94-
request.signal.addEventListener('abort', () => {
95-
for (const unsub of unsubscribers) {
96-
unsub()
97-
}
98-
try {
99-
controller.close()
100-
} catch {
101-
// Already closed
102-
}
103-
logger.info(`SSE connection closed for workspace ${workspaceId}`)
104-
})
113+
request.signal.addEventListener(
114+
'abort',
115+
() => {
116+
cleanup()
117+
try {
118+
controller.close()
119+
} catch {
120+
// Already closed
121+
}
122+
},
123+
{ once: true }
124+
)
105125

106126
logger.info(`SSE connection opened for workspace ${workspaceId}`)
107127
},
128+
cancel() {
129+
cleanup()
130+
},
108131
})
109132

110133
return new Response(stream, { headers: SSE_HEADERS })

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
2222
import { processInputFileFields } from '@/lib/execution/files'
2323
import { preprocessExecution } from '@/lib/execution/preprocessing'
2424
import { LoggingSession } from '@/lib/logs/execution/logging-session'
25+
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
2526
import {
2627
cleanupExecutionBase64Cache,
2728
hydrateUserFilesWithBase64,
@@ -763,6 +764,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
763764
const encoder = new TextEncoder()
764765
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
765766
let isStreamClosed = false
767+
let sseDecremented = false
766768

767769
const eventWriter = createExecutionEventWriter(executionId)
768770
setExecutionMeta(executionId, {
@@ -773,6 +775,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
773775

774776
const stream = new ReadableStream<Uint8Array>({
775777
async start(controller) {
778+
incrementSSEConnections('workflow-execute')
776779
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
777780

778781
const sendEvent = (event: ExecutionEvent) => {
@@ -1147,6 +1150,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11471150
if (executionId) {
11481151
await cleanupExecutionBase64Cache(executionId)
11491152
}
1153+
if (!sseDecremented) {
1154+
sseDecremented = true
1155+
decrementSSEConnections('workflow-execute')
1156+
}
11501157
if (!isStreamClosed) {
11511158
try {
11521159
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -1155,9 +1162,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11551162
}
11561163
}
11571164
},
1158-
cancel() {
1165+
async cancel() {
11591166
isStreamClosed = true
11601167
logger.info(`[${requestId}] Client disconnected from SSE stream`)
1168+
timeoutController.abort()
1169+
timeoutController.cleanup()
1170+
await eventWriter.close().catch(() => {})
1171+
if (!sseDecremented) {
1172+
sseDecremented = true
1173+
decrementSSEConnections('workflow-execute')
1174+
}
11611175
},
11621176
})
11631177

apps/sim/instrumentation-node.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,7 @@ async function initializeOpenTelemetry() {
160160

161161
export async function register() {
162162
await initializeOpenTelemetry()
163+
164+
const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry')
165+
startMemoryTelemetry()
163166
}

apps/sim/lib/execution/event-buffer.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const EVENT_LIMIT = 1000
1010
const RESERVE_BATCH = 100
1111
const FLUSH_INTERVAL_MS = 15
1212
const FLUSH_MAX_BATCH = 200
13+
const MAX_PENDING_EVENTS = 1000
1314

1415
function getEventsKey(executionId: string) {
1516
return `${REDIS_PREFIX}${executionId}:events`
@@ -184,6 +185,15 @@ export function createExecutionEventWriter(executionId: string): ExecutionEventW
184185
stack: error instanceof Error ? error.stack : undefined,
185186
})
186187
pending = batch.concat(pending)
188+
if (pending.length > MAX_PENDING_EVENTS) {
189+
const dropped = pending.length - MAX_PENDING_EVENTS
190+
pending = pending.slice(-MAX_PENDING_EVENTS)
191+
logger.warn('Dropped oldest pending events due to sustained Redis failure', {
192+
executionId,
193+
dropped,
194+
remaining: pending.length,
195+
})
196+
}
187197
}
188198
}
189199

apps/sim/lib/execution/isolated-vm.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,9 +716,15 @@ function spawnWorker(): Promise<WorkerInfo> {
716716

717717
proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message))
718718

719+
const MAX_STDERR_SIZE = 64 * 1024
719720
let stderrData = ''
720721
proc.stderr?.on('data', (data: Buffer) => {
721-
stderrData += data.toString()
722+
if (stderrData.length < MAX_STDERR_SIZE) {
723+
stderrData += data.toString()
724+
if (stderrData.length > MAX_STDERR_SIZE) {
725+
stderrData = stderrData.slice(0, MAX_STDERR_SIZE)
726+
}
727+
}
722728
})
723729

724730
const startTimeout = setTimeout(() => {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Periodic memory telemetry for diagnosing heap growth in production.
3+
* Logs process.memoryUsage() every 60s alongside active SSE connection
4+
* counts, enabling correlation between connection leaks and memory spikes.
5+
*/
6+
7+
import { createLogger } from '@sim/logger'
8+
import {
9+
getActiveSSEConnectionCount,
10+
getActiveSSEConnectionsByRoute,
11+
} from '@/lib/monitoring/sse-connections'
12+
13+
const logger = createLogger('MemoryTelemetry')
14+
15+
const MB = 1024 * 1024
16+
17+
let started = false
18+
19+
export function startMemoryTelemetry(intervalMs = 60_000) {
20+
if (started) return
21+
started = true
22+
23+
const timer = setInterval(() => {
24+
const mem = process.memoryUsage()
25+
logger.info('Memory snapshot', {
26+
heapUsedMB: Math.round(mem.heapUsed / MB),
27+
heapTotalMB: Math.round(mem.heapTotal / MB),
28+
rssMB: Math.round(mem.rss / MB),
29+
externalMB: Math.round(mem.external / MB),
30+
uptimeMin: Math.round(process.uptime() / 60),
31+
activeSSEConnections: getActiveSSEConnectionCount(),
32+
sseByRoute: getActiveSSEConnectionsByRoute(),
33+
})
34+
}, intervalMs)
35+
timer.unref()
36+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Tracks active SSE connections by route for memory leak diagnostics.
3+
* Logged alongside periodic memory telemetry to correlate connection
4+
* counts with heap growth.
5+
*/
6+
7+
const connections = new Map<string, number>()
8+
9+
export function incrementSSEConnections(route: string) {
10+
connections.set(route, (connections.get(route) ?? 0) + 1)
11+
}
12+
13+
export function decrementSSEConnections(route: string) {
14+
const count = (connections.get(route) ?? 0) - 1
15+
if (count <= 0) connections.delete(route)
16+
else connections.set(route, count)
17+
}
18+
19+
export function getActiveSSEConnectionCount(): number {
20+
let total = 0
21+
for (const count of connections.values()) total += count
22+
return total
23+
}
24+
25+
export function getActiveSSEConnectionsByRoute(): Record<string, number> {
26+
return Object.fromEntries(connections)
27+
}

0 commit comments

Comments
 (0)