tuna-nest-kafka
TypeScript icon, indicating that this package has built-in type declarations

0.0.4 • Public • Published

Nest Logo KafkaJS Logo

NestJS + KafkaJS

Integration of KafkaJS with NestJS to build event driven microservices.

Setup

Import and add the TunaNestKafkaModule to the imports array of the module for which you would like to use Kafka.

Synchronous Module Initialization

Register the TunaNestKafkaModule synchronous with the register() method:

@Module({
  imports: [
    TunaNestKafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          config: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

Asynchronous Module Initialization

Register the TunaNestKafkaModule synchronous with the registerMultiple() method:

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    TunaNestKafkaModule.registerMultiple([
      {
        name: 'tuna-app',
        config: {
          brokers: ['localhost:9092'],
          clientId: 'tuna-client-kafka-1',
        },
        consumer: {
          groupId: 'tuna-nest-kafka-grp1',
          retryNumber: 1,
        },
      },
      {
        name: 'tuna-app-2',
        config: {
          brokers: ['localhost:9092'],
          clientId: 'tuna-client-kafka-2',
        },
        consumer: {
          groupId: 'tuna-nest-kafka-grp3',
          retryNumber: 1,
        },
      },
    ]),
  ]
  ...
})

Asynchronous Module Initialization

Register the TunaNestKafkaModule synchronous with the registerAsync() method:

@Module({
  imports: [
    TunaNestKafkaModule.registerAsync({
      useFactory: (configService: ConfigService): TunaKafkaOption => {
        return {
          config: {
            brokers: configService.get('BROKER_URL'),
            clientId: configService.get('CLIENT_ID'),
          },
          consumer: {
            groupId: configService.get('GROUP_ID'),
          },
        };
      },
      inject: [ConfigService],
    }),
  ]
  ...
})

Asynchronous Module Initialization

Register the TunaNestKafkaModule Asynchronous with the registerMultipleAsync() method:

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    TunaNestKafkaModule.registerMultipleAsync({
      useFactory: (configService: ConfigService): TunaKafkaOptions[] => {
        return [
          {
            name: 'app1',
            config: {
              brokers: configService.get('BROKER_URL'),
              clientId: configService.get('CLIENT_ID'),
            },
            consumer: {
              groupId: configService.get('GROUP_ID'),
            },
          },
        ];
      },
      inject: [ConfigService],
    }),
  ]
  ...
})

Full settings can be found:

Config Options
client https://kafka.js.org/docs/configuration
consumer https://kafka.js.org/docs/consuming#options
producer https://kafka.js.org/docs/producing#options
serializer
deserializer
consumeFromBeginning true/false

Producing

Send messages back to kafka.

const TOPIC_NAME = 'hero.kill.dragon';

export class Producer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
    const result = await this.client.send({
      topic: TOPIC_NAME,
      messages: [
        {
          key: '1',
          value: message
        }
      ]
    });

    return result;
  }

}

Consumer

@Injectable()
export class AppService {
  getHello(): string {
    return 'Hello World!';
  }

  @KafkaConsumer('tuna_consumer_1') // If don't set config here the consumer will use in common configuration
  con1(payload: any, context: KafkaInfo) {
    console.log('Message ==> ', payload, 'Context ', context);
    throw new Error('Hello World! Error: '); // The function will be retried after failing
  }

  @KafkaConsumer('tuna_consumer_2', {
    groupId: 'tuna-nest-kafka-grp2', // Custom consumer group
    retryNumber: 2, // Number of retry
    instanceName: 'tuna-app-2' // Specific consumer instance if have multiple instances
  })
  consumer2(payload: any, context: KafkaInfo) {
    console.log('Message ==> ', payload, 'Context ', context);
    sleep(10000);
  }

  @KafkaConsumer('tuna_consumer_3', {
    groupId: 'tuna-nest-kafka-grp1', // This consumer will belong to default instance or first instance if have multiple instances
  })
  consumer3(payload: any, context: KafkaInfo) {
    console.log('Message ==> ', payload, 'Context ', context);
    sleep(10000);
  }
}

Readme

Keywords

none

Package Sidebar

Install

npm i tuna-nest-kafka

Weekly Downloads

2

Version

0.0.4

License

MIT

Unpacked Size

162 kB

Total Files

49

Last publish

Collaborators

  • kenryphyam