Skip to content

Commit 6b407ee

Browse files
committed
refactor(parallel): extract shared updateActiveBlockRefCount helper
1 parent 04fbe1d commit 6b407ee

File tree

2 files changed

+44
-31
lines changed

2 files changed

+44
-31
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
TriggerUtils,
2121
} from '@/lib/workflows/triggers/triggers'
2222
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
23+
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
2324
import { getBlock } from '@/blocks'
2425
import type { SerializableExecutionState } from '@/executor/execution/types'
2526
import type {
@@ -329,20 +330,7 @@ export function useWorkflowExecution() {
329330

330331
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
331332
if (!workflowId) return
332-
if (isActive) {
333-
const count = activeBlockRefCounts.get(blockId) ?? 0
334-
activeBlockRefCounts.set(blockId, count + 1)
335-
activeBlocksSet.add(blockId)
336-
} else {
337-
const count = activeBlockRefCounts.get(blockId) ?? 1
338-
const next = count - 1
339-
if (next <= 0) {
340-
activeBlockRefCounts.delete(blockId)
341-
activeBlocksSet.delete(blockId)
342-
} else {
343-
activeBlockRefCounts.set(blockId, next)
344-
}
345-
}
333+
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
346334
setActiveBlocks(workflowId, new Set(activeBlocksSet))
347335
}
348336

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,30 @@ import { useTerminalConsoleStore } from '@/stores/terminal'
55
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
66
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
77

8+
/**
9+
* Updates the active blocks set and ref counts for a single block.
10+
* Ref counting ensures a block stays active until all parallel branches for it complete.
11+
*/
12+
export function updateActiveBlockRefCount(
13+
refCounts: Map<string, number>,
14+
activeSet: Set<string>,
15+
blockId: string,
16+
isActive: boolean
17+
): void {
18+
if (isActive) {
19+
refCounts.set(blockId, (refCounts.get(blockId) ?? 0) + 1)
20+
activeSet.add(blockId)
21+
} else {
22+
const next = (refCounts.get(blockId) ?? 1) - 1
23+
if (next <= 0) {
24+
refCounts.delete(blockId)
25+
activeSet.delete(blockId)
26+
} else {
27+
refCounts.set(blockId, next)
28+
}
29+
}
30+
}
31+
832
export interface WorkflowExecutionOptions {
933
workflowInput?: any
1034
onStream?: (se: StreamingExecution) => Promise<void>
@@ -104,9 +128,12 @@ export async function executeWorkflowWithFullLogging(
104128

105129
switch (event.type) {
106130
case 'block:started': {
107-
const startCount = activeBlockRefCounts.get(event.data.blockId) ?? 0
108-
activeBlockRefCounts.set(event.data.blockId, startCount + 1)
109-
activeBlocksSet.add(event.data.blockId)
131+
updateActiveBlockRefCount(
132+
activeBlockRefCounts,
133+
activeBlocksSet,
134+
event.data.blockId,
135+
true
136+
)
110137
setActiveBlocks(wfId, new Set(activeBlocksSet))
111138

112139
const incomingEdges = workflowEdges.filter(
@@ -119,13 +146,12 @@ export async function executeWorkflowWithFullLogging(
119146
}
120147

121148
case 'block:completed': {
122-
const completeCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
123-
if (completeCount <= 1) {
124-
activeBlockRefCounts.delete(event.data.blockId)
125-
activeBlocksSet.delete(event.data.blockId)
126-
} else {
127-
activeBlockRefCounts.set(event.data.blockId, completeCount - 1)
128-
}
149+
updateActiveBlockRefCount(
150+
activeBlockRefCounts,
151+
activeBlocksSet,
152+
event.data.blockId,
153+
false
154+
)
129155
setActiveBlocks(wfId, new Set(activeBlocksSet))
130156

131157
setBlockRunStatus(wfId, event.data.blockId, 'success')
@@ -156,13 +182,12 @@ export async function executeWorkflowWithFullLogging(
156182
}
157183

158184
case 'block:error': {
159-
const errorCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
160-
if (errorCount <= 1) {
161-
activeBlockRefCounts.delete(event.data.blockId)
162-
activeBlocksSet.delete(event.data.blockId)
163-
} else {
164-
activeBlockRefCounts.set(event.data.blockId, errorCount - 1)
165-
}
185+
updateActiveBlockRefCount(
186+
activeBlockRefCounts,
187+
activeBlocksSet,
188+
event.data.blockId,
189+
false
190+
)
166191
setActiveBlocks(wfId, new Set(activeBlocksSet))
167192

168193
setBlockRunStatus(wfId, event.data.blockId, 'error')

0 commit comments

Comments
 (0)