avro-registry-client
A node client to interact with the avro registry. There are methods for updating your subjects and schemas, but also some convenience methods for encoding and decoding.
Installation
npm install avro-registry-client
Prerequisites
If you're planning on using the kafka consuming/producing functionality, you'll
want to install kafka-node
as well.
You should have a local copy of the confluent avro schema registry running, which means you also need kafka and maybe zookepper running locally. More documentation on how to get that running forthcoming...sometime.
If you plan on using the .decode()
method, take special note of some of the
caveats described in the description section as it may affect how you need to
restrict schemas in your schema registry.
tl;dr All your your schemas should be unique to each subject/version. This means that you should only be using record types and you should have a name/namespace property that is unique to each subject.
Usage
const kafka = require('kafka-node')
const MY_TOPIC = 'my-topic'
const kafkaProducerClient = new kafka.KafkaClient({
kafkaHost: 'localhost:9092'
})
const kafkaConsumerClient = new kafka.KafkaClient({
kafkaHost: 'localhost:9092'
})
const kafkaProducer = new kafka.Producer(kafkaProducerClient)
const kafkaConsumer = new kafka.Consumer(
kafkaConsumerClient,
[{ topic: MY_TOPIC }],
{ encoding: 'buffer', keyEncoding: 'buffer' }
)
//
;(async function run() {
const registry = require('avro-registry-client')('http://localhost:8081')
await registry.createSubjectVersion('MySubject', {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
})
await registry.createSubjectVersion('MySubject', {
name: 'MySubjectId',
type: 'record',
fields: [
{
name: 'id',
type: 'string'
}
]
})
const mySubjectProducer = registry.createProducer(kafkaProducer, MY_TOPIC, {
valueSubject: { subject: 'MySubject', version: 'latest' },
keySubject: { subject: 'MySubjectId', version: 'latest' }
})
registry
.createConsumer(kafkaConsumer, [
{
valueSubject: { subject: 'MySubject', version: 'latest' },
keySubject: { subject: 'MySubjectId', version: 'latest' },
handler({ key, value, message }) {
console.log(key) // { id: '123' }
console.log(value) // { field1: 'my field value' }
console.log(message) // This is the raw message from kafka-node .on('message')
}
}
])
.listen(err => {
// This gets called if there is an error handling the message
})
})()
API
createAvroRegistryClient (registryHost)
(default export)
Creates a registry client whose methods are described below.
Parameters
-
registryHost
(String) - The host of the avro registry
Example
const createAvroRegistryClient = require('avro-registry-client')
const registry = createAvroRegistryClient('http://localhost:8081')
createAvroRegistryClient.COMPATIBILITY_TYPES
An object whose properties are the allowed compatibility types when setting compatibility. Note that you don't have to use this object to put into the compatibility methods' argument. You can just use the raw string value. This object is just helpful for enumeration purposes.
Example
const createAvroRegistryClient = require('avro-registry-client')
const { COMPATIBILITY_TYPES } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')
registry.setGlobalCompatibility(COMPATIBILITY_TYPES.FULL).then(() => {})
createAvroRegistryClient.errors
An object whose values are the known errors that this library will produce when parsing errors from the schema registry responses.
Example
const createAvroRegistryClient = require('avro-registry-client')
const { errors } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')
const invalidSchema = {
// no name property
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
console.log(errors)
/**
* { RouteNotFoundError: [Function: RouteNotFoundError],
* SubjectNotFoundError: [Function: SubjectNotFoundError],
* VersionNotFoundError: [Function: VersionNotFoundError],
* SchemaNotFoundError: [Function: SchemaNotFoundError],
* IncompatibleSchemaError: [Function: IncompatibleSchemaError],
* InvalidSchemaError: [Function: InvalidSchemaError],
* InvalidVersionError: [Function: InvalidVersionError],
* InvalidCompatibilityLevelError: [Function: InvalidCompatibilityLevelError] }
*/
registry.createSubjectVersion('MyInvalidSchema', invalidSchema).catch(err => {
if (err instanceof errors.InvalidSchemaError) {
// do something with it
}
})
registry.createProducer(kafkaProducer, topic, options)
Creates a Kafka Producer that will encode the key and value with avro.
Parameters
-
kafkaProducer
(kafka-node.Producer) - an instance of kafka-node.Producer -
topic
(String) - The topic this producer will publish to -
options
(Object) - An object that contains the keySubject and valueSubject definitions. See example for detailed usage.
Returns
-
(key: any, value: any, sendOptions: Object) => void
- Returns a function that takes in three arguments:key
,value
, andsendOptions
.
Examples
const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaProducer = new kafka.Producer(kafkaClient)
const producer = createProducer(kafkaProducer, 'my-topic', {
valueSubject: { subject: 'MySubject', version: 1 }, // Or pin to a specific version
keySubject: { subject: 'MySubjectId', version: 'latest' }
})
// You can also set the options this way:
const producer2 = createProducer(kafkaProducer, 'my-topic')
.valueSubject('MySubject', 1)
.keySubject('MySubjectId') // You may omit the second argument if you just want 'latest'
// Same api for producer2
producer({ id: '123' }, { field1: 'body' }).then(() => {
console.log('sent!')
})
registry.createConsumer(kafkaConsumer, messageTypes)
Creates a Kafka Consumer that will decode messages with avro.
Parameters
-
kafkaConsumer
(kafka-node.Consumer|kafka-node.ConsumerGroup) - An instance of a kafka-node consumer. Can be aConsumer
orConsumerGroup
. Note that when you configure the consumer, you will have already told it which topics to listen to. Also note that the consumer must be configured to handle the keys and values as buffers. See Example for details. -
messageTypes
(Object[]) - An array of message types that this consumer will be able to handle. Message Type definitions consist of a valueSubject, keySubject, and a handler. The reason this is an array is because it is possible to receive multiple message types on a single topic. For more details, see the note underregistry.decode (possibleSchemas, message)
.
Example
const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaConsumer = new kafka.Producer(kafkaClient, [{ topic: 'my-topic' }], {
encoding: 'buffer',
keyEncoding: 'buffer'
})
registry
.createConsumer(kafkaConsumer, [{
valueSubject: { version: 'MySubject', version: 1 },
keySubject: { version: 'MySubjectId', version: 'latest' },
handler(({ key, value, message }) {
// The `key` is the avro decoded key
// The `value` is the avro decoded value
// The `message` is the raw message from kafka-node .on('message') handler
})
}])
.listen((err) => {
// This will be called on any error that occurs when there is an error
// handling the message. This will also get called if a message comes
// through that you have not setup a message type handler for.
})
// You can also handle passing in message types this way:
registry
.createConsumer(kafkaConsumer)
.messageType({
valueSubject: { version: 'MySubject', version: 1 },
keySubject: { version: 'MySubjectId', version: 'latest' },
handler({ key, value, message }) {}
})
.listen((err) => {})
// Or this way
registry
.createConsumer(kafkaConsumer)
.messageType()
.valueSubject('MySubject', 1)
.keySubject('MySubjectId') // No need to pass the second argument if you intend to use the latest version
.handler(({ key, value, message }) => {})
// Calling this breaks out of the messageType definition. Calling .messageType() also does this
.listen((err) => {})
.listen()
returns the result of calling kafkaConsumer.on('message')
, so it
just returns the kafkaConsumer so you can listen for error messages and the
like.
registry.encode (subject, version, value)
Encodes a value with the schema that has the given subject and subject version.
Parameters
-
subject
(String) - The subject of the schema to encode with -
version
(Number|String) - The version of the subject to encode with. Must be a number or the string'latest'
Returns
-
Promise< Buffer >
- The encoded message
Example
registry.encode('MySchema', 'latest', { field1: 'foobar' }).then(response => {
console.log(response) // <Buffer 00 00 00 00 01 ...>
})
registry.decode (possibleSchemas, message)
Decodes a value with the one of the given subjects defined in the handlerMap.
NOTE!!!!
The reason that you pass in an object (possibleSchemas) as the first argument is
because it is possible to send multiple "Subjects" of messages down a single
pipe, but the avro schema registry encoding only encodes the schema ID into the
message, which by itself isn't enough information to map it to a handler for a
specific subject. The possibleSchemas
is kind of a "seed" to help the decoder
figure out which subject it is mapped to so you can handle them differently.
For example:
You're sending PersonUpdated and PersonCreated encoded messages down a People kafka topic, and you want different behavior depending on whether you get a PersonCreated event or a PersonUpdated event. The messages themselves only have a schema ID encoded in them, which you can't use to lookup which subject the message was intended to be written with (limitations of the schema registry). We can get around that because there is a way to lookup to see if a schema (which we can get by the schema ID) exists in a given subject. The problem with this is that the schema registry allows a schema (with a schema ID) to belong to more than one subject/version, so it would be possible to map the message to the wrong subject.
For the above reason, this library is really only compatible with schema registries in which unique schemas are published to schema versions. With care, this can be done by following these tenets:
- Put the "name" and "namespace" properties on the top-level of your schemas that together are unique to the "Subject".
- Only use "record" types. Simple types in the top level of schemas (such as "string") don't use the "name" and "namespace" properties to calculate the "uniqueness" of a schema, for whatever reason. You can get around this by adding an unuses property, like "x-id" or something with a unique value, then that will be used to calculate the "uniqueness" of a schema.
Parameters
-
handlerMap
(Object.<String, Number|String>) -
message
(Buffer) - The message to decode.
Returns
-
Promise< Any >
- The decoded value
Example
// message being something we got from kafka or wherever
registry.decode({ MySchema: 'latest' }, message).then(response => {
console.log(response)
/**
* { subject: 'MySchema',
* value: MySchema { field1: 'test' } }
*/
})
registry.primeSubjectByVersion (subject, version)
Parameters
-
subject
(String) - The subject to prime -
version
(Number|String) - The version to prime
Returns
Promise<null>
Example
registry.primeSubjectByVersion('MySubject', 1).catch(err => {
console.log('Priming failed', err)
})
registry.primeSubjectByAllVersions (subject)
Parameters
-
subject
(String) - The subject to prime
Returns
Promise<null>
Example
registry.primeSubjectByAllVersions('MySubject').catch(err => {
console.log('Priming failed', err)
})
registry.getSchemaById (id)
Get a schema by its specific id (unique to all schemas within a registry)
Parameters
-
id
(Number) - The id of the schema to fetch
Returns
-
Promise< { schema: String } >
- An object with a "schema" property which is the stringified JSON object of the schema.
Example
registry.getSchemaById(1).then(response => {
console.log(response) // { schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}' }
})
registry.listSubjects ()
Lists all available subjects in the registry
Returns
-
Promise< [ String ] >
- An array of subject names
Example
registry.listSubjects().then(response => {
console.log(response) // [ 'MySubject' ]
})
registry.listSubjectVersions (subject)
Lists the version numbers of the versions under a given subject
Parameters
-
subject
(String) - The subject whose versions we want to check
Returns
-
Promise< [ Number ] >
- An array of version numbers
Example
registry.listSubjectVersions('MySubject').then(response => {
console.log(response) // [ 1 ]
})
registry.deleteSubject (subject)
Deletes all of the versions from a subject
Parameters
-
subject
(String) - The subject to delete a version from
Returns
-
Promise< [ Number ] >
- An array of version numbers that were deleted
Example
registry.deleteSubject('MySubject').then(response => {
console.log(response) // [ 1 ]
})
registry.getSubjectByVersion (subject, version)
Get a schema by its subject and version number
Parameters
-
subject
(String) - The subject to fetch -
version
(Number|String) - The specific version number to fetch, or'latest'
if you want the latest version. There is another method to fetch the latest which just calls this function with'latest'
as the version parameter
Returns
-
Promise< { subject: String, version: Number, id: Number, schema: String } >
- An object with the properties describing the schema.
Example
registry.getSubjectByVersion('MySubject', 1).then(response => {
console.log(response)
/**
* {
* subject: 'MySubject',
* version: 1,
* id: 1,
* schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
* }
*/
})
registry.createSubjectVersion (subject, schema)
Registers a new version of the given subject
Parameters
-
subject
(String) - The subject to add the schema version to -
schema
(Object) - The avro compliant schema to push up
Returns
-
Promise< { id: Number } >
- An object with the unique schema id as theid
property.
Example
const schema = {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
registry.createSubjectVersion('MySubject', schema).then(response => {
console.log(response) // { id: 1 }
})
registry.checkSchemaBySubject (subject, schema)
Checks to see if a schema exists in a subject
Parameters
-
subject
(String) - The subject to check -
schema
(Object) - The avro compliant schema to check
Returns
Promise< { subject: String, version: Number, id: Number, schema: String } >
Example
const schema = {
name: 'MySubject',
type: 'record',
fields: [
{
name: 'field1',
type: 'string'
}
]
}
registry.checkSchemaBySubject('MySubject', schema).then(response => {
console.log(response)
/**
* {
* subject: 'MySubject',
* version: 1,
* id: 1,
* schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
* }
*/
})
registry.deleteSubjectVersion (subject, version)
Deletes a version out of the given subject
Parameters
-
subject
(String) - The subject to delete a version from -
version
(Number|String) - The specific version number to delete
Returns
-
Promise< Number >
- The number is the version number that was deleted
Example
registry.deleteSubjectVersion('MySubject', 1).then(response => {
console.log(response) // 1
})
registry.subjectVersionCompatibilityWithSchema (subject, version, schema)
Checks to see if a given schema is compatible with the schema at the given subject/version.
Parameters
-
subject
(String) - The subject to delete a version from -
version
(Number|String) - The specific version number to delete -
schema
(Object) - The schema to check
Returns
Promise< { is_compatible: Boolean } >
Example
registry
.subjectVersionCompatibilityWithSchema('MySchema', 1, {
name: 'MySchema',
type: 'record',
fields: [
{
name: 'field2',
type: 'string'
}
]
})
.then(response => {
console.log(response) // { is_compatible: false }
})
registry.setGlobalCompatibility (compatibility)
Sets the default global compatibility level for all subjects (except for the ones that have their own set)
Parameters
-
compatibility
(String) - Has to be'NONE'
,'BACKWARD'
,'FORWARD'
, or'FULL'
Returns
Promise< { compatibility: String } >
Example
registry.setGlobalCompatibility('FULL').then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.getGlobalCompatibility ()
Sets the default global compatibility level for all subjects (except for the ones that have their own set)
Returns
Promise< { compatibility: String } >
Example
registry.getGlobalCompatibility().then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.setSubjectCompatibility (subject, compatibility)
Sets the compatibility level for a specific subject
Parameters
-
compatibility
(String) - Has to be'NONE'
,'BACKWARD'
,'FORWARD'
, or'FULL'
Returns
Promise< { compatibility: String } >
Example
registry.setSubjectCompatibility('MySubject', 'FULL').then(response => {
console.log(response) // { compatibility: 'FULL' }
})
registry.getSubjectCompatibility (subject)
Sets the compatibility level for a specific subject
Returns
Promise< { compatibility: String } >
Example
registry.getSubjectCompatibility('MySubject').then(response => {
console.log(response) // { compatibility: 'FULL' }
})