Module: "urx/src/pipe"#

Stream values can be transformed and controlled by piping through operators. urx includes several operators like map, filter, scan, and throttleTime. The withLatestFrom operator allows the combination of values from other streams.

const foo = stream<number>()
// create an emitter that first adds 2 to the passed value, then multiplies it by * 2
const bar = pipe(foo, map(value => value + 2), map(value => value * 2))
subscribe(bar, value => console.log(value))
publish(foo, 2) // outputs 8

Implementing Custom Operators#

To implement your own operators, implement the Operator interface.

Index#

Interfaces#

Functions#

Functions#

debounceTime#

â–¸ debounceTime<T>(interval: number): Operator<T>

Defined in urx/src/pipe.ts:289

Debounces flowing values at the provided interval in milliseconds. Throttle VS Debounce in SO.

const foo = stream<number>()
publish(foo, 1)
setTimeout(() => publish(foo, 2), 20)
setTimeout(() => publish(foo, 3), 20)
subscribe(pipe(foo, debounceTime(50)), val => {
console.log(value); // 3
})

Type parameters:#

Name
T

Parameters:#

NameType
intervalnumber

Returns: Operator<T>


defaultComparator#

â–¸ defaultComparator<T>(previous: T, next: T): boolean

Defined in urx/src/pipe.ts:112

The default Comparator for distinctUntilChanged and duc.

Type parameters:#

Name
T

Parameters:#

NameType
previousT
nextT

Returns: boolean


distinctUntilChanged#

â–¸ distinctUntilChanged<T>(comparator?: Comparator<T>): Operator<T>

Defined in urx/src/pipe.ts:130

Filters out identical values. Pass an optional Comparator if you need to filter non-primitive values.

const foo = stream<number>()
subscribe(
pipe(foo, distinctUntilChanged()),
console.log
) // will be called only once
publish(foo, 42)
publish(foo, 42)

Type parameters:#

Name
T

Parameters:#

NameTypeDefault value
comparatorComparator<T>defaultComparator

Returns: Operator<T>


filter#

â–¸ filter<T>(predicate: (value: T) => boolean): Operator<T>

Defined in urx/src/pipe.ts:156

Filters out values for which the predicator does not return true-ish.

const foo = stream<number>()
subscribe(
pipe(foo, filter(value => value % 2 === 0)),
console.log
) // will be called only with even values
publish(foo, 2)
publish(foo, 3)
publish(foo, 4)
publish(foo, 5)

Type parameters:#

Name
T

Parameters:#

NameType
predicate(value: T) => boolean

Returns: Operator<T>


map#

â–¸ map<T, K>(project: (value: T) => K): Operator<T, K>

Defined in urx/src/pipe.ts:176

Maps values using the provided project function.

const foo = stream<number>()
subscribe(
pipe(foo, map(value => value * 2)),
console.log
) // 4, 6
publish(foo, 2)
publish(foo, 3)

Type parameters:#

Name
T
K

Parameters:#

NameType
project(value: T) => K

Returns: Operator<T, K>


mapTo#

â–¸ mapTo<T>(value: T): Operator<any, T>

Defined in urx/src/pipe.ts:194

Maps values to the hard-coded value.

const foo = stream<number>()
subscribe(
pipe(foo, mapTo(3)),
console.log
) // 3, 3
publish(foo, 1)
publish(foo, 2)

Type parameters:#

Name
T

Parameters:#

NameType
valueT

Returns: Operator<any, T>


pipe#

â–¸ pipe<T>(s: Emitter<T>): Emitter<T>

Defined in urx/src/pipe.ts:76

Creates a new emitter from the passed one by piping its values through one or more operators. Operators can perform various actions like filter values, pull values from other emitters, or compute new values.

const foo = stream<number>()
// create an emitter that first adds 2 to the passed value, then multiplies it by * 2
const bar = pipe(foo, map(value => value + 2), map(value => value * 2))
subscribe(bar, value => console.log(value))
publish(foo, 2) // outputs 8

