@qq-framework/kafka
TypeScript icon, indicating that this package has built-in type declarations

3.6.6 • Public • Published

qq-framework-kafka

Introdução

O qq-framework-kafka visa disponibilizar interfaces de acesso ao Kafka.

Intalação

npm install @qq-framework/kafka --save

Definindo o Kafka como broker de evento da aplicação

  • Pré requisitos
    • @qq-framework/http
    • @qq-framewok/basic >3.0.0

O nestJS é o framework base para as aplicações do Mercantil. Através do seu módulo CQRS disponibiliza um broker local de eventos por onde é possível "disparar" e "escutar" eventos da aplicação. Essa abordagem local é prática mas trás algumas desvantagens, como a falta de rastreabilidade dos eventos gerados e também a impossibilidade de escalar os handlers através de múltiplos nós das aplicações.

O Kafka pode ser definido como o broker de eventos da aplicação realizando os seguintes passos:

  1. Adicione o serviço Kafka à sua aplicação, informando o nome da sua aplicação e também a lista de brokers do Kafka aos quais deseja se conectar.
// main.js

import { createKafkaMicroservice } from '@qq-framework/kafka'
setEnv()
async function bootstrap() {
    app = await NestFactory.create(AppModule, {
        bufferLogs: false,
    })
    ...
    app.connectMicroservice(createKafkaMicroservice(process.env.npm_package_name, process.env.KAFKA_BROKERS.split(',')))

    app.startAllMicroservices()
    ...
}    
  1. Importe o KafkaModule nos módulos da sua aplicação que necessitarão de comunicação com o Kafka, seja para leitura ou para escrita nos tópicos.
import { KafkaModule, KafkaProducerService } from '@qq-framework/kafka'

setEnv()

@Module({
    imports: [
        KafkaModule.forRoot(process.env.npm_package_name, process.env.KAFKA_BROKERS.split(','))),
        ...
    ],
    controllers: [...],
    providers: [...],
})
export class CoreModule {}
  1. Estenda todos os seus eventos da classe AbstractKafkaEvent do módulo @qq-framework/kafka
import { AbstractKafkaEvent } from '@qq-framework/kafka'
import { KafkaTopics } from '../../../../../kafka.config'

export interface UsuarioCadastradoEventProps {
    id: string
    nome: string
    email: string
    perfilId: string
    admin: boolean
    ativo: boolean
    hashRecuperacaoSenha: string
    categoriaIds?: string[]
    fornecedorId?: string
}

export class UsuarioCadastradoEvent extends AbstractKafkaEvent<UsuarioCadastradoEventProps> {
    constructor(props: UsuarioCadastradoEventProps) {
        super(UsuarioCadastradoEvent.name, props, KafkaTopics.USUARIO_CADASTRADO, props.id)
    }
}
  1. Injete na sua classe de serviço o KafkaDomainEventPublisher
import { DomainEventException, DomainEventPublisherService, AggregateRoot } from '@qq-framework/basic'

export class CriarUsuarioUseCase extends AbstractUseCase<
    CriarUsuarioUsecaseProps,
    CriarUsuarioUseCaseExceptions,
    void
> {
    constructor(
        @Inject('KafkaDomainEventPublisher') private readonly publisher: DomainEventPublisherService<AggregateRoot>
    ) {
        super({ processName: CriarUsuarioUseCase.name })
    }

    ...
  1. Publique os eventos do agragado através do método publish
...

const publish = await this.publisher.publish(usuario.value)
if (publish.isFailure()) return R.failure(publish.error)

...
  1. Por ser o Kafka um broker externo, a leitura dos seus eventos acontecerá através das classes de controllere não através dos handlers, utilizando a anotação @MessagePatter(...)
...

@MessagePattern(KafkaTopics.CADASTRAR_CATEGORIA)
public async criarCategoriaKafka(@Payload() message: CampoCategoriaImportadaEventDTO) {
    const result = await this.cadastrarCategoriaFacade.execute({
        nome: message.nome,
        supercategorias: message.supercategorias,
        campo: message.campo,
        })

    return super.buildResponse({
        result,
        successStatusCode: 201,
    })
}
  1. Quando não há um agregado envolvido em sua transação, o disparo de um evento pode ser realizado por um servido de domínio. Nesses casos você deve estender a classe AbstractServiceDomain injetando o KafkaEventBus.
import { EventBus } from '@qq-framework/basic'
import { AbstractServiceDomain } from '@qq-framework/ddd'

export class CadastrarCampoCategoriaService extends AbstractServiceDomain {
    constructor(
        @Inject('KafkaEventBus') eventBus: EventBus,
        @Inject('CategoriaRepository') private readonly categoriaRepository: CategoriaRepository
    ) {
        super(eventBus)
    }

    ...
}
  1. Seu evento então poderá ser disparado através do método publish
...

super.publish(
    new CampoCategoriaImportadoEvent({
        nome: nomeCategoria,
        campo: {
            nome: categoria.campo.nome.toLocaleLowerCase(),
            tipo: categoria.campo.tipo,
            dominio: categoria.campo.dominio?.split(';'),
            tamanho: categoria.campo.tamanho,
            precisao: categoria.campo.precisao,
            prefixo: categoria.campo.prefixo,
            sufixo: categoria.campo.sufixo,
            default: categoria.campo.default,
            ordem: categoria.campo.ordem,
            textoExplicativo: categoria.campo.textoExplicativo,
            textoExemplo: categoria.campo.textoExemplo,
        },
        supercategorias,
    }),
    { generateLog: false }
)

...

Definindo o Kafka como broker para os logs da aplicação

As aplicações desenvolvidas utilizando o framework do Mercantil têm por padrão a gravação de logs de execução de todos os processos da camada de aplicação. Estes logs registram o início e o final de cada execução e se essas execuções finalizaram com sucesso ou erro.

Inicialmente esses logs eram sempre gerados em banco de dados, mas através dos passos seguidos aqui, estes logs poderão ser enviados para o Kafka para um processamento centralizado.

  • Pré requisitos

    • "@qq-framework/ddd": "^3.2.1"
    • "@qq-framework/kafka": "^3.1.3"
    • "@qq-framework/basic": "^3.1.0"
  • Adicione o módulo @qq-framework/kafka ao módulo da sua aplicação

  • Nas suas classes de serviço de aplicação, injete o ApplicationProcessKafkaLoggerService e o passe para o construtor da super classe através do parâmetro applicationProcessLoggerServiceImpl

import {
    ApplicationProcessLoggerService,
} from '@qq-framework/basic'

...

constructor(
        @Inject('ApplicationProcessKafkaLoggerService')
        applicationProcessLoggerService: ApplicationProcessLoggerService
    ) {
        super({
            processName: AlterarCampoGenericoUsecase.name,
            applicationProcessLoggerServiceImpl: applicationProcessLoggerService,
        })
    }

...
  • Feito isso, os logs de start, sucess e error passarão a ser gerados através de aventos no tópico qq-log-aplicacao.log-gerado

Readme

Keywords

none

Package Sidebar

Install

npm i @qq-framework/kafka

Weekly Downloads

16

Version

3.6.6

License

ISC

Unpacked Size

382 kB

Total Files

76

Last publish

Collaborators

  • carlos.castro.qq
  • guilhermegotin
  • fn_qq
  • juniordeitch
  • 6porto
  • lealhugui