@delta-base/server
TypeScript icon, indicating that this package has built-in type declarations

0.3.7 • Public • Published

DeltaBase Server SDK

npm version License

A TypeScript SDK for interacting with the DeltaBase event sourcing platform.

Overview

DeltaBase is an event sourcing platform that allows you to store and process event streams. This SDK provides a clean, typed interface for working with DeltaBase from Node.js applications.

Key features:

  • Event Storage: Append, read, and query events in streams
  • Event Subscriptions: React to events in real-time with webhooks and other subscription types
  • Advanced Querying: Filter and search events across streams
  • Event Aggregation: Build state from event streams with built-in aggregation helpers
  • Administration: Manage event stores, subscriptions, and platform resources

Installation

npm install @deltabase/server
# or
yarn add @deltabase/server
# or
pnpm add @deltabase/server

Quick Start

import { DeltaBase } from '@deltabase/server';

// Initialize the DeltaBase client
const client = new DeltaBase({
  apiKey: 'your-api-key',
  // For production use
  // baseUrl: 'https://api.delta-base.com'
});

// Get an event store client
const eventStore = client.getEventStore('my-event-store');

// Append events to a stream
await eventStore.appendToStream('user-123', [
  {
    type: 'user.created',
    data: { name: 'Alice', email: 'alice@example.com' }
  }
]);

// Read events from a stream
const { events } = await eventStore.readStream('user-123');
console.log('User events:', events);

Key Concepts

Event Sourcing

DeltaBase is built around the event sourcing pattern, where:

  • All changes to application state are captured as a sequence of immutable events
  • Events are stored in an append-only log
  • Current state can be reconstructed by replaying events
  • Event history provides a complete audit trail of all changes

Streams

A stream is a sequence of related events, typically representing the history of a single entity. Streams are identified by a unique ID, such as user-123 or order-456.

Events

An event represents something that happened in your domain. Events are immutable and include:

  • type: A descriptive name for the event (e.g., user.created, order.placed)
  • data: The payload of the event, containing relevant information
  • Metadata: Additional information like timestamps, correlation IDs, etc.

Subscriptions

Subscriptions allow you to react to events in real-time. You can subscribe to specific event types or patterns and receive notifications via webhooks or other mechanisms.

Usage Examples

Working with Event Stores

import { DeltaBase } from '@deltabase/server';

const client = new DeltaBase({ apiKey: 'your-api-key', baseUrl: 'https://api.delta-base.com' });

// Create a new event store
const management = client.getManagement();
await management.createEventStore({
  name: 'orders',
  description: 'Event store for order events',
});

// Get an event store client
const orderStore = client.getEventStore('orders');

// Append events
await orderStore.appendToStream('order-123', [
  {
    type: 'order.created',
    data: {
      customerId: 'cust-456',
      items: [{ productId: 'prod-789', quantity: 2, price: 25.99 }],
      total: 51.98
    }
  }
]);

// Read events
const { events } = await orderStore.readStream('order-123');

// Query events
const { events: userOrders } = await orderStore.queryEvents({
  type: 'order.created',
  fromDate: '2023-01-01T00:00:00Z',
  limit: 10
});

// List streams
const { streams } = await orderStore.listStreams({ pattern: 'order-*' });

Aggregating State from Events

// Define your state type
type OrderState = {
  id: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  total: number;
  status: 'pending' | 'shipped' | 'delivered' | 'canceled';
};

// Define your event types
type OrderEvent =
  | { type: 'order.created'; data: { customerId: string; items: Array<{ productId: string; quantity: number; price: number }>; total: number } }
  | { type: 'order.shipped'; data: { trackingNumber: string; shippedAt: string } }
  | { type: 'order.delivered'; data: { deliveredAt: string } }
  | { type: 'order.canceled'; data: { reason: string; canceledAt: string } };

// Aggregate events into state
const { state, currentStreamVersion } = await orderStore.aggregateStream<OrderState, OrderEvent>(
  'order-123',
  {
    initialState: () => ({
      id: '',
      customerId: '',
      items: [],
      total: 0,
      status: 'pending'
    }),
    evolve: (state, event) => {
      switch (event.type) {
        case 'order.created':
          return {
            ...state,
            id: event.streamId,
            customerId: event.data.customerId,
            items: event.data.items,
            total: event.data.total
          };
        case 'order.shipped':
          return { ...state, status: 'shipped' };
        case 'order.delivered':
          return { ...state, status: 'delivered' };
        case 'order.canceled':
          return { ...state, status: 'canceled' };
        default:
          return state;
      }
    }
  }
);

