kafka-pipe
A functional/fluent utility for kafka, built on top of kafka-node
.
Install
npm install @specialblend/kafka-pipe
Classes
- PipeConsumer
-
Callable kafka Consumer with pipe and error helper methods
- PipeProducer
-
Callable kafka Producer when instance is called directly, acts like PipeProducer.send
- PipeSender
-
Callable kafka PipeProducer which allows presetting a destination topic and options
- PipeTransformer
-
Consumer/producer mixin that pipes messages from
sourceTopic
intotransformer
function, and sends result todestinationTopic
, ordeadLetterTopic
on error - Client
-
Kafka Client
Constants
-
createConsumer ⇒
PipeConsumer
-
Curried factory of PipeConsumer
-
createProducer ⇒
PipeProducer
-
Curried factory of PipeProducer
-
createSender ⇒
PipeSender
-
Curried factory of PipeProducer
-
createTransformer ⇒
PipeTransformer
-
Curried factory of PipeTransformer
PipeConsumer
Callable kafka Consumer with pipe and error helper methods
Kind: global class
PipeConsumer
pipeConsumer.pipe(handler) ⇒ Pipe incoming messages to provided handler
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
message handler function |
PipeConsumer
pipeConsumer.error(handler) ⇒ Alias for this.on('error')
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
error handler function |
PipeConsumer
pipeConsumer.__call__(handler) ⇒ Make instance callable alias of this.pipe
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
message handler function |
PipeProducer
Callable kafka Producer when instance is called directly, acts like PipeProducer.send
Kind: global class
-
PipeProducer
- new PipeProducer(client, options)
-
.send(payload) ⇒
Promise.<*>
-
.call(payload) ⇒
Promise.<*>
new PipeProducer(client, options)
Create
Param | Type | Description |
---|---|---|
client | Client |
kafka client |
options | Object |
opyions |
Promise.<*>
pipeProducer.send(payload) ⇒ Send a payload
Kind: instance method of PipeProducer
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
Promise.<*>
pipeProducer.__call__(payload) ⇒ Make instance callable alias of this.send
Kind: instance method of PipeProducer
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
PipeSender
Callable kafka PipeProducer which allows presetting a destination topic and options
Kind: global class
-
PipeSender
- new PipeSender(client, topic, payloadOptions, producerOptions)
-
.send(messages) ⇒
Promise.<*>
-
.call(payload) ⇒
Promise.<*>
new PipeSender(client, topic, payloadOptions, producerOptions)
Curry topic and payload options
Param | Type | Description |
---|---|---|
client | Client |
kafka client |
topic | String |
kafka topic name |
payloadOptions | Object |
options to include with outgoing payloads |
producerOptions | Object |
producer options |
Promise.<*>
pipeSender.send(messages) ⇒ Send messages to preset topic, with preset options
Kind: instance method of PipeSender
Returns: Promise.<*>
- returned Promise
Param | Type | Description |
---|---|---|
messages | Array.<String> |
an array of messages to send |
Promise.<*>
pipeSender.__call__(payload) ⇒ Make instance callable alias of this.send
Kind: instance method of PipeSender
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
PipeTransformer
Consumer/producer mixin that
pipes messages from sourceTopic
into transformer
function,
and sends result to destinationTopic
,
or deadLetterTopic
on error
new PipeTransformer(transformer, client, sourceTopic, destinationTopic, deadLetterTopic)
create a PipeTransformer
Param | Type | Description |
---|---|---|
transformer | function |
the transformer function |
client | Client |
kafka Client |
sourceTopic | String |
name of topic to read from |
destinationTopic | String |
name of topic to send to |
deadLetterTopic | String |
name of topic to send failed payloads |
Client
Kafka Client
new Client(kafkaHost, options)
Create a kafka Client
Param | Type | Description |
---|---|---|
kafkaHost | String |
kafka host |
options | Object |
options |
PipeConsumer
createConsumer ⇒ Curried factory of PipeConsumer
PipeProducer
createProducer ⇒ Curried factory of PipeProducer
PipeSender
createSender ⇒ Curried factory of PipeProducer
PipeTransformer
createTransformer ⇒ Curried factory of PipeTransformer
Kind: global constant