The main purpose of this package is to provide core functionality for building event-driven architectures in TypeScript applications.
EventBus
provides methods to make it possible to extend event routing for specific integrations and enable acknowledgement mechanisms for message brokers.
TStarting from this version, the core package supports not only events but also tasks and any custom "handlable" objects through a generic abstraction system.
- Installation
- Core Concepts
- Event Handlers
- Task Processors
- Event Bus
- Core Definitions
- Scoped Handlers with Context
- Working with Results
First, let's install the package using your preferred package manager:
# Using npm
npm install @event-driven-architecture/core
# Using yarn
yarn add @event-driven-architecture/core
# Using pnpm
pnpm add @event-driven-architecture/core
# Using bun
bun add @event-driven-architecture/core
The event-driven architecture is built around the concept of Handlables - objects that can be processed by handlers. The package provides two main implementations:
- Events - Represent things that have happened in your system
- Tasks - Represent work that needs to be done
All processable objects implement the Handlable
interface:
interface Handlable<TPayload extends object = object> {
readonly payload: Readonly<TPayload>;
}
The generic Handler
interface processes any handlable and optionally returns a result:
interface Handler<THandlable extends Handlable, TResult = unknown, TContext = unknown> {
handle(handlable: THandlable, context?: TContext): TResult | Promise<TResult>;
}
Events implement the Event
interface, which extends Handlable
:
import { Event } from '@event-driven-architecture/core';
interface UserCreatedEventPayload {
userId: string;
}
export class UserCreatedEvent implements Event<UserCreatedEventPayload> {
constructor(public readonly payload: UserCreatedEventPayload) {}
}
Event handlers implement the EventHandler
interface:
import { EventHandler } from '@event-driven-architecture/core';
import { UserCreatedEvent } from './events/user-created.event';
export class UserCreatedEventHandler implements EventHandler<UserCreatedEvent> {
handle(event: UserCreatedEvent): void {
const { userId } = event.payload;
// Handle the event
console.log(`User created with ID: ${userId}`);
}
}
Tasks implement the Task
interface, which also extends Handlable
:
import { Task } from '@event-driven-architecture/core';
interface CalculateOrderTotalPayload {
orderId: string;
items: Array<{ price: number; quantity: number }>;
}
export class CalculateOrderTotalTask implements Task<CalculateOrderTotalPayload> {
constructor(public readonly payload: CalculateOrderTotalPayload) {}
}
Task processors implement the TaskProcessor
interface and can return results:
import { TaskProcessor } from '@event-driven-architecture/core';
import { CalculateOrderTotalTask } from './tasks/calculate-order-total.task';
interface OrderTotal {
orderId: string;
total: number;
}
export class CalculateOrderTotalProcessor implements TaskProcessor<CalculateOrderTotalTask, OrderTotal> {
handle(task: CalculateOrderTotalTask): OrderTotal {
const { orderId, items } = task.payload;
const total = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
return { orderId, total };
}
}
Handlers are registered through an implementation of HandlerRegister
. A handler is uniquely identified by:
- The handlable class it processes (using
handles
property) - Optional routing metadata – a free-form object for differentiation
// singleton handler (single instance provided by you)
handlerRegister.addHandler({ handles: UserCreatedEvent, routingMetadata: { v: 1 } }, new UserCreatedEventHandler());
// scoped / transient handler (register by class, a fresh instance created per invocation)
handlerRegister.addScopedHandler(
{ handles: CalculateOrderTotalTask }, // no metadata – acts as a catch-all
CalculateOrderTotalProcessor,
);
Important: Before publishing events or tasks, you must register a publisher with the EventBus using the setPublisher()
method. Attempting to publish without a registered publisher will throw a PublisherNotSetException
.
To publish handlables, use the EventBus
:
import { EventBus } from '@event-driven-architecture/core';
import { UserCreatedEvent } from './events/user-created.event';
import { CalculateOrderTotalTask } from './tasks/calculate-order-total.task';
class UserService {
constructor(private readonly eventBus: EventBus) {}
createUser(userId: string): void {
// Business logic...
// Publish event
this.eventBus.publish(new UserCreatedEvent({ userId }));
}
}
class OrderService {
constructor(private readonly eventBus: EventBus) {}
processOrder(orderId: string): void {
// Publish task
this.eventBus.publish(
new CalculateOrderTotalTask({
orderId,
items: [{ price: 10, quantity: 2 }],
}),
);
}
}
To use external message brokers, you need to set up a publisher that implements the Publisher
interface:
import { EventBus, Handlable, Publisher } from '@event-driven-architecture/core';
class MyCustomPublisher implements Publisher {
publish<T extends Handlable>(handlable: T): void {
// Send to message broker
console.log('Publishing:', handlable);
}
publishAll(handlables: Handlable[]): void {
// Send all to message broker
console.log('Publishing all:', handlables);
}
}
class AppBootstrap {
constructor(
private readonly eventBus: EventBus,
private readonly customPublisher: MyCustomPublisher,
) {}
initialize() {
// Set the publisher for the event bus
this.eventBus.setPublisher(this.customPublisher);
}
}
When processing handlables synchronously, the EventBus returns HandlingResult
objects that encapsulate both successful results and errors using the Result pattern. This eliminates the need to catch exceptions and provides a more predictable error handling experience:
// Single handler - returns HandlingResult<TResult>
const result = await eventBus.synchronouslyConsumeByStrictlySingleHandler(
new CalculateOrderTotalTask({ orderId: '123', items: [{ price: 10, quantity: 2 }] }),
{ routingMetadata: { v: 1 } },
);
// Check if the operation was successful
if (result.isSuccess()) {
console.log('Order total:', result.getValueOrThrow().total);
} else {
console.error('Error processing task:', result.getErrorOrNull());
}
// Multiple handlers - returns HandlingResult<TResult>[]
const results = await eventBus.synchronouslyConsumeByMultipleHandlers(new UserCreatedEvent({ userId: '123' }), {
routingMetadata: { v: 1 },
});
results.forEach((result) => {
if (result.isSuccess()) {
console.log('Handler result:', result.getValueOrThrow());
} else {
console.error('Handler failed:', result.getErrorOrNull());
}
});
The HandlingResult
class provides several methods for working with results and errors:
const result = await eventBus.synchronouslyConsumeByStrictlySingleHandler(new SomeTask());
// Check result status
if (result.isSuccess()) {
// Safe to access value
const value = result.getValueOrThrow();
} else if (result.isError()) {
// Handle error case
const error = result.getErrorOrNull();
console.error('Processing failed:', error);
}
// Alternative: Get value or null (doesn't throw)
const valueOrNull = result.getValueOrNull();
if (valueOrNull !== null) {
// Process successful result
}
// Get value or throw the contained error
try {
const value = result.getValueOrThrow();
// Process value
} catch (error) {
// Handle the original error that caused the failure
}
The event bus can return the following error types in failed HandlingResult
objects:
-
HandlerNotFoundException
- No handlers found for the given handlable and routing metadata -
MultipleHandlersFoundException
- Multiple handlers found when exactly one was expected -
HandlerThrownException
- A handler threw an exception during execution (contains the original error)
When consuming handlables, you can pass a request-scoped context alongside routing metadata:
const result = await eventBus.synchronouslyConsumeByStrictlySingleHandler(new UserCreatedEvent({ userId: '123' }), {
routingMetadata: { v: 1 },
context: { requestId: '456' },
});
The event-driven module provides several key definitions:
Handlable - Base interface for all processable objects. Contains a read-only payload with information.
Event - Specialization of Handlable that represents things that have happened in your application.
Task - Specialization of Handlable that represents work that needs to be done.
Handler - Generic interface for processing handlables. Can optionally return results and receive context.
Event Handler (EventHandler) - Specialization of Handler for processing events. Typically returns void.
Task Processor (TaskProcessor) - Specialization of Handler for processing tasks. Can return results.
Event Bus (EventBus) - Core interface for the event bus. Responsible for publishing handlables and routing them to appropriate handlers.
Publisher - Interface for publishing handlables to external systems. Must be registered with the EventBus using setPublisher()
.
Handler Register (HandlerRegister) - Interface for registering handlers and retrieving handler signatures.
HandlingResult - Result wrapper that encapsulates both successful results and errors from synchronous handler execution. Uses the Result pattern to eliminate exception throwing and provide predictable error handling. Contains methods like isSuccess()
, isError()
, getValueOrThrow()
, and getErrorOrNull()
.
You can create scoped handlers that receive context information:
import { EventHandler, TaskProcessor } from '@event-driven-architecture/core';
import { UserCreatedEvent } from './events/user-created.event';
import { CalculateOrderTotalTask } from './tasks/calculate-order-total.task';
interface EventContext {
requestId: string;
}
export class ScopedUserCreatedEventHandler implements EventHandler<UserCreatedEvent, EventContext> {
handle(event: UserCreatedEvent, context: EventContext): void {
// Access request context
console.log('Request context:', context);
// Handle the event with access to request context
const { userId } = event.payload;
console.log(`User created with ID: ${userId} in request: ${context.requestId}`);
}
}
export class ScopedCalculateOrderTotalProcessor
implements TaskProcessor<CalculateOrderTotalTask, OrderTotal, EventContext>
{
handle(task: CalculateOrderTotalTask, context: EventContext): OrderTotal {
console.log('Processing order in request:', context.requestId);
const { orderId, items } = task.payload;
const total = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
return { orderId, total };
}
}
The core module uses generics extensively for type safety. Here's how to leverage them:
You can specify the expected result type when creating an EventBus instance:
import { BaseEventBus, EventBus, HandlingResult } from '@event-driven-architecture/core';
interface ProcessingValue {
// ... typed structure
}
const eventBus: EventBus<MyEvent, ProcessingValue> = new BaseEventBus<MyEvent, ProcessingValue>(handlerRegister);
// When consuming:
const result: HandlingResult<ProcessingValue> = await eventBus.synchronouslyConsumeByStrictlySingleHandler(myEvent);
if (result.isSuccess()) {
const value: ProcessingValue = result.getValueOrThrow();
}
Handlers can be typed for specific payloads and results:
class TypedHandler implements Handler<MyEvent, ProcessingValue> {
handle(event: MyEvent): ProcessingValue {
return {
// expected structure
};
}
}
This ensures compile-time checks for integrations.
The snippet below shows how the main pieces plug together with the new API:
import {
BaseEventBus,
BaseHandlerRegister,
Event,
EventBus,
EventHandler,
Handlable,
HandlerRegister,
Publisher,
Task,
TaskProcessor,
} from '@event-driven-architecture/core';
// 1. Define an event
export class UserCreatedEvent implements Event<{ userId: string }> {
constructor(public readonly payload: { userId: string }) {}
}
// 2. Define a task
export class CalculateOrderTotalTask implements Task<{ orderId: string; total: number }> {
constructor(public readonly payload: { orderId: string; total: number }) {}
}
// 3. Implement handlers
class UserCreatedHandler implements EventHandler<UserCreatedEvent> {
handle(event: UserCreatedEvent): void {
console.log('User created (v=1):', event.payload.userId);
}
}
class OrderTotalProcessor implements TaskProcessor<CalculateOrderTotalTask, { calculatedTotal: number }> {
handle(task: CalculateOrderTotalTask): { calculatedTotal: number } {
console.log('Calculating total for order:', task.payload.orderId);
return { calculatedTotal: task.payload.total * 1.1 }; // Add 10% tax
}
}
// 4. Implement a publisher
const inMemoryPublisher: Publisher = {
publish: (handlable) => console.log('Published', handlable),
publishAll: (handlables) => console.log('Published many', handlables),
};
// 5. Wire everything together
const register: HandlerRegister = new BaseHandlerRegister();
register.addHandler({ handles: UserCreatedEvent, routingMetadata: { v: 1 } }, new UserCreatedHandler());
register.addHandler({ handles: CalculateOrderTotalTask }, new OrderTotalProcessor());
const eventBus = new BaseEventBus(register);
eventBus.setPublisher(inMemoryPublisher);
// 6. Emit and consume handlables
const event = new UserCreatedEvent({ userId: '1' });
const task = new CalculateOrderTotalTask({ orderId: 'order-1', total: 100 });
// Consume event (returns void in HandlingResult)
const eventResult = await eventBus.synchronouslyConsumeByStrictlySingleHandler(event, {
routingMetadata: { v: 1 },
});
if (eventResult.isSuccess()) {
// log: User created (v=1): 1
console.log('Event processed successfully');
} else {
console.error('Event processing failed:', eventResult.getErrorOrNull());
}
// Consume task (returns calculated result in HandlingResult)
const taskResult = await eventBus.synchronouslyConsumeByStrictlySingleHandler(task);
if (taskResult.isSuccess()) {
console.log('Task result:', taskResult.getValueOrThrow().calculatedTotal); // 110
} else {
console.error('Task processing failed:', taskResult.getErrorOrNull());
}
// Or publish to forward to the configured publisher
eventBus.publish(event);
eventBus.publish(task);