Sharing Subscription Calculations#

pipe acts as a proxy for the source emitter, and re-runs the operators for each subscription to the derived emitter. Use streamFromEmitter or statefulStreamFromEmitter to avoid that.

Type parameters:#

Name
T

Parameters:#

NameType
sEmitter<T>

Returns: Emitter<T>

â–¸ pipe<T, O1>(s: Emitter<T>, o1: O<T, O1>): Emitter<O1>

Defined in urx/src/pipe.ts:77

Type parameters:#

Name
T
O1

Parameters:#

NameType
sEmitter<T>
o1O<T, O1>

Returns: Emitter<O1>

â–¸ pipe<T, O1, O2>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>]): Emitter<O2>

Defined in urx/src/pipe.ts:78

Type parameters:#

Name
T
O1
O2

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>]

Returns: Emitter<O2>

â–¸ pipe<T, O1, O2, O3>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>, O<O2, O3>]): Emitter<O3>

Defined in urx/src/pipe.ts:79

Type parameters:#

Name
T
O1
O2
O3

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>, O<O2, O3>]

Returns: Emitter<O3>

â–¸ pipe<T, O1, O2, O3, O4>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>]): Emitter<O4>

Defined in urx/src/pipe.ts:80

Type parameters:#

Name
T
O1
O2
O3
O4

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>]

Returns: Emitter<O4>

â–¸ pipe<T, O1, O2, O3, O4, O5>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>]): Emitter<O5>

Defined in urx/src/pipe.ts:81

Type parameters:#

Name
T
O1
O2
O3
O4
O5

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>]

Returns: Emitter<O5>

â–¸ pipe<T, O1, O2, O3, O4, O5, O6>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>, O<O5, O6>]): Emitter<O6>

Defined in urx/src/pipe.ts:82

Type parameters:#

Name
T
O1
O2
O3
O4
O5
O6

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>, O<O5, O6>]

Returns: Emitter<O6>

â–¸ pipe<T, O1, O2, O3, O4, O5, O6, O7>(s: Emitter<T>, ...o: [O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>, O<O5, O6>, O<O6, O7>]): Emitter<O7>

Defined in urx/src/pipe.ts:83

Type parameters:#

Name
T
O1
O2
O3
O4
O5
O6
O7

Parameters:#

NameType
sEmitter<T>
...o[O<T, O1>, O<O1, O2>, O<O2, O3>, O<O3, O4>, O<O4, O5>, O<O5, O6>, O<O6, O7>]

Returns: Emitter<O7>


scan#

â–¸ scan<T, K>(scanner: (current: T, value: K) => T, initial: T): Operator<K, T>

Defined in urx/src/pipe.ts:213

Works like Array#reduce. Applies an accumulator function on the emitter, and outputs intermediate result. Starts with the initial value.

