@streamerson/consumer
TypeScript icon, indicating that this package has built-in type declarations

0.0.40 • Public • Published

@streamerson/consumer

Typescript Consumer/Producer for Redis Streams

This package is a part of the larger @streamerson monorepo, which contain tools to interact with Redis streams in event-oriented architectures. This particular member of the monorepo introduces a high-level interface for consuming and producing events/stream messages. The idea is fundamentally to bind handlers to events, where the handler optionally returns a message to another stream. This pattern is very similar to the Request->Handler trope we see in many webservers, and that's deliberate, to capture some of that functional style while working with streams.

Table of Contents

A Foreword on Naming and Purpose

The reason this package is called @streamerson/consumer rather than @streamerson/consumer-producer is mulititude: brevity, clarity (does it produce consumers?), and moreover, in the @streamerson architecture, consuming a stream very often means producing a message on another one, though that's not always the case-- i.e. this package supports multiple use-cases:

  • Consume a stream message and do nothing (if there is no handler)
  • Consume a stream message and do something (in a handler)
  • Consume a stream message, do something, and produce a message on another stream

Installation

  • yarn install @streamerson/consumer

Example

The following example will create a consumer reading from a Redis stream called my-stream-topic, and listening for messages with the messageType: "my-event". The example binds an event handler to "my-event", the return value of which will be sent along the bidirectional channel to whoever may be listening:

consumer-with-framework.example.ts

import { Topic } from '@streamerson/core';
import { StreamConsumer } from '@streamerson/consumer';

const consumer = new StreamConsumer({
    topic: new Topic('my-stream-topic'),
    bidirectional: true
});

consumer.registerStreamEvent<{ name: string }>('hello', async (e) => {
   return {
       howdy: `there, ${e.payload.name}`
   }
});

await consumer.connectAndListen();

Consumer-Producers

The @streamerson/consumer module is intended as an abstraction layer over the low-level components in the @streamerson/core modules. The intention is to provide a feature-rich, usable layer for application-layer consumers of events. I am going to keep mentioning that the feature-richness of the @streamerson/consumer is the reason for its existence (over just using the core modules), so let's take a look at some of those features:

Features

  • Hides the underlying Streams and EventEmitters behind functional interfaces
    • Allows access to these underlying constructs as an escape hatch
  • Creates an "Event Type" -> "Event Handler" contract familiar to developers
  • Supports dynamic switching of these event handlers
  • Supports configurable bidirectional / unidirectional modes
  • Supports message routing (a->b->c->...->x) modes for stream pipelines
  • Supports consuming from multiple streams at once (fan-in multiple Topics)
  • Remembers its streams so you don't have to reference them in operations
  • Supports configuration for providing your own Redis client, logger, etc.

To understand the motivation for these features, first, let's look at a side-by-side of a low-level featureless consumer written for your benefit (dear Reader), and beside it, the @streamerson/consumer for comparison.

Examples in Depth

Drop Down to see Low-Level "Bidi Stream Processor" Example Code

consumer-without-framework.example.ts

import {MappedStreamEvent, StreamingDataSource, Topic} from '@streamerson/core';
import {Transform} from 'stream';

const streamTopic = new Topic('my-stream-topic');

const channels = {
    read: new StreamingDataSource(),
    write: new StreamingDataSource()
}

await Promise.all([
    channels.read.connect(),
    channels.write.connect()
]);

const [readableStream, writableStream] = [
    channels.read.getReadStream({
        stream: streamTopic.consumerKey()
    }),
    channels.write.getWriteStream({
        stream: streamTopic.producerKey()
    }),
];

const transform = new Transform({
    objectMode: true,
    transform: function (e: MappedStreamEvent, _, cb) {
        switch(e.messageType as string) {
            case 'hello':
                this.push(({
                    ...e,
                    payload: {
                        hello: 'world!  I just saw a message: \r\n\r\n' + JSON.stringify(e.payload, null, 2)
                    }
                } as MappedStreamEvent));
                cb();
                break;
            default:
                this.push(({
                    ...e,
                    payload: {
                        error: 'Unknown message type',
                        statusCode: 400
                    }
                } as MappedStreamEvent));
                cb();
                break;
        }
    }
});

readableStream.pipe(transform).pipe(writableStream);

You'll notice the code in the dropdown above is kind of grossly low-level (it is concerned with streams, Transforms, etc.) and requires assembly. Luckily, the @streamerson/consumer comes with lots of features out of the box, and conceals the configuration burden behind reusable interfaces, and allows for a declarative approach to the more imperative components in the monorepo. Let's take a look:

