@chronark/upstash-kafka
TypeScript icon, indicating that this package has built-in type declarations

1.1.1 • Public • Published

upstash-kafka

Serverless kafka client for upstash

codecov

This project is written using the deno runtime and then transpiled to node and published as a package on npm.

Requirements

Either deno 1.x or node 14.x and higher

Installation

Deno

import { Kafka } from "https://deno.land/x/upstash_kafka/mod.ts"

Node

npm install @chronark/upstash-kafka
yarn add @chronark/upstash-kafka
pnpm add @chronark/upstash-kafka

You get the idea.

Quickstart

Auth

  1. Go to upstash and select your database.
  2. Copy the REST API secrets at the bottom of the page
import { Kafka } from "@chronark/upstash-kafka";

const kafka = new Kafka({
  url: "<UPSTASH_KAFKA_REST_URL>",
  username: "<UPSTASH_KAFKA_REST_USERNAME>",
  password: "<UPSTASH_KAFKA_REST_PASSWORD>",
});

Produce a single message

const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message);
const res = await p.produce("<my.topic>", message, {
  partition: 1,
  timestamp: 12345,
  key: "<custom key>",
  headers: [{ key: "traceId", value: "85a9f12" }],
});

Produce multiple messages.

The same options from the example above can be set for every message.

const p = kafka.producer();
const res = await p.produceMany([
  {
    topic: "my.topic",
    value: "my message",
    // ...options
  },
  {
    topic: "another.topic",
    value: "another message",
    // ...options
  },
]);

Consume

The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That's why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.

const c = kafka.consumer();
const messages = await c.consume({
  consumerGroupId: "group_1",
  instanceId: "instance_1",
  topics: ["test.topic"],
  autoOffsetReset: "earliest",
});

More examples can be found in the docstring

Commit manually

While consume can handle committing automatically, you can also use Consumer.commit to manually commit.

const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";

const c = kafka.consumer();
const messages = await c.consume({
  consumerGroupId,
  instanceId,
  topics: [topic],
  autoCommit: false,
});

for (const message of messages) {
  // message handling logic

  await c.commit({
    consumerGroupId,
    instanceId,
    offset: {
      topic: message.topic,
      partition: message.partition,
      offset: message.offset,
    },
  });
}

Fetch

You can also manage offsets manually by using Consumer.fetch

const c = kafka.consumer();
const messages = await c.fetch({
  topic: "greeting",
  partition: 3,
  offset: 42,
  timeout: 1000,
});

Examples

There is a minimal working example application available in /example as well as various examples in the docstrings of each method.

Contributing

Setup

  1. Create a kafka instance on upstash docs
  2. Create the following topics: blue, red, green docs
  3. Create .env file with your kafka secrets cp .env.example .env

Running tests

make test

Building for node

make build

A /npm folder will be created with the built node module. As part of the build process the tests are run against your installed node version. To disable this, you can configure the build pipeline in /cmd/build.ts

// ...
await build({
  test: false, // <-- add this
  // ... remaining config
});

/@chronark/upstash-kafka/

    Package Sidebar

    Install

    npm i @chronark/upstash-kafka

    Weekly Downloads

    12

    Version

    1.1.1

    License

    MIT

    Unpacked Size

    36 kB

    Total Files

    22

    Last publish

    Collaborators

    • chronark