console.log('Current order state:', state);

Setting Up Event Subscriptions

// Get the event bus for a store
const eventBus = client.getEventBus('orders');

// Create a webhook subscription for all order events
const subscription = await eventBus.subscribeWebhook(
  'order.*',
  'https://example.com/webhooks/orders',
  {
    headers: { 'X-API-Key': 'webhook-secret' },
    retryPolicy: {
      maxAttempts: 3,
      backoffMinutes: 5
    }
  }
);

// List subscriptions
const { subscriptions } = await eventBus.listSubscriptions();

// Unsubscribe
await eventBus.unsubscribe(subscription.subscriptionId);

Administrative Operations

const management = client.getManagement();

// List all event stores
const { eventStores } = await management.listEventStores();

// Get details about a specific event store
const storeDetails = await management.getEventStore('orders');
console.log('Storage used:', storeDetails.statistics?.databaseSizeBytes);

// Update event store settings
await management.updateEventStore('orders', {
  description: 'Updated description',
  retentionPeriodDays: 90
});

// Delete an event store (use with caution!)
await management.deleteEventStore('unused-store');

API Reference

The SDK is organized into several main classes:

DeltaBase

The main client class that provides access to all functionality.

const client = new DeltaBase({
  apiKey: 'your-api-key',
  baseUrl: 'https://api.delta-base.com' // Optional
});

Methods:

  • getManagement(): Returns a ManagementClient for administrative operations
  • getEventStore(eventStoreId): Returns an EventStore client for the specified store
  • getEventBus(eventStoreId): Returns an EventBus client for the specified store

EventStore

For working with event streams within a specific event store.

Methods:

  • appendToStream(streamId, events, options?): Append events to a stream
  • readStream(streamId, options?): Read events from a stream
  • aggregateStream(streamId, options): Build state from events using an aggregation function
  • queryEvents(options?): Query events across streams with filtering
  • queryStreams(options?): Query streams with filtering
  • listStreams(options?): Get a list of stream IDs

EventBus

For managing event subscriptions.

Methods:

  • subscribe(options): Create a subscription to events
  • subscribeWebhook(eventFilter, url, options?): Convenient method to create a webhook subscription
  • getSubscription(subscriptionId): Get details about a subscription
  • listSubscriptions(options?): List all subscriptions
  • unsubscribe(subscriptionId): Delete a subscription

ManagementClient

For administrative operations on event stores.

Methods:

  • createEventStore(options): Create a new event store
  • listEventStores(): List all event stores
  • getEventStore(eventStoreId): Get details about an event store
  • updateEventStore(eventStoreId, settings): Update event store settings
  • deleteEventStore(eventStoreId): Delete an event store

Advanced Topics

Optimistic Concurrency Control

You can use the expectedStreamVersion option to implement optimistic concurrency control:

try {
  await eventStore.appendToStream(
    'user-123',
    [{ type: 'user.updated', data: { email: 'new@example.com' } }],
    { expectedStreamVersion: 5n } // Only succeed if the current version is 5
  );
} catch (error) {
  console.error('Concurrency conflict!', error);
}

Transaction IDs

Every event can include a transaction ID to group related events:

await eventStore.appendToStream('order-123', [
  {
    type: 'order.created',
    data: { /* ... */ },
    transactionId: 'tx-abc-123'
  },
  {
    type: 'inventory.reserved',
    data: { /* ... */ },
    transactionId: 'tx-abc-123' // Same transaction ID
  }
]);

License

(c) Copyright 2025 nibbio LLC, all rights reserved.

Package Sidebar

Install

npm i @delta-base/server

Weekly Downloads

233

Version

0.3.7

License

LicenseRef-LICENSE

Unpacked Size

357 kB

Total Files

9

Last publish

Collaborators

  • enrique.ruvalcaba-nibbio
  • robinsantiago
  • fernando-barroso