A more configurable EventSource implementation that runs in browsers, NodeJS, and workers. The default browser EventSource API is too limited. Event Source Plus fixes that.
- Use any HTTP method
- Send custom headers
- Optionally change headers when retrying
- Pass data as body or query params
- Runs in browsers, NodeJS, and workers
- First class typescript support
- Automatic retry with exponential backoff and hooks for customizing behavior
- Multiple retry strategies for use in realtime applications vs LLM applications
- ESM and CommonJS support
# npm
npm i event-source-plus
# pnpm
pnpm i event-source-plus
import { EventSourcePlus } from "event-source-plus";
const eventSource = new EventSourcePlus("https://example.com");
eventSource.listen({
onMessage(message) {
console.log(message);
},
});
When working with LLMs it may be preferable to set the retry strategy to on-error
. For details why see here
const eventSource = new EventSourcePlus("https://example.com", {
retryStrategy: "on-error",
});
The listen()
method returns a controller that you can use to interact with the active event stream.
Use the abort()
method to cancel requests
const controller = eventSource.listen({
onMessage(message) {
console.log(message);
},
});
controller.abort();
The abort method can be used inside of listen hooks as well
let msgCount = 0;
const controller = eventSource.listen({
onMessage(message) {
msgCount++;
if (msgCount >= 20) {
controller.abort();
break;
}
console.log(message);
},
onResponse({ response }) {
if (response.status === 409) {
controller.abort();
return;
}
},
});
You can register a listener via onAbort()
. Use this if you want to trigger some logic whenever an event stream is closed
const controller = eventSource.listen({
onMessage(message) {
console.log(message);
},
});
controller.onAbort(() => {
console.log("The event stream was closed");
});
controller.abort();
The reconnect()
method is used to reset the connection. If a connection is currently open it will forcibly abort the old request and open a new one.
const controller = eventSource.listen({
onMessage(message) {
console.log(message);
},
});
// close the connection
controller.abort();
// reopen the connection
controller.reconnect();
Let's say we expect the server to send a heartbeat message every 20 seconds. With the reconnect()
method we can force the connection to reopen if we haven't received a message within 20 seconds.
let timeout = setTimeout(() => controller.reconnect(), 20000);
const controller = eventSource.listen({
onMessage(message) {
// cancel the existing timer because we have received a message
clearTimeout(timeout);
// set a new timer where we forcibly reopen the connection
// if we haven't received a message in 20000ms
timeout = setTimeout(() => controller.reconnect(), 20000);
console.log(message);
},
});
Just be aware that in this example you will have to call both abort()
and clearTimeout()
to permanently close the connection otherwise the timeout will reset the connection even though you aborted it.
clearTimeout(timeout);
controller.abort();
The EventSourcePlus
constructor allows you to pass additional fetch options such as method
, body
, and headers
.
const eventSource = new EventSourcePlus("https://example.com", {
method: "post",
body: JSON.stringify({ message: "hello world" }),
headers: {
"Content-Type": "application/json",
},
});
You can also pass in a custom fetch
implementation, which is useful for environments that don't natively support fetch
.
const eventSource = new EventSourcePlus("https://example.com", {
fetch: myCustomFetch,
});
Headers can be set by passing an object or a function. The function may return a header object or a promise that resolves to a header object.
// object syntax //
const eventSource = new EventSourcePlus("https://example.com", {
// this value will remain the same for every request
headers: {
Authorization: "some-token",
},
});
// function syntax //
function getHeaders() {
return {
Authorization: "some-token",
};
}
const eventSource = new EventSourcePlus("https://example.com", {
// this function will rerun every time a request is sent
headers: getHeaders,
});
// async function syntax //
async function getHeaders() {
const token = await getSomeToken();
return {
Authorization: token,
};
}
const eventSource = new EventSourcePlus("https://example.com", {
// this function will rerun every time a request is sent
headers: getHeaders,
});
The function syntax is especially useful when dealing with authentication because it allows you to always get a fresh auth token. This usually a pain point when working other SSE client libraries.
By default this library will automatically retry the request indefinitely with exponential backoff maxing out at 30 seconds. Both those these values can be adjusted when initializing the EventSourcePlus
class.
const eventSource = new EventSourcePlus("https://example.com", {
// automatically retry up to 100 times (default is 'undefined')
maxRetryCount: 100,
// set exponential backoff to max out at 10000 ms (default is "30000")
maxRetryInterval: 10000,
});
Additionally, you can abort the request inside listen hooks using the EventSourceController
// abort the request if we receive 10 server errors
let errCount = 0;
const controller = eventSource.listen({
onMessage(data) {},
onResponseError({ request, response, options }) {
errCount++;
if (errCount >= 10) {
controller.abort();
}
},
});
This library has two retry strategies. always
and on-error
.
always
is the default. It will always attempt to keep the connection open after it has been closed. This is useful for most realtime applications which need to keep a persistent connection with the backend.
on-error
will only retry if an error occurred. If an event stream was successfully received by the client it will not reconnect after the connection is closed. This is useful for short lived streams that have a fixed length (For example LLM response streams) since it means you no longer need to listen for a "DONE" event to close the connection.
To change the retry strategy simply update the retryStrategy
option:
const eventSource = new EventSourcePlus("https://example.com", {
retryStrategy: "on-error",
});
The listen()
method has the following hooks:
onMessage
onRequest
onRequestError
onResponse
onResponseError
The only required hook is onMessage
.
onMessage
is called whenever receiving a new Server Sent Event from the server.
eventSource.listen({
onMessage(message) {
console.log(message);
},
});
onRequest
is called as soon as a request is constructed. This allows you to modify the request or do simple logging.
eventSource.listen({
onRequest({ request, options }) {
console.log(request, options);
// add current time query search params
options.query = options.query || {};
options.query.t = new Date();
},
});
onRequestError
will be called when the request fails.
eventSource.listen({
async onRequestError({ request, options, error }) {
console.log(`[request error]`, request, error);
},
});
Some example errors might be Connection refused
or Failed to parse URL
onResponse
will be called after receiving a response from the server.
eventSource.listen({
async onResponse({ request, response, options }) {
console.log(`Received status code: ${response.status}`);
},
});
onResponseError
will fire if one of the following conditions have been met
-
response.ok
is nottrue
(i.e. server returned an error status code) - The
Content-Type
header sent by the server doesn't includetext/event-stream
eventSource.listen({
async onResponseError({ request, response, options }) {
console.log(
`[response error]`,
request,
response.status,
response.body,
);
},
});
Under the hood, this library uses makes use of the following APIs:
- Fetch
-
ReadableStream
- specifically the getReader() method
This means that you can use EventSourcePlus
in any environment that supports those features including:
- All modern browsers
- NodeJS v16.5.0 or greater
- node-fetch-native is used to backport
Fetch
to Node v16.5. In other cases the native NodeFetch
implementation is used.
- node-fetch-native is used to backport
- Any server runtime that also has support for these APIs
Pull requests and issue reports are welcome.
Before submitting a PR please ensure that you have run the following commands and there are no errors.
pnpm run lint
pnpm run format
(For VSCode users "formatOnSave" is set to true. So the formatting step may be unnecessary)
Integration tests and unit tests get run by CI.