@rjavlonn/custom-rabbit
TypeScript icon, indicating that this package has built-in type declarations

1.0.1 • Public • Published

NestRabbit

Build Status

Installation

npm install --save @rjavlonn/custom-rabbit

Usage

Initialisation

NestRabbit is based on AMQPConnectionManager.
To have a detailed understanding about how to configure the library feel free to check it out.

...

@Module({
    imports: [
        NestRabbitModule
    ],
})
export class ApplicationModule {}
}

The NestRabbitModule being Global, you need to declare it only once in your application module.

Configuration

To configure the exchanges/queues you need to register your configuration under the NestRabbitConfigurationNamespace.

export default registerAs(NestRabbitConfigurationNamespace, (): NRModuleConfiguration => ({
    urls: [
        ...
    ],
    auto: true,
    logging: true,
    exchanges: [
        ...
    ],
    queues: [
        ...
    ]
});

Configurations options :

Configuration Type Description
urls Array broker uri to connect to, if one ore more are down, the library will try to connect to at least one.
auto boolean when a consumer is invoked, will ack after the call is done or nack if an exception is raised during the call
logging boolean will automatically log both the notification and the error (if raised during the call)
exchanges Array list of exchanges to assert before starting. if an assertion fails, stop the service.
queues Array list of queues to assert before starting. if an assertion fails, stop the service.
options AmqpConnectionManagerOptions options related to the root library (AmqpConnectionManager)

For the queues configuration, the name property is really important. This is the key used to bind your consumers to the configuration.

Subscribing

NestRabbit use @Consume() decorator to register callbacks for specific queues.

@Controller()
export class FooController {

    constructor(
        private readonly fooService: FooService,
    ) {}

    @Consume('some.queue')
    public async onEvent() {
        console.log('A message has been posted on this queue!');
    }
    ...

Important 🚨 :️ Due to the Nest lifecycle, NestRabbit check all methods of all registered controllers.
Do not forget to decorate your class with the @Controller() annotation and register as any HTTP controller.

@Controller()
export class FooConsumer {
    private readonly logger: Logger;

    constructor(
        private readonly medicalDocumentService: MedicalDocumentService,
    ) {
        this.logger = new Logger(this.constructor.name);
    }

    @Consume('edge-monitoring.document_approved.queue')
    @UsePipes(new ValidationPipe({
        transform: true,
        whitelist: true,
    }))
    public async onDocumentApproved(
        @Context() context: BrokerContext,
        @Payload() payload: DocumentMessageDTO,
    ): Promise<void> {
        ...
    }

Configuration

The configuration is a mandatory step to :

  • connect to the broker
  • configure the setup of your queues
  • configure the setup of your exchanges

Behind the scene NestRabbit uses @nestjs/config meaning the configuration is also compliant with
.env configuration system.

To understand how to properly configure your queues and service, feel free to check the documentation of amqplib or rabbitMQ.

export default registerAs(NestRabbitConfigurationNamespace, (): INRModuleConfiguration => ({
    urls: [
        process.env.RABBIT_MQ_URI || `amqp://cied:cied@localhost:5672`,
    ],
    exchanges: [
        {
            name: 'demo-service.exchange',
            type: 'direct',
            options: {
                durable: true,
                internal: false,
                autoDelete: false,
                arguments: [],
            },
        },
    ],
    queues: [
        {
            name: 'some.queue',
            prefetch: 100,
            options: {
                deadLetterRoutingKey: 'some.error.route',
                deadLetterExchange: 'demo-service.exchange',
            },
            consumption: {
                noAck: false,
            },
            binding: {
                exchange: process.env.RABBIT_CORE_TOPIC_EXCHANGE || 'main.exchange',
                routingKey: 'some.foo.route',
            },
        },
        {
            name: 'some.errors.queue',
            binding: {
                exchange: 'demo-service.exchange',
                routingKey: 'some.error.route',
            },
        },
    ],
}));

Acknoledgement

In some context you might want to do manual acknowledgment, to do so you can use the @Context() param decorator.

....
    @Consume('some.other.queue')
    public async onEvent(@Context() context: BrokerContext) {
        console.log('A message has been posted on this queue!');
        context.ack();
    }

BrokerContext API

The BrokerContext exposes methods to Ack or Nack messages the same way the the AMQP Channel. Feel free to check the JSDoc of the classe directly from your favorite IDE.

Message Content

By default the message contains lots of information. You can access the raw message from the context with the method context.getRawMessage();

If you want to map the content directly with a class the same way an HTTP Controller does with the @Body() decorator
you can use the @Payload() param decorator. This decorator will try to JSON.parse() the content of your message.

Transformation/Validation

As for HTTP Controllers the NestRabbit analyses the Pipes decorators, meaning you directly validate and instantiate your payload with a ValidationPipe.

    @Consume('test.queue')
    @UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
    public async onEvent(
        @Context() context: BrokerContext, 
        @Payload() payload: Foo,
    ) {
            ...                        
    }

Readme

Keywords

none

Package Sidebar

Install

npm i @rjavlonn/custom-rabbit

Version

1.0.1

License

MIT

Unpacked Size

200 kB

Total Files

51

Last publish

Collaborators

  • rjavlonn