Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions spec/asynciterable/buffercountortime-spec.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import '../asynciterablehelpers.js';
import { of, concat } from 'ix/asynciterable/index.js';
import { bufferCountOrTime, delay } from 'ix/asynciterable/operators/index.js';
import { of, concat, toArray, interval } from 'ix/asynciterable/index.js';
import { bufferCountOrTime, delay, take } from 'ix/asynciterable/operators/index.js';

test('buffer count behaviour', async () => {
const result: number[][] = [];
const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);

await of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
.pipe(bufferCountOrTime(5, 10))
.forEach((buf) => {
result.push(buf);
});
const res = source.pipe(bufferCountOrTime(5, 10));

expect(result).toEqual([
expect(await toArray(res)).toEqual([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 0],
]);
});

test('buffer time behaviour', async () => {
const result: number[][] = [];
const seq = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));
await seq.pipe(bufferCountOrTime(5, 10)).forEach((buf) => {
result.push(buf);
});
const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));

expect(result).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
const res = source.pipe(bufferCountOrTime(5, 10));

expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
});

test('continues the timer in the background', async () => {
const source = interval(10);

const res = source.pipe(bufferCountOrTime(3, 45));

expect(await toArray(res.pipe(take(2)))).toEqual([[0, 1, 2], [3]]);
});
33 changes: 33 additions & 0 deletions spec/asynciterable/buffercountwithdebounce-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import '../asynciterablehelpers.js';
import { of, concat, interval, toArray } from 'ix/asynciterable/index.js';
import { bufferCountWithDebounce, delay, take } from 'ix/asynciterable/operators/index.js';

test('buffer count behaviour', async () => {
const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);

const res = source.pipe(bufferCountWithDebounce(5, 10));

expect(await toArray(res)).toEqual([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 0],
]);
});

test('buffer time behaviour', async () => {
const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));

const res = source.pipe(bufferCountWithDebounce(5, 10));

expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
});

test('fills buffers', async () => {
const source = interval(10);

const res = source.pipe(bufferCountWithDebounce(3, 45));

expect(await toArray(res.pipe(take(2)))).toEqual([
[0, 1, 2],
[3, 4, 5],
]);
});
3 changes: 3 additions & 0 deletions src/asynciterable/operators/buffercountortime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AsyncIterableX, interval, concat, of } from '../index.js';
import { map } from './map.js';
import { merge } from '../merge.js';
import { wrapWithAbort } from './withabort.js';
import type { bufferCountWithDebounce } from './buffercountwithdebounce.js'; // Used only in jsdoc

const timerEvent = {};
const ended = {};
Expand Down Expand Up @@ -45,6 +46,8 @@ class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {
* Projects each element of an async-iterable sequence into consecutive buffers
* which are emitted when either the threshold count or time is met.
*
* @see https://github.com/ReactiveX/IxJS/pull/380 for the difference between {@link bufferCountOrTime} and {@link bufferCountWithDebounce}.
*
* @template TSource The type of elements in the source sequence.
* @param {number} count The size of the buffer.
* @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer
Expand Down
90 changes: 90 additions & 0 deletions src/asynciterable/operators/buffercountwithdebounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { OperatorAsyncFunction } from '../../interfaces.js';
import { AsyncIterableX, concat, of } from '../index.js';
import { merge } from '../merge.js';
import { wrapWithAbort } from './withabort.js';
import { AsyncSink } from '../asyncsink.js';
import type { bufferCountOrTime } from './buffercountortime.js'; // Used only in jsdoc
import { sleep } from '../_sleep.js';

const timeoutEvent = Symbol('BufferCountWithDebounce:TimeoutEvent');
const endedEvent = Symbol('BufferCountWithDebounce:EndedEvent');

class BufferCountWithDebounce<TSource> extends AsyncIterableX<TSource[]> {
constructor(
private readonly _source: AsyncIterable<TSource>,
private readonly _bufferSize: number,
private readonly _maxWaitTime: number
) {
super();
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];

const timeoutSink = new AsyncSink<typeof timeoutEvent>();
const merged = merge(concat(this._source, of(endedEvent)), timeoutSink);

let abortController: AbortController | undefined;

try {
for await (const item of wrapWithAbort(merged, signal)) {
if (!abortController) {
abortController = new AbortController();

sleep(this._maxWaitTime, abortController.signal, true)
.then(() => {
timeoutSink.write(timeoutEvent);
})
.catch(() => {
// Ignore the error if the abort signal is triggered
});
}

if (item === endedEvent) {
break;
}

if (item !== timeoutEvent) {
buffer.push(item);
}

if (buffer.length >= this._bufferSize || (item === timeoutEvent && buffer.length > 0)) {
abortController.abort();
abortController = undefined;

yield buffer.slice();
buffer.length = 0;
}
}

if (buffer.length) {
yield buffer;
}
} finally {
abortController?.abort();
}
}
}

/**
* Projects each element of an async-iterable sequence into consecutive buffers
* which are emitted when either the threshold count or time is met.
*
* @see https://github.com/ReactiveX/IxJS/pull/380 for the difference between {@link bufferCountOrTime} and {@link bufferCountWithDebounce}.
*
* @template TSource The type of elements in the source sequence.
* @param {number} count The size of the buffer.
* @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer
* @returns {OperatorAsyncFunction<TSource, TSource[]>} An operator which returns an async-iterable sequence
* of buffers
*/
export function bufferCountWithDebounce<TSource>(
count: number,
time: number
): OperatorAsyncFunction<TSource, TSource[]> {
return function bufferOperatorFunction(
source: AsyncIterable<TSource>
): AsyncIterableX<TSource[]> {
return new BufferCountWithDebounce<TSource>(source, count, time);
};
}
1 change: 1 addition & 0 deletions src/asynciterable/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './batch.js';
export * from './buffer.js';
export * from './buffercountortime.js';
export * from './buffercountwithdebounce.js';
export * from './catcherror.js';
export * from './combinelatestwith.js';
export * from './concatall.js';
Expand Down