Strongly consider starting with @most/core. It is the foundation of the upcoming most 2.0, has improved documentation, new features, better tree-shaking build characteristics, and simpler APIs. Updating from @most/core to most 2.0 will be non-breaking and straightforward.
You can keep using most 1.x, and update to either @most/core or most 2.0 when you're ready. See the upgrade guide for more information.
- Reading these docs
- API Notes
- Creating streams
- Handling errors
- recoverWith, alias flatMapError
- throwError
- Transforming streams
- Filtering streams
- Transducer support
- Slicing streams
- Looping
- Adapting fluent APIs
- Consuming streams
- Combining streams
- Combining higher order streams
- switchLatest, alias switch
- join
- mergeConcurrently
- Awaiting promises
- awaitPromises, alias await
- Rate limiting streams
- Delaying streams
- Sharing stream
You'll see diagrams like the following:
stream1: -a-b-c-d->
stream2: -a--b---c|
stream3: -abc-def-X
These are timeline diagrams that try to give a simple, representative notion of how a stream behaves over time. Time proceeds from left to right, using letters and symbols to indicate certain things:
-- an instant in time where no event occurs- letters (a,b,c,d,etc) - an event at an instant in time
|- end of streamX- an error occurred at an instant in time>- stream continues infinitely- Typically,
>means you can assume that a stream will continue to repeat some common pattern infinitely
- Typically,
stream: a|
A stream that emits a and then ends immediately.
stream: a-b---|
A stream that emits a, then b, and some time later ends.
stream: a-b-X
A stream that emits a, then b, then fails.
stream: abc-def->
A stream that emits a, then b, then c, then nothing, then d, then e, then f, and then continues infinitely.
Most.js implements a subset of the draft ES Observable proposal:
stream[Symbol.observable]() -> Observablereturns a compatible observable with asubscribemethod that other implementations can consume.most.from(observable) -> Streamcoerces a compliantobservable(one that provides[Symbol.observable]()) to a most.js stream.stream.forEach(f) -> Promiseis fully compatible with the draft ES ObservableforEachAPI.stream.subscribe(observer) -> Subscriptionsubscribes to a most.js Stream using the draft ES ObservablesubscribeAPI.
This allows most.js to interoperate seamlessly with other implementations, such as RxJS 5, and Kefir.
Consult the documentation of other libraries for specifics. Any functions and methods that accept draft ES Observables should accept most.js Streams seamlessly.
Use most.from to coerce any observable to a most.js stream:
import { from } from 'most'
const mostStream = from(anyObservable)You can use most.from in other creative ways as well:
const functionThatReturnsAnObservable = a => // return an observable
// Using chain (aka flatMap)
const mostStream = //...
// Use .map.chain
mostStream.map(functionThatReturnsAnObservable).chain(from)
.observe(b => console.log(b))
// Or use function composition, using your favorite FP lib
mostStream.chain(compose(functionThatReturnsAnObservable, from))A similar approach works with other higher order operations such as join and switch.
mostStream.map(functionThatReturnsAnObservable).map(from).join()...
mostStream.map(functionThatReturnsAnObservable).map(from).switch()...Or with merge, combine, etc. by coercing first
arrayOfObservables = [...]
most.mergeArray(arrayOfObservables.map(from))
most.combineArray(combineFunction, arrayOfObservables.map(from))Alias: most.of
most.of(x): x|
Create a stream containing only x.
const stream = most.of('hello');
stream.observe(x => console.log(x)); // logs hello// Use `just` for destructured ES6 import
import { just } from 'most';
const stream = just('hello');
stream.observe(x => console.log(x));promise: ----a
most.fromPromise(promise): ----a|
Create a stream containing the outcome of a promise. If the promise fulfills, the stream will contain the promise's value. If the promise rejects, the stream will be in an error state with the promise's rejection reason as its error. See recoverWith for error recovery.
observable: -a--b--c--c-->
most.from(observable): -a--b--c--c-->
most.from([1,2,3,4]): 1234|
Create a stream containing all items from an Iterable or Observable.
The observable must provide minimal draft ES observable compliance as per the es-observable draft: it must have a [Symbol.observable]() method that returns an object with a well-behaved .subscribe() method.
The iterable can be an Array, Array-like, or anything that supports the iterable protocol or iterator protocol, such as a generator. Providing a finite iterable, such as an Array, creates a finite stream. Providing an infinite iterable, such as an infinite generator, creates an infinite stream.
Note: from will fail fast by throwing a TypeError synchronously when passed a value that is not an Iterable, Iterator, or Observable. This indicates an invalid use of from, which should be fixed/prevented rather than handled at runtime.
// Logs 1 2 3 4
most.from([1,2,3,4])
.forEach(console.log.bind(console));// Strings are Array-like, this works
// Logs a b c d
most.from('abcd')
.forEach(console.log.bind(console));function* numbers() {
for(i=0 ;; ++i) {
yield i;
}
}
// Create an infinite stream of numbers
var stream = most.from(numbers());
// Limit the stream to the first 100 numbers
stream.take(100)
.forEach(console.log.bind(console));Note: periodic's second argument (x) is deprecated. To create a periodic stream with a specific value use constant(x, periodic(period))
most.periodic(2): x-x-x-x-x-x-> (x === undefined)
most.periodic(5, a): a----a----a->
Create an infinite stream containing events that arrive every period milliseconds, and whose value is undefined.
most.empty(): |
Create an already-ended stream containing no events.
most.never(): ---->
Create a stream that contains no events and never ends.
Build an infinite stream by computing successive items iteratively. Conceptually, the stream will contain: [initial, f(initial), f(f(initial)), ...]
// An infinite stream of all integers >= 0, ie
// 0, 1, 2, 3, 4, 5, ...
most.iterate(function(x) {
return x + 1;
}, 0);The iterating function may return a promise. This allows most.iterate to be used to build asynchronous streams of future values. For example:
// An infinite stream of all integers >= 0, each delayed
// by 1 more millisecond than the previous.
// IOW, a stream that decelerates as it produces values:
// 0 (immediately)
// 1 (1 millisecond after 0)
// 2 (2 millisecond after 1)
// 3 (3 millisecond after 2)
// ... etc
// Take only the first 10
most.iterate(x => delay(x + 1)).take(10);
// Simple promise delay helper
const delay = y =>
new Promise(resolve => setTimeout(resolve, y, y), 0);Build a stream by computing successive items. Whereas reduce tears down a stream to a final value, unfold builds up a stream from a seed value.
The unfolding function accepts a seed value and must return a tuple: {value:*, seed:*, done:boolean}, or a promise for a tuple. Returning a promise allows most.unfold to be used to build asynchronous streams of future values.
tuple.valuewill be emitted as an event.tuple.seedwill be passed to the next invocation of the unfolding function.tuple.donecan be used to stop unfolding. Whentuple.done == true, unfolding will stop. Additionally, whentuple.done == true:tuple.value(deprecated) will be used as the stream's end signal value. In future versions,tuple.valuewill be ignored whentuple.doneistruetuple.seedwill be ignored
Note that if the unfolding function never returns a tuple with tuple.done == true, the stream will be infinite.
const urlPrefix = 'product/'
const fetch = url => {
// ... fetch content and return a promise
return Promise.resolve('...')
}
// Unfold an infinite stream of products, producing a stream of:
// [rest('product/1'), rest('product/2'), rest('product/3'), ...]
most.unfold(id =>
fetch(urlPrefix + id).then(content => {
return { value: content, seed: id + 1 }
}), 1)Build a stream by running an asynchronous generator: a generator which yields promises.
When the generator yields a promise, the promise's fulfillment value will be added to the stream. If the promise rejects, an exception will be thrown in the generator. You can use try/catch to handle the exception.
function delayPromise(ms, value) {
return new Promise(resolve => setTimeout(() => resolve(value), ms));
}
function* countdown(delay, start) {
for(let i = start; i > 0; --i) {
yield delayPromise(delay, i);
}
}
// Logs
// 3 (after 1 second)
// 2 (after 1 more second)
// 1 (after 1 more second)
most.generate(countdown, 1000, 3)
.observe(x => console.log(x))source: -a--b-c---d->
most.fromEvent(eventType, source): -a--b-c---d->
Create a stream containing events from the provided EventTarget, such as a DOM element, or EventEmitter. This provides a simple way to coerce existing event sources into streams.
When passing an EventTarget, you can provide useCapture as the 3rd parameter, and it will be passed through to addEventListener and removeEventListener. When not provided, useCapture defaults to false.
When the stream ends (for example, by using take, takeUntil, etc.), it will automatically be disconnected from the event source. For example, in the case of DOM events, the underlying DOM event listener will be removed automatically.
Notes on EventEmitter
- When source event has more than one argument, all the arguments will be aggregated into array in resulting Stream.
- EventEmitters and EventTargets, such as DOM nodes, behave differently in that EventEmitter allows events to be delivered in the same tick as a listener is added. When using EventEmitter,
most.fromEvent, will ensure asynchronous event delivery, thereby preventing hazards of "maybe sync, maybe async" (aka zalgo) event delivery.
const clicks = most.fromEvent('click', document.querySelector('.the-button'));// We can do some event delegation by applying a filter to the stream
// in conjunction with e.target.matches this will allow only events with
// .the-button class to be processed
var container = document.querySelector('.container');
most.fromEvent('click', container)
.filter(e => e.target.matches('.the-button'))
.forEach(event => { /* do something with event */ })// Using preventDefault
const form = document.querySelector('form');
most.fromEvent('submit', form)
.tap(e => e.preventDefault())
.map(parseForm)
.map(JSON.stringify)
.forEach(event => { /* do something with JSON data */ })// Using event delegation with Element.matches
// This allows only events with the .toggle-button class
// It also only calls preventDefault on allowed events
const container = document.querySelector('.container');
most.fromEvent('click', container)
.filter(e => e.target.matches('.toggle-button'))
.tap(e => e.preventDefault())
.forEach(event => { /* do something with event */ })Create a new stream containing x followed by all events in stream.
stream: a-b-c-d->
stream.startWith(x): xa-b-c-d->
Create a new stream containing all events in stream1 followed by all events in stream2.
stream1: -a-b-c|
stream2: -d-e-f->
stream1.concat(stream2): -a-b-c-d-e-f->
Note that this effectively timeshifts events from stream2 past the end time of stream1. In contrast, other operations such as combine, merge, chain preserve event arrival times, allowing events from the multiple combined streams to interleave.
Alias: flatMapError
Recover from a stream failure by calling a function to create a new stream.
stream: -a-b-c-X
f(X): -d-e-f->
stream.recoverWith(f): -a-b-c-d-e-f->
When a stream fails with an error, the error will be passed to f. f must return a new stream to replace the error.
const fetch = url => {
// ... fetch content and return a promise
return Promise.resolve('...')
}
const stream = most.fromPromise(fetch('http://myapi.com/things'));
// Try to process data from the real API, but fall back
// to some default data if that fails.
stream.map(JSON.parse)
.recoverWith(e => most.of(defaultData))
.forEach(processData);Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.
most.throwError(X): X
Create a new stream by applying f to each event of the input stream.
stream: -a-b-c-d->
stream.map(add1): -f(a)-f(b)-f(c)-f(d)->
// Logs 2 3 4 5
most.from([1,2,3,4])
.map(function(x) {
return x + 1;
})
.forEach(console.log.bind(console));Create a new stream by replacing each event of the input stream with x.
stream: -a-b-c-d->
stream.constant(x): -x-x-x-x->
// Logs 1 1 1 1
most.from([1,2,3,4])
.constant(1)
.forEach(console.log.bind(console));Create a new stream containing incrementally accumulated results, starting with the provided initial value.
function f(accumulated, x) -> newAccumulated
stream: -1-2-3->
stream.scan(add, 0): 01-3-6->
Unlike reduce which produces a single, final result, scan emits incremental results. The resulting stream is of the same proportion as the original. For example, if the original contains 10 events, the resulting stream will contain 11 (the initial value, followed by 10 incremental events). If the original stream is infinite, the resulting stream will be infinite.
// Logs a ab abc abcd
most.from(['a', 'b', 'c', 'd'])
.scan((string, letter) => string + letter, '')
.forEach(s => console.log(s));// Maintain a sliding window of (up to) 3 values in an array
// A stream containing all integers >= 0
const numbers = most.iterate(x => x + 1, 0);
const nextWindow = (slidingWindow, x) =>
slidingWindow.concat(x).slice(-3)
// Logs
// []
// [0]
// [0,1]
// [0,1,2]
// [1,2,3]
// [2,3,4]
// ... etc ...
numbers.scan(nextWindow, [])
.take(10)
.forEach(array => console.log(array));Alias: flatMap
Transform each event in stream into a stream, and then merge it into the resulting stream. Note that f must return a stream.
function f(x) -> Stream
stream: -a----b----c|
f(a): 1--2--3|
f(b): 1----2----3|
f(c): 1-2-3|
stream.chain(f): -1--2-13---2-1-233|
Note the difference between concatMap and chain: concatMap concatenates, while chain merges.
// Logs: 1 2 1 1 2 1 1 2 2 2
most.from([1, 2])
.chain(x => most.periodic(x * 10).take(5).constant(x))
.observe(x => console.log(x));Replace the end signal with a new stream returned by f. Note that f must return a stream.
function f(x) -> Stream
stream: -a-b-c-d-e-f->
stream.take(4): -a-b-c-d|end
f(end): 1-2-3-4-5->
stream.continueWith(f): -a-b-c-d-1-2-3-4-5->
most.periodic(10, 'x')
.take(4)
.continueWith(() => most.iterate(x => x + 1, 1).take(5))
.observe(x => console.log(x));
// Logs: x 4 times... ends and then logs 1, 2, 3, 4, 5Transform each event in stream into a stream, and then concatenate it onto the end of the resulting stream. Note that f must return a stream.
The mapping function f is applied lazily. That is, f is called only once it is time to concatenate a new stream.
function f(x) -> Stream
stream: -a----b----c|
f(a): 1--2--3|
f(b): 1----2----3|
f(c): 1-2-3|
stream.concatMap(f): -1--2--31----2----31-2-3|
f called lazily: ^ ^ ^
Note the difference between concatMap and chain: concatMap concatenates, while chain merges.
// Logs: 1 1 1 1 1 2 2 2 2 2
most.from([1, 2])
.concatMap(x => most.periodic(x * 10).take(5).constant(x))
.observe(console.log.bind(console));Apply the latest function in streamOfFunctions to the latest value in stream.
streamOfFunctions: --f---------g--------h------>
stream: -a-------b-------c-------d-->
streamOfFunctions.ap(stream): --fa-----fb-gb---gc--hc--hd->
In effect, ap applies a time-varying function to a time-varying value.
Materialize event timestamps, transforming Stream<X> into Stream<{ time:number, value:X }>
// Logs
// { time: 1418740004055, value: 'hello' }
// { time: 1418740004065, value: 'hello' }
// { time: 1418740004075, value: 'hello' }
// { time: 1418740004085, value: 'hello' }
// ... etc
most.periodic(10).constant('hello')
.timestamp()
.take(10)
.observe(console.log.bind(console));Perform a side-effect for each event in stream.
stream: -a-b-c-d->
stream.tap(f): -a-b-c-d->
For each event in stream, f is called, but the value of its result is ignored. If f fails (ie throws), then the returned stream will also fail. The stream returned by tap will contain the same events as the original stream.
Create a stream containing only events for which predicate returns truthy.
stream: -1-2-3-4->
stream.filter(even): ---2---4->
Create a new stream with adjacent repeated events removed.
stream: -1-2-2-3-4-4-5->
stream.skipRepeats(): -1-2---3-4---5->
Note that === is used to identify duplicate items. To use a different comparison, use skipRepeatsWith
Create a new stream with adjacent repeated events removed, using the provided equals function.
stream: -a-b-B-c-D-d-e->
stream.skipRepeatsWith(equalsIgnoreCase): -a-b---c-D---e->
The equals function should accept two values and return truthy if the two values are equal, or falsy if they are not equal.
function equals(a, b) -> boolean
Create a new stream by passing items through the provided transducer.
Transducers are composable transformations. They may map, filter, add items to, drop items from, or otherwise transform an event stream. The primary benefit of transducers is that they are composable and reusable across any data structures that support them (see note on performance below)
Most.js supports any transducer that implements the de facto JavaScript transducer protocol. For example, two popular transducers libraries are transducers-js and transducers.js.
// Create a transducer that slices, filters, and maps
import transducers from 'transducers-js'
var transducer = transducers.comp(
transducers.take(4),
transducers.filter(x => x % 2 === 0),
transducers.map(x => x + 1)
)
// Logs 3 5
most.from([1,2,3,4,5,6,7,8,9])
.transduce(transducer)
.observe(x => console.log(x))Note on transducer performance: Transducers perform single-pass transformation. For many data structures, this can provide a significant performance improvement. However, most.js's builtin combinators currently outperform popular transducer libraries. The primary benefit of using transducers with most.js is reusability and portability.
Create a new stream containing only events where start <= index < end, where index is the ordinal index of an event in stream.
stream: -a-b-c-d-e-f->
stream.slice(1, 4): ---b-c-d|
stream: -a-b-c|
stream.slice(1, 4): ---b-c|
If stream contains fewer than start events, the returned stream will be empty.
Create a new stream containing at most n events from stream.
stream: -a-b-c-d-e-f->
stream.take(3): -a-b-c|
stream: -a-b|
stream.take(3): -a-b|
If stream contains fewer than n events, the returned stream will be effectively equivalent to stream.
Create a new stream that omits the first n events from stream.
stream: -a-b-c-d-e-f->
stream.skip(3): -------d-e-f->
stream: -a-b-c-d-e|
stream.skip(3): -------d-e|
stream: -a-b-c|
stream.skip(3): ------|
If stream contains fewer than n events, the returned stream will be empty.
Create a new stream containing all events until predicate returns false.
stream: -2-4-5-6-8->
stream.takeWhile(even): -2-4-|
Create a new stream containing all events after predicate returns false.
stream: -2-4-5-6-8->
stream.skipWhile(even): -----5-6-8->
Create a new stream containing all events before and including when the predicate returns true.
stream: -1-2-3-4-5-6-8->
stream.skipAfter(even): -1-2|
Alias: takeUntil
Create a new stream containing all events until endSignal emits an event.
stream: -a-b-c-d-e-f->
endSignal: ------z->
stream.until(endSignal): -a-b-c|
If endSignal is empty or never emits an event, then the returned stream will be effectively equivalent to stream.
// Log mouse events until the user clicks. Note that DOM event handlers will
// automatically be unregistered.
most.fromEvent('mousemove', document)
.until(most.fromEvent('click', document))
.forEach(mouseEvent => console.log(mouseEvent));Alias: skipUntil
Create a new stream containing all events after startSignal emits its first event.
stream: -a-b-c-d-e-f->
startSignal: ------z->
stream.since(startSignal): -------d-e-f->
If startSignal is empty or never emits an event, then the returned stream will be effectively equivalent to never().
// Start logging mouse events when the user clicks.
most.fromEvent('mousemove', document)
.since(most.fromEvent('click', document))
.forEach(mouseEvent => console.log(mouseEvent));Create a new stream containing only events that occur during a dynamic time window.
stream: -a-b-c-d-e-f-g->
timeWindow: -----s
s: -----t
stream.during(timeWindow): -----c-d-e-|
This is similar to slice, but uses time signals rather than indices to limit the stream.
// After the first click, log mouse move events for 1 second.
// Note that DOM event handlers will automatically be unregistered.
const start = most.fromEvent('click', document);
const end = most.of().delay(1000);
// Map the first click to a stream containing a 1 second delay
// The click represents the window start time, after which
// the window will be open for 1 second.
const timeWindow = start.constant(end);
most.fromEvent('mousemove', document)
.during(timeWindow)
.forEach(mouseEvent => console.log(mouseEvent));Create a feedback loop that emits one value and feeds back another to be used in the next iteration.
It allows you to maintain and update a "state" (aka feedback, aka seed for the next iteration) while emitting a different value. In contrast, scan feeds back and emits the same value.
// Average an array of values
const average = values =>
values.reduce((sum, x) => sum + x, 0) / values.length
const stream = most.iterate(x => x + 1, 0)
// Emit the simple (ie windowed) moving average of the 10 most recent values
stream.loop((values, x) => {
values.push(x);
values = values.slice(-10); // Keep up to 10 most recent
const avg = average(values);
// Return { seed, value } pair.
// seed will feed back into next iteration
// value will be propagated
return { seed: values, value: avg };
}, [])
.take(10)
.observe(avg => console.log(avg));transform (stream: Stream<A>) -> R
Use a functional API in fluent style.
Functional APIs allow for the highest degree of modularity via external packages, such as @most/hold, without the risks of modifying prototypes.
Note that the transform function may return any type (not just a Stream), and .thru(transform) will also return that type.
If you prefer using fluent APIs, thru allows using those functional APIs in a fluent style. For example:
import { hold } from '@most/hold'
import { periodic } from 'most'
periodic(10, 1)
.take(5)
.scan((total, increment) => total + increment, 0)
.thru(hold)
.observe(x => console.log(x))rather than mixing functional and fluent:
import { hold } from '@most/hold'
import { periodic } from 'most'
hold(periodic(10, 1)
.take(5)
.scan((total, increment) => total + increment, 0))
.observe(x => console.log(x))Note also that the function passed to thru is not restricted to returning a stream. For example:
import { from } from 'most'
const lastAsPromise = (stream) =>
stream.reduce((_, x) => x)
// since lastAsPromise returns a Promise
// stream.thru(lastAsPromise) *also* returns a Promise because
// logs 3
from([1, 2, 3])
.thru(lastAsPromise) // returns a Promise
.then(x => console.log(x))Multiple arguments should be handled via partial application of the function passed to thru, using bind or a currying or partial application utility from your favorite functional programming library.
// Via curried transform
const transform = curry((x, y, z, stream) => /* ... */)
const stream2 = stream1.thru(transform(x, y, z))// Via partial application
const transform = (x, y, z, stream) => // ...
// Partially apply with partial application helper
const stream2 = stream1.thru(partial(transform, x, y, z))
// Or, partially apply with bind
const stream2 = stream1.thru(transform.bind(null, x, y, z)))Reduce a stream, returning a promise for the ultimate result.
stream: -1-2-3-4-|
stream.reduce(sum, 0): 10
The returned promise will fulfill with the final reduced result, or will reject if a failure occurs while reducing the stream.
The reduce function (f above)
TODO: Example
Alias: forEach
Start consuming events from stream, processing each with f. The returned promise will fulfill after all the events have been consumed, or will reject if the stream fails and the error is not handled.
The forEach alias is compatible with the draft ES Observable proposal forEach. Read more about Observable interop here.
// Log mouse movements until the user clicks, then stop.
most.fromEvent('mousemove', document)
.takeUntil(most.fromEvent('click', document))
.observe(x => console.log(x))
.then(() => console.log('All done'))Start consuming events from stream. This can be useful in some cases where you don't want or need to process the terminal events--e.g. when all processing has been done via upstream side-effects. Most times, however, you'll use observe to consume and process terminal events.
The returned promise will fulfill after all the events have been consumed, or will reject if the stream fails and the error is not handled.
Draft ES Observable compatible subscribe. Start consuming events from stream by providing an Observer object.
type Observer = {
// Receives the next value in the sequence
next(value) => void
// Receives the sequence error
error(errorValue) => void
// Receives the sequence completion signal
// The completionValue parameter is deprecated
complete(completionValue) => void
}Returns a Subscription object that can be used to unsubscribe from the stream of events.
type Subscription = {
// Cancels the subscription
unsubscribe() => void
}Read more about draft ES Observable interop here.
Both forEach and subscribe are supported in the draft ES Observable proposal, and the following behave similarly:
stream.forEach(handleEvent).then(handleEnd, handleError)
stream.subscribe({
next: handleEvent,
complete: handleEnd,
error: handleError
})However, there are also some important differences.
forEach
- returns a Promise, which can be transformed further using
.then, - integrates easily into existing asynchronous code that uses promises
- encourages declarative programming using
until,take, andtakeWhile, etc.
subscribe
- returns a
Subscription, - allows imperative unsubscription in cases where declarative isn't possible
Create a new stream containing events from stream1 and stream2.
stream1: -a--b----c--->
stream2: --w---x-y--z->
stream1.merge(stream2): -aw-b-x-yc-z->
Merging multiple streams creates a new stream containing all events from the input stream without affecting the arrival time of the events. You can think of the events from the input streams simply being interleaved into the new, merged stream. A merged stream ends when all of its input streams have ended.
In contrast to concat, merge preserves the arrival times of events. That is, it creates a new stream where events from stream1 and stream2 can interleave.
Array form of merge. Create a new Stream containing all events from all streams in arrayOfStreams.
s1: -a--b----c--->
s2: --w---x-y--z->
s3: ---1---2----3>
most.mergeArray([s1, s2, s3]): -aw1b-x2yc-z3>
See merge for more details.
Create a new stream that emits the set of latest event values from all input streams whenever a new event arrives on any input stream.
stream1: -0--1----2--->
stream2: --3---4-5--6->
stream1.combine(add, stream2): --3-4-5-67-8->
Combining creates a new stream by applying a function to the most recent event from each stream whenever a new event arrives on any one stream. Combining must wait for at least one event to arrive on all input streams before it can produce any events.
A combined stream has the same proportion as the max of the proportions of its input streams. To put it imperative terms: combine ends after all its inputs have ended.
// Add the current value of two inputs
// Updates the result whenever *either* of the inputs changes!
// Create a stream from an <input> value
const fromInput = input =>
most.fromEvent('change', input)
.map(e => e.target.value)
.map(Number)
// Add two numbers
const add = (x, y) => x + y
// Create streams for the current value of x and y
const xStream = fromInput(document.querySelector('input.x'))
const yStream = fromInput(document.querySelector('input.y'))
// Create a result stream by adding x and y
// This always adds the latest value of x and y
const resultStream = most.combine(add, xStream, yStream)
const resultNode = document.querySelector('.result')
resultStream.observe(z => {
resultNode.textContent = z
})Array form of combine. Create a new stream that emits the set of latest event values from all input streams whenever a new event arrives on any input stream.
s1: -0--1----2->
s2: --3---4-5-->
s3: ---2---1--->
most.combineArray(add3, [s1, s2, s3]): ---56-7678->
See combine for more details.
Create a new stream by combining sampled values from many input streams.
s1: -1-----2-----3->
s2: -1---2---3---4->
sampler: -a-a-a-a-a-a-a->
sampler.sample(add, s1, s2): -2-2-3-4-5-5-7->
s1: -1----2----3->
s2: -1-2-3-4-5-6->
sampler: -a--a--a--a-->
sampler.sample(add, s1, s2): -2--3--6--7-->
While combine, produces a value whenever an event arrives on any of its inputs, sample produces a value only when an event arrives on the sampler.
// Add the current value of two inputs
// Updates only when the user clicks a button
// Create a stream from an <input> value
const fromInput = input =>
most.fromEvent('change', input)
.map(e => e.target.value)
.map(Number)
// Add two numbers
const add = (x, y) => x + y
// Create streams for the current value of x and y
const xStream = fromInput(document.querySelector('input.x'))
const yStream = fromInput(document.querySelector('input.y'))
const click = most.fromEvent('click', document.querySelector('.button'))
// Create a result stream by adding the values of x and y
// at the time the button was clicked.
// NOTE: add() is NOT called when x and y change, but rather
// only when the button is clicked.
const resultStream = most.sample(add, click, xStream, yStream)
const resultNode = document.querySelector('.result')
resultStream.observe(z => {
resultNode.textContent = z
})When an event arrives on sampler, emit the latest event value from values. Effectively equivalent to sampler.sample(identity, values);
values: -1---2-3---4-5---6-7---8->
sampler: ---a---a---a---a---a---a->
values.sampleWith(sampler): ---1---3---4---5---7---8->
values: -1----2----3----4----5--->
sampler: -a-a-a-a-a-a-a-a-a-a-a-a->
values.sampleWith(sampler): -1-1-1-2-2-3-3-3-4-4-5-5->
Sampling can "smooth" an erratic source, or can act as a dynamic throttle to speed or slow events from one stream using another.
// Log mouse position whenever the user presses a key
most.fromEvent('mousemove', document)
.sampleWith(most.fromEvent('keydown', document))
.observe(console.log.bind(console));Create a new stream by applying a function to corresponding pairs of events from the inputs streams.
stream1: -1--2--3--4->
stream2: -1---2---3---4->
stream1.zip(add, stream2): -2---4---6---8->
Zipping correlates by index corresponding events from two or more input streams. Note that zipping a "fast" stream and a "slow" stream will cause buffering. Events from the fast stream must be buffered in memory until an event at the corresponding index arrives on the slow stream.
A zipped stream ends when any one of its input streams ends.
const add = (x, y) => x + y
// Logs 5 7 9
// In other words: add(1, 4) add(2, 5) add(3, 6)
most.from([1,2,3])
.zip(add, most.from([4,5,6,7,8]))
.forEach(x => console.log(x))A stream zipped with a stream created by most.periodic will emit events in intervals.
// Logs new sum every second
most.from([1, 2, 3, 4])
.zip(v => v, most.periodic(1000))
.scan((result, y) => result + y, 0)
.forEach(x => console.log(x))Alias: switch
Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.
s: -a-b-c-d-e-f->
t: -1-2-3-4-5-6->
stream: -s-----t----->
stream.switchLatest(): -a-b-c-4-5-6->
TODO: Example
Given a higher-order stream, return a new stream that merges all the inner streams as they arrive.
s: ---a---b---c---d-->
t: -1--2--3--4--5--6->
stream: -s------t--------->
stream.join(): ---a---b--4c-5-d6->
TODO: Example
Given a higher-order stream, return a new stream that merges inner streams as they arrive up to the specified concurrency. Once concurrency number of streams are being merged, newly arriving streams will be merged after an existing one ends.
s: --a--b--c--d--e-->
t: --x------y|
u: -1--2--3--4--5--6>
stream: -s--t--u--------->
stream.mergeConcurrently(2): --a--b--cy4d-5e-6>
Note that u is only merged after t ends, due to the concurrency level of 2.
Note also that stream.mergeConcurrently(Infinity) is equivalent to stream.join().
To control concurrency, mergeConcurrently must maintain an internal queue of newly arrived streams. If new streams arrive faster than the concurrency level allows them to be merged, the internal queue will grow infinitely.
Deprecated alias: await
Given a stream of promises, ie Stream<Promise<X>>, return a new stream containing the fulfillment values, ie Stream<X>.
promise p: ---1
promise q: ------2
promise r: -3
stream: -p---q---r->
stream.awaitPromises(): ---1--2--3->
Event times may be delayed. However, event order is always preserved, regardless of promise fulfillment order.
To create a stream that merges promises in fulfillment order, use
stream.chain(most.fromPromise). Note the difference:
promise p: --1
promise q: --------2
promise r: ------3
stream: -p-q-r----->
stream.chain(most.fromPromise): --1---3-2-->
stream.awaitPromises(): --1-----23->
If a promise rejects, the stream will be in an error state with the rejected promise's reason as its error. See recoverWith for error recovery. For example:
promise p: ---1
promise q: ------X
promise r: -3
stream: -p---q---r->
stream.awaitPromises(): ---1--X
If a promise remains pending forever, the stream will never produce any events beyond that promise. Use a promise timeout or race in such cases to ensure that all promises either fulfill or reject. For example:
promise p: ---1
promise q: ----------->
promise r: -3
stream: -p---q---r->
stream.awaitPromises(): ---1------->
const urls = ['http://...', 'http://...', 'http://...']
const fetchContent = url => {
// ... fetch url and return a promise for it ...
//
return Promise.resolve(url)
}
const streamOfPromises = most.from(urls).map(fetchContent)
const streamOfContent = streamOfPromises.await()
streamOfContent.forEach(content => console.log(content))Wait for a burst of events to subside and emit only the last event in the burst.
stream: abcd----abcd---->
stream.debounce(2): -----d-------d-->
s1: abcd----abcd---->
s2: ------------|
s1.until(s2).debounce(2): -----d------d|
If the stream ends while there is a pending debounced event (e.g. via until, see example above), the pending event will be emitted just before the stream ends.
Debouncing can be extremely useful when dealing with bursts of similar events, for example, debouncing keypress events before initiating a remote search query in a browser application.
const searchInput = document.querySelector('[name="search-text"]');
const searchText = most.fromEvent('input', searchInput);
// Logs the current value of the searchInput, only after the
// user stops typing for 500 millis
searchText.debounce(500)
.map(e => e.target.value)
.observe(x => console.log(x));Limit the rate of events to at most one per throttlePeriod.
stream: abcd----abcd---->
stream.throttle(2): a-c-----a-c----->
In contrast to debounce, throttle simply drops events that occur more often than throttlePeriod, whereas debounce waits for a "quiet period".
Timeshift a stream by delayTime in milliseconds.
stream: -a-b-c-d->
stream.delay(1): --a-b-c-d->
stream.delay(5): ------a-b-c-d->
Delaying a stream timeshifts all the events by the same amount. Delaying doesn't change the time between events.
Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.
stream: -a-b-c-d->
stream.multicast(): -a-b-c-d->
Using multicast allows you to build up a stream of maps, filters, and other transformations, and then share it efficiently with multiple observers.