nk-kafka-node
Motivations
Working with Kafka in Node can be difficult. There's plenty of libraries to use that are actively maintained. However they fall short in a couple of places:
-
In containerized environments where containers are containers are coming up asynchronously, a few checks need to happen before consumers are connected to Kafka and before producers should start producing messages.
-
In instances where you have consumers subscribed to a topic and a producer publishing to the same topic, you need to be assured that the topic is created ahead of time in the case that the consumer
ready
s before the producer. -
kafka-node simplifies a lot of complexity, but has known issues around the zookeeper client.
-
Most clients leave you in callback chains as you proactively try to prevent faults.
To address these, this helper library has been created.
Usage
KafkaClient
Internally, nk-kafka-node
uses KafkaClient from kafak-node
. One client per producer/consumer is made at the time of producer/consumer creation. You can configure clients with the method configureClient(clientConfig)
and pass all the same options as you can to KafkaClient.
The default configuration is:
const clientConfig = {
kafkaHost: process.env.KAFKA_HOST || 'kafka:9092',
connectRetryOptions: {
retries: 20,
factor: 3,
minTimeout: 1000,
maxTimeout: 30000
}
};
If you only need to set the kafkaHost
you can use the environment variable KAFKA_HOST
.
Creating Consumers
After setting your client config, you can create consumers with createConsumer(topic, [groupId], [optionsOverride])
. GroupId is optional, but is recommended. Without groupId
provided, the consumer will be assigned a uuid. optionsOverride
merges with the default options set which are the following:
const consumerOptions = {
autoCommit: true,
fromOffset: false,
encoding: 'utf8'
};
createConsumer
extends kafka-node's consumer. It has the same events, methods, and options available to it. createConsumer
checks if topics that are being subscribed to exist, and tries to create them with kafka-node's createTopics.
Note: In order to take advantage of creating topics proactively, set the equivalent environment variable or setting in your kafka instance AUTO_CREATE_TOPICS_ENABLE
to true. For "ches/kafka" that is KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
Example:
const Kafka = require('nk-kafka-node');
Kafka.createConsumer([{topic: 'my-topic1'}, {topic: 'my-topic12'}], 'app-my-topics-consumer')
.then((consumer) => {
consumer.on('message', (msg) => {
// Handle message
});
consumer.on('error', (err) => {
// Handle error from connected consumer
consumer.close(); // Closing on error is a suggestion
});
})
.catch(err => {
// Handle error connecting consumer
});
Creating and Using Producers
After setting your client config, you can create producers with createProducer()
. createProducer
extends kafka-node's producer. It has the same events and methods available to it as well as an extra method.
The producer
that is resolved by createProducer
has a method called produce
on it. produce
ensures that a leader node has been elected before it attempts to publish a set of messages as well as setting some options.
Example:
const Kafka = require('nk-kafka-node');
Kafka.createProducer()
.then((producer) => {
producer.produce('my-topic1', [
JSON.stringify({
body: {
user: 'Jeff'
rug: null
}
}),
'plain string',
Buffer.from('boat string')
]);
producer.on('error', (err) => {
// Handle connected producer error
})
})
.catch((err) => {
// Handle creating/connecting producer
})