This library is based on node-rdkafka and aim to provide a simple producer
and consumer wrapped with promise to allow the use of async / await
with minimal configuration and overhead.
This library is fully written in TypeScript.
Use s3pweb/alpine-kafka docker image with node-rdkafka included to shorten build time (version tag is the version of node-rdkafka).
- producer: By default node-rdkafka will set request.required.acks at -1. You can override it by setting "request.required.acks" or "acks" in the config object.
- 2 different producers are available
- Producer and consumer now are classes and have a constructor
- No more direct need for node-config and a logger
npm install @s3pweb/simple-kafka-promise
Minimal configuration for the consumer is:
{ "metadata.broker.list": "0.0.0.0:9094", "group.id": "test.group" }
Minimal configuration for the producer is:
{ "metadata.broker.list": "0.0.0.0:9094" }
This project is based on node-rdkafka and supports the same configuration options. Go here for more details.
const KafkaConsumer = require('@s3pweb/simple-kafka-promise').KafkaConsumer
// Create a new instance
const consumer = new KafkaConsumer({ 'metadata.broker.list': '0.0.0.0:9094', 'group.id': 'test.group' }, 1000)
// Connect
await consumer.connect(['topicName'])
// Consume messages
const messagesArray = await consumer.listen(100, true)
// Disconnect the consumer
await consumer.disconnect()
To use with typescript, just change the import to
import { KafkaConsumer } from '@s3pweb/simple-kafka-promise';
To connect with SSL, use:
const consumer = new KafkaConsumer({
'metadata.broker.list': [
'broker1:9093',
'broker2:9093',
'broker3:9093'
],
'group.id': 'test-consumer.group',
'security.protocol': 'ssl',
'enable.ssl.certificate.verification': true,
'ssl.ca.location': '/path/to/the/CA/certificate.crt'
})
To connect with SASL, use:
const consumer = new KafkaConsumer({
'metadata.broker.list': [
'broker1:9094',
'broker2:9094',
'broker3:9094'
],
'group.id': 'test-sasl.group',
'security.protocol': 'sasl_ssl',
'sasl.username': 'username',
'sasl.password': 'password',
'sasl.mechanisms': 'SCRAM-SHA-256',
'enable.ssl.certificate.verification': true,
'ssl.ca.location': '/path/to/the/CA/certificate.crt'
})
const KafkaProducer = require('@s3pweb/simple-kafka-promise').KafkaProducer
// Create a new instance
const producer = new KafkaProducer({ 'metadata.broker.list': '0.0.0.0:9094' }, '')
// Connect
await producer.connect(['topicName'])
// Produce some messages
const offset = await producer.sendMessage(topicName, { message: `My message.` }, 0, null)
// Disconnect
await producer.disconnect()
To use with typescript, just change the import to
import { KafkaProducer } from '@s3pweb/simple-kafka-promise';
To produce some messages take a look at ./examples/producer.js
and to consume some messages take a look at ./examples/consumer.js
.
If you have docker, you can use ./examples/docker-compose.yaml
to start one zookeeper
and one kafka
stack on your machine.
This stack comes with kafkamanager
and kafkadrop
for easy monitoring and debugging.
If you want to build a docker image based on alpine linux, you need to add some packages to the base image. Go here for more details.
To create a new release, you only need to add your code (don't forget the mocks and the interfaces) and create a new tag. DO NOT push the code to NPM, create a version on Github or send a message for the new release. It will be done automatically by Github Actions.