Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ In ESM:
```ts
import {
AwaitQueue,
AwaitQueuePushOptions,
AwaitQueueTask,
AwaitQueueTaskDump,
AwaitQueueStoppedError,
Expand All @@ -31,6 +32,7 @@ Using CommonJS:
```ts
const {
AwaitQueue,
AwaitQueuePushOptions,
AwaitQueueTask,
AwaitQueueTaskDump,
AwaitQueueStoppedError,
Expand All @@ -40,6 +42,18 @@ const {

## Types

### `type AwaitQueuePushOptions`

```ts
export type AwaitQueuePushOptions = {
removeOngoingTasksWithSameName?: boolean;
};
```

Options given to `awaitQueue.push()`.

- `removeOngoingTasksWithSameName`: If `true`, all previously enqueued tasks with same name will be removed and will reject with an instance of `AwaitQueueRemovedTaskError`.

### `type AwaitQueueTask`

```ts
Expand Down Expand Up @@ -87,13 +101,14 @@ Number of enqueued pending tasks in the queue (including the running one if any)
#### Method `awaitQueue.push()`

```ts
async push<T>(task: AwaitQueueTask<T>, name?: string): Promise<T>
async push<T>(task: AwaitQueueTask<T>, name?: string, options?: AwaitQueuePushOptions): Promise<T>
```

Accepts a task as argument and enqueues it after pending tasks. Once processed, the `push()` method resolves (or rejects) with the result (or error) returned by the given task.

- `@param task`: Asynchronous or asynchronous function.
- `@param name`: Optional task name (useful for `awaitQueue.dump()` method).
- `@param name`: Optional task name.
- `@param.options`: Options.

#### Method `awaitQueue.stop()`

Expand Down
54 changes: 38 additions & 16 deletions src/AwaitQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Logger } from './Logger';
import type { AwaitQueueTask, AwaitQueueTaskDump } from './types';
import type {
AwaitQueuePushOptions,
AwaitQueueTask,
AwaitQueueTaskDump,
} from './types';
import { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors';

const logger = new Logger('AwaitQueue');
Expand All @@ -12,16 +16,17 @@ type PendingTask<T> = {
executedAt?: number;
completed: boolean;
resolve: (result: T | PromiseLike<T>) => void;
reject: (error: Error) => void;
reject: (
error: Error,
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
) => void;
};

export class AwaitQueue {
// Queue of pending tasks (map of PendingTasks indexed by id).
private readonly pendingTasks: Map<number, PendingTask<any>> = new Map();
// Incrementing PendingTask id.
private nextTaskId = 0;
// Whether stop() method is stopping all pending tasks.
private stopping = false;

constructor() {
logger.debug('constructor()');
Expand All @@ -31,10 +36,14 @@ export class AwaitQueue {
return this.pendingTasks.size;
}

async push<T>(task: AwaitQueueTask<T>, name?: string): Promise<T> {
async push<T>(
task: AwaitQueueTask<T>,
name?: string,
options?: AwaitQueuePushOptions
): Promise<T> {
name = name ?? task.name;

logger.debug(`push() [name:${name}]`);
logger.debug(`push() [name:${name}, options:%o]`, options);

if (typeof task !== 'function') {
throw new TypeError('given task is not a function');
Expand All @@ -47,6 +56,16 @@ export class AwaitQueue {
}

return new Promise<T>((resolve, reject) => {
if (name && options?.removeOngoingTasksWithSameName) {
for (const pendingTask of this.pendingTasks.values()) {
if (pendingTask.name === name) {
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
canExecuteNextTask: false,
});
}
}
}

const pendingTask: PendingTask<T> = {
id: this.nextTaskId++,
task: task,
Expand Down Expand Up @@ -86,7 +105,10 @@ export class AwaitQueue {
void this.execute(nextPendingTask);
}
},
reject: (error: Error) => {
reject: (
error: Error,
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
) => {
// pendingTask.reject() can be called within execute() method if the
// task completed with error. However it may have also been called in
// stop() or remove() methods (before or while being executed) so its
Expand All @@ -109,8 +131,8 @@ export class AwaitQueue {
// Reject the task with the obtained error.
reject(error);

// Execute the next pending task (if any) unless stop() is running.
if (!this.stopping) {
// May execute next pending task (if any).
if (canExecuteNextTask) {
const [nextPendingTask] = this.pendingTasks.values();

// NOTE: During the reject() callback the user app may have interacted
Expand All @@ -137,15 +159,13 @@ export class AwaitQueue {
stop(): void {
logger.debug('stop()');

this.stopping = true;

for (const pendingTask of this.pendingTasks.values()) {
logger.debug(`stop() | stopping task [name:${pendingTask.name}]`);

pendingTask.reject(new AwaitQueueStoppedError());
pendingTask.reject(new AwaitQueueStoppedError(), {
canExecuteNextTask: false,
});
}

this.stopping = false;
}

remove(taskIdx: number): void {
Expand All @@ -159,7 +179,9 @@ export class AwaitQueue {
return;
}

pendingTask.reject(new AwaitQueueRemovedTaskError());
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
canExecuteNextTask: true,
});
}

dump(): AwaitQueueTaskDump[] {
Expand Down Expand Up @@ -193,7 +215,7 @@ export class AwaitQueue {
pendingTask.resolve(result);
} catch (error) {
// Reject the task with its rejected error.
pendingTask.reject(error as Error);
pendingTask.reject(error as Error, { canExecuteNextTask: true });
}
}
}
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export { AwaitQueue } from './AwaitQueue';
export { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors';
export type { AwaitQueueTask, AwaitQueueTaskDump } from './types';
export type {
AwaitQueuePushOptions,
AwaitQueueTask,
AwaitQueueTaskDump,
} from './types';
56 changes: 52 additions & 4 deletions src/tests/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
const executionsCount: Map<string, number> = new Map();
const emitter = new EventEmitter();

const taskA = function (): Promise<void> {
const taskA = async function (): Promise<void> {
const taskName = 'taskA';

return new Promise<void>(resolve => {
Expand All @@ -203,7 +203,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
});
};

const taskB = function (): Promise<void> {
const taskB = async function (): Promise<void> {
const taskName = 'taskB';

return new Promise<void>(resolve => {
Expand All @@ -215,7 +215,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
});
};

const taskC = function (): Promise<void> {
const taskC = async function (): Promise<void> {
const taskName = 'taskC';

return new Promise<void>(resolve => {
Expand All @@ -227,7 +227,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
});
};

const taskD = function (): Promise<void> {
const taskD = async function (): Promise<void> {
const taskName = 'taskD';

return new Promise<void>(resolve => {
Expand Down Expand Up @@ -277,6 +277,54 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
expect(executionsCount.get('taskD')).toBe(1);
}, 1000);

test('removeOngoingTasksWithSameName option removes ongoing tasks with same name', async () => {
const awaitQueue = new AwaitQueue();
const emitter = new EventEmitter();
let removedTaskACount = 0;

const taskA = async function (): Promise<void> {
return new Promise<void>(resolve => {
emitter.on('resolve-task-a', resolve);
});
};

const taskB = async function (): Promise<void> {
return new Promise<void>(resolve => {
emitter.on('resolve-task-b', resolve);
});
};

for (let i = 0; i < 10; ++i) {
awaitQueue
.push(taskA, 'taskA', { removeOngoingTasksWithSameName: false })
.catch(error => {
if (error instanceof AwaitQueueRemovedTaskError) {
++removedTaskACount;
}
});
}

const lastTaskAPromise = awaitQueue.push(taskA, 'taskA', {
removeOngoingTasksWithSameName: true,
});

const taskBPromise = awaitQueue.push(taskB, 'taskB', {
removeOngoingTasksWithSameName: true,
});

expectDumpToContain(awaitQueue, ['taskA', 'taskB']);

emitter.emit('resolve-task-a');

await lastTaskAPromise;

emitter.emit('resolve-task-b');

await taskBPromise;

expect(removedTaskACount).toBe(10);
}, 1000);

async function wait(timeMs: number): Promise<void> {
await new Promise<void>(resolve => {
setTimeout(resolve, timeMs);
Expand Down
4 changes: 4 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export type AwaitQueuePushOptions = {
removeOngoingTasksWithSameName?: boolean;
};

export type AwaitQueueTask<T> = () => T | PromiseLike<T>;

export type AwaitQueueTaskDump = {
Expand Down