Skip to content

Commit 336bed3

Browse files
committed
fix(queue): resume replay after observed idle
Track queued replay activity as a single session state so automatic dispatch waits for a real opencode idle event. This prevents queued items from running concurrently while still resuming when idle is emitted before the replay API call returns.
1 parent e294688 commit 336bed3

1 file changed

Lines changed: 42 additions & 21 deletions

File tree

index.ts

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ type EntryOp =
2727
| { kind: "command"; source: string; cmd: string; args: string }
2828
| { kind: "shell"; source: string; shell: string }
2929

30-
type State = { items: Item[]; busy: boolean; flushing: boolean; stopped: boolean; failed: boolean }
30+
type Activity = { kind: "idle" } | { kind: "busy" } | { kind: "sending"; idle: boolean }
31+
type State = { items: Item[]; activity: Activity; stopped: boolean; failed: boolean }
3132

3233
type Op =
3334
| { kind: "list" }
@@ -127,7 +128,7 @@ export const QueuePlugin: Plugin = async ({ client }) => {
127128
const state = (sid: string) => {
128129
let current = sessions.get(sid)
129130
if (!current) {
130-
current = { items: [], busy: false, flushing: false, stopped: false, failed: false }
131+
current = { items: [], activity: { kind: "idle" }, stopped: false, failed: false }
131132
sessions.set(sid, current)
132133
}
133134
return current
@@ -229,16 +230,42 @@ export const QueuePlugin: Plugin = async ({ client }) => {
229230
console.warn("QueuePlugin skipped queued item without replayable content")
230231
}
231232

233+
const advance = (sid: string) => {
234+
const current = state(sid)
235+
if (current.activity.kind !== "idle" || current.stopped || !current.items.length) return
236+
void flush(sid, 1)
237+
}
238+
239+
const settle = (sid: string, resume: boolean) => {
240+
const current = state(sid)
241+
current.activity = { kind: "idle" }
242+
if (current.failed) {
243+
current.failed = false
244+
return
245+
}
246+
247+
if (resume) advance(sid)
248+
}
249+
250+
const idle = (sid: string) => {
251+
const current = state(sid)
252+
if (current.activity.kind === "sending") {
253+
current.activity.idle = true
254+
return
255+
}
256+
if (current.activity.kind === "busy") settle(sid, true)
257+
}
258+
232259
const flush = async (sid: string, count = Infinity) => {
233260
const current = state(sid)
234261
const items = current.items.splice(0, count)
235262
if (!items.length) return { sent: 0, failed: 0 }
236263

237-
current.flushing = true
238264
let failed = 0
239265
const retry: Item[] = []
240266
try {
241267
for (const item of items) {
268+
current.activity = { kind: "sending", idle: false }
242269
try {
243270
await replay(sid, item)
244271
} catch (error) {
@@ -250,17 +277,13 @@ export const QueuePlugin: Plugin = async ({ client }) => {
250277
}
251278
} finally {
252279
if (retry.length) current.items.unshift(...retry)
253-
current.flushing = false
280+
const replayCompleted = current.activity.kind === "sending" && current.activity.idle
281+
if (replayCompleted) settle(sid, count === 1 && failed === 0)
282+
else current.activity = failed ? { kind: "idle" } : { kind: "busy" }
254283
}
255284
return { sent: items.length - failed, failed }
256285
}
257286

258-
const advance = (sid: string) => {
259-
const current = state(sid)
260-
if (!current.items.length || current.busy || current.stopped || current.flushing) return
261-
void flush(sid, 1)
262-
}
263-
264287
const manage = async (sid: string, op: ControlOp) => {
265288
const current = state(sid)
266289

@@ -301,7 +324,7 @@ export const QueuePlugin: Plugin = async ({ client }) => {
301324
if (plan(event)) {
302325
const sid = event.properties.sessionID
303326
const current = sessions.get(sid)
304-
if (!current || (!current.flushing && (current.stopped || !current.items.length))) return
327+
if (!current || (current.activity.kind !== "sending" && (current.stopped || !current.items.length))) return
305328
await no(event.properties.id)
306329
await toast("Declined plan approval to continue queued work", "info")
307330
return
@@ -318,23 +341,21 @@ export const QueuePlugin: Plugin = async ({ client }) => {
318341
}
319342

320343
if (event.type === "session.idle") {
321-
const current = state(event.properties.sessionID)
322-
current.busy = false
323-
if (!current.failed) advance(event.properties.sessionID)
324-
current.failed = false
344+
idle(event.properties.sessionID)
325345
return
326346
}
327347

328348
if (event.type !== "session.status") return
329349

330-
const current = state(event.properties.sessionID)
350+
const sid = event.properties.sessionID
351+
const current = state(sid)
331352
if (event.properties.status.type !== "idle") {
332-
current.busy = true
353+
if (current.activity.kind !== "sending") current.activity = { kind: "busy" }
333354
current.failed = false
334355
return
335356
}
336357

337-
current.busy = false
358+
idle(sid)
338359
},
339360
"command.execute.before": async (input, output) => {
340361
const sid = input.sessionID
@@ -346,7 +367,7 @@ export const QueuePlugin: Plugin = async ({ client }) => {
346367
if (!queued) return
347368

348369
const current = sessions.get(sid)
349-
const shouldQueue = Boolean(current?.busy || current?.stopped)
370+
const shouldQueue = Boolean(current && (current.activity.kind !== "idle" || current.stopped))
350371
if (!shouldQueue) {
351372
for (const part of output.parts) if (part.type === "text") part.text = stripSuffix(part.text)
352373
return
@@ -357,7 +378,7 @@ export const QueuePlugin: Plugin = async ({ client }) => {
357378
}
358379

359380
const current = sessions.get(sid)
360-
const shouldQueue = Boolean(current?.busy || current?.stopped)
381+
const shouldQueue = Boolean(current && (current.activity.kind !== "idle" || current.stopped))
361382
const op = parse(parsePrefix(body), parts.length)
362383

363384
if (control(op)) return stop(await manage(sid, op))
@@ -404,7 +425,7 @@ export const QueuePlugin: Plugin = async ({ client }) => {
404425
return
405426
}
406427

407-
if (!current.busy && !current.stopped) {
428+
if (current.activity.kind === "idle" && !current.stopped) {
408429
if (op.kind === "command") return
409430
if (op.kind === "shell") {
410431
hide(output.message.id, text)

0 commit comments

Comments
 (0)