Skip to content

Commit 4707465

Browse files
committed
improvement(monitoring): add SSE metering to wand, execution-stream, and a2a-message endpoints
1 parent 979cae7 commit 4707465

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

apps/sim/app/api/a2a/serve/[agentId]/route.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
1919
import { SSE_HEADERS } from '@/lib/core/utils/sse'
2020
import { getBaseUrl } from '@/lib/core/utils/urls'
2121
import { markExecutionCancelled } from '@/lib/execution/cancellation'
22-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
22+
import {
23+
decrementSSEConnections,
24+
incrementSSEConnections,
25+
} from '@/lib/monitoring/sse-connections'
2326
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
2427
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
2528
import {
@@ -631,9 +634,11 @@ async function handleMessageStream(
631634
}
632635

633636
const encoder = new TextEncoder()
637+
let messageStreamDecremented = false
634638

635639
const stream = new ReadableStream({
636640
async start(controller) {
641+
incrementSSEConnections('a2a-message')
637642
const sendEvent = (event: string, data: unknown) => {
638643
try {
639644
const jsonRpcResponse = {
@@ -842,9 +847,19 @@ async function handleMessageStream(
842847
})
843848
} finally {
844849
await releaseLock(lockKey, lockValue)
850+
if (!messageStreamDecremented) {
851+
messageStreamDecremented = true
852+
decrementSSEConnections('a2a-message')
853+
}
845854
controller.close()
846855
}
847856
},
857+
cancel() {
858+
if (!messageStreamDecremented) {
859+
messageStreamDecremented = true
860+
decrementSSEConnections('a2a-message')
861+
}
862+
},
848863
})
849864

850865
return new NextResponse(stream, {

apps/sim/app/api/wand/route.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
1010
import { env } from '@/lib/core/config/env'
1111
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
1212
import { generateRequestId } from '@/lib/core/utils/request'
13+
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1314
import { enrichTableSchema } from '@/lib/table/llm/wand'
1415
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
1516
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
@@ -330,10 +331,13 @@ export async function POST(req: NextRequest) {
330331
const encoder = new TextEncoder()
331332
const decoder = new TextDecoder()
332333

334+
let wandStreamClosed = false
333335
const readable = new ReadableStream({
334336
async start(controller) {
337+
incrementSSEConnections('wand')
335338
const reader = response.body?.getReader()
336339
if (!reader) {
340+
decrementSSEConnections('wand')
337341
controller.close()
338342
return
339343
}
@@ -478,6 +482,16 @@ export async function POST(req: NextRequest) {
478482
controller.close()
479483
} finally {
480484
reader.releaseLock()
485+
if (!wandStreamClosed) {
486+
wandStreamClosed = true
487+
decrementSSEConnections('wand')
488+
}
489+
}
490+
},
491+
cancel() {
492+
if (!wandStreamClosed) {
493+
wandStreamClosed = true
494+
decrementSSEConnections('wand')
481495
}
482496
},
483497
})

apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
getExecutionMeta,
88
readExecutionEvents,
99
} from '@/lib/execution/event-buffer'
10+
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1011
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
1112
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
1213

@@ -73,8 +74,10 @@ export async function GET(
7374

7475
let closed = false
7576

77+
let sseDecremented = false
7678
const stream = new ReadableStream<Uint8Array>({
7779
async start(controller) {
80+
incrementSSEConnections('execution-stream-reconnect')
7881
let lastEventId = fromEventId
7982
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
8083

@@ -142,11 +145,20 @@ export async function GET(
142145
controller.close()
143146
} catch {}
144147
}
148+
} finally {
149+
if (!sseDecremented) {
150+
sseDecremented = true
151+
decrementSSEConnections('execution-stream-reconnect')
152+
}
145153
}
146154
},
147155
cancel() {
148156
closed = true
149157
logger.info('Client disconnected from reconnection stream', { executionId })
158+
if (!sseDecremented) {
159+
sseDecremented = true
160+
decrementSSEConnections('execution-stream-reconnect')
161+
}
150162
},
151163
})
152164

0 commit comments

Comments
 (0)