A TypeScript SDK for interacting with the DeltaBase event sourcing platform.
DeltaBase is an event sourcing platform that allows you to store and process event streams. This SDK provides a clean, typed interface for working with DeltaBase from Node.js applications.
Key features:
- Event Storage: Append, read, and query events in streams
- Event Subscriptions: React to events in real-time with webhooks and other subscription types
- Advanced Querying: Filter and search events across streams
- Event Aggregation: Build state from event streams with built-in aggregation helpers
- Administration: Manage event stores, subscriptions, and platform resources
npm install @deltabase/server
# or
yarn add @deltabase/server
# or
pnpm add @deltabase/server
import { DeltaBase } from '@deltabase/server';
// Initialize the DeltaBase client
const client = new DeltaBase({
apiKey: 'your-api-key',
// For production use
// baseUrl: 'https://api.delta-base.com'
});
// Get an event store client
const eventStore = client.getEventStore('my-event-store');
// Append events to a stream
await eventStore.appendToStream('user-123', [
{
type: 'user.created',
data: { name: 'Alice', email: 'alice@example.com' }
}
]);
// Read events from a stream
const { events } = await eventStore.readStream('user-123');
console.log('User events:', events);
DeltaBase is built around the event sourcing pattern, where:
- All changes to application state are captured as a sequence of immutable events
- Events are stored in an append-only log
- Current state can be reconstructed by replaying events
- Event history provides a complete audit trail of all changes
A stream is a sequence of related events, typically representing the history of a single entity. Streams are identified by a unique ID, such as user-123
or order-456
.
An event represents something that happened in your domain. Events are immutable and include:
-
type
: A descriptive name for the event (e.g.,user.created
,order.placed
) -
data
: The payload of the event, containing relevant information - Metadata: Additional information like timestamps, correlation IDs, etc.
Subscriptions allow you to react to events in real-time. You can subscribe to specific event types or patterns and receive notifications via webhooks or other mechanisms.
import { DeltaBase } from '@deltabase/server';
const client = new DeltaBase({ apiKey: 'your-api-key', baseUrl: 'https://api.delta-base.com' });
// Create a new event store
const management = client.getManagement();
await management.createEventStore({
name: 'orders',
description: 'Event store for order events',
});
// Get an event store client
const orderStore = client.getEventStore('orders');
// Append events
await orderStore.appendToStream('order-123', [
{
type: 'order.created',
data: {
customerId: 'cust-456',
items: [{ productId: 'prod-789', quantity: 2, price: 25.99 }],
total: 51.98
}
}
]);
// Read events
const { events } = await orderStore.readStream('order-123');
// Query events
const { events: userOrders } = await orderStore.queryEvents({
type: 'order.created',
fromDate: '2023-01-01T00:00:00Z',
limit: 10
});
// List streams
const { streams } = await orderStore.listStreams({ pattern: 'order-*' });
// Define your state type
type OrderState = {
id: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
status: 'pending' | 'shipped' | 'delivered' | 'canceled';
};
// Define your event types
type OrderEvent =
| { type: 'order.created'; data: { customerId: string; items: Array<{ productId: string; quantity: number; price: number }>; total: number } }
| { type: 'order.shipped'; data: { trackingNumber: string; shippedAt: string } }
| { type: 'order.delivered'; data: { deliveredAt: string } }
| { type: 'order.canceled'; data: { reason: string; canceledAt: string } };
// Aggregate events into state
const { state, currentStreamVersion } = await orderStore.aggregateStream<OrderState, OrderEvent>(
'order-123',
{
initialState: () => ({
id: '',
customerId: '',
items: [],
total: 0,
status: 'pending'
}),
evolve: (state, event) => {
switch (event.type) {
case 'order.created':
return {
...state,
id: event.streamId,
customerId: event.data.customerId,
items: event.data.items,
total: event.data.total
};
case 'order.shipped':
return { ...state, status: 'shipped' };
case 'order.delivered':
return { ...state, status: 'delivered' };
case 'order.canceled':
return { ...state, status: 'canceled' };
default:
return state;
}
}
}
);
console.log('Current order state:', state);
// Get the event bus for a store
const eventBus = client.getEventBus('orders');
// Create a webhook subscription for all order events
const subscription = await eventBus.subscribeWebhook(
'order.*',
'https://example.com/webhooks/orders',
{
headers: { 'X-API-Key': 'webhook-secret' },
retryPolicy: {
maxAttempts: 3,
backoffMinutes: 5
}
}
);
// List subscriptions
const { subscriptions } = await eventBus.listSubscriptions();
// Unsubscribe
await eventBus.unsubscribe(subscription.subscriptionId);
const management = client.getManagement();
// List all event stores
const { eventStores } = await management.listEventStores();
// Get details about a specific event store
const storeDetails = await management.getEventStore('orders');
console.log('Storage used:', storeDetails.statistics?.databaseSizeBytes);
// Update event store settings
await management.updateEventStore('orders', {
description: 'Updated description',
retentionPeriodDays: 90
});
// Delete an event store (use with caution!)
await management.deleteEventStore('unused-store');
The SDK is organized into several main classes:
The main client class that provides access to all functionality.
const client = new DeltaBase({
apiKey: 'your-api-key',
baseUrl: 'https://api.delta-base.com' // Optional
});
Methods:
-
getManagement()
: Returns a ManagementClient for administrative operations -
getEventStore(eventStoreId)
: Returns an EventStore client for the specified store -
getEventBus(eventStoreId)
: Returns an EventBus client for the specified store
For working with event streams within a specific event store.
Methods:
-
appendToStream(streamId, events, options?)
: Append events to a stream -
readStream(streamId, options?)
: Read events from a stream -
aggregateStream(streamId, options)
: Build state from events using an aggregation function -
queryEvents(options?)
: Query events across streams with filtering -
queryStreams(options?)
: Query streams with filtering -
listStreams(options?)
: Get a list of stream IDs
For managing event subscriptions.
Methods:
-
subscribe(options)
: Create a subscription to events -
subscribeWebhook(eventFilter, url, options?)
: Convenient method to create a webhook subscription -
getSubscription(subscriptionId)
: Get details about a subscription -
listSubscriptions(options?)
: List all subscriptions -
unsubscribe(subscriptionId)
: Delete a subscription
For administrative operations on event stores.
Methods:
-
createEventStore(options)
: Create a new event store -
listEventStores()
: List all event stores -
getEventStore(eventStoreId)
: Get details about an event store -
updateEventStore(eventStoreId, settings)
: Update event store settings -
deleteEventStore(eventStoreId)
: Delete an event store
You can use the expectedStreamVersion
option to implement optimistic concurrency control:
try {
await eventStore.appendToStream(
'user-123',
[{ type: 'user.updated', data: { email: 'new@example.com' } }],
{ expectedStreamVersion: 5n } // Only succeed if the current version is 5
);
} catch (error) {
console.error('Concurrency conflict!', error);
}
Every event can include a transaction ID to group related events:
await eventStore.appendToStream('order-123', [
{
type: 'order.created',
data: { /* ... */ },
transactionId: 'tx-abc-123'
},
{
type: 'inventory.reserved',
data: { /* ... */ },
transactionId: 'tx-abc-123' // Same transaction ID
}
]);
(c) Copyright 2025 nibbio LLC, all rights reserved.