From ea34ad16b80a1f9049f3af7a7d8a59016863bb5a Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Thu, 17 Sep 2020 13:23:56 -0400 Subject: [PATCH 1/9] feat(AsyncObservable): initial commit --- src/interfaces.ts | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/interfaces.ts b/src/interfaces.ts index 94a45e98..b1f01412 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -13,3 +13,31 @@ export type MonoTypeOperatorAsyncFunction = OperatorAsyncFunction; /** @ignore */ export type BufferLike = string | Buffer | Uint8Array; + +export const SYMBOL_ASYNC_DISPOSABLE = Symbol.for('AsyncDisposable'); + +export interface AsyncSubscription { + [SYMBOL_ASYNC_DISPOSABLE]: () => Promise; +} + +export interface AsyncObserver { + next: (value: T) => Promise; + error: (error: any) => Promise; + complete: () => Promise; +} + +export interface AsyncObservable { + subscribeAsync: (observer: AsyncObserver) => Promise; +} + +export interface AsyncSubject + extends AsyncObserver, + AsyncObservable {} + +export interface AsyncScheduler { + now: () => Date; + scheduleAsync: ( + action: (signal: AbortSignal) => Promise, + dueTime: number + ) => Promise; +} From 5185f18d5229e1f4e19ef7c6ad00c27acffd9461 Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Fri, 18 Sep 2020 17:29:27 -0400 Subject: [PATCH 2/9] feat(AsyncObservable): add first operators --- src/asyncobservable/asyncobservablex.ts | 137 ++++++++++++++++++ src/asyncobservable/asyncobserverx.ts | 55 +++++++ src/asyncobservable/create.ts | 19 +++ src/asyncobservable/operators/map.ts | 80 ++++++++++ src/asyncobservable/subscribesafe.ts | 14 ++ .../subscriptions/asyncsubscriptionx.ts | 38 +++++ src/interfaces.ts | 33 ++++- 7 files changed, 374 insertions(+), 2 deletions(-) create mode 100644 src/asyncobservable/asyncobservablex.ts create mode 100644 src/asyncobservable/asyncobserverx.ts create mode 100644 src/asyncobservable/create.ts create mode 100644 src/asyncobservable/operators/map.ts create mode 100644 src/asyncobservable/subscribesafe.ts create mode 100644 src/asyncobservable/subscriptions/asyncsubscriptionx.ts diff --git a/src/asyncobservable/asyncobservablex.ts b/src/asyncobservable/asyncobservablex.ts new file mode 100644 index 00000000..23d9fefb --- /dev/null +++ b/src/asyncobservable/asyncobservablex.ts @@ -0,0 +1,137 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + PartialAsyncObserver, + SYMBOL_ASYNC_DISPOSABLE, +} from '../interfaces'; +import { AsyncObserverX } from './asyncobserverx'; + +class AutoDetachObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _subscription?: AsyncSubscription; + private _unsubscribing: boolean = false; + private _task?: Promise; + + constructor(observer: AsyncObserver) { + super(); + this._observer = observer; + } + + async assign(subscription: AsyncSubscription) { + let shouldUnsubscribe = false; + if (this._unsubscribing) { + shouldUnsubscribe = true; + } else { + this._subscription = subscription; + } + + if (shouldUnsubscribe) { + await this._subscription![SYMBOL_ASYNC_DISPOSABLE](); + } + } + + async _next(value: T) { + if (this._unsubscribing) { + return; + } + this._task = this._observer.next(value); + try { + await this._task; + } finally { + this._task = undefined; + } + } + + async _error(err: any) { + if (this._unsubscribing) { + return; + } + this._task = this._observer.error(err); + try { + await this._task; + } finally { + await this._finish(); + } + } + + async _complete() { + if (this._unsubscribing) { + return; + } + this._task = this._observer.complete(); + try { + await this._task; + } finally { + await this._finish(); + } + } + + async [SYMBOL_ASYNC_DISPOSABLE]() { + let subscription; + + if (!this._unsubscribing) { + this._unsubscribing = true; + subscription = this._subscription; + } + + if (subscription) { + await subscription[SYMBOL_ASYNC_DISPOSABLE](); + } + } + + private async _finish() { + let subscription; + if (!this._unsubscribing) { + this._unsubscribing = true; + subscription = this._subscription; + } + + this._task = undefined; + + if (subscription) { + await subscription[SYMBOL_ASYNC_DISPOSABLE](); + } + } +} + +export class SafeObserver extends AsyncObserverX { + private _observer: PartialAsyncObserver; + + constructor(observer: PartialAsyncObserver) { + super(); + this._observer = observer; + } + + async _next(value: T) { + if (this._observer.next) { + await this._observer.next(value); + } + } + + async _error(err: any) { + if (this._observer.error) { + await this._observer.error(err); + } else { + throw err; + } + } + + async _complete() { + if (this._observer.complete) { + await this._observer.complete(); + } + } +} + +export abstract class AsyncObservableX implements AsyncObservable { + async subscribeAsync(observer: PartialAsyncObserver): Promise { + const safeObserver = new SafeObserver(observer); + const autoDetachObserver = new AutoDetachObserver(safeObserver); + const subscription = await this._subscribeAsync(autoDetachObserver); + await autoDetachObserver.assign(subscription); + return autoDetachObserver; + } + + abstract _subscribeAsync(observer: AsyncObserver): Promise; +} diff --git a/src/asyncobservable/asyncobserverx.ts b/src/asyncobservable/asyncobserverx.ts new file mode 100644 index 00000000..ad7a327a --- /dev/null +++ b/src/asyncobservable/asyncobserverx.ts @@ -0,0 +1,55 @@ +import { AsyncObserver } from '../interfaces'; + +enum ObserverState { + Idle, + Busy, + Done, +} + +export abstract class AsyncObserverX implements AsyncObserver { + private _state: ObserverState = ObserverState.Idle; + + next(value: T) { + this._tryEnter(); + try { + return this._next(value); + } finally { + this._state = ObserverState.Idle; + } + } + + abstract _next(value: T): Promise; + + error(err: any) { + this._tryEnter(); + try { + return this._error(err); + } finally { + this._state = ObserverState.Done; + } + } + + abstract _error(err: any): Promise; + + complete() { + this._tryEnter(); + try { + return this._complete(); + } finally { + this._state = ObserverState.Done; + } + } + + abstract _complete(): Promise; + + private _tryEnter() { + const old = this._state; + if (old === ObserverState.Idle) { + this._state = ObserverState.Busy; + } else if (old === ObserverState.Busy) { + throw new Error('Observer is already busy'); + } else if (old === ObserverState.Done) { + throw new Error('Observer has already terminated'); + } + } +} diff --git a/src/asyncobservable/create.ts b/src/asyncobservable/create.ts new file mode 100644 index 00000000..48e317cc --- /dev/null +++ b/src/asyncobservable/create.ts @@ -0,0 +1,19 @@ +import { PartialAsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; + +class AnonymousAsyncObservable extends AsyncObservableX { + private _fn: (observer: PartialAsyncObserver) => Promise; + + constructor(fn: (observer: PartialAsyncObserver) => Promise) { + super(); + this._fn = fn; + } + + _subscribeAsync(observer: PartialAsyncObserver) { + return this._fn(observer); + } +} + +export function create(fn: (observer: PartialAsyncObserver) => Promise) { + return new AnonymousAsyncObservable(fn); +} diff --git a/src/asyncobservable/operators/map.ts b/src/asyncobservable/operators/map.ts new file mode 100644 index 00000000..ef00f502 --- /dev/null +++ b/src/asyncobservable/operators/map.ts @@ -0,0 +1,80 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + OperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +class MapObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _selector: (value: T, index: number) => Promise | R; + private _thisArg?: any; + private _index: number = 0; + + constructor( + observer: AsyncObserver, + selector: (value: T, index: number) => Promise | R, + thisArg?: any + ) { + super(); + this._observer = observer; + this._selector = selector; + this._thisArg = thisArg; + } + + async _next(value: T) { + let res; + try { + res = await this._selector.call(this._thisArg, value, this._index++); + } catch (e) { + await this._observer.error(e); + return; + } + + await this._observer.next(res); + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +class MapObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _selector: (value: T, index: number) => Promise | R; + private _thisArg?: any; + + constructor( + source: AsyncObservable, + selector: (value: T, index: number) => Promise | R, + thisArg?: any + ) { + super(); + this._source = source; + this._selector = selector; + this._thisArg = thisArg; + } + + async _subscribeAsync(observer: AsyncObserver): Promise { + return await subscribeSafe( + this._source, + new MapObserver(observer, this._selector, this._thisArg) + ); + } +} + +export function map( + selector: (value: TSource, index: number) => Promise | TResult, + thisArg?: any +): OperatorAsyncObservableFunction { + return function mapOperatorFunction(source: AsyncObservable) { + return new MapObservable(source, selector, thisArg); + }; +} diff --git a/src/asyncobservable/subscribesafe.ts b/src/asyncobservable/subscribesafe.ts new file mode 100644 index 00000000..cbea215b --- /dev/null +++ b/src/asyncobservable/subscribesafe.ts @@ -0,0 +1,14 @@ +import { AsyncObservable, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncSubscriptionX } from './subscriptions/asyncsubscriptionx'; + +export async function subscribeSafe( + observable: AsyncObservable, + observer: AsyncObserver +): Promise { + try { + return await observable.subscribeAsync(observer); + } catch (e) { + await observer.error(e); + return AsyncSubscriptionX.empty(); + } +} diff --git a/src/asyncobservable/subscriptions/asyncsubscriptionx.ts b/src/asyncobservable/subscriptions/asyncsubscriptionx.ts new file mode 100644 index 00000000..8fae60cd --- /dev/null +++ b/src/asyncobservable/subscriptions/asyncsubscriptionx.ts @@ -0,0 +1,38 @@ +import { AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces'; + +const NOOP_PROMISE = Promise.resolve(undefined); + +class EmptySubscription implements AsyncSubscription { + async [SYMBOL_ASYNC_DISPOSABLE](): Promise { + await NOOP_PROMISE; + } +} + +const EMPTY_SUBSCRIPTION = new EmptySubscription(); + +class AnonymousSubscription implements AsyncSubscription { + private _fn?: () => Promise; + + constructor(fn: () => Promise) { + this._fn = fn; + } + + async [SYMBOL_ASYNC_DISPOSABLE](): Promise { + if (this._fn) { + await this._fn!(); + this._fn = undefined; + } else { + await NOOP_PROMISE; + } + } +} + +export class AsyncSubscriptionX { + static create(unsubscribe: () => Promise): AsyncSubscription { + return new AnonymousSubscription(unsubscribe); + } + + static empty(): AsyncSubscription { + return EMPTY_SUBSCRIPTION; + } +} diff --git a/src/interfaces.ts b/src/interfaces.ts index b1f01412..14216107 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,5 +1,6 @@ import { IterableX } from './iterable'; import { AsyncIterableX } from './asynciterable'; +import { AsyncObservableX } from './asyncobservable/asyncobservablex'; export type UnaryFunction = (source: T) => R; @@ -7,6 +8,11 @@ export type OperatorFunction = UnaryFunction, IterableX>; export type OperatorAsyncFunction = UnaryFunction, AsyncIterableX>; +export type OperatorAsyncObservableFunction = UnaryFunction< +AsyncObservable, +AsyncObservableX +>; + export type MonoTypeOperatorFunction = OperatorFunction; export type MonoTypeOperatorAsyncFunction = OperatorAsyncFunction; @@ -20,14 +26,37 @@ export interface AsyncSubscription { [SYMBOL_ASYNC_DISPOSABLE]: () => Promise; } +export interface NextAsyncObserver { + next: (value: T) => Promise; + error?: (err: any) => Promise; + complete?: () => Promise; +} + +export interface ErrorAsyncObserver { + next?: (value: T) => Promise; + error: (err: any) => Promise; + complete?: () => Promise; +} + +export interface CompletionAsyncObserver { + next?: (value: T) => Promise; + error?: (err: any) => Promise; + complete: () => Promise; +} + +export type PartialAsyncObserver = + | NextAsyncObserver + | ErrorAsyncObserver + | CompletionAsyncObserver; + export interface AsyncObserver { next: (value: T) => Promise; - error: (error: any) => Promise; + error: (err: any) => Promise; complete: () => Promise; } export interface AsyncObservable { - subscribeAsync: (observer: AsyncObserver) => Promise; + subscribeAsync: (observer: PartialAsyncObserver) => Promise; } export interface AsyncSubject From 6ea0a6e468defec218947297579fd6a35a258948 Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Tue, 22 Sep 2020 19:05:17 -0400 Subject: [PATCH 3/9] feat(AsyncObservable): finish through scan --- src/abortcontroller.ts | 11 +++ src/asyncobservable/asyncobservablex.ts | 28 +++++- src/asyncobservable/operators/filter.ts | 88 ++++++++++++++++++ src/asyncobservable/operators/map.ts | 36 ++++++-- src/asyncobservable/operators/scan.ts | 117 ++++++++++++++++++++++++ src/asyncobservable/subscribesafe.ts | 5 +- src/interfaces.ts | 7 +- 7 files changed, 278 insertions(+), 14 deletions(-) create mode 100644 src/abortcontroller.ts create mode 100644 src/asyncobservable/operators/filter.ts create mode 100644 src/asyncobservable/operators/scan.ts diff --git a/src/abortcontroller.ts b/src/abortcontroller.ts new file mode 100644 index 00000000..f6ee2af8 --- /dev/null +++ b/src/abortcontroller.ts @@ -0,0 +1,11 @@ +export function createLinkedAbortController(...signals: AbortSignal[]) { + const controller = new AbortController(); + + Array.from(signals).forEach((signal) => { + signal.onabort = () => { + controller.abort(); + }; + }); + + return controller; +} diff --git a/src/asyncobservable/asyncobservablex.ts b/src/asyncobservable/asyncobservablex.ts index 23d9fefb..1fe8962d 100644 --- a/src/asyncobservable/asyncobservablex.ts +++ b/src/asyncobservable/asyncobservablex.ts @@ -2,8 +2,10 @@ import { AsyncObservable, AsyncObserver, AsyncSubscription, + OperatorAsyncObservableFunction, PartialAsyncObserver, SYMBOL_ASYNC_DISPOSABLE, + UnaryFunction, } from '../interfaces'; import { AsyncObserverX } from './asyncobserverx'; @@ -125,13 +127,33 @@ export class SafeObserver extends AsyncObserverX { } export abstract class AsyncObservableX implements AsyncObservable { - async subscribeAsync(observer: PartialAsyncObserver): Promise { + async subscribeAsync( + observer: PartialAsyncObserver, + signal?: AbortSignal + ): Promise { const safeObserver = new SafeObserver(observer); const autoDetachObserver = new AutoDetachObserver(safeObserver); - const subscription = await this._subscribeAsync(autoDetachObserver); + const subscription = await this._subscribeAsync(autoDetachObserver, signal); await autoDetachObserver.assign(subscription); return autoDetachObserver; } - abstract _subscribeAsync(observer: AsyncObserver): Promise; + abstract _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise; + + /** @nocollapse */ + pipe(...operations: UnaryFunction, R>[]): R; + pipe(...operations: OperatorAsyncObservableFunction[]): AsyncObservableX; + pipe(...args: any[]) { + let i = -1; + const n = args.length; + let acc: any = this; + while (++i < n) { + // TODO: Cast using `as` + acc = args[i](acc); + } + return acc; + } } diff --git a/src/asyncobservable/operators/filter.ts b/src/asyncobservable/operators/filter.ts new file mode 100644 index 00000000..71401c1e --- /dev/null +++ b/src/asyncobservable/operators/filter.ts @@ -0,0 +1,88 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + MonoTypeOperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +class FilterObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean; + private _thisArg?: any; + private _signal?: AbortSignal; + private _index: number = 0; + + constructor( + observer: AsyncObserver, + predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any, + signal?: AbortSignal + ) { + super(); + this._observer = observer; + this._predicate = predicate; + this._thisArg = thisArg; + this._signal = signal; + } + + async _next(value: T) { + let shouldYield; + try { + shouldYield = await this._predicate.call(this._thisArg, value, this._index++, this._signal); + } catch (e) { + await this._observer.error(e); + return; + } + if (shouldYield) { + await this._observer.next(value); + } + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +export class FilterObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean; + private _thisArg?: any; + + constructor( + source: AsyncObservable, + predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any + ) { + super(); + this._source = source; + this._predicate = predicate; + this._thisArg = thisArg; + } + + async _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return await subscribeSafe( + this._source, + new FilterObserver(observer, this._predicate, this._thisArg, signal), + signal + ); + } +} + +export function filter( + predicate: (value: TSource, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any +): MonoTypeOperatorAsyncObservableFunction { + return function filterOperatorFunction(source: AsyncObservable) { + return new FilterObservable(source, predicate, thisArg); + }; +} diff --git a/src/asyncobservable/operators/map.ts b/src/asyncobservable/operators/map.ts index ef00f502..ebdb6ca0 100644 --- a/src/asyncobservable/operators/map.ts +++ b/src/asyncobservable/operators/map.ts @@ -10,25 +10,28 @@ import { subscribeSafe } from '../subscribesafe'; class MapObserver extends AsyncObserverX { private _observer: AsyncObserver; - private _selector: (value: T, index: number) => Promise | R; + private _selector: (value: T, index: number, signal?: AbortSignal) => Promise | R; private _thisArg?: any; + private _signal?: AbortSignal; private _index: number = 0; constructor( observer: AsyncObserver, - selector: (value: T, index: number) => Promise | R, - thisArg?: any + selector: (value: T, index: number, signal?: AbortSignal) => Promise | R, + thisArg?: any, + signal?: AbortSignal ) { super(); this._observer = observer; this._selector = selector; this._thisArg = thisArg; + this._signal = signal; } async _next(value: T) { let res; try { - res = await this._selector.call(this._thisArg, value, this._index++); + res = await this._selector.call(this._thisArg, value, this._index++, this._signal); } catch (e) { await this._observer.error(e); return; @@ -48,12 +51,12 @@ class MapObserver extends AsyncObserverX { class MapObservable extends AsyncObservableX { private _source: AsyncObservable; - private _selector: (value: T, index: number) => Promise | R; + private _selector: (value: T, index: number, signal?: AbortSignal) => Promise | R; private _thisArg?: any; constructor( source: AsyncObservable, - selector: (value: T, index: number) => Promise | R, + selector: (value: T, index: number, signal?: AbortSignal) => Promise | R, thisArg?: any ) { super(); @@ -62,14 +65,31 @@ class MapObservable extends AsyncObservableX { this._thisArg = thisArg; } - async _subscribeAsync(observer: AsyncObserver): Promise { + async _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { return await subscribeSafe( this._source, - new MapObserver(observer, this._selector, this._thisArg) + new MapObserver(observer, this._selector, this._thisArg, signal), + signal ); } } +/** + * Projects each element of an async-observable sequence into a new form. + * + * @export + * @template TSource The type of the elements in the source sequence. + * @template TResult The type of the elements in the result sequence, obtained by running the selector + * function for each element in the source sequence. + * @param {((value: TSource, index: number, signal?: AbortSignal) => Promise | TResult)} selector A transform function + * to apply to each source element. + * @param {*} [thisArg] Optional this for binding to the selector. + * @returns {OperatorAsyncObservableFunction} An async-observable sequence whose elements are the result of invoking the transform + * function on each element of source. + */ export function map( selector: (value: TSource, index: number) => Promise | TResult, thisArg?: any diff --git a/src/asyncobservable/operators/scan.ts b/src/asyncobservable/operators/scan.ts new file mode 100644 index 00000000..60b7d9b0 --- /dev/null +++ b/src/asyncobservable/operators/scan.ts @@ -0,0 +1,117 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + OperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +/** + * The options for performing a scan operation, including the callback and the optional seed. + * + * @export + * @interface ScanOptions + * @template T The type of the elements in the source sequence. + * @template R The type of the result for the reducer callback. + */ +export interface ScanOptions { + /** + * The optional seed used for the scan operation. + * + * @type {R} The type of the result + * @memberof ScanOptions + */ + seed?: R; + /** + * The callback used for the scan operation, which passes the accumulator, current value, the + * current index, and an Abort Signal. This returns a result or a Promise containing a result. + * + * @memberof ScanOptions + */ + callback: (accumulator: R, current: T, index: number, signal?: AbortSignal) => R | Promise; +} + +class ScanObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _fn: (acc: R, x: T, index: number, signal?: AbortSignal) => R | Promise; + private _seed?: T | R; + private _hasSeed: boolean; + private _signal?: AbortSignal; + private _index: number; + private _hasValue: boolean; + private _acc: T | R | undefined; + + constructor(observer: AsyncObserver, options: ScanOptions, signal?: AbortSignal) { + super(); + this._observer = observer; + this._fn = options['callback']; + this._hasSeed = options.hasOwnProperty('seed'); + this._seed = options['seed']; + this._signal = signal; + this._index = 0; + this._hasValue = false; + this._acc = this._seed; + } + + async _next(value: T): Promise { + if (this._hasValue || (this._hasValue = this._hasSeed)) { + try { + this._acc = await this._fn( this._acc, value, this._index++, this._signal); + } catch (e) { + await this._observer.error(e); + return; + } + } else { + this._acc = value; + this._hasValue = true; + this._index++; + } + + await this._observer.next( this._acc); + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +export class ScanAsyncObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _options: ScanOptions; + + constructor(source: AsyncObservable, options: ScanOptions) { + super(); + this._source = source; + this._options = options; + } + + _subscribeAsync(observer: AsyncObserver, signal?: AbortSignal): Promise { + return subscribeSafe( + this._source, + new ScanObserver(observer, this._options, signal), + signal + ); + } +} + +/** + * Applies an accumulator function over an async-observable sequence and returns each intermediate result. + * The specified seed value, if given, is used as the initial accumulator value. + * + * @export + * @template T The type of the elements in the source sequence. + * @template R The type of the result of the aggregation. + * @param {ScanOptions} options The options including the accumulator function and seed. + * @returns {OperatorAsyncObservableFunction} An async-enumerable sequence containing the accumulated values. + */ +export function scan(options: ScanOptions): OperatorAsyncObservableFunction { + return function scanOperatorFunction(source: AsyncObservable): AsyncObservableX { + return new ScanAsyncObservable(source, options); + }; +} diff --git a/src/asyncobservable/subscribesafe.ts b/src/asyncobservable/subscribesafe.ts index cbea215b..248735b0 100644 --- a/src/asyncobservable/subscribesafe.ts +++ b/src/asyncobservable/subscribesafe.ts @@ -3,10 +3,11 @@ import { AsyncSubscriptionX } from './subscriptions/asyncsubscriptionx'; export async function subscribeSafe( observable: AsyncObservable, - observer: AsyncObserver + observer: AsyncObserver, + signal?: AbortSignal ): Promise { try { - return await observable.subscribeAsync(observer); + return await observable.subscribeAsync(observer, signal); } catch (e) { await observer.error(e); return AsyncSubscriptionX.empty(); diff --git a/src/interfaces.ts b/src/interfaces.ts index 14216107..00d3e7e6 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -17,6 +17,8 @@ export type MonoTypeOperatorFunction = OperatorFunction; export type MonoTypeOperatorAsyncFunction = OperatorAsyncFunction; +export type MonoTypeOperatorAsyncObservableFunction = OperatorAsyncObservableFunction; + /** @ignore */ export type BufferLike = string | Buffer | Uint8Array; @@ -56,7 +58,10 @@ export interface AsyncObserver { } export interface AsyncObservable { - subscribeAsync: (observer: PartialAsyncObserver) => Promise; + subscribeAsync: ( + observer: PartialAsyncObserver, + signal?: AbortSignal + ) => Promise; } export interface AsyncSubject From cd60088dd0fb5aed750894fc633eb659e7af2601 Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Wed, 23 Sep 2020 21:40:05 -0400 Subject: [PATCH 4/9] feat(schedulers): Add async schedulers --- .../concurrency/asyncschedulerx.ts | 53 +++++++++++++++++++ .../concurrency/immediateasyncscheduler.ts | 40 ++++++++++++++ src/interfaces.ts | 7 +-- 3 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 src/asyncobservable/concurrency/asyncschedulerx.ts create mode 100644 src/asyncobservable/concurrency/immediateasyncscheduler.ts diff --git a/src/asyncobservable/concurrency/asyncschedulerx.ts b/src/asyncobservable/concurrency/asyncschedulerx.ts new file mode 100644 index 00000000..89f7b4cf --- /dev/null +++ b/src/asyncobservable/concurrency/asyncschedulerx.ts @@ -0,0 +1,53 @@ +import { AsyncScheduler, AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces'; + +function normalizeTime(time: number) { + return time < 0 ? 0 : time; +} + +export abstract class AsyncSchedulerX implements AsyncScheduler { + get now() { + return Date.now(); + } + + scheduleNowAsync(action: (signal: AbortSignal) => Promise) { + return this._scheduleAsync(action); + } + + scheduleFutureAsync(action: (signal: AbortSignal) => Promise, dueTime: number) { + const newTime = normalizeTime(dueTime); + + return this._scheduleAsync(async (signal) => { + await this._delay(newTime, signal); + await action(signal); + }); + } + + async _scheduleAsync(action: (signal: AbortSignal) => Promise): Promise { + const cas = new CancellationAsyncSubscription(); + await this._scheduleCoreAsync(action, cas.signal); + return cas; + } + + abstract _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise; + + abstract _delay(dueTime: number, signal: AbortSignal): Promise; +} + +export class CancellationAsyncSubscription implements AsyncSubscription { + private _controller: AbortController; + + constructor() { + this._controller = new AbortController(); + } + + get signal() { + return this._controller.signal; + } + + async [SYMBOL_ASYNC_DISPOSABLE]() { + this._controller.abort(); + } +} diff --git a/src/asyncobservable/concurrency/immediateasyncscheduler.ts b/src/asyncobservable/concurrency/immediateasyncscheduler.ts new file mode 100644 index 00000000..866250b4 --- /dev/null +++ b/src/asyncobservable/concurrency/immediateasyncscheduler.ts @@ -0,0 +1,40 @@ +import { AbortError } from '../../aborterror'; +import { AsyncSchedulerX } from './asyncschedulerx'; + +function delay(dueTime: number, signal: AbortSignal) { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new AbortError()); + } + + const id = setTimeout(() => { + if (signal.aborted) { + reject(new AbortError()); + } else { + resolve(); + } + }, dueTime); + + signal.addEventListener( + 'abort', + () => { + clearTimeout(id); + reject(new AbortError()); + }, + { once: true } + ); + }); +} + +export class ImmediateAsyncScheduler extends AsyncSchedulerX { + async _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise { + await action(signal); + } + + async _delay(dueTime: number, signal: AbortSignal): Promise { + await delay(dueTime, signal); + } +} diff --git a/src/interfaces.ts b/src/interfaces.ts index 00d3e7e6..65fee1b7 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -69,9 +69,10 @@ export interface AsyncSubject AsyncObservable {} export interface AsyncScheduler { - now: () => Date; - scheduleAsync: ( + now: number; + scheduleNowAsync(action: (signal: AbortSignal) => Promise): Promise; + scheduleFutureAsync( action: (signal: AbortSignal) => Promise, dueTime: number - ) => Promise; + ): Promise; } From 2318adbdecbddd8505fcbe7d0977988d7f69f884 Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Wed, 23 Sep 2020 21:45:50 -0400 Subject: [PATCH 5/9] feat(schedulers): Add async schedulers --- src/asyncobservable/concurrency/immediateasyncscheduler.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/asyncobservable/concurrency/immediateasyncscheduler.ts b/src/asyncobservable/concurrency/immediateasyncscheduler.ts index 866250b4..8a0bebc3 100644 --- a/src/asyncobservable/concurrency/immediateasyncscheduler.ts +++ b/src/asyncobservable/concurrency/immediateasyncscheduler.ts @@ -27,6 +27,11 @@ function delay(dueTime: number, signal: AbortSignal) { } export class ImmediateAsyncScheduler extends AsyncSchedulerX { + private static _instance = new ImmediateAsyncScheduler(); + static get instance() { + return ImmediateAsyncScheduler._instance; + } + async _scheduleCoreAsync( action: (signal: AbortSignal) => Promise, signal: AbortSignal From 3fc30fb9b547ef8145a02f8504f2b857f2af66fe Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Wed, 23 Sep 2020 21:49:22 -0400 Subject: [PATCH 6/9] feat(schedulers): Fix create with abortsignal --- src/asyncobservable/create.ts | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/asyncobservable/create.ts b/src/asyncobservable/create.ts index 48e317cc..0d4e3e4c 100644 --- a/src/asyncobservable/create.ts +++ b/src/asyncobservable/create.ts @@ -2,18 +2,25 @@ import { PartialAsyncObserver, AsyncSubscription } from '../interfaces'; import { AsyncObservableX } from './asyncobservablex'; class AnonymousAsyncObservable extends AsyncObservableX { - private _fn: (observer: PartialAsyncObserver) => Promise; + private _fn: ( + observer: PartialAsyncObserver, + signal?: AbortSignal + ) => Promise; - constructor(fn: (observer: PartialAsyncObserver) => Promise) { + constructor( + fn: (observer: PartialAsyncObserver, signal?: AbortSignal) => Promise + ) { super(); this._fn = fn; } - _subscribeAsync(observer: PartialAsyncObserver) { - return this._fn(observer); + _subscribeAsync(observer: PartialAsyncObserver, signal?: AbortSignal) { + return this._fn(observer, signal); } } -export function create(fn: (observer: PartialAsyncObserver) => Promise) { +export function create( + fn: (observer: PartialAsyncObserver, signal?: AbortSignal) => Promise +) { return new AnonymousAsyncObservable(fn); } From 1b1319e28aa035b15461ab9fcf56639dc142518f Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Thu, 24 Sep 2020 17:56:26 -0400 Subject: [PATCH 7/9] feat(scheduler): add more cancellation --- src/abortcontroller.ts | 10 ++- src/asyncobservable/concurrency/_delay.ts | 26 +++++++ .../concurrency/asyncschedulerx.ts | 33 ++++++--- .../concurrency/immediateasyncscheduler.ts | 27 +------ .../concurrency/microtaskscheduler.ts | 21 ++++++ src/asyncobservable/from.ts | 72 +++++++++++++++++++ src/asyncobservable/of.ts | 42 +++++++++++ src/interfaces.ts | 8 ++- 8 files changed, 200 insertions(+), 39 deletions(-) create mode 100644 src/asyncobservable/concurrency/_delay.ts create mode 100644 src/asyncobservable/concurrency/microtaskscheduler.ts create mode 100644 src/asyncobservable/from.ts create mode 100644 src/asyncobservable/of.ts diff --git a/src/abortcontroller.ts b/src/abortcontroller.ts index f6ee2af8..be2f2e1c 100644 --- a/src/abortcontroller.ts +++ b/src/abortcontroller.ts @@ -2,9 +2,13 @@ export function createLinkedAbortController(...signals: AbortSignal[]) { const controller = new AbortController(); Array.from(signals).forEach((signal) => { - signal.onabort = () => { - controller.abort(); - }; + signal.addEventListener( + 'abort', + () => { + controller.abort(); + }, + { once: true } + ); }); return controller; diff --git a/src/asyncobservable/concurrency/_delay.ts b/src/asyncobservable/concurrency/_delay.ts new file mode 100644 index 00000000..d4d7ab3b --- /dev/null +++ b/src/asyncobservable/concurrency/_delay.ts @@ -0,0 +1,26 @@ +import { AbortError } from '../../aborterror'; + +export function delay(dueTime: number, signal: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new AbortError()); + } + + const id = setTimeout(() => { + if (signal.aborted) { + reject(new AbortError()); + } else { + resolve(); + } + }, dueTime); + + signal.addEventListener( + 'abort', + () => { + clearTimeout(id); + reject(new AbortError()); + }, + { once: true } + ); + }); +} diff --git a/src/asyncobservable/concurrency/asyncschedulerx.ts b/src/asyncobservable/concurrency/asyncschedulerx.ts index 89f7b4cf..50db7667 100644 --- a/src/asyncobservable/concurrency/asyncschedulerx.ts +++ b/src/asyncobservable/concurrency/asyncschedulerx.ts @@ -1,3 +1,4 @@ +import { throwIfAborted } from 'ix/aborterror'; import { AsyncScheduler, AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces'; function normalizeTime(time: number) { @@ -9,21 +10,31 @@ export abstract class AsyncSchedulerX implements AsyncScheduler { return Date.now(); } - scheduleNowAsync(action: (signal: AbortSignal) => Promise) { - return this._scheduleAsync(action); + scheduleNowAsync(action: (signal: AbortSignal) => Promise, signal?: AbortSignal) { + return this._scheduleAsync(action, signal); } - scheduleFutureAsync(action: (signal: AbortSignal) => Promise, dueTime: number) { + scheduleFutureAsync( + action: (signal: AbortSignal) => Promise, + dueTime: number, + signal?: AbortSignal + ) { const newTime = normalizeTime(dueTime); - return this._scheduleAsync(async (signal) => { - await this._delay(newTime, signal); - await action(signal); - }); + return this._scheduleAsync(async (innerSignal) => { + await this._delay(newTime, innerSignal); + await action(innerSignal); + }, signal); } - async _scheduleAsync(action: (signal: AbortSignal) => Promise): Promise { + async _scheduleAsync( + action: (signal: AbortSignal) => Promise, + signal?: AbortSignal + ): Promise { + throwIfAborted(signal); + const cas = new CancellationAsyncSubscription(); + cas.link(signal); await this._scheduleCoreAsync(action, cas.signal); return cas; } @@ -47,6 +58,12 @@ export class CancellationAsyncSubscription implements AsyncSubscription { return this._controller.signal; } + link(signal?: AbortSignal) { + if (signal) { + signal.addEventListener('abort', () => this._controller.abort(), { once: true }); + } + } + async [SYMBOL_ASYNC_DISPOSABLE]() { this._controller.abort(); } diff --git a/src/asyncobservable/concurrency/immediateasyncscheduler.ts b/src/asyncobservable/concurrency/immediateasyncscheduler.ts index 8a0bebc3..4dd88891 100644 --- a/src/asyncobservable/concurrency/immediateasyncscheduler.ts +++ b/src/asyncobservable/concurrency/immediateasyncscheduler.ts @@ -1,30 +1,5 @@ -import { AbortError } from '../../aborterror'; import { AsyncSchedulerX } from './asyncschedulerx'; - -function delay(dueTime: number, signal: AbortSignal) { - return new Promise((resolve, reject) => { - if (signal.aborted) { - reject(new AbortError()); - } - - const id = setTimeout(() => { - if (signal.aborted) { - reject(new AbortError()); - } else { - resolve(); - } - }, dueTime); - - signal.addEventListener( - 'abort', - () => { - clearTimeout(id); - reject(new AbortError()); - }, - { once: true } - ); - }); -} +import { delay } from './_delay'; export class ImmediateAsyncScheduler extends AsyncSchedulerX { private static _instance = new ImmediateAsyncScheduler(); diff --git a/src/asyncobservable/concurrency/microtaskscheduler.ts b/src/asyncobservable/concurrency/microtaskscheduler.ts new file mode 100644 index 00000000..17330500 --- /dev/null +++ b/src/asyncobservable/concurrency/microtaskscheduler.ts @@ -0,0 +1,21 @@ +import { AsyncSchedulerX } from './asyncschedulerx'; +import { delay } from './_delay'; + +export class MicroTaskAsyncScheduler extends AsyncSchedulerX { + private static _instance = new MicroTaskAsyncScheduler(); + static get instance() { + return MicroTaskAsyncScheduler._instance; + } + + async _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise { + return new Promise((resolve) => { + resolve(action(signal)); + }); + } + async _delay(dueTime: number, signal: AbortSignal): Promise { + await delay(dueTime, signal); + } +} diff --git a/src/asyncobservable/from.ts b/src/asyncobservable/from.ts new file mode 100644 index 00000000..898fc139 --- /dev/null +++ b/src/asyncobservable/from.ts @@ -0,0 +1,72 @@ +import { AbortError } from '../aborterror'; +import { AsyncScheduler, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; + +export class FromAsyncIterableAsyncObservable extends AsyncObservableX { + private _value: AsyncIterable; + private _scheduler: AsyncScheduler; + + constructor(value: AsyncIterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + // TODO: put a try/catch around each next() call + for await (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} + +export class FromIterableAsyncObservable extends AsyncObservableX { + private _value: Iterable; + private _scheduler: AsyncScheduler; + + constructor(value: Iterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + for await (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} diff --git a/src/asyncobservable/of.ts b/src/asyncobservable/of.ts new file mode 100644 index 00000000..77650f79 --- /dev/null +++ b/src/asyncobservable/of.ts @@ -0,0 +1,42 @@ +import { AbortError } from '../aborterror'; +import { AsyncScheduler, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; +import { ImmediateAsyncScheduler } from './concurrency/immediateasyncscheduler'; + +export class OfAsyncObservable extends AsyncObservableX { + private _value: Iterable; + private _scheduler: AsyncScheduler; + + constructor(value: Iterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + for (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} + +export function of(...args: TSource[]) { + return new OfAsyncObservable(args, ImmediateAsyncScheduler.instance); +} diff --git a/src/interfaces.ts b/src/interfaces.ts index 65fee1b7..01662e26 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -70,9 +70,13 @@ export interface AsyncSubject export interface AsyncScheduler { now: number; - scheduleNowAsync(action: (signal: AbortSignal) => Promise): Promise; + scheduleNowAsync( + action: (signal: AbortSignal) => Promise, + signal?: AbortSignal + ): Promise; scheduleFutureAsync( action: (signal: AbortSignal) => Promise, - dueTime: number + dueTime: number, + signal?: AbortSignal ): Promise; } From 82ec0a275ef2268880db34f943679cd703e567dc Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Thu, 24 Sep 2020 17:56:55 -0400 Subject: [PATCH 8/9] feat(scheduler): add more cancellation --- src/asyncobservable/concurrency/microtaskscheduler.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/asyncobservable/concurrency/microtaskscheduler.ts b/src/asyncobservable/concurrency/microtaskscheduler.ts index 17330500..a5fb9060 100644 --- a/src/asyncobservable/concurrency/microtaskscheduler.ts +++ b/src/asyncobservable/concurrency/microtaskscheduler.ts @@ -15,6 +15,7 @@ export class MicroTaskAsyncScheduler extends AsyncSchedulerX { resolve(action(signal)); }); } + async _delay(dueTime: number, signal: AbortSignal): Promise { await delay(dueTime, signal); } From 0eecdc0dea63a8b853de0ffced43a07d69c5cdba Mon Sep 17 00:00:00 2001 From: Matthew Podwysocki Date: Thu, 24 Sep 2020 18:04:31 -0400 Subject: [PATCH 9/9] feat(scheduler): add more cancellation --- src/asyncobservable/concurrency/microtaskscheduler.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/asyncobservable/concurrency/microtaskscheduler.ts b/src/asyncobservable/concurrency/microtaskscheduler.ts index a5fb9060..da461a4f 100644 --- a/src/asyncobservable/concurrency/microtaskscheduler.ts +++ b/src/asyncobservable/concurrency/microtaskscheduler.ts @@ -11,8 +11,8 @@ export class MicroTaskAsyncScheduler extends AsyncSchedulerX { action: (signal: AbortSignal) => Promise, signal: AbortSignal ): Promise { - return new Promise((resolve) => { - resolve(action(signal)); + return Promise.resolve().then(() => { + return action(signal); }); }