Skip to content

Commit c11075c

Browse files
committed
fix: retry resume during pause persistence
Fixes #3081.
1 parent e9c4251 commit c11075c

File tree

2 files changed

+232
-97
lines changed

2 files changed

+232
-97
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
2+
3+
vi.mock('@sim/db', () => ({
4+
db: {
5+
transaction: vi.fn(),
6+
},
7+
}))
8+
9+
import { db } from '@sim/db'
10+
11+
describe('PauseResumeManager.enqueueOrStartResume', () => {
12+
let PauseResumeManager: typeof import('@/lib/workflows/executor/human-in-the-loop-manager').PauseResumeManager
13+
14+
beforeAll(async () => {
15+
vi.stubEnv('NEXT_PUBLIC_APP_URL', 'http://localhost:3000')
16+
;({ PauseResumeManager } = await import('@/lib/workflows/executor/human-in-the-loop-manager'))
17+
})
18+
19+
beforeEach(() => {
20+
vi.useFakeTimers()
21+
vi.clearAllMocks()
22+
})
23+
24+
afterEach(() => {
25+
vi.useRealTimers()
26+
})
27+
28+
it('retries when paused execution is not yet persisted', async () => {
29+
vi.mocked(db.transaction)
30+
.mockRejectedValueOnce(new Error('Paused execution not found or already resumed'))
31+
.mockRejectedValueOnce(new Error('Paused execution not found or already resumed'))
32+
.mockResolvedValueOnce({
33+
status: 'queued',
34+
resumeExecutionId: 'exec-1',
35+
queuePosition: 1,
36+
} as any)
37+
38+
const promise = PauseResumeManager.enqueueOrStartResume({
39+
executionId: 'exec-1',
40+
contextId: 'ctx-1',
41+
resumeInput: { ok: true },
42+
userId: 'user-1',
43+
})
44+
45+
await vi.runAllTimersAsync()
46+
47+
await expect(promise).resolves.toMatchObject({
48+
status: 'queued',
49+
resumeExecutionId: 'exec-1',
50+
queuePosition: 1,
51+
})
52+
expect(db.transaction).toHaveBeenCalledTimes(3)
53+
})
54+
55+
it('retries when snapshot is not ready yet', async () => {
56+
vi.mocked(db.transaction)
57+
.mockRejectedValueOnce(new Error('Snapshot not ready; execution still finalizing pause'))
58+
.mockResolvedValueOnce({
59+
status: 'queued',
60+
resumeExecutionId: 'exec-2',
61+
queuePosition: 1,
62+
} as any)
63+
64+
const promise = PauseResumeManager.enqueueOrStartResume({
65+
executionId: 'exec-2',
66+
contextId: 'ctx-2',
67+
resumeInput: null,
68+
userId: 'user-2',
69+
})
70+
71+
await vi.runAllTimersAsync()
72+
73+
await expect(promise).resolves.toMatchObject({
74+
status: 'queued',
75+
resumeExecutionId: 'exec-2',
76+
})
77+
expect(db.transaction).toHaveBeenCalledTimes(2)
78+
})
79+
80+
it('does not retry non-transient errors', async () => {
81+
vi.mocked(db.transaction).mockRejectedValueOnce(
82+
new Error('Pause point not found for execution')
83+
)
84+
85+
const promise = PauseResumeManager.enqueueOrStartResume({
86+
executionId: 'exec-3',
87+
contextId: 'ctx-3',
88+
resumeInput: null,
89+
userId: 'user-3',
90+
})
91+
92+
await expect(promise).rejects.toThrow('Pause point not found for execution')
93+
expect(db.transaction).toHaveBeenCalledTimes(1)
94+
})
95+
})

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 137 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,22 @@ import type { SerializedConnection } from '@/serializer/types'
1313

1414
const logger = createLogger('HumanInTheLoopManager')
1515

