Skip to content

Commit fd967b2

Browse files
committed
fix(snapshot): changed insert to upsert when concurrent identical child workflows are running
1 parent dcf8137 commit fd967b2

File tree

2 files changed

+218
-39
lines changed

2 files changed

+218
-39
lines changed

apps/sim/lib/logs/execution/snapshot/service.test.ts

Lines changed: 200 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,43 @@
1-
import { describe, expect, it } from 'vitest'
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
vi.mock('@sim/db', () => databaseMock)
8+
vi.mock('@sim/db/schema', () => ({}))
9+
vi.mock('@sim/logger', () => loggerMock)
10+
vi.mock('drizzle-orm', () => drizzleOrmMock)
11+
vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') }))
12+
213
import { SnapshotService } from '@/lib/logs/execution/snapshot/service'
314
import type { WorkflowState } from '@/lib/logs/types'
415

16+
const mockState: WorkflowState = {
17+
blocks: {
18+
block1: {
19+
id: 'block1',
20+
name: 'Test Agent',
21+
type: 'agent',
22+
position: { x: 100, y: 200 },
23+
subBlocks: {},
24+
outputs: {},
25+
enabled: true,
26+
horizontalHandles: true,
27+
advancedMode: false,
28+
height: 0,
29+
},
30+
},
31+
edges: [{ id: 'edge1', source: 'block1', target: 'block2' }],
32+
loops: {},
33+
parallels: {},
34+
}
35+
536
describe('SnapshotService', () => {
37+
beforeEach(() => {
38+
vi.clearAllMocks()
39+
})
40+
641
describe('computeStateHash', () => {
742
it.concurrent('should generate consistent hashes for identical states', () => {
843
const service = new SnapshotService()
@@ -62,7 +97,7 @@ describe('SnapshotService', () => {
6297
blocks: {
6398
block1: {
6499
...baseState.blocks.block1,
65-
position: { x: 500, y: 600 }, // Different position
100+
position: { x: 500, y: 600 },
66101
},
67102
},
68103
}
@@ -140,7 +175,7 @@ describe('SnapshotService', () => {
140175
const state2: WorkflowState = {
141176
blocks: {},
142177
edges: [
143-
{ id: 'edge2', source: 'b', target: 'c' }, // Different order
178+
{ id: 'edge2', source: 'b', target: 'c' },
144179
{ id: 'edge1', source: 'a', target: 'b' },
145180
],
146181
loops: {},
@@ -219,7 +254,6 @@ describe('SnapshotService', () => {
219254
const hash = service.computeStateHash(complexState)
220255
expect(hash).toHaveLength(64)
221256

222-
// Should be consistent
223257
const hash2 = service.computeStateHash(complexState)
224258
expect(hash).toBe(hash2)
225259
})
@@ -335,4 +369,166 @@ describe('SnapshotService', () => {
335369
expect(hash1).toHaveLength(64)
336370
})
337371
})
372+
373+
describe('createSnapshotWithDeduplication', () => {
374+
it('should use upsert to insert a new snapshot', async () => {
375+
const service = new SnapshotService()
376+
const workflowId = 'wf-123'
377+
378+
const mockReturning = vi.fn().mockResolvedValue([
379+
{
380+
id: 'generated-uuid-1',
381+
workflowId,
382+
stateHash: 'abc123',
383+
stateData: mockState,
384+
createdAt: new Date('2026-02-19T00:00:00Z'),
385+
},
386+
])
387+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
388+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
389+
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
390+
databaseMock.db.insert = mockInsert
391+
392+
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
393+
394+
expect(mockInsert).toHaveBeenCalled()
395+
expect(mockValues).toHaveBeenCalledWith(
396+
expect.objectContaining({
397+
id: 'generated-uuid-1',
398+
workflowId,
399+
stateData: mockState,
400+
})
401+
)
402+
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith(
403+
expect.objectContaining({
404+
set: expect.any(Object),
405+
})
406+
)
407+
expect(result.snapshot.id).toBe('generated-uuid-1')
408+
expect(result.isNew).toBe(true)
409+
})
410+
411+
it('should detect reused snapshot when returned id differs from generated id', async () => {
412+
const service = new SnapshotService()
413+
const workflowId = 'wf-123'
414+
415+
const mockReturning = vi.fn().mockResolvedValue([
416+
{
417+
id: 'existing-snapshot-id',
418+
workflowId,
419+
stateHash: 'abc123',
420+
stateData: mockState,
421+
createdAt: new Date('2026-02-19T00:00:00Z'),
422+
},
423+
])
424+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
425+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
426+
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
427+
databaseMock.db.insert = mockInsert
428+
429+
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
430+
431+
expect(result.snapshot.id).toBe('existing-snapshot-id')
432+
expect(result.isNew).toBe(false)
433+
})
434+
435+
it('should not throw on concurrent inserts with the same hash', async () => {
436+
const service = new SnapshotService()
437+
const workflowId = 'wf-123'
438+
439+
const mockReturningNew = vi.fn().mockResolvedValue([
440+
{
441+
id: 'generated-uuid-1',
442+
workflowId,
443+
stateHash: 'abc123',
444+
stateData: mockState,
445+
createdAt: new Date('2026-02-19T00:00:00Z'),
446+
},
447+
])
448+
const mockReturningExisting = vi.fn().mockResolvedValue([
449+
{
450+
id: 'existing-snapshot-id',
451+
workflowId,
452+
stateHash: 'abc123',
453+
stateData: mockState,
454+
createdAt: new Date('2026-02-19T00:00:00Z'),
455+
},
456+
])
457+
458+
let callCount = 0
459+
databaseMock.db.insert = vi.fn().mockImplementation(() => ({
460+
values: vi.fn().mockImplementation(() => ({
461+
onConflictDoUpdate: vi.fn().mockImplementation(() => ({
462+
returning: callCount++ === 0 ? mockReturningNew : mockReturningExisting,
463+
})),
464+
})),
465+
}))
466+
467+
const [result1, result2] = await Promise.all([
468+
service.createSnapshotWithDeduplication(workflowId, mockState),
469+
service.createSnapshotWithDeduplication(workflowId, mockState),
470+
])
471+
472+
expect(result1.snapshot.id).toBe('generated-uuid-1')
473+
expect(result1.isNew).toBe(true)
474+
expect(result2.snapshot.id).toBe('existing-snapshot-id')
475+
expect(result2.isNew).toBe(false)
476+
})
477+
478+
it('should pass state_data in the ON CONFLICT SET clause', async () => {
479+
const service = new SnapshotService()
480+
const workflowId = 'wf-123'
481+
482+
let capturedConflictConfig: Record<string, unknown> | undefined
483+
const mockReturning = vi.fn().mockResolvedValue([
484+
{
485+
id: 'generated-uuid-1',
486+
workflowId,
487+
stateHash: 'abc123',
488+
stateData: mockState,
489+
createdAt: new Date('2026-02-19T00:00:00Z'),
490+
},
491+
])
492+
493+
databaseMock.db.insert = vi.fn().mockReturnValue({
494+
values: vi.fn().mockReturnValue({
495+
onConflictDoUpdate: vi.fn().mockImplementation((config: Record<string, unknown>) => {
496+
capturedConflictConfig = config
497+
return { returning: mockReturning }
498+
}),
499+
}),
500+
})
501+
502+
await service.createSnapshotWithDeduplication(workflowId, mockState)
503+
504+
expect(capturedConflictConfig).toBeDefined()
505+
expect(capturedConflictConfig!.target).toBeDefined()
506+
expect(capturedConflictConfig!.set).toBeDefined()
507+
expect(capturedConflictConfig!.set).toHaveProperty('stateData')
508+
})
509+
510+
it('should always call insert, never a separate select for deduplication', async () => {
511+
const service = new SnapshotService()
512+
const workflowId = 'wf-123'
513+
514+
const mockReturning = vi.fn().mockResolvedValue([
515+
{
516+
id: 'generated-uuid-1',
517+
workflowId,
518+
stateHash: 'abc123',
519+
stateData: mockState,
520+
createdAt: new Date('2026-02-19T00:00:00Z'),
521+
},
522+
])
523+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
524+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
525+
databaseMock.db.insert = vi.fn().mockReturnValue({ values: mockValues })
526+
databaseMock.db.select = vi.fn()
527+
528+
await service.createSnapshotWithDeduplication(workflowId, mockState)
529+
530+
expect(databaseMock.db.insert).toHaveBeenCalledTimes(1)
531+
expect(databaseMock.db.select).not.toHaveBeenCalled()
532+
})
533+
})
338534
})

apps/sim/lib/logs/execution/snapshot/service.ts

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { createHash } from 'crypto'
22
import { db } from '@sim/db'
33
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
44
import { createLogger } from '@sim/logger'
5-
import { and, eq, lt, notExists } from 'drizzle-orm'
5+
import { and, eq, lt, notExists, sql } from 'drizzle-orm'
66
import { v4 as uuidv4 } from 'uuid'
77
import type {
88
SnapshotService as ISnapshotService,
@@ -28,58 +28,41 @@ export class SnapshotService implements ISnapshotService {
2828
workflowId: string,
2929
state: WorkflowState
3030
): Promise<SnapshotCreationResult> {
31-
// Hash the position-less state for deduplication (functional equivalence)
3231
const stateHash = this.computeStateHash(state)
3332

34-
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
35-
if (existingSnapshot) {
36-
let refreshedState: WorkflowState = existingSnapshot.stateData
37-
try {
38-
await db
39-
.update(workflowExecutionSnapshots)
40-
.set({ stateData: state })
41-
.where(eq(workflowExecutionSnapshots.id, existingSnapshot.id))
42-
refreshedState = state
43-
} catch (error) {
44-
logger.warn(
45-
`Failed to refresh snapshot stateData for ${existingSnapshot.id}, continuing with existing data`,
46-
error
47-
)
48-
}
49-
50-
logger.info(
51-
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
52-
)
53-
return {
54-
snapshot: { ...existingSnapshot, stateData: refreshedState },
55-
isNew: false,
56-
}
57-
}
58-
59-
// Store the FULL state (including positions) so we can recreate the exact workflow
60-
// Even though we hash without positions, we want to preserve the complete state
6133
const snapshotData: WorkflowExecutionSnapshotInsert = {
6234
id: uuidv4(),
6335
workflowId,
6436
stateHash,
6537
stateData: state,
6638
}
6739

68-
const [newSnapshot] = await db
40+
const [upsertedSnapshot] = await db
6941
.insert(workflowExecutionSnapshots)
7042
.values(snapshotData)
43+
.onConflictDoUpdate({
44+
target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash],
45+
set: {
46+
stateData: sql`excluded.state_data`,
47+
},
48+
})
7149
.returning()
7250

51+
const isNew = upsertedSnapshot.id === snapshotData.id
52+
7353
logger.info(
74-
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
54+
isNew
55+
? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
56+
: `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
7557
)
58+
7659
return {
7760
snapshot: {
78-
...newSnapshot,
79-
stateData: newSnapshot.stateData as WorkflowState,
80-
createdAt: newSnapshot.createdAt.toISOString(),
61+
...upsertedSnapshot,
62+
stateData: upsertedSnapshot.stateData as WorkflowState,
63+
createdAt: upsertedSnapshot.createdAt.toISOString(),
8164
},
82-
isNew: true,
65+
isNew,
8366
}
8467
}
8568

0 commit comments

Comments
 (0)