Many services use [Kinesis][kinesis] as a message processing system. Testing service code against a Kinesis service layer requires either mocking the Kinesis interface (using something like [aws-sdk-mock][aws-sdk-mock]) or pointing the test code at a provisioned Kinesis instance. LocalStack has published [LocalStack Docker][localstack-docker] so that testing can be done without having to use real AWS resources, making testing even easier to do using tools like [docker-compose][docker-compose] and [dockerode][dockerode].
kinesis-tools
supports both methods of integrating with Kinesis. For simple
unit tests, the kinesis
helper can be used to provision and link a Kinesis
docker container with typical beforeAll
, beforeEach
, afterEach
and afterAll
hooks.
The helper will define all relevant environment variables and will return a preconfigured
[Kinesis client][kinesis-client] test context. The setup and tear
down will also create and destroy streams, as defined in a provided stream name array, between
test cases. The helper will also automatically handle port binding differences
between regular and nested Docker environments.
export const kinesisTestHooks = <StreamName extends string = string>(
options: KinesisTestHooks | KinesisTestHooksWithDocker,
): KinesisHooksResults<StreamName> => {}
interface RootHooksConfig {
streams: ModifiedCreateStreamInput[];
useUniqueIdentifier?: boolean;
useLocalStack?: boolean;
kinesisClientConfig?: CreateKinesisClient;
localStackConfig?: LocalstackConnectionOptions;
}
// Useful for running with a pre-configured instance of Kinesis
// already running, like inside docker-compose.
export interface KinesisTestHooks extends RootHooksConfig {
useLocalStack?: false;
localStackConfig?: undefined;
}
// Use when running unit tests and needing to configure a docker instance
// to run Kinesis.
export interface KinesisTestHooksWithDocker extends RootHooksConfig {
useLocalKinesis: true;
kinesisClientConfig?: undefined;
localStackConfig?: LocalstackConnectionOptions;
}
export interface KinesisHooksState {
streams: ModifiedCreateStreamInput[];
kinesisClient: KinesisClient;
uniqueIdentifier?: string;
}
export interface KinesisTestHooksContext<StreamNames extends string = string> extends KinesisHooksState {
config: CreateKinesisClient,
streamNamesMap: Record<StreamNames, string>;
}
export interface KinesisHooksResults<StreamName extends string = string> {
beforeAll: () => Promise<void>;
beforeEach: () => Promise<KinesisTestHooksContext<StreamName>>;
afterEach: (context?: KinesisTestHooksContext<StreamName>) => Promise<void>;
afterAll: () => Promise<void>;
}
The beforeEach
test context includes the following attributes:
Attribute | Description/Type |
---|---|
kinesisClient | @aws-sdk/client-kinesis { KinesisClient } |
uniqueIdentifier | The unique identifier appended to each stream name. E.g. abcdef12345
|
streams | A copy of the input stream configuration array. |
streamNamesMap | Map of base stream name to uuid stream name. E.g. users to users-abcdef12345
|
config | The aws kinesis config object, useful for passing to AWS.Kinesis, or other kinesis wrappers. |
If useUniqueIdentifier
is true, dynamically generated stream names will be used, in
the form of <streamNameProvided>-<ulid>
. The unique stream name can be
fetched from the streamNamesMap
map. Otherwise, the stream name will be the default
provided in the streams array. This allows tests to be run in parallel.
kinesis.tools
contains some simple kinesis tools to iterate a stream.
The KinesisIterator
class provides a simple method to get the records from a stream by creating the stream iterator,
and reusing it to get records.
Attribute | Description/Type |
---|---|
kinesisClient | AWS.Kinesis |
streamName | the name of the kinesis stream |
The init function creates the stream iterator so it can get records from the stream.
Fetches the next batch of records from the stream. It auto updates it's position in the stream, and returns the KinesisIterator
Attribute | Description/Type |
---|---|
limit | an optional limit to how many records to return. Max is 10,000 |
The array of records arrays returned from the last KinesisIterator.next()
call.
The complete list of responses from the last KinesisIterator.next()
call.
The static KinesisIterator.newIterator
function creates a new KinesisIterator
, and calls the init function.
Will iterate through a kinesis stream, and return the events that can be sent to a Lambda handler.
export const getLambdaTriggerEvents = async (
options: GetLambdaTriggerEvents,
): Promise<GetLambdaTriggerEventsResults> => {}
export interface GetLambdaTriggerEvents {
kinesisIterator: KinesisIterator;
limit?: number;
}
export interface GetLambdaTriggerEventsResults {
processedRecordCount: number;
lambdaEvents: KinesisStreamEvent[];
}