@castore/dam
TypeScript icon, indicating that this package has built-in type declarations

1.25.3 • Public • Published

Dam

Data maintenance & migration tooling for the Castore library.

📥 Installation

# npm
npm install --save-dev @castore/dam

# yarn
yarn add --dev @castore/dam

This package has @castore/core as peer dependency, so you will have to install it as well:

# npm
npm install @castore/core

# yarn
yarn add @castore/core

👩‍💻 Usage

@castore/dam exposes a series of utils that scan past events and re-publish them in message channels – or "pour them" as in "pouring water from a container to another" 🫗.

Those utils are typically very useful for data maintenance and migration. They publish messages with the replay option enabled and can be rate limited to limit impact on production traffic. They are the following:

pourEventStoreAggregateIds

Pour all the aggregate ids of an event store in a provided AggregateExistsMessageChannel. Aggregate ids are published in the order in which they are retrieved (by default, ordered by their initial timestamps).

import { pourEventStoreAggregateIds } from '@castore/dam';

// 👇 ...or AggregateExistsMessageBus
const maintenanceMessageQueue = new AggregateExistsMessageQueue({
  sourceEventStores: [pokemonEventStore],
  ...
});

const results = await pourEventStoreAggregateIds({
  eventStore: pokemonEventStore,
  messageChannel: maintenanceMessageQueue,
  // 👇 Optional `listAggregateIds` options (except "pageToken")
  options: {
    limit: 100,
    initialEventAfter: '2020-01-01T00:00:00.000Z',
    initialEventBefore: '2023-01-01T00:00:00.000Z',
    reverse: false,
  },
  // 👇 Optional rate limit (messages/second)
  rateLimit: 100,
});

const {
  // 👇 Count of poured aggregate ids
  pouredAggregateIdCount,
  // 👇 Infos about first/last scanned aggregates (potentially undefined)
  firstScannedAggregate,
  lastScannedAggregate,
} = results;

pourAggregateEvents

Pour all the events of a specific aggregate in a provided NotificationMessageChannel. Events are published in the order in which they are retrieved (by default, ordered by their timestamps).

import { pourAggregateEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
  sourceEventStores: [pokemonEventStore],
  ...
});

const results = await pourAggregateEvents({
  eventStore: pokemonEventStore,
  messageChannel: maintenanceMessageQueue,
  aggregateId: 'pikachu1',
  // 👇 Optional `getEvents` options
  options: {
    minVersion: 1,
    maxVersion: 10,
    limit: 5,
    reverse: false,
  },
  // 👇 Optional `timestamp` filters
  filters: {
    from: '2020-01-01T00:00:00.000Z',
    to: '2023-01-01T00:00:00.000Z',
  },
  // 👇 Optional rate limit (messages/second)
  rateLimit: 100,
});

const {
  // 👇 Count of poured events
  pouredEventCount,
  // 👇 Infos about first/last scanned events (potentially undefined)
  firstPouredEvent,
  lastPouredEvent,
} = results;

pourEventStoreEvents

Pour all the events of an event store in a provided NotificationMessageChannel. Events are published in the order of their timestamps (independently of their aggregate).

import { pourEventStoreEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
  sourceEventStores: [pokemonEventStore],
  ...
});

const results = await pourEventStoreEvents({
  eventStore: pokemonEventStore,
  messageChannel: maintenanceMessageQueue,
  // 👇 Optional `timestamp` filters
  filters: {
    from: '2020-01-01T00:00:00.000Z',
    to: '2023-01-01T00:00:00.000Z',
  },
  // 👇 Optional rate limit (messages/second)
  rateLimit: 100,
});

const {
  // 👇 Count of poured events
  pouredEventCount,
  // 👇 Infos about first/last scanned aggregates (potentially undefined)
  firstScannedAggregate,
  lastScannedAggregate,
} = results;

pourEventStoreCollectionEvents

Pour all the events of a collection of event stores in a provided NotificationMessageChannel. Events are published in the order of their timestamps (independently of their aggregate and event store).

import { pourEventStoreEvents } from '@castore/dam';

// 👇 ...or NotificationMessageBus
const maintenanceMessageQueue = new NotificationMessageQueue({
  sourceEventStores: [pokemonEventStore, trainerEventStore],
  // ...
});

const results = await pourEventStoreCollectionEvents({
  eventStores: [pokemonEventStore, trainerEventStore],
  messageChannel: maintenanceMessageQueue,
  // 👇 Optional `timestamp` filters
  filters: {
    from: '2020-01-01T00:00:00.000Z',
    to: '2023-01-01T00:00:00.000Z',
  },
  // 👇 Optional rate limit (messages/second)
  rateLimit: 100,
});

const {
  // 👇 Count of poured events
  pouredEventCount,
  // 👇 Infos about first/last scanned aggregates (potentially undefined)
  scans: {
    // 👇 By event store id
    POKEMONS: { firstScannedAggregate, lastScannedAggregate },
    TRAINERS: { firstScannedAggregate, lastScannedAggregate },
  },
} = results;

Package Sidebar

Install

npm i @castore/dam

Weekly Downloads

2

Version

1.25.3

License

MIT

Unpacked Size

281 kB

Total Files

109

Last publish

Collaborators

  • thomasaribart
  • valentinbeggi
  • charlesgery
  • julietteff