This package has been deprecated

Author message:

This package has been moved to @nestjs/bull. It won't be maintained here anymore, consider updating your dependencies accordingly. (https://www.npmjs.com/package/@nestjs/bull)

nest-bull
TypeScript icon, indicating that this package has built-in type declarations

0.9.0 • Public • Published

Nest Logo

Master branch build Dev branch build Code Climate maintainability Code Climate coverage npm NPM npm peer dependency version npm peer dependency version

Description

This is a Bull module for Nest 6.

Installation

$ npm i --save nest-bull bull
$ npm i --save-dev @types/bull

Quick Start

import {Body, Controller, Get, Module, Param, Post} from '@nestjs/common';
import {DoneCallback, Job, Queue} from 'bull';
import {BullModule, InjectQueue} from 'nest-bull';
 
@Controller()
export class AppController {
 
  constructor(
    @InjectQueue('store') readonly queue: Queue,
  ) {}
 
  @Post()
  async addJob( @Body() value: any ) {
    const job: Job = await this.queue.add(value);
    return job.id;
  }
 
  @Get(':id')
  async getJob( @Param('id') id: string ) {
    return await this.queue.getJob(id);
  }
}
 
@Module({
  imports: [
    BullModule.register({
      name: 'store',
      options: {
        redis: {
          port: 6379,
        },
      },
      processors: [
        (job: Job, done: DoneCallback) => { done(null, job.data); },
      ],
    }),
  ],
  controllers: [
    AppController,
  ],
})
export class ApplicationModule {}

Decorators

This module provides some decorators that will help you to set up your queue listeners.

@Processor()

The @Processor() class decorator is mandatory if you plan to use this package's decorators.

It accepts an optional QueueDecoratorOptions argument:

export interface QueueDecoratorOptions {
  name?: string; // Name of the queue
}

@Process()

The @Process() method decorator flags a method as a processing function for the queued jobs.

It accepts an optional QueueProcessDecoratorOptions argument:

export interface QueueProcessDecoratorOptions {
  name?: string; // Name of the job
  concurrency?: number; // Concurrency of the job
}

Whenever a job matching the configured name (if any) is queued, it will be processed by the decorated method. Such method is expected to have the following signature (job: Job, done?: DoneCallback): any;

@OnQueueEvent()

The OnQueueEvent() method decorator flags a method as an event listener for the related queue.

It requires a BullQueueEvent argument:

export type BullQueueEvent = 
  | 'error'
  | 'waiting'
  | 'active'
  | 'stalled'
  | 'progress'
  | 'completed'
  | 'failed'
  | 'paused'
  | 'resumed'
  | 'cleaned'
  | 'drained'
  | 'removed'
  | 'global:error'
  | 'global:waiting'
  | 'global:active'
  | 'global:stalled'
  | 'global:progress'
  | 'global:completed'
  | 'global:failed'
  | 'global:paused'
  | 'global:resumed'
  | 'global:cleaned'
  | 'global:drained'
  | 'global:removed';

You can also use the BullQueueEvents and BullQueueGlobalEvents enums.

Fortunately, there is a shorthand decorator for each of the Bull events:

  • @OnQueueError()
  • @OnQueueWaiting()
  • @OnQueueActive()
  • @OnQueueStalled()
  • @OnQueueProgress()
  • @OnQueueCompleted()
  • @OnQueueFailed()
  • @OnQueuePaused()
  • @OnQueueResumed()
  • @OnQueueCleaned()
  • @OnQueueDrained()
  • @OnQueueRemoved()
  • @OnGlobalQueueError()
  • @OnGlobalQueueWaiting()
  • @OnGlobalQueueActive()
  • @OnGlobalQueueStalled()
  • @OnGlobalQueueProgress()
  • @OnGlobalQueueCompleted()
  • @OnGlobalQueueFailed()
  • @OnGlobalQueuePaused()
  • @OnGlobalQueueResumed()
  • @OnGlobalQueueCleaned()
  • @OnGlobalQueueDrained()
  • @OnGlobalQueueRemoved()

If you need more details about those events, head straight to Bull's reference doc.

Example

Here is a pretty self-explanatory example on how this package's decorators should be used.

import {Processor, Process, OnQueueActive, OnQueueEvent, BullQueueEvents} from '../../lib';
import {NumberService} from './number.service';
import {Job, DoneCallback} from 'bull';
 
@Processor()
export class MyQueue {
  private readonly logger = new Logger('MyQueue');
 
  constructor(private readonly service: NumberService) {}
 
  @Process({ name: 'twice' })
  processTwice(job: Job<number>) {
    return this.service.twice(job.data);
  }
 
  @Process({ name: 'thrice' })
  processThrice(job: Job<number>, callback: DoneCallback) {
    callback(null, this.service.thrice(job.data));
  }
 
  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }
 
  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) {
    this.logger.log(
      `Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
    );
  }
}

Separate processes

This module allows you to run your job handlers in fork processes. To do so, add the filesystem path to a file (or more) exporting your processor function to the processors property of the BullModule options. You can read more on this subject in Bull's documentation.

Please note that, your function being executed in a fork, Nestjs' DI won't be available.

Example

// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from 'nest-bull';
import { join } from 'path';
 
@Module({
  imports: [
    BullModule.register({
      processors: [ join(__dirname, 'processor.ts') ]
    }) 
  ]
})
export class AppModule {}
// processor.ts
import { Job, DoneCallback } from 'bull';
 
export default function(job: Job, cb: DoneCallback) {
  cb(null, 'It works');
}

People

Readme

Keywords

none

Package Sidebar

Install

npm i nest-bull

Weekly Downloads

324

Version

0.9.0

License

MIT

Unpacked Size

47.5 kB

Total Files

24

Last publish

Collaborators

  • fwoelffel