iris
Kafka and Avro mashup
A kafka library that is meant to validate produced messages and decode consumed messages using schemas defined in Confluent's Schema Registry.
It uses the excellent node-rdkafka library under the hood by wrapping a producer's produce method or a consumer's consume method with encodings that work with a schema registry backed Kafka deployment. It uses avsc to handle avro encoding/decoding of JSON messages.
Install
npm install @parkhub/iris
Usage
Give it the url for your schema registry, initialize it and create clients!
Make sure to brush up on your Kafka knowledge before tweaking with producers/consumers options!
import iris from '@parkhub/iris';
const registryUrl = 'http://registry:8081';
const brokerList = 'kafka-1:9092,kafka-2:9092';
const schemaCfgs = [
{
topic: 'TestTopic'
}, {
topic: 'OtherTopic'
version: '1.2'
}
];
async function startApp() {
const kafka = await iris({ registryUrl, brokerList, SchemaCfgs })
.initialize();
const consumer = await kafka.createConsumer({
groupId: 'consumer-group',
topicCfgs: {
'consume.callback.max.messages': 100
}
}).connect();
consumer.subscribe(['TestTopic'], (data) => {
const { message, topic, schemaId } = data;
console.log('Message received', JSON.stringify(message, null, 4));
console.log('Message from topic', topic);
console.log('SchemaId used to parse message', schemaId);
});
const producer = await kafka.createProducer({
'client.id': 'kafka',
'dr_cb': true
}).connect();
const message = {
name: 'satsuki',
age: 19
};
producer.produce('TestTopic', null, message);
return kafka;
}
startApp()
.then(async (kafka) => {
kafka.disconnectAllClients();
})
.catch(err => console.error(err));
API
The API for Iris clients follows pretty closely to those of node-rdkafka with the exception of a few methods that have been promisified!
These methods for both Producers/Consumers are:
- getMetadata
- connect
- disconnect
They take the same configurations as described in the node-rdkafka api docs except the callback if you're using the promise API.
HOWEVER, due to node-rdkafka using its methods to do some internal magic, you also have the option of using the callback API. Just pass in the callback along with each method's arguments.
All producer/consumer clients listen to the same events, take the same configurations(with a few exceptions listed below) and behave the same way. Make sure to take a look at node-rdkafka configurations for more detail!
Consumer API
createConsumer({ groupId, Kafka Consumer Configurations, topicCfgs(valid Kafka Consumer Topic Configurations) })
Differences
This method follows closely with Kafka Consumer Configurations except that groupId is used here instead of 'group.id' Any other valid configuration can be passed to the Consumer by following the same semantics used in node-rdkafka configurations. And the same applies for topicCfgs Object and topic configurations.
Only the [standard consumer api] is supported at the moment. I've also joined the "subscribe" process so subscribe actually takes an array of topics and the handler. So you don't need to call subscribe, then consume then listen on 'data' event. Everything is done when you call the subscribe method.
A consumer handler will receive the following structure:
{
message: 'Decoded Message',
topic: 'Topic the message came from',
schemaId: 'The schemaId used to encode the topic',
key: 'Key for this kafka topic',
size: 'Size of message in bytes',
partition: 'Partition the message was on',
offset: 'Offset the message was read from'
}
import iris from '@parkhub/iris';
// Using async/await
(async function startConsumer() {
const consumer = iris.createConsumer({
groupId: 'consumer-group',
topicCfgs: {
'consume.callback.max.messages': 100
}
});
await consumer.connect();
const handler = data => console.log(data);
consumer.subscribe(['MY_TOPIC'], handler);
console.log('DONE!');
})();
// Using Promises
iris.createConsumer({ connection: 'kafka:9092', groupId: 'MY_GROUP_ID'})
.then(consumer => {
const handler = message => console.log(message);
consumer.subscribe(['MY_TOPIC'], handler);
})
.then(() => console.log('DONE!'))
.catch(err => console.log('ERROR!', err));
Producer API
createProducer(Kafka Producer Configurations)
Differences
The only difference is that iris' only supports [standard-api producer] clients. Everything else remains the same.
import iris from '@parkhub/iris';
// Using async/await
(async function startProducer() {
const producer = await iris.createProducer({
'client.id': 'kafka',
'dr_cb': true
});
await producer.connect();
producer.produce('TestTopic', null, 'message');
await producer.disconnect();
console.log('DONE!');
}());
// Using Promises
iris
.createProducer({ 'client.id': 'kafka:9092', dr_cb: true })
.then((producer) => {
producer.produce('TestTopic', null, 'message');
return producer.disconnect();
})
.then(() => console.log('DONE!'))
.catch(err => console.log('ERROR!', err));
---
Development Guide
In this section you will be able to find out how to get started developing for iris.
Requirements
- Must have the latest version of Docker installed.
Downloading
git clone git@github.com:parkhub/iris.git
Building
docker-compose up iris-integration
Running Tests
Integration Tests
When you run docker-compose up iris-integration
it will actually run the integration tests. As you make changes to the project the tests will rerun.
Unit Tests
To run the unit test simply make sure to install the packages locally by running npm start
. Then all you need to do is run the test command.
npm start test
Creating a Commit
We use semantic-release to manage our releases. If you haven't worked with it before please take a look at their project to understand more about how it works.
-
First I like to run the validate command before running through the commit process because if it fails on validation when your committing then you will have to go through the commit process again. To run the validate command simply run this:
npm start validate
-
To start a new release, make sure you have added your files to git and then run this command:
npm start commit
This will take you through the release process. Follow the directions and read the steps throughly.
-
After you have committed your code and it passes the linter then you can push your branch up to Github and create a pull request.