Kafka Client Module
kafka client implementation for aranode flow engine.
Table of Contents
Installation
To install this module in your aranode project, you just need to add the package using following command:
$ yarn add @a-mehrabi/aranode-kafka-client
After that, when you want to use it, you must include it in custom modules like the following:
path: .env
ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-client
Usage
For using Kafka client, you need three descriptions:
-
Kafka client config config description (kafkaClientConfig)
-
Kafka producer adapter description (kafkaProducer)
-
Kafka consumer adapter description (kafkaConsumer)
Kafka client config
Kafka Client config description enables you to define the config of the kafka client, including clientId, brokers config, etc.
You don't define configs of the producer and consumer in this config.
version: 1
kind: config
name: config-name
config:
kafka:
clientId: client-id
brokers:
- localhost:9092
retry:
retries: 8
initialRetryTime: 100
maxRetryTime: 2
factor: 3
multiplier: 4
Kafka Producer
to publish messages to Kafka server you have to create a adapter description. kafka producer adapter description, sets a handler for specified kafka client.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
transform:
path: 'dist/producer-transformer.js'
nextNode: kaka
kafka:
port:
name: kafka-client-out-port
type: outbound
terminal: true
Configs example:
path: adapter.yml
version: 1
kind: adapter
name: adapter-name
adapter:
producer:
client: kafka-client-config-name
topic: topic-name
path: bind.yml
version: 1
kind: bindPro
name: bind-name
bind:
flow-name:
- type: outbound
port: port-name
adapter: adapter-name
Kafka Consumer
kafka consumer allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
nextNode: consumeData
consumeData:
transform:
path: 'dist/consumer-transformer.js'
terminal: true
Configs example:
path: adapter.yml
version: 1
kind: adapter
name: consumer-adapter-name
adapter:
consumer:
client: kafka-client-config-name
groupId: group-id
topic: topic-name
fromBeginning: true
path: bind.yml
version: 1
kind: bind
name: bind-name
bind:
flow-name:
- type: inbound
port: port-name
adapter: consumer-adapter-name
API
kafkaClientConfig
Type: config description
Options:
-
clientId (string), required
Kafka client id
-
brokers (string[]), required
Kafka client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata
-
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
-
retries (number), optional, default =
5
Max number of retries per call
-
initialRetryTime (number), optional, default =
300
Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
-
maxRetryTime (number), optional, default =
30000
Maximum wait time for a retry in milliseconds
-
factor (number), optional, default =
0.2
Randomization factor Name of the service that you want to load and use
-
kafkaProducer
Type: adapter description
Options:
-
client (string), required
Kafka client config name
-
topic (string), required, default =
null
Topic name
-
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
-
retries (number), optional, default =
5
Max number of retries per call
-
initialRetryTime (number), optional, default =
300
Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
-
maxRetryTime (number), optional, default =
30000
Maximum wait time for a retry in milliseconds
-
factor (number), optional, default =
0.2
Randomization factor
-
multiplier (number), optional, default =
2
Exponential factor
-
-
metadataMaxAge (number), optional, default =
300000
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
-
allowAutoTopicCreation (boolean), optional, default =
true
Allow topic creation when querying metadata for non-existent topics
-
idempotent (boolean), optional, default =
false
Experimental. If enabled producer will ensure each message is written exactly once. Acks must be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER.
-
transactionalId (string), optional
-
transactionTimeout (number), optional, default =
60000
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
If this value is larger than the transaction max timeout ms setting in the broker, the request will fail with a InvalidTransactionTimeout error -
maxInFlightRequests (number), optional, default =
null
Max number of requests that may be in progress at any time. If falsey then no limit
kafkaConsumer
Type: adapter description
Options:
-
client (string), required
Kafka client config name
-
groupId (string), required
Group id
-
topic (string), required
Topic name
-
fromBeginning (boolean), required
The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group.
-
sessionTimeout (number), optional, default =
30000
The consumer timeout time described in milliseconds