npm i rx-queue-operator
An Observable typically sends data to all its observers simultaenously:
import { Subject } from 'rxjs'
import { take } from 'rxjs/operators'
const source = new Subject()
source.pipe(take(2)).subscribe(v => console.log('A: ' + v))
source.pipe(take(3)).subscribe(v => console.log('B: ' + v))
source.next(1)
source.next(2)
source.next(3)
source.next(4)
// A: 1, B: 1
// A: 2, B: 2
// B: 3
A queued observable will put its observers in a queue and only send data to the first one until it unsubscribes:
import { queue } from 'rx-queue-operator'
const queued = queue(source)
ququed.pipe(take(2)).subscribe(v => console.log('A: ' + v))
queued.pipe(take(3)).subscribe(v => console.log('B: ' + v))
source.next(1)
source.next(2)
source.next(3)
source.next(4)
// A: 1
// A: 2
// B: 3
// B: 4
If you want to partition incoming data based on some key and then form queues for each key (instead of one for the whole
observable), then use KeyedQueue
utility:
import { KeyedQueue } from 'rx-queue-operator'
const queue = new KeyedQueue<string>(x => x[0])
queue.for('a').pipe(take(2)).subscribe(v => console.log('1: ' + v))
queue.for('a').pipe(take(3)).subscribe(v => console.log('2: ' + v))
queue.for('b').pipe(take(2)).subscribe(v => console.log('3: ' + v))
queue.next('alice')
queue.next('bob')
queue.next('amy')
queue.next('ben')
queue.next('anna')
// 1: alice
// 3: bob
// 1: amy
// 3: ben
// 2: anna
KeyedQueue
is an Observer
, so you can subscribe it to any other observable. The .for()
method will provide a queued observable for a particular
key.