A NestJS module wrapper for node-rdkafka.
npm i nestjs-kafka-module
Requirements:
Min | Max | |
---|---|---|
Node.JS | 16 | 20 |
NestJS | 8 | 10 |
Initialize a KafkaModule
with configuration for a consumer
, producer
or adminClient
respectively. A full list of configuration for each item can be found on node-rdkafka
's Configuration section.
app.module.ts
import { Module } from "@nestjs/common";
import { KafkaModule } from "nestjs-kafka-module";
@Module({
imports: [
KafkaModule.forRoot({
consumer: {
conf: {
"group.id": "kafka_consumer",
"metadata.broker.list": "127.0.0.1:9092",
},
},
producer: {
conf: {
"client.id": "kafka_prducer",
"metadata.broker.list": "127.0.0.1:9092",
},
},
adminClient: {
conf: {
"metadata.broker.list": "127.0.0.1:9092",
},
},
}),
],
})
export class AppModule {}
cats.service.ts
import { Injectable, Inject } from "@nestjs/common";
import { KafkaConsumer, Producer, IAdminClient } from "node-rdkafka";
import { KAFKA_ADMIN_CLIENT_PROVIDER } from "nestjs-kafka-module";
@Injectable()
export class CatsService {
constructor(
private readonly kafkaConsumer: KafkaConsumer,
private readonly kafkaProducer: Producer,
@Inject(KAFKA_ADMIN_CLIENT_PROVIDER)
private readonly kafkaAdminClient: IAdminClient
) {
/* Trying to get an instance of a provider without defining a dedicated configuration will result in an error. */
}
}
It is not mandatory to define configuration for any consumer
, producer
or adminClient
, you're free to define just what you need. Keep in mind the table below showing which Provider
is going to be available in your context based on the defined configuration:
Configuration | Provider |
---|---|
consumer | KafkaConsumer |
producer | Producer |
admin | KAFKA_ADMIN_CLIENT_PROVIDER |
In the example folder you can find examples of Nest application that uses this library.
It is possible to dynamically configure the module using forRootAsync
method and pass, for instance, a ConfigService
as shown below:
import { Module } from "@nestjs/common";
import { KafkaModule } from "nestjs-kafka-module";
@Module({
imports: [
KafkaModule.forRootAsync({
useFactory: (configService: ConfigService) => {
const groupId = configService.get("group_id");
const brokerList = configService.get("metadata_broker_list");
const clientId = configService.get("cliend_id");
return {
consumer: {
conf: {
"group.id": groupId,
"metadata.broker.list": brokerList,
},
},
producer: {
conf: {
"client.id": clientId,
"metadata.broker.list": brokerList,
},
},
adminClient: {
conf: {
"metadata.broker.list": brokerList,
},
},
};
},
inject: [ConfigService],
}),
],
})
export class ApplicationModule {}
By default, during KafkaModule
initialization, a connection attempt is done automatically. However this implies that if the broker connection is not available (broker is temporary down/not accessible) during startup, the NestJS initialization may fail.
Is it possible to change this behavior using autoConnect
flag on KafkaConsuner
and Producer
as shown below:
KafkaModule.forRoot({
consumer: {
autoConnect: false,
conf: {
"group.id": "nestjs-rdkafka-test",
"metadata.broker.list": "127.0.0.1:9092",
},
},
producer: {
autoConnect: false,
conf: {
"metadata.broker.list": "127.0.0.1:9092",
},
},
});
All clients will be automatically disconnected from Kafka onModuleDestroy
. You can manually disconnect by calling:
await this.consumer?.disconnect();
await this.producer?.disconnect();
await this.adminClient?.disconnect();
nestjs-kafka-module is MIT licensed.