rx-queue-operator
TypeScript icon, indicating that this package has built-in type declarations

0.1.0 • Public • Published

Queue Operator for Observables

npm i rx-queue-operator

tests coverage version



An Observable typically sends data to all its observers simultaenously:

import { Subject } from 'rxjs'
import { take } from 'rxjs/operators'

const source = new Subject()

source.pipe(take(2)).subscribe(v => console.log('A: ' + v))
source.pipe(take(3)).subscribe(v => console.log('B: ' + v))

source.next(1)
source.next(2)
source.next(3)
source.next(4)

// A: 1, B: 1
// A: 2, B: 2
// B: 3

A queued observable will put its observers in a queue and only send data to the first one until it unsubscribes:

import { queue } from 'rx-queue-operator'

const queued = queue(source)

ququed.pipe(take(2)).subscribe(v => console.log('A: ' + v))
queued.pipe(take(3)).subscribe(v => console.log('B: ' + v))

source.next(1)
source.next(2)
source.next(3)
source.next(4)

// A: 1
// A: 2
// B: 3
// B: 4

Keyed Queue

If you want to partition incoming data based on some key and then form queues for each key (instead of one for the whole observable), then use KeyedQueue utility:

import { KeyedQueue } from 'rx-queue-operator'

const queue = new KeyedQueue<string>(x => x[0])

queue.for('a').pipe(take(2)).subscribe(v => console.log('1: ' + v))
queue.for('a').pipe(take(3)).subscribe(v => console.log('2: ' + v))
queue.for('b').pipe(take(2)).subscribe(v => console.log('3: ' + v))

queue.next('alice')
queue.next('bob')
queue.next('amy')
queue.next('ben')
queue.next('anna')

// 1: alice
// 3: bob
// 1: amy
// 3: ben
// 2: anna

KeyedQueue is an Observer, so you can subscribe it to any other observable. The .for() method will provide a queued observable for a particular key.

Package Sidebar

Install

npm i rx-queue-operator

Weekly Downloads

0

Version

0.1.0

License

MIT

Unpacked Size

47.4 kB

Total Files

24

Last publish

Collaborators

  • lorean.victor