diff --git a/spec/asynciterable/buffercountortime-spec.ts b/spec/asynciterable/buffercountortime-spec.ts index 7eb2d7ed..f196ddc3 100644 --- a/spec/asynciterable/buffercountortime-spec.ts +++ b/spec/asynciterable/buffercountortime-spec.ts @@ -1,28 +1,30 @@ import '../asynciterablehelpers.js'; -import { of, concat } from 'ix/asynciterable/index.js'; -import { bufferCountOrTime, delay } from 'ix/asynciterable/operators/index.js'; +import { of, concat, toArray, interval } from 'ix/asynciterable/index.js'; +import { bufferCountOrTime, delay, take } from 'ix/asynciterable/operators/index.js'; test('buffer count behaviour', async () => { - const result: number[][] = []; + const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0); - await of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) - .pipe(bufferCountOrTime(5, 10)) - .forEach((buf) => { - result.push(buf); - }); + const res = source.pipe(bufferCountOrTime(5, 10)); - expect(result).toEqual([ + expect(await toArray(res)).toEqual([ [1, 2, 3, 4, 5], [6, 7, 8, 9, 0], ]); }); test('buffer time behaviour', async () => { - const result: number[][] = []; - const seq = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11))); - await seq.pipe(bufferCountOrTime(5, 10)).forEach((buf) => { - result.push(buf); - }); + const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11))); - expect(result).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]); + const res = source.pipe(bufferCountOrTime(5, 10)); + + expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]); +}); + +test('continues the timer in the background', async () => { + const source = interval(10); + + const res = source.pipe(bufferCountOrTime(3, 45)); + + expect(await toArray(res.pipe(take(2)))).toEqual([[0, 1, 2], [3]]); }); diff --git a/spec/asynciterable/buffercountwithdebounce-spec.ts b/spec/asynciterable/buffercountwithdebounce-spec.ts new file mode 100644 index 00000000..47fb418e --- /dev/null +++ b/spec/asynciterable/buffercountwithdebounce-spec.ts @@ -0,0 +1,33 @@ +import '../asynciterablehelpers.js'; +import { of, concat, interval, toArray } from 'ix/asynciterable/index.js'; +import { bufferCountWithDebounce, delay, take } from 'ix/asynciterable/operators/index.js'; + +test('buffer count behaviour', async () => { + const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0); + + const res = source.pipe(bufferCountWithDebounce(5, 10)); + + expect(await toArray(res)).toEqual([ + [1, 2, 3, 4, 5], + [6, 7, 8, 9, 0], + ]); +}); + +test('buffer time behaviour', async () => { + const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11))); + + const res = source.pipe(bufferCountWithDebounce(5, 10)); + + expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]); +}); + +test('fills buffers', async () => { + const source = interval(10); + + const res = source.pipe(bufferCountWithDebounce(3, 45)); + + expect(await toArray(res.pipe(take(2)))).toEqual([ + [0, 1, 2], + [3, 4, 5], + ]); +}); diff --git a/src/asynciterable/operators/buffercountortime.ts b/src/asynciterable/operators/buffercountortime.ts index cea17c1a..37a47da0 100644 --- a/src/asynciterable/operators/buffercountortime.ts +++ b/src/asynciterable/operators/buffercountortime.ts @@ -3,6 +3,7 @@ import { AsyncIterableX, interval, concat, of } from '../index.js'; import { map } from './map.js'; import { merge } from '../merge.js'; import { wrapWithAbort } from './withabort.js'; +import type { bufferCountWithDebounce } from './buffercountwithdebounce.js'; // Used only in jsdoc const timerEvent = {}; const ended = {}; @@ -45,6 +46,8 @@ class BufferCountOrTime extends AsyncIterableX { * Projects each element of an async-iterable sequence into consecutive buffers * which are emitted when either the threshold count or time is met. * + * @see https://github.com/ReactiveX/IxJS/pull/380 for the difference between {@link bufferCountOrTime} and {@link bufferCountWithDebounce}. + * * @template TSource The type of elements in the source sequence. * @param {number} count The size of the buffer. * @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer diff --git a/src/asynciterable/operators/buffercountwithdebounce.ts b/src/asynciterable/operators/buffercountwithdebounce.ts new file mode 100644 index 00000000..91f477e2 --- /dev/null +++ b/src/asynciterable/operators/buffercountwithdebounce.ts @@ -0,0 +1,90 @@ +import { OperatorAsyncFunction } from '../../interfaces.js'; +import { AsyncIterableX, concat, of } from '../index.js'; +import { merge } from '../merge.js'; +import { wrapWithAbort } from './withabort.js'; +import { AsyncSink } from '../asyncsink.js'; +import type { bufferCountOrTime } from './buffercountortime.js'; // Used only in jsdoc +import { sleep } from '../_sleep.js'; + +const timeoutEvent = Symbol('BufferCountWithDebounce:TimeoutEvent'); +const endedEvent = Symbol('BufferCountWithDebounce:EndedEvent'); + +class BufferCountWithDebounce extends AsyncIterableX { + constructor( + private readonly _source: AsyncIterable, + private readonly _bufferSize: number, + private readonly _maxWaitTime: number + ) { + super(); + } + + async *[Symbol.asyncIterator](signal?: AbortSignal) { + const buffer: TSource[] = []; + + const timeoutSink = new AsyncSink(); + const merged = merge(concat(this._source, of(endedEvent)), timeoutSink); + + let abortController: AbortController | undefined; + + try { + for await (const item of wrapWithAbort(merged, signal)) { + if (!abortController) { + abortController = new AbortController(); + + sleep(this._maxWaitTime, abortController.signal, true) + .then(() => { + timeoutSink.write(timeoutEvent); + }) + .catch(() => { + // Ignore the error if the abort signal is triggered + }); + } + + if (item === endedEvent) { + break; + } + + if (item !== timeoutEvent) { + buffer.push(item); + } + + if (buffer.length >= this._bufferSize || (item === timeoutEvent && buffer.length > 0)) { + abortController.abort(); + abortController = undefined; + + yield buffer.slice(); + buffer.length = 0; + } + } + + if (buffer.length) { + yield buffer; + } + } finally { + abortController?.abort(); + } + } +} + +/** + * Projects each element of an async-iterable sequence into consecutive buffers + * which are emitted when either the threshold count or time is met. + * + * @see https://github.com/ReactiveX/IxJS/pull/380 for the difference between {@link bufferCountOrTime} and {@link bufferCountWithDebounce}. + * + * @template TSource The type of elements in the source sequence. + * @param {number} count The size of the buffer. + * @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer + * @returns {OperatorAsyncFunction} An operator which returns an async-iterable sequence + * of buffers + */ +export function bufferCountWithDebounce( + count: number, + time: number +): OperatorAsyncFunction { + return function bufferOperatorFunction( + source: AsyncIterable + ): AsyncIterableX { + return new BufferCountWithDebounce(source, count, time); + }; +} diff --git a/src/asynciterable/operators/index.ts b/src/asynciterable/operators/index.ts index 64ad5e51..d5112486 100644 --- a/src/asynciterable/operators/index.ts +++ b/src/asynciterable/operators/index.ts @@ -1,6 +1,7 @@ export * from './batch.js'; export * from './buffer.js'; export * from './buffercountortime.js'; +export * from './buffercountwithdebounce.js'; export * from './catcherror.js'; export * from './combinelatestwith.js'; export * from './concatall.js';