const foo = stream<number>()
subscribe(
pipe(foo, scan((acc, value) => acc + value, 2),
console.log
) // 3, 5
publish(foo, 1)
publish(foo, 2)

Type parameters:#

Name
T
K

Parameters:#

NameType
scanner(current: T, value: K) => T
initialT

Returns: Operator<K, T>


skip#

â–¸ skip<T>(times: number): Operator<T>

Defined in urx/src/pipe.ts:233

Skips the specified amount of values from the emitter.

const foo = stream<number>()
subscribe(
pipe(foo, skip(2)),
console.log
) // 3, 4
publish(foo, 1) // skipped
publish(foo, 2) // skipped
publish(foo, 3)
publish(foo, 4)

Type parameters:#

Name
T

Parameters:#

NameType
timesnumber

Returns: Operator<T>


throttleTime#

â–¸ throttleTime<T>(interval: number): Operator<T>

Defined in urx/src/pipe.ts:255

Throttles flowing values at the provided interval in milliseconds. Throttle VS Debounce in SO.

const foo = stream<number>()
publish(foo, 1)
setTimeout(() => publish(foo, 2), 20)
setTimeout(() => publish(foo, 3), 20)
subscribe(pipe(foo, throttleTime(50)), val => {
console.log(value); // 3
})

Type parameters:#

Name
T

Parameters:#

NameType
intervalnumber

Returns: Operator<T>


withLatestFrom#

â–¸ withLatestFrom<T, R1>(...s: [Emitter<R1>]): Operator<T, [T, R1]>

Defined in urx/src/pipe.ts:327

Combines the source Emitter with the latest values from the specified Emitters into an array. Outputs only when the source Emitter emits. See combineLatest for a transformer that outputs when any of the emitters emit.

const foo = stream<number>()
const bar = stream<number>()
subscribe(
pipe(
foo,
withLatestFrom(bar)
),
(([foo, bar]) => console.log({ foo, bar }))
)
publish(foo, 1) // nothing happens, bar has not emitted yet
publish(bar, 1) // still nothing
publish(foo, 2) // logs { foo: 2, bar: 1 }
publish(bar, 2)
publish(foo, 3) // logs { foo: 3, bar: 2 }

Type parameters:#

Name
T
R1

Parameters:#

NameType
...s[Emitter<R1>]

Returns: Operator<T, [T, R1]>

â–¸ withLatestFrom<T, R1, R2>(...s: [Emitter<R1>, Emitter<R2>]): Operator<T, [T, R1, R2]>

Defined in urx/src/pipe.ts:328

Type parameters:#

Name
T
R1
R2

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>]

Returns: Operator<T, [T, R1, R2]>

â–¸ withLatestFrom<T, R1, R2, R3>(...s: [Emitter<R1>, Emitter<R2>, Emitter<R3>]): Operator<T, [T, R1, R2, R3]>

Defined in urx/src/pipe.ts:329

Type parameters:#

Name
T
R1
R2
R3

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>, Emitter<R3>]

Returns: Operator<T, [T, R1, R2, R3]>

â–¸ withLatestFrom<T, R1, R2, R3, R4>(...s: [Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>]): Operator<T, [T, R1, R2, R3, R4]>

Defined in urx/src/pipe.ts:330

Type parameters:#

Name
T
R1
R2
R3
R4

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>]

Returns: Operator<T, [T, R1, R2, R3, R4]>

â–¸ withLatestFrom<T, R1, R2, R3, R4, R5>(...s: [Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>]): Operator<T, [T, R1, R2, R3, R4, R5]>

Defined in urx/src/pipe.ts:331

Type parameters:#

Name
T
R1
R2
R3
R4
R5

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>]

Returns: Operator<T, [T, R1, R2, R3, R4, R5]>

â–¸ withLatestFrom<T, R1, R2, R3, R4, R5, R6>(...s: [Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>, Emitter<R6>]): Operator<T, [T, R1, R2, R3, R4, R5, R6]>

Defined in urx/src/pipe.ts:332

Type parameters:#

Name
T
R1
R2
R3
R4
R5
R6

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>, Emitter<R6>]

Returns: Operator<T, [T, R1, R2, R3, R4, R5, R6]>

â–¸ withLatestFrom<T, R1, R2, R3, R4, R5, R6, R7>(...s: [Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>, Emitter<R6>, Emitter<R7>]): Operator<T, [T, R1, R2, R3, R4, R5, R6, R7]>

Defined in urx/src/pipe.ts:333

Type parameters:#

Name
T
R1
R2
R3
R4
R5
R6
R7

Parameters:#

NameType
...s[Emitter<R1>, Emitter<R2>, Emitter<R3>, Emitter<R4>, Emitter<R5>, Emitter<R6>, Emitter<R7>]

Returns: Operator<T, [T, R1, R2, R3, R4, R5, R6, R7]>