Adds Kafka support to microfleet. Provides Stream like API for sending messages to Kafka broker.
For more information please read about node-rdkafka.
yarn add @microfleet/plugin-kafka
To make use of the plugin, adjust microfleet configuration in the following way:
exports.plugins = [
...,
'kafka',
...
]
exports.kafka = {
// librdkafka configuration
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
debug: 'consumer,cgrp,topic,fetch',
'metadata.broker.list': 'kafka:9092',
'group.id': 'test-group',
}
Microfleet Kafka Plugin extends service interface with the following methods:
Initializes Kafka Consumer stream using provided params and creates a Readable stream.
This is the reimplementation of the node-rdkafka.ConsumerStream
stream with some addons.
Extra parameters:
const streamOpts = {
checkTopicExists: boolean, // Check whether consumed topics exist.
stopOnPartitionsEOF: boolean, // Stop stream when all assigned partitions read.
offsetQueryTimeout: number, // Milliseconds Timeout for Broker requests.
offsetCommitTimeout: number, // Milliseconds to wait for offset commits received on stream close.
}
Initializes Kafka producer using provided params and creates a Writable stream. Detailed docs here - https://blizzard.github.io/node-rdkafka/current/ProducerStream.html
For information about parameters passed to the interface methods:
-
streamOpts
- See this forConsumerStream
or this forProducerStream
-
conf
- See this page -
topic
- See this page
producerStream = await service.kafka.createProducerStream({
streamOptions: { objectMode: true, pollInterval: 10 },
conf: {'group.id': 'other-group'},
})
consumerStream = await service.kafka.createConsumerStream({
streamOptions: { topics: topic, streamAsBatch: true, fetchSize: 10 },
conf: {
debug: 'consumer',
'enable.auto.commit': false,
'client.id': 'someid',
'group.id': 'other-group',
},
topic: {
'auto.offset.reset': 'earliest', // 'earliest | latest' - earliest will start from las committed offset, latest - will start from last received message.
}
)
// and then
producerStream.write({
topic,
value: Buffer.from(`message at ${Date.now()}`),
}, cb)
// or
producerStream.write(Buffer.from(`message at ${Date.now()}`), cb)
for await (const message of consumer) {
// process message
}