@micro-dev-hub/kafka-event-sourcing
TypeScript icon, indicating that this package has built-in type declarations

1.0.2 • Public • Published

Module kafka-event-sourcing

Introduction

This module facilitates the transmission of messages to the Kafka system, seamlessly integrated with the Confluent Schema Registry. Leveraging Apache Avro schema, we register new schemas. After, We will follow up registered schema to encode, and decode messages, enhancing the security of messages transmitted to the Kafka system.

Getting started

  • Install package
npm install @micro-dev-hub/kafka-event-sourcing
# yarn add @micro-dev-hub/kafka-event-sourcing  
  • Create producer
// Example for publish messages
import { KafkaInstance } from "@micro-dev-hub/kafka-event-sourcing";

const clientId = "my-app";
const brokers = ["localhost:9092"];
const schemaRegistry = {host: "http://localhost:8081"};

const kafka = new KafkaInstance(clientId, brokers, schemaRegistry);

const producer = kafka.producer();

const produce = async() => {
    await producer.connect();
    let i = 1;
    let topicCount = 1;

    const schema = `
        {
            "type": "record",
            "name": "kafkaEventSourcingTest",
            "namespace": "examples",
            "fields": [{ "type": "string", "name": "fullName" }]
        }
        `;

    setInterval(async() => {
        try {
            if(topicCount > 10) {
                topicCount = 1;
            }

            await producer.send({
                topic: `topic-test-${topicCount}`,
                message: 
                    {
                        value: {fullName: `Test ${i} in topic-test-${topicCount}`}
                    }
            }, schema);

            console.log(`Test ${i} in topic-test-${topicCount}`);

            i++;
            topicCount++;
        } catch (error) {
            console.log(error);
        }
    }, 1000)
    
}

produce();
  • Create consumer
// Example for subcribe messages
import { EachMessagePayload } from "kafkajs";
import { IConsumerHandler, KafkaInstance } from "@micro-dev-hub/kafka-event-sourcing";

const clientId = "my-app";
const brokers = ["localhost:9092"];
const schemaRegistry = {host: "http://localhost:8081"};

const kafka = new KafkaInstance(clientId, brokers, schemaRegistry);

const consumer = kafka.consumer({
  groupId: 'group-1',
  minBytes: 5,
  maxBytes: 1e6,
  maxWaitTimeInMs: 3000,
});

consumer.connect();

const testhandler: IConsumerHandler[] = [
  {
    topics: ['topic-test-1','topic-test-2'],
    schemas: ['kafkaEventSourcingTest',],
    fromBeginning: true,
    handler: async (payload: IEachMessagePayload) => {
      console.log(`received ${payload.message.value} of topic-test-1 and topic-test-2`)
    }
  },
  {
    topics: ['topic-test-3',],
    schemas: [],
    fromBeginning: true,
    handler: async (payload: IEachMessagePayload) => {
      console.log(`received ${payload.message.value} of topic-test-3`)
    }
  },
  {
    topics: ['topic-test-4','topic-test-5'],
    schemas: [],
    fromBeginning: true,
    handler: async (payload: IEachMessagePayload) => {
      console.log(`received ${payload.message.value} of topic-test-4 and topic-test-5`)
    }
  },
]

consumer.reads({autoCommit: true},testhandler);

Features

KafkaInstance

  • Declare new KafkaInstance(clientId: string, brokers: string[], schemaRegistryAPIClientArgs: SchemaRegistryAPIClientArgs).

  • producer(): The method will return ProducerInstance.

  • consumer(consumerConfig: ConsumerConfig): The method will return ConsumerInstance.

ProducerInstance

  • connect(): The method will start connect to kafka.

  • disconnect(): The method will disconnect to kafka.

  • send(publishMessages: IPublishMessage, schema: string): The method facilitates schema registration with the Confluent Schema Registry. If the schema already exists, it returns the identifier of the existing schema; otherwise, it registers the new schema and returns its identifier. Subsequently, the method encodes the message using the registered schema and dispatches it to the Kafka system.

ConsumerInstance

  • connect(): The method will start connect to kafka.

  • disconnect(): The method will disconnect to kafka.

  • reads(consumerRunConfig: IConsumerRunConfig, consumerHandlers: IConsumerHandler[]): The reads method is designed to streamline the process of consuming messages from Kafka topics using the provided Kafka consumer configuration (consumerRunConfig) and an array of consumer handlers (consumerHandlers). The method iterates through each specified topic in the consumerHandlers, subscribes the consumer to those topics, and then initiates the consumer's execution. It allows can customize bussiness logic as user wish.

Readme

Keywords

Package Sidebar

Install

npm i @micro-dev-hub/kafka-event-sourcing

Weekly Downloads

1

Version

1.0.2

License

ISC

Unpacked Size

48.8 kB

Total Files

34

Last publish

Collaborators

  • quantran-npm
  • kiettran1709
  • huyhoangvo1001