Drop Down to see High-Level "@streamerson/consumer" Example Code

Hopefully this seems cleaner, less concerned with low-level details, and easier to understand from the perspective of someone doing service development. The handler for each event resembles in principle the handler for a web-request, and is routed along a MessageType in much the same way that a web-request is routed by its path. The metadata of the stream message is visible to the handler (much like the Request objects many developers know fondly) as is the payload of that message (again-- much like the Body of a Request). This familiarity is intentional and why the @streamerson/consumer is the sort of "blessed-path" over utilizing the lower level modules (as in the case of the "low-level" example code above).

API

_API.md

🏭 StreamConsumer

Methods

⚙️ bindStreamEvents

Method Type
bindStreamEvents (topic: Topic) => void

⚙️ setOutgoingChannel

Method Type
setOutgoingChannel (channel: StreamingDataSource) => void

⚙️ registerStreamEvent

Bind an MessageType to a handler function

Method Type
registerStreamEvent <T extends PayloadVariety = Record<string, NonNullablePrimitive>, R extends void or PayloadVariety = Record<string, NonNullablePrimitive>>(typeKey: keyof EventMap, handle: HandlerLogicFunction<...>) => void

Parameters:

  • typeKey: the MessageType to bind
  • handle: the handler function to bind to the MessageType

⚙️ deregisterStreamEvent

Method Type
deregisterStreamEvent (typeKey: keyof EventMap) => void

⚙️ addStream

Method Type
addStream (key: string) => void

⚙️ hasStream

Method Type
hasStream (key: string) => boolean

⚙️ removeStream

Method Type
removeStream (key: string) => void

⚙️ cacheComposite

Method Type
cacheComposite (cacheKey: string) => { key: string; shard: string; }

Message Acknowledgement / Tracking

The @streamerson/consumer does not support the acknowledgement of messages server-side-- i.e., the stream itself in Redis does not know who has read what, or who last read which message, and two consumer modules reading from the same key would get the same messages. (If this is disappointing, please read on for some appointment) This means that this module is appropriate for fan-in streams, event-broadcasting use-cases, etc.

So if you want to have a sort of once-only processing architecture (in which one or many readers each operate on different stream messages), this particular package is not it... the @streamerson/consumer package ⭐ is ⭐ though, so you're in luck!

If you found yourself in this section because you are wondering about acknowledgement, then you might be looking for a consumer-group of one member (or more), which "automagically" checks each message back in using the XACK Redis protocol, meaning that its consumer-group is message-processed aware. @streamerson/consumer instances are stream-position aware using a cursor, but they do not acknowledge a message as "complete" in Redis.

(Footnote: it is possible to have multiple consumer groups, each with independent message-processed awareness over the same set of messages, resulting in exactly-twice [or N-times] processing per message. I can think of only a few use-cases for this, but it's a cool idea.)

TLDR:

  • This consumer deliberately does not mark a message "processed" -- after all, there could be other readers in a fanout or broadcast scenario. This also makes the whole process faster.
  • If you want a stateless, once-only delivery of messages to a single consumer that marks its messages as processed in Redis when complete, look at the @streamerson/consumer package.

Stream Recovery / Cursor Iteration

!warning! Some of the following may not be fully implemented but will be in a 1.0 version: !warning!

The default behavior of the Consumer is to come alive and listen only for new messages. However, a cursor parameter allows the consumer to begin reading from a historical point in the stream. This is automatically done as the client curses across stream entries from Redis, but can be supplied manually for a number of reasons.

If you want to implement recovery at the process-level such that a reader can die, come alive, and not miss any messages, with the @streamerson/consumer module, it requires some configuration logic. This is by design, because these modules cannot know the identity of a reader on a stream, so storing its last position on the stream (statelessly) requires some bespoke value-- luckily, supported in configuration on the recoveryKey constructor parameter.

This key specifies at which key in Redis to store the iterators for a given client-- meaning that in a multi-reader scenario, to have per-reader recovery, you would want to give each of these readers a unique recoveryKey driven by environment or build configuration.

If this seems like a pain, it's potentially because you are crossing over the threshold from a consumer to a consumer-group when you begin caring about tracking the state of individual readers on a given stream. Much of that process is handled out-of-the-box for us by Redis when we utilize the consumer group API, which is implemented in the @streamerson/consumer module. A richer explanation of this difference can be found above.

Readme

Keywords

none

Package Sidebar

Install

npm i @streamerson/consumer

Weekly Downloads

0

Version

0.0.40

License

none

Unpacked Size

54.7 kB

Total Files

24

Last publish

Collaborators

  • oliverio