From b07a1b269f528b96c7ef45e27582c20bd062be90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Mon, 22 Sep 2025 12:53:28 +0200 Subject: [PATCH 1/3] Add push() options ## Details - Add `{ removeOngoingTasksWithSameName }` options to `push()` method. --- README.md | 19 +++++++++++-- src/AwaitQueue.ts | 54 ++++++++++++++++++++++++++----------- src/index.ts | 6 ++++- src/tests/test.ts | 69 ++++++++++++++++++++++++++++++++++++++++++++--- src/types.ts | 4 +++ 5 files changed, 129 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index a0f3e8d..7620d40 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ In ESM: ```ts import { AwaitQueue, + AwaitQueuePushOptions, AwaitQueueTask, AwaitQueueTaskDump, AwaitQueueStoppedError, @@ -31,6 +32,7 @@ Using CommonJS: ```ts const { AwaitQueue, + AwaitQueuePushOptions, AwaitQueueTask, AwaitQueueTaskDump, AwaitQueueStoppedError, @@ -40,6 +42,18 @@ const { ## Types +### `type AwaitQueuePushOptions` + +```ts +export type AwaitQueuePushOptions = { + removeOngoingTasksWithSameName?: boolean; +}; +``` + +Options given to `awaitQueue.push()`. + +- `removeOngoingTasksWithSameName`: If `true`, all previously enqueued tasks with same name will be aborted and will reject with an instance of `AwaitQueueRemovedTaskError`. + ### `type AwaitQueueTask` ```ts @@ -87,13 +101,14 @@ Number of enqueued pending tasks in the queue (including the running one if any) #### Method `awaitQueue.push()` ```ts -async push(task: AwaitQueueTask, name?: string): Promise +async push(task: AwaitQueueTask, name?: string, options?: AwaitQueuePushOptions): Promise ``` Accepts a task as argument and enqueues it after pending tasks. Once processed, the `push()` method resolves (or rejects) with the result (or error) returned by the given task. - `@param task`: Asynchronous or asynchronous function. -- `@param name`: Optional task name (useful for `awaitQueue.dump()` method). +- `@param name`: Optional task name. +- `@param.options`: Options. #### Method `awaitQueue.stop()` diff --git a/src/AwaitQueue.ts b/src/AwaitQueue.ts index 0be1ef4..424ad2d 100644 --- a/src/AwaitQueue.ts +++ b/src/AwaitQueue.ts @@ -1,5 +1,9 @@ import { Logger } from './Logger'; -import type { AwaitQueueTask, AwaitQueueTaskDump } from './types'; +import type { + AwaitQueuePushOptions, + AwaitQueueTask, + AwaitQueueTaskDump, +} from './types'; import { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors'; const logger = new Logger('AwaitQueue'); @@ -12,7 +16,10 @@ type PendingTask = { executedAt?: number; completed: boolean; resolve: (result: T | PromiseLike) => void; - reject: (error: Error) => void; + reject: ( + error: Error, + { canExecuteNextTask }: { canExecuteNextTask: boolean } + ) => void; }; export class AwaitQueue { @@ -20,8 +27,6 @@ export class AwaitQueue { private readonly pendingTasks: Map> = new Map(); // Incrementing PendingTask id. private nextTaskId = 0; - // Whether stop() method is stopping all pending tasks. - private stopping = false; constructor() { logger.debug('constructor()'); @@ -31,10 +36,14 @@ export class AwaitQueue { return this.pendingTasks.size; } - async push(task: AwaitQueueTask, name?: string): Promise { + async push( + task: AwaitQueueTask, + name?: string, + options?: AwaitQueuePushOptions + ): Promise { name = name ?? task.name; - logger.debug(`push() [name:${name}]`); + logger.debug(`push() [name:${name}, options:%o]`, options); if (typeof task !== 'function') { throw new TypeError('given task is not a function'); @@ -47,6 +56,16 @@ export class AwaitQueue { } return new Promise((resolve, reject) => { + if (name && options?.removeOngoingTasksWithSameName) { + for (const pendingTask of this.pendingTasks.values()) { + if (pendingTask.name === name) { + pendingTask.reject(new AwaitQueueRemovedTaskError(), { + canExecuteNextTask: false, + }); + } + } + } + const pendingTask: PendingTask = { id: this.nextTaskId++, task: task, @@ -86,7 +105,10 @@ export class AwaitQueue { void this.execute(nextPendingTask); } }, - reject: (error: Error) => { + reject: ( + error: Error, + { canExecuteNextTask }: { canExecuteNextTask: boolean } + ) => { // pendingTask.reject() can be called within execute() method if the // task completed with error. However it may have also been called in // stop() or remove() methods (before or while being executed) so its @@ -109,8 +131,8 @@ export class AwaitQueue { // Reject the task with the obtained error. reject(error); - // Execute the next pending task (if any) unless stop() is running. - if (!this.stopping) { + // May execute next pending task (if any). + if (canExecuteNextTask) { const [nextPendingTask] = this.pendingTasks.values(); // NOTE: During the reject() callback the user app may have interacted @@ -137,15 +159,13 @@ export class AwaitQueue { stop(): void { logger.debug('stop()'); - this.stopping = true; - for (const pendingTask of this.pendingTasks.values()) { logger.debug(`stop() | stopping task [name:${pendingTask.name}]`); - pendingTask.reject(new AwaitQueueStoppedError()); + pendingTask.reject(new AwaitQueueStoppedError(), { + canExecuteNextTask: false, + }); } - - this.stopping = false; } remove(taskIdx: number): void { @@ -159,7 +179,9 @@ export class AwaitQueue { return; } - pendingTask.reject(new AwaitQueueRemovedTaskError()); + pendingTask.reject(new AwaitQueueRemovedTaskError(), { + canExecuteNextTask: true, + }); } dump(): AwaitQueueTaskDump[] { @@ -193,7 +215,7 @@ export class AwaitQueue { pendingTask.resolve(result); } catch (error) { // Reject the task with its rejected error. - pendingTask.reject(error as Error); + pendingTask.reject(error as Error, { canExecuteNextTask: true }); } } } diff --git a/src/index.ts b/src/index.ts index 54ed4ae..d541fbc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,7 @@ export { AwaitQueue } from './AwaitQueue'; export { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors'; -export type { AwaitQueueTask, AwaitQueueTaskDump } from './types'; +export type { + AwaitQueuePushOptions, + AwaitQueueTask, + AwaitQueueTaskDump, +} from './types'; diff --git a/src/tests/test.ts b/src/tests/test.ts index 80911bf..6b349f9 100644 --- a/src/tests/test.ts +++ b/src/tests/test.ts @@ -191,7 +191,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing' const executionsCount: Map = new Map(); const emitter = new EventEmitter(); - const taskA = function (): Promise { + const taskA = async function (): Promise { const taskName = 'taskA'; return new Promise(resolve => { @@ -203,7 +203,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing' }); }; - const taskB = function (): Promise { + const taskB = async function (): Promise { const taskName = 'taskB'; return new Promise(resolve => { @@ -215,7 +215,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing' }); }; - const taskC = function (): Promise { + const taskC = async function (): Promise { const taskName = 'taskC'; return new Promise(resolve => { @@ -227,7 +227,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing' }); }; - const taskD = function (): Promise { + const taskD = async function (): Promise { const taskName = 'taskD'; return new Promise(resolve => { @@ -277,6 +277,67 @@ test('new task does not lead to next task execution if a stopped one is ongoing' expect(executionsCount.get('taskD')).toBe(1); }, 1000); +test('removeOngoingTasksWithSameName option cancels ongoing tasks with same name', async () => { + const awaitQueue = new AwaitQueue(); + const executionsCount: Map = new Map(); + const emitter = new EventEmitter(); + let removedTaskACount = 0; + + const taskA = async function (): Promise { + const taskName = 'taskA'; + + return new Promise(resolve => { + let executionCount = executionsCount.get(taskName) ?? 0; + + executionsCount.set(taskName, ++executionCount); + + emitter.on('resolve-task-a', resolve); + }); + }; + + const taskB = async function (): Promise { + const taskName = 'taskB'; + + return new Promise(resolve => { + let executionCount = executionsCount.get(taskName) ?? 0; + + executionsCount.set(taskName, ++executionCount); + + emitter.on('resolve-task-b', resolve); + }); + }; + + for (let i = 0; i < 10; ++i) { + awaitQueue + .push(taskA, 'taskA', { removeOngoingTasksWithSameName: false }) + .catch(error => { + if (error instanceof AwaitQueueRemovedTaskError) { + ++removedTaskACount; + } + }); + } + + const lastTaskAPromise = awaitQueue.push(taskA, 'taskA', { + removeOngoingTasksWithSameName: true, + }); + + const taskBPromise = awaitQueue.push(taskB, 'taskB', { + removeOngoingTasksWithSameName: true, + }); + + expectDumpToContain(awaitQueue, ['taskA', 'taskB']); + + emitter.emit('resolve-task-a'); + + await lastTaskAPromise; + + emitter.emit('resolve-task-b'); + + await taskBPromise; + + expect(removedTaskACount).toBe(10); +}, 1000); + async function wait(timeMs: number): Promise { await new Promise(resolve => { setTimeout(resolve, timeMs); diff --git a/src/types.ts b/src/types.ts index 9143c0d..c14dfef 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,7 @@ +export type AwaitQueuePushOptions = { + removeOngoingTasksWithSameName?: boolean; +}; + export type AwaitQueueTask = () => T | PromiseLike; export type AwaitQueueTaskDump = { From d1de2bc807bb56b80bb8c115e767f36180c3f042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Mon, 22 Sep 2025 12:56:56 +0200 Subject: [PATCH 2/3] cosmetic --- README.md | 2 +- src/tests/test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7620d40..74b3f09 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ export type AwaitQueuePushOptions = { Options given to `awaitQueue.push()`. -- `removeOngoingTasksWithSameName`: If `true`, all previously enqueued tasks with same name will be aborted and will reject with an instance of `AwaitQueueRemovedTaskError`. +- `removeOngoingTasksWithSameName`: If `true`, all previously enqueued tasks with same name will be removed and will reject with an instance of `AwaitQueueRemovedTaskError`. ### `type AwaitQueueTask` diff --git a/src/tests/test.ts b/src/tests/test.ts index 6b349f9..e562a45 100644 --- a/src/tests/test.ts +++ b/src/tests/test.ts @@ -277,7 +277,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing' expect(executionsCount.get('taskD')).toBe(1); }, 1000); -test('removeOngoingTasksWithSameName option cancels ongoing tasks with same name', async () => { +test('removeOngoingTasksWithSameName option removes ongoing tasks with same name', async () => { const awaitQueue = new AwaitQueue(); const executionsCount: Map = new Map(); const emitter = new EventEmitter(); From 846fc3acd26f101564971b98889304460b4490fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Mon, 22 Sep 2025 12:58:54 +0200 Subject: [PATCH 3/3] simplify test --- src/tests/test.ts | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/tests/test.ts b/src/tests/test.ts index e562a45..ae1277c 100644 --- a/src/tests/test.ts +++ b/src/tests/test.ts @@ -279,30 +279,17 @@ test('new task does not lead to next task execution if a stopped one is ongoing' test('removeOngoingTasksWithSameName option removes ongoing tasks with same name', async () => { const awaitQueue = new AwaitQueue(); - const executionsCount: Map = new Map(); const emitter = new EventEmitter(); let removedTaskACount = 0; const taskA = async function (): Promise { - const taskName = 'taskA'; - return new Promise(resolve => { - let executionCount = executionsCount.get(taskName) ?? 0; - - executionsCount.set(taskName, ++executionCount); - emitter.on('resolve-task-a', resolve); }); }; const taskB = async function (): Promise { - const taskName = 'taskB'; - return new Promise(resolve => { - let executionCount = executionsCount.get(taskName) ?? 0; - - executionsCount.set(taskName, ++executionCount); - emitter.on('resolve-task-b', resolve); }); };