16+
const _RESUME_LOOKUP_MAX_ATTEMPTS = 8
17+
const _RESUME_LOOKUP_INITIAL_DELAY_MS = 25
18+
const _RESUME_LOOKUP_MAX_DELAY_MS = 400
19+
20+
function _sleep(ms: number): Promise<void> {
21+
return new Promise((resolve) => setTimeout(resolve, ms))
22+
}
23+
24+
function _is_transient_resume_lookup_error(error: unknown): boolean {
25+
const msg = error instanceof Error ? error.message : String(error)
26+
return (
27+
msg.includes('Paused execution not found') ||
28+
msg.includes('Snapshot not ready; execution still finalizing pause')
29+
)
30+
}
31+
1632
interface ResumeQueueEntrySummary {
1733
id: string
1834
pausedExecutionId: string
@@ -162,120 +178,144 @@ export class PauseResumeManager {
162178
static async enqueueOrStartResume(args: EnqueueResumeArgs): Promise<EnqueueResumeResult> {
163179
const { executionId, contextId, resumeInput, userId } = args
164180

165-
return await db.transaction(async (tx) => {
166-
const pausedExecution = await tx
167-
.select()
168-
.from(pausedExecutions)
169-
.where(eq(pausedExecutions.executionId, executionId))
170-
.for('update')
171-
.limit(1)
172-
.then((rows) => rows[0])
181+
let delayMs = _RESUME_LOOKUP_INITIAL_DELAY_MS
182+
let lastError: unknown
183+
for (let attempt = 1; attempt <= _RESUME_LOOKUP_MAX_ATTEMPTS; attempt++) {
184+
try {
185+
return await db.transaction(async (tx) => {
186+
const pausedExecution = await tx
187+
.select()
188+
.from(pausedExecutions)
189+
.where(eq(pausedExecutions.executionId, executionId))
190+
.for('update')
191+
.limit(1)
192+
.then((rows) => rows[0])
193+
194+
if (!pausedExecution) {
195+
throw new Error('Paused execution not found or already resumed')
196+
}
173197

174-
if (!pausedExecution) {
175-
throw new Error('Paused execution not found or already resumed')
176-
}
198+
const pausePoints = pausedExecution.pausePoints as Record<string, any>
199+
const pausePoint = pausePoints?.[contextId]
200+
if (!pausePoint) {
201+
throw new Error('Pause point not found for execution')
202+
}
203+
if (pausePoint.resumeStatus !== 'paused') {
204+
throw new Error('Pause point already resumed or in progress')
205+
}
206+
if (!pausePoint.snapshotReady) {
207+
throw new Error('Snapshot not ready; execution still finalizing pause')
208+
}
177209

178-
const pausePoints = pausedExecution.pausePoints as Record<string, any>
179-
const pausePoint = pausePoints?.[contextId]
180-
if (!pausePoint) {
181-
throw new Error('Pause point not found for execution')
182-
}
183-
if (pausePoint.resumeStatus !== 'paused') {
184-
throw new Error('Pause point already resumed or in progress')
185-
}
186-
if (!pausePoint.snapshotReady) {
187-
throw new Error('Snapshot not ready; execution still finalizing pause')
188-
}
210+
const activeResume = await tx
211+
.select({ id: resumeQueue.id })
212+
.from(resumeQueue)
213+
.where(
214+
and(
215+
eq(resumeQueue.parentExecutionId, executionId),
216+
inArray(resumeQueue.status, ['claimed'] as const)
217+
)
218+
)
219+
.limit(1)
220+
.then((rows) => rows[0])
221+
222+
const resumeExecutionId = executionId
223+
const now = new Date()
224+
225+
if (activeResume) {
226+
const [entry] = await tx
227+
.insert(resumeQueue)
228+
.values({
229+
id: randomUUID(),
230+
pausedExecutionId: pausedExecution.id,
231+
parentExecutionId: executionId,
232+
newExecutionId: resumeExecutionId,
233+
contextId,
234+
resumeInput: resumeInput ?? null,
235+
status: 'pending',
236+
queuedAt: now,
237+
})
238+
.returning({ id: resumeQueue.id, queuedAt: resumeQueue.queuedAt })
189239

190-
const activeResume = await tx
191-
.select({ id: resumeQueue.id })
192-
.from(resumeQueue)
193-
.where(
194-
and(
195-
eq(resumeQueue.parentExecutionId, executionId),
196-
inArray(resumeQueue.status, ['claimed'] as const)
197-
)
198-
)
199-
.limit(1)
200-
.then((rows) => rows[0])
240+
await tx
241+
.update(pausedExecutions)
242+
.set({
243+
pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"queued"'::jsonb)`,
244+
})
245+
.where(eq(pausedExecutions.id, pausedExecution.id))
246+
247+
pausePoint.resumeStatus = 'queued'
248+
249+
const [positionRow = { position: 0 }] = await tx
250+
.select({ position: sql<number>`count(*)` })
251+
.from(resumeQueue)
252+
.where(
253+
and(
254+
eq(resumeQueue.parentExecutionId, executionId),
255+
eq(resumeQueue.status, 'pending'),
256+
lt(resumeQueue.queuedAt, entry.queuedAt)
257+
)
258+
)
201259

202-
const resumeExecutionId = executionId
203-
const now = new Date()
260+
return {
261+
status: 'queued',
262+
resumeExecutionId,
263+
queuePosition: Number(positionRow.position ?? 0) + 1,
264+
}
265+
}
204266

205-
if (activeResume) {
206-
const [entry] = await tx
207-
.insert(resumeQueue)
208-
.values({
209-
id: randomUUID(),
267+
const resumeEntryId = randomUUID()
268+
await tx.insert(resumeQueue).values({
269+
id: resumeEntryId,
210270
pausedExecutionId: pausedExecution.id,
211271
parentExecutionId: executionId,
212272
newExecutionId: resumeExecutionId,
213273
contextId,
214274
resumeInput: resumeInput ?? null,
215-
status: 'pending',
275+
status: 'claimed',
216276
queuedAt: now,
277+
claimedAt: now,
217278
})
218-
.returning({ id: resumeQueue.id, queuedAt: resumeQueue.queuedAt })
219279

220-
await tx
221-
.update(pausedExecutions)
222-
.set({
223-
pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"queued"'::jsonb)`,
224-
})
225-
.where(eq(pausedExecutions.id, pausedExecution.id))
226-
227-
pausePoint.resumeStatus = 'queued'
228-
229-
const [positionRow = { position: 0 }] = await tx
230-
.select({ position: sql<number>`count(*)` })
231-
.from(resumeQueue)
232-
.where(
233-
and(
234-
eq(resumeQueue.parentExecutionId, executionId),
235-
eq(resumeQueue.status, 'pending'),
236-
lt(resumeQueue.queuedAt, entry.queuedAt)
237-
)
238-
)
280+
await tx
281+
.update(pausedExecutions)
282+
.set({
283+
pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"resuming"'::jsonb)`,
284+
})
285+
.where(eq(pausedExecutions.id, pausedExecution.id))
239286

240-
return {
241-
status: 'queued',
242-
resumeExecutionId,
243-
queuePosition: Number(positionRow.position ?? 0) + 1,
244-
}
245-
}
287+
pausePoint.resumeStatus = 'resuming'
246288

247-
const resumeEntryId = randomUUID()
248-
await tx.insert(resumeQueue).values({
249-
id: resumeEntryId,
250-
pausedExecutionId: pausedExecution.id,
251-
parentExecutionId: executionId,
252-
newExecutionId: resumeExecutionId,
253-
contextId,
254-
resumeInput: resumeInput ?? null,
255-
status: 'claimed',
256-
queuedAt: now,
257-
claimedAt: now,
258-
})
259-
260-
await tx
261-
.update(pausedExecutions)
262-
.set({
263-
pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"resuming"'::jsonb)`,
289+
return {
290+
status: 'starting',
291+
resumeExecutionId,
292+
resumeEntryId,
293+
pausedExecution,
294+
contextId,
295+
resumeInput,
296+
userId,
297+
}
264298
})
265-
.where(eq(pausedExecutions.id, pausedExecution.id))
266-
267-
pausePoint.resumeStatus = 'resuming'
268-
269-
return {
270-
status: 'starting',
271-
resumeExecutionId,
272-
resumeEntryId,
273-
pausedExecution,
274-
contextId,
275-
resumeInput,
276-
userId,
299+
} catch (error) {
300+
lastError = error
301+
const shouldRetry = _is_transient_resume_lookup_error(error)
302+
if (!shouldRetry || attempt >= _RESUME_LOOKUP_MAX_ATTEMPTS) {
303+
throw error
304+
}
305+
logger.warn(
306+
`Transient resume lookup failure; retrying (attempt ${attempt}/${_RESUME_LOOKUP_MAX_ATTEMPTS})`,
307+
{
308+
executionId,
309+
contextId,
310+
delayMs,
311+
error: error instanceof Error ? error.message : String(error),
312+
}
313+
)
314+
await _sleep(delayMs)
315+
delayMs = Math.min(delayMs * 2, _RESUME_LOOKUP_MAX_DELAY_MS)
277316
}
278-
})
317+
}
318+
throw lastError
279319
}
280320

281321
static async startResumeExecution(args: StartResumeExecutionArgs): Promise<void> {

0 commit comments

Comments
 (0)