O qq-framework-kafka visa disponibilizar interfaces de acesso ao Kafka.
npm install @qq-framework/kafka --save
- Pré requisitos
- @qq-framework/http
- @qq-framewok/basic >3.0.0
O nestJS é o framework base para as aplicações do Mercantil. Através do seu módulo CQRS disponibiliza um broker local de eventos por onde é possível "disparar" e "escutar" eventos da aplicação. Essa abordagem local é prática mas trás algumas desvantagens, como a falta de rastreabilidade dos eventos gerados e também a impossibilidade de escalar os handlers através de múltiplos nós das aplicações.
O Kafka pode ser definido como o broker de eventos da aplicação realizando os seguintes passos:
- Adicione o serviço Kafka à sua aplicação, informando o nome da sua aplicação e também a lista de brokers do Kafka aos quais deseja se conectar.
// main.js
import { createKafkaMicroservice } from '@qq-framework/kafka'
setEnv()
async function bootstrap() {
app = await NestFactory.create(AppModule, {
bufferLogs: false,
})
...
app.connectMicroservice(createKafkaMicroservice(process.env.npm_package_name, process.env.KAFKA_BROKERS.split(',')))
app.startAllMicroservices()
...
}
- Importe o KafkaModule nos módulos da sua aplicação que necessitarão de comunicação com o Kafka, seja para leitura ou para escrita nos tópicos.
import { KafkaModule, KafkaProducerService } from '@qq-framework/kafka'
setEnv()
@Module({
imports: [
KafkaModule.forRoot(process.env.npm_package_name, process.env.KAFKA_BROKERS.split(','))),
...
],
controllers: [...],
providers: [...],
})
export class CoreModule {}
- Estenda todos os seus eventos da classe
AbstractKafkaEvent
do módulo@qq-framework/kafka
import { AbstractKafkaEvent } from '@qq-framework/kafka'
import { KafkaTopics } from '../../../../../kafka.config'
export interface UsuarioCadastradoEventProps {
id: string
nome: string
email: string
perfilId: string
admin: boolean
ativo: boolean
hashRecuperacaoSenha: string
categoriaIds?: string[]
fornecedorId?: string
}
export class UsuarioCadastradoEvent extends AbstractKafkaEvent<UsuarioCadastradoEventProps> {
constructor(props: UsuarioCadastradoEventProps) {
super(UsuarioCadastradoEvent.name, props, KafkaTopics.USUARIO_CADASTRADO, props.id)
}
}
- Injete na sua classe de serviço o
KafkaDomainEventPublisher
import { DomainEventException, DomainEventPublisherService, AggregateRoot } from '@qq-framework/basic'
export class CriarUsuarioUseCase extends AbstractUseCase<
CriarUsuarioUsecaseProps,
CriarUsuarioUseCaseExceptions,
void
> {
constructor(
@Inject('KafkaDomainEventPublisher') private readonly publisher: DomainEventPublisherService<AggregateRoot>
) {
super({ processName: CriarUsuarioUseCase.name })
}
...
- Publique os eventos do agragado através do método
publish
...
const publish = await this.publisher.publish(usuario.value)
if (publish.isFailure()) return R.failure(publish.error)
...
- Por ser o Kafka um broker externo, a leitura dos seus eventos acontecerá através das classes de
controller
e não através doshandlers
, utilizando a anotação @MessagePatter(...)
...
@MessagePattern(KafkaTopics.CADASTRAR_CATEGORIA)
public async criarCategoriaKafka(@Payload() message: CampoCategoriaImportadaEventDTO) {
const result = await this.cadastrarCategoriaFacade.execute({
nome: message.nome,
supercategorias: message.supercategorias,
campo: message.campo,
})
return super.buildResponse({
result,
successStatusCode: 201,
})
}
- Quando não há um agregado envolvido em sua transação, o disparo de um evento pode ser realizado por um servido de domínio. Nesses casos você deve estender a classe
AbstractServiceDomain
injetando oKafkaEventBus
.
import { EventBus } from '@qq-framework/basic'
import { AbstractServiceDomain } from '@qq-framework/ddd'
export class CadastrarCampoCategoriaService extends AbstractServiceDomain {
constructor(
@Inject('KafkaEventBus') eventBus: EventBus,
@Inject('CategoriaRepository') private readonly categoriaRepository: CategoriaRepository
) {
super(eventBus)
}
...
}
- Seu evento então poderá ser disparado através do método
publish
...
super.publish(
new CampoCategoriaImportadoEvent({
nome: nomeCategoria,
campo: {
nome: categoria.campo.nome.toLocaleLowerCase(),
tipo: categoria.campo.tipo,
dominio: categoria.campo.dominio?.split(';'),
tamanho: categoria.campo.tamanho,
precisao: categoria.campo.precisao,
prefixo: categoria.campo.prefixo,
sufixo: categoria.campo.sufixo,
default: categoria.campo.default,
ordem: categoria.campo.ordem,
textoExplicativo: categoria.campo.textoExplicativo,
textoExemplo: categoria.campo.textoExemplo,
},
supercategorias,
}),
{ generateLog: false }
)
...
As aplicações desenvolvidas utilizando o framework do Mercantil têm por padrão a gravação de logs de execução de todos os processos da camada de aplicação. Estes logs registram o início e o final de cada execução e se essas execuções finalizaram com sucesso ou erro.
Inicialmente esses logs eram sempre gerados em banco de dados, mas através dos passos seguidos aqui, estes logs poderão ser enviados para o Kafka para um processamento centralizado.
-
Pré requisitos
- "@qq-framework/ddd": "^3.2.1"
- "@qq-framework/kafka": "^3.1.3"
- "@qq-framework/basic": "^3.1.0"
-
Adicione o módulo
@qq-framework/kafka
ao módulo da sua aplicação -
Nas suas classes de serviço de aplicação, injete o
ApplicationProcessKafkaLoggerService
e o passe para o construtor da super classe através do parâmetroapplicationProcessLoggerServiceImpl
import {
ApplicationProcessLoggerService,
} from '@qq-framework/basic'
...
constructor(
@Inject('ApplicationProcessKafkaLoggerService')
applicationProcessLoggerService: ApplicationProcessLoggerService
) {
super({
processName: AlterarCampoGenericoUsecase.name,
applicationProcessLoggerServiceImpl: applicationProcessLoggerService,
})
}
...
- Feito isso, os logs de start, sucess e error passarão a ser gerados através de aventos no tópico
qq-log-aplicacao.log-gerado