npm install --save @rjavlonn/custom-rabbit
NestRabbit is based on AMQPConnectionManager.
To have a detailed understanding about how to configure the library feel free to check it out.
...
@Module({
imports: [
NestRabbitModule
],
})
export class ApplicationModule {}
}
The NestRabbitModule being Global, you need to declare it only once in your application module.
To configure the exchanges/queues you need to register your configuration under the NestRabbitConfigurationNamespace.
export default registerAs(NestRabbitConfigurationNamespace, (): NRModuleConfiguration => ({
urls: [
...
],
auto: true,
logging: true,
exchanges: [
...
],
queues: [
...
]
});
Configurations options :
Configuration | Type | Description |
---|---|---|
urls | Array | broker uri to connect to, if one ore more are down, the library will try to connect to at least one. |
auto | boolean | when a consumer is invoked, will ack after the call is done or nack if an exception is raised during the call |
logging | boolean | will automatically log both the notification and the error (if raised during the call) |
exchanges | Array | list of exchanges to assert before starting. if an assertion fails, stop the service. |
queues | Array | list of queues to assert before starting. if an assertion fails, stop the service. |
options | AmqpConnectionManagerOptions | options related to the root library (AmqpConnectionManager) |
For the queues
configuration, the name property is really important. This is the key used to bind your consumers to the configuration.
NestRabbit use @Consume()
decorator to register callbacks for specific queues.
@Controller()
export class FooController {
constructor(
private readonly fooService: FooService,
) {}
@Consume('some.queue')
public async onEvent() {
console.log('A message has been posted on this queue!');
}
...
Important 🚨 :️ Due to the Nest lifecycle, NestRabbit check all methods of all registered controllers.
Do not forget to decorate your class with the @Controller() annotation and register as any HTTP controller.
@Controller()
export class FooConsumer {
private readonly logger: Logger;
constructor(
private readonly medicalDocumentService: MedicalDocumentService,
) {
this.logger = new Logger(this.constructor.name);
}
@Consume('edge-monitoring.document_approved.queue')
@UsePipes(new ValidationPipe({
transform: true,
whitelist: true,
}))
public async onDocumentApproved(
@Context() context: BrokerContext,
@Payload() payload: DocumentMessageDTO,
): Promise<void> {
...
}
The configuration is a mandatory step to :
- connect to the broker
- configure the setup of your queues
- configure the setup of your exchanges
Behind the scene NestRabbit uses
@nestjs/config
meaning the configuration is also compliant with
.env configuration system.
To understand how to properly configure your queues and service, feel free to check the documentation of amqplib or rabbitMQ.
export default registerAs(NestRabbitConfigurationNamespace, (): INRModuleConfiguration => ({
urls: [
process.env.RABBIT_MQ_URI || `amqp://cied:cied@localhost:5672`,
],
exchanges: [
{
name: 'demo-service.exchange',
type: 'direct',
options: {
durable: true,
internal: false,
autoDelete: false,
arguments: [],
},
},
],
queues: [
{
name: 'some.queue',
prefetch: 100,
options: {
deadLetterRoutingKey: 'some.error.route',
deadLetterExchange: 'demo-service.exchange',
},
consumption: {
noAck: false,
},
binding: {
exchange: process.env.RABBIT_CORE_TOPIC_EXCHANGE || 'main.exchange',
routingKey: 'some.foo.route',
},
},
{
name: 'some.errors.queue',
binding: {
exchange: 'demo-service.exchange',
routingKey: 'some.error.route',
},
},
],
}));
In some context you might want to do manual acknowledgment, to do so you can use the @Context()
param decorator.
....
@Consume('some.other.queue')
public async onEvent(@Context() context: BrokerContext) {
console.log('A message has been posted on this queue!');
context.ack();
}
The BrokerContext exposes methods to Ack or Nack messages the same way the the AMQP Channel. Feel free to check the JSDoc of the classe directly from your favorite IDE.
By default the message contains lots of information. You can access the raw message from the context
with the method context.getRawMessage()
;
If you want to map the content directly with a class the same way an HTTP Controller does with the @Body()
decorator
you can use the @Payload()
param decorator. This decorator will try to JSON.parse()
the content of your message.
As for HTTP Controllers the NestRabbit analyses the Pipes decorators, meaning you directly validate and instantiate your payload with a ValidationPipe.
@Consume('test.queue')
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
public async onEvent(
@Context() context: BrokerContext,
@Payload() payload: Foo,
) {
...
}