EventHub
클래스는 Kafka와 Confluent Schema Registry를 통해 데이터를 안정적으로 송신하고 관리할 수 있도록 설계된 라이브러리입니다. Kafka 이벤트 전송 시 Schema Registry를 활용하여 데이터 인코딩 등을 지원합니다.
- Kafka Producer 연결 및 이벤트 송신
- Schema Registry를 통한 데이터 인코딩 지원
-
retry
설정을 통해 Kafka와 Schema Registry 연결의 재시도 횟수 및 시간을 유연하게 제어 - 다양한 데이터 타입에 대한 유효성 검사 및 변환 기능 제공
npm i @seedn-corp/sdn-event-hubs
Kafka와 Schema Registry 정보를 기반으로 EventHub
객체를 생성합니다. kafkaData
와 schemaRegistryData
는 retry
옵션을 포함하여 필요한 설정을 제공합니다.
const EventHub = require('@seedn-corp/sdn-event-hubs');
const kafkaData = {
broker: 'your.kafka.broker:9092',
user: 'your-username',
password: 'your-password',
retry: { retries: 5, initialRetryTime: 300, maxRetryTime: 60000 }, // 선택사항
};
const schemaRegistryData = {
host: 'https://your-schema-registry-url',
username: 'registry-username',
password: 'registry-password',
retry: { retries: 5, initialRetryTimeInSecs: 0.1, maxRetryTimeInSecs: 5, factor: 0.2 }, // 선택사항
};
const eventHub = new EventHub('your-client-id', kafkaData, schemaRegistryData);
Kafka Producer에 연결하여 메시지를 송신할 준비를 합니다.
await eventHub.connectProducer();
Producer 연결을 해제하여 리소스를 정리합니다.
await eventHub.disconnectProducer();
지정된 토픽으로 데이터를 전송합니다. valueObj
는 데이터 스키마, key
는 메시지의 키(필요 시), dataArr
는 전송할 데이터 배열을 나타냅니다.
const topic = 'your-topic';
const valueObj = {
/* your schema object */
};
const key = 'unique-key';
const dataArr = [{ field1: 'value1' }, { field2: 'value2' }];
await eventHub.sendMessage(topic, valueObj, key, dataArr);
에러 정보가 포함된 메시지를 에러 토픽으로 전송합니다. service
는 서비스 이름을, error
객체는 에러 상세 정보를 포함합니다.
const error = {
function: 'functionName',
type: 'errorType',
message: 'errorMessage',
};
await eventHub.sendError(valueObj, 'serviceName', error);
Schema Registry에서 스키마 ID를 가져오거나 없을 경우 새로 등록합니다.
const schemaId = await eventHub.getSchemaRegistryId(valueObj, 'subject-name');
스키마 필드에 맞게 데이터를 정리하고 타입을 검증합니다. 유효한 데이터 타입 변환을 적용하여 최종 데이터 객체를 반환합니다.
const transformedData = await eventHub.setSchemaValue(valueObj, data);
데이터의 타입을 확인하고, 정의된 기본값으로 변환하여 유효성을 검증합니다.
const intValue = eventHub.checkType(value, 'int', 0);
다음은 EventHub
를 사용해 Kafka에 메시지를 송신하는 예제입니다.
(async () => {
const kafkaData = {
/* 설정 정보 */
};
const schemaRegistryData = {
/* 설정 정보 */
};
const eventHub = new EventHub('your-client-id', kafkaData, schemaRegistryData);
await eventHub.connectProducer();
try {
const schema = {
/* 메시지 스키마 */
};
const dataArr = [{ key: 'value' }];
await eventHub.sendMessage('your-topic', schema, { key1: 'key1' }, dataArr);
} catch (error) {
console.error('Error sending message:', error);
} finally {
await eventHub.disconnectProducer();
}
})();
이 라이브러리는 MIT 라이선스를 따릅니다.