Mesh is a command-based WebSocket framework for real-time applications. It uses Redis to coordinate connections, rooms, presence, and shared state across application instances, with built-in support for structured commands, latency tracking, and automatic reconnection.
- Quickstart
- Who is this for?
- Distributed messaging architecture
- Redis channel subscriptions
- Rooms
- Presence
- Record subscriptions
- Command middleware
- Latency tracking and connection liveness
- Comparison
npm install @prsm/mesh
Here's the fastest way to get a server and client connected.
import { MeshServer } from "@prsm/mesh/server";
const server = new MeshServer({
port: 8080,
redisOptions: { host: "localhost", port: 6379 },
});
server.registerCommand("echo", async (ctx) => {
return `echo: ${ctx.payload}`;
});
import { MeshClient } from "@prsm/mesh/client";
const client = new MeshClient("ws://localhost:8080");
await client.connect();
const response = await client.command("echo", "Hello!");
console.log(response); // "echo: Hello!"
Mesh supports multiple real-time patterns—choose where to go next based on your use case:
-
Pub/Sub messaging (e.g. chat, notifications): → Redis channel subscriptions
-
Real-time presence tracking (e.g. who's online, who's in what room): → Presence
-
Granular, versioned data sync (e.g. user profiles, dashboards): → Record subscriptions
-
Identify users, store connection info, or manage rooms:
→ Metadata and Room metadata -
Control access or validate inputs across commands:
→ Command middleware -
Use Mesh with an existing Express server:
→ @prsm/mesh-express
Want to see how messages flow across servers?
→ Distributed messaging architecture
The 95% of real-world apps that need real-time sync—but don't want to build their own protocol, state engine, or distributed infrastructure. If you're building something like the examples below, Mesh is probably a great fit:
Collaborative apps
- Live cursors
- Shared whiteboards or form editors
- Session co-browsing
Dashboards and control panels
- IoT device dashboards
- Live analytics or system monitors
- Stock tickers
Real-time social features
- Chat, presence indicators
- Typing indicators
- Notification feeds
Turn-based or async games
- Multiplayer puzzles
- Card games
- Structured multiplayer game state (e.g. inventories, turns, positions, shared resources)
The diagram below shows how Mesh handles communication in a distributed setup. It uses Redis to track which connections belong to which rooms, determine their host servers, and route messages either locally or across servers via pub/sub.
sequenceDiagram
autonumber
participant ClientA
participant ServerA
participant Redis
participant ServerB
participant ClientB
ClientA->>ServerA: connect()
ServerA->>Redis: register connection ID + instance ID
ServerA->>Redis: add connection to room "room1"
ClientB->>ServerB: connect()
ServerB->>Redis: register connection ID + instance ID
ServerB->>Redis: add connection to room "room1"
ClientA->>ServerA: command("broadcastRoom", { roomName: "room1", payload })
ServerA->>Redis: getRoomConnectionIds("room1")
ServerA->>Redis: getInstanceIdsForConnections([...])
alt Local delivery
ServerA->>ClientA: send(payload)
end
alt Remote delivery
ServerA->>Redis: publish(pubsub channel for ServerB, payload)
Redis-->>ServerB: message received
ServerB->>ClientB: send(payload)
end
Mesh lets clients subscribe to Redis pub/sub channels and receive messages directly over their WebSocket connection. When subscribing, clients can optionally request recent message history.
Expose the channels you want to allow subscriptions to:
server.exposeChannel("notifications:global");
server.exposeChannel(/^chat:.+$/);
// return false to disallow subscription, or true to allow
server.exposeChannel(/^private:chat:.+$/, async (conn, channel) => {
// per-client guarding
const valid = await isPremiumUser(conn);
return valid;
});
To publish messages to a channel (which subscribed clients will receive), use the publishToChannel
method. You can optionally store a history of recent messages in Redis.
// publish to 'notifications:global' without history
await server.publishToChannel(
"notifications:global",
JSON.stringify({ alert: "Red alert!" })
);
// publish a chat message and keep the last 50 messages in history
await server.publishToChannel(
"chat:room1",
JSON.stringify({ type: "user-message", user: "1", text: "Hi" }),
50 // store in Redis history
);
The history
parameter tells Mesh to store the message in a Redis list (history:<channel>
) and trim the list to the specified size, ensuring only the most recent messages are kept. Clients subscribing with the historyLimit
option will receive these historical messages upon connection.
const { success, history } = await client.subscribeChannel(
"chat:room1",
(message) => {
console.log("Live message:", message);
},
{ historyLimit: 3 }
);
if (success) {
console.log("Recent messages:", history); // ["msg3", "msg2", "msg1"]
}
Unsubscribe when no longer needed:
await client.unsubscribeChannel("chat:room1");
This feature is great for:
- Real-time chat and collaboration
- Live system dashboards
- Pub/sub messaging across distributed server instances
- Notification feeds with instant context
Mesh supports rooms as a first-class concept for organizing connections into logical groups. Clients can join and leave rooms using simple built-in commands.
Room membership is automatically tracked across server instances using Redis, and cleaned up when connections disconnect.
Use the joinRoom
method on the client:
const { success, present } = await client.joinRoom("room:lobby");
By default, this joins the room without subscribing to presence.
To automatically receive presence updates when users join or leave:
const { success, present } = await client.joinRoom("room:lobby", (update) => {
if (update.type === "join") {
console.log("User joined:", update.connectionId);
} else {
console.log("User left:", update.connectionId);
}
});
This behaves identically to calling subscribePresence(...)
yourself, but is more convenient. The present
array includes the list of currently connected users at the time of join.
[!NOTE] If you don’t pass a callback, the
present
array still reflects who is currently in the room — even though no real-time presence tracking is active.
Call leaveRoom(...)
to exit the room:
await client.leaveRoom("room:lobby");
If presence was subscribed via joinRoom(...)
, it will automatically be unsubscribed when leaving.
This ensures room and presence subscriptions remain in sync without extra work.
Mesh also exposes room utilities on the server for custom behavior:
server.addToRoom(roomName, connection)
server.removeFromRoom(roomName, connection)
-
server.isInRoom(roomName, connection)
→boolean
-
server.getRoomMembers(roomName)
→string[]
server.removeFromAllRooms(connection)
server.clearRoom(roomName)
These can be used to implement custom commands or manage room state manually if needed.
You can guard room joins using command middleware, just like any other command. The built-in room join command is "mesh/join-room", and the payload contains a roomName
string:
server.addMiddleware(async (ctx) => {
if (ctx.command === "mesh/join-room") {
const { roomName } = ctx.payload;
const meta = await server.connectionManager.getMetadata(ctx.connection);
if (!meta?.canJoinRooms) {
throw new Error("Access denied");
}
if (roomName.startsWith("admin:") && !meta.isAdmin) {
throw new Error("Admins only");
}
}
});
This gives you full flexibility to enforce auth, roles, or custom logic per room.
Mesh provides a built-in presence system that tracks which connections are present in specific rooms and notifies clients when connections join or leave. This is ideal for building features like "who's online" indicators, or any real-time awareness of other users.
[!NOTE] Presence only tracks connection IDs, not metadata. You must join them explicitly if you want to show e.g. usernames, avatars, emails, etc.
Enable presence tracking for specific rooms using exact names or regex patterns:
// track presence for all rooms matching a pattern
server.trackPresence(/^room:.*$/);
// track presence for a specific room
server.trackPresence("lobby");
// guard who can see presence.
// clients who attempt to subscribe to the presence of this room
// will be rejected if the guard returns false
server.trackPresence("admin-room", async (conn, roomName) => {
const meta = await server.connectionManager.getMetadata(conn);
return meta?.isAdmin === true;
});
// custom TTL
server.trackPresence("game-room", { ttl: 60_000 }); // ms
// guard and TTL
server.trackPresence("vip-room", {
ttl: 30_000,
guard: async (conn, roomName) => {
const meta = await server.connectionManager.getMetadata(conn);
return meta?.isVIP === true;
}
});
When presence tracking is enabled for a room, Mesh automatically:
- Detects and records the connection IDs of clients joining the room.
- Emits real‐time “join” and “leave” events to subscribed clients.
- Automatically refreshes each connection’s presence using a configurable TTL as long as the client remains active.
- Cleans up expired or disconnected entries to maintain an up-to-date presence list.
// get all connections currently present in a room
const connectionIds = await server.presenceManager.getPresentConnections("lobby");
Subscribe to presence updates for a room:
const { success, present } = await client.subscribePresence(
"lobby",
(update) => {
if (update.type === "join") {
console.log("User joined:", update.connectionId);
} else if (update.type === "leave") {
console.log("User left:", update.connectionId);
}
}
);
// initial list of present connections
console.log("Currently present:", present); // ["conn1", "conn2", ...]
Unsubscribe when no longer needed:
await client.unsubscribePresence("lobby");
Presence is most useful when combined with connection metadata. For example:
// server: set user metadata when they connect
server.onConnection(async (connection) => {
// maybe from an auth token or session
await server.connectionManager.setMetadata(connection, {
userId: "user123",
username: "Alice",
avatar: "https://example.com/avatar.png"
});
});
// client: subscribe to presence and resolve metadata
const { success, present } = await client.subscribePresence(
"lobby",
async (update) => {
// fetch metadata for the connection that joined/left.
//
// since clients cannot access `getAllMetadataForRoom()` directly (it's just a server API),
// you can expose it via a custom command like `get-user-metadata`:
const metadata = await client.command("get-user-metadata", {
connectionId: update.connectionId
});
if (update.type === "join") {
console.log(`${metadata.username} joined the lobby`);
} else if (update.type === "leave") {
console.log(`${metadata.username} left the lobby`);
}
}
);
// initial presence - fetch metadata for all present connections
const allMetadata = await Promise.all(
present.map(connectionId =>
client.command("get-user-metadata", { connectionId })
)
);
console.log("Users in lobby:", allMetadata);
You can associate data like user IDs, tokens, or custom attributes with a connection using the setMetadata
method. This metadata is stored in Redis, making it ideal for identifying users, managing permissions, or persisting session-related data across a distributed setup. Since it lives in Redis, it’s accessible from any server instance.
Metadata can be any JSON-serializable object, including nested structures. Updates fully replace the previous value—partial updates (patches) are not supported. While there is no hard size limit, large metadata objects may impact Redis performance.
server.registerCommand("authenticate", async (ctx) => {
// maybe do some actual authentication here
const { userId } = ctx.payload;
const token = encode({
sub: userId,
iat: Date.now(),
exp: Date.now() + 3600,
});
await server.connectionManager.setMetadata(ctx.connection, {
userId,
token,
});
return { success: true };
});
Get metadata for a specific connection:
const metadata = await server.connectionManager.getMetadata(connectionId);
// { userId, token }
Get all metadata for all connections:
const metadata = await server.connectionManager.getAllMetadata();
// [{ [connectionId]: { userId, token } }, ...]
Get all metadata for all connections in a specific room:
const metadata = await server.connectionManager.getAllMetadataForRoom(roomName);
// [{ [connectionId]: { userId, token } }, ...]
Similar to connection metadata, Mesh allows you to associate arbitrary data with rooms. This is useful for storing room-specific information like topics, settings, or ownership details. Room metadata is also stored in Redis and accessible across all server instances.
Room metadata can also be any JSON-serializable object. Updates replace the entire object—partial merges only occur when using updateMetadata, which shallowly merges the new fields. Like connection metadata, there’s no enforced size limit, but keeping room metadata small is recommended for performance.
// set metadata for a room
await server.roomManager.setMetadata("lobby", {
topic: "General Discussion",
maxUsers: 50,
});
// get metadata for a specific room
const lobbyMeta = await server.roomManager.getMetadata("lobby");
// { topic: "General Discussion", maxUsers: 50 }
// update metadata (merges with existing data)
await server.roomManager.updateMetadata("lobby", {
topic: "Updated Topic", // overwrites existing topic
private: false, // adds new field
});
const updatedLobbyMeta = await server.roomManager.getMetadata("lobby");
// { topic: "Updated Topic", maxUsers: 50, private: false }
// get metadata for all rooms
const allRoomMeta = await server.roomManager.getAllMetadata();
// { lobby: { topic: "Updated Topic", maxUsers: 50, private: false }, otherRoom: { ... } }
Room metadata is removed when clearRoom(roomName)
is called.
Mesh supports subscribing to individual records stored in Redis. When a record changes, clients receive either the full value or a JSON patch describing the update—depending on the selected mode (full
or patch
).
Subscriptions work across multiple server instances, support versioning for consistency, and scale efficiently. Each client can choose its preferred mode independently.
Expose records using exact IDs or regex patterns. You can add optional per-client guard logic:
server.exposeRecord("user:123");
server.exposeRecord(/^product:\d+$/);
server.exposeRecord(/^private:.+$/, async (conn, recordId) => {
const meta = await server.connectionManager.getMetadata(conn);
return !!meta?.userId;
});
To allow clients to subscribe and also modify records, use exposeWritableRecord
. This also accepts optional guard functions to control write access:
// allow any connected client to write to cursor records
server.exposeWritableRecord(/^cursor:user:\d+$/);
// allow only authenticated users to write to their profile
server.exposeWritableRecord(/^profile:user:\d+$/, async (conn, recordId) => {
const meta = await server.connectionManager.getMetadata(conn);
const recordUserId = recordId.split(":").pop();
return meta?.userId === recordUserId; // check if user ID matches record ID
});
Important: Records exposed via exposeWritableRecord
are automatically readable (subscribable) by clients. You don't need to call exposeRecord
for the same pattern. However, if you want different guards for reading and writing, you can expose the same pattern with both methods, each with its own guard.
Use publishRecordUpdate()
from the server to update the stored value, increment the version, generate a patch, and broadcast to all subscribed clients (both read-only and writable).
await server.publishRecordUpdate("user:123", {
name: "Alice",
email: "alice@example.com",
});
// later...
await server.publishRecordUpdate("user:123", {
name: "Alice",
email: "alice@updated.com",
status: "active",
});
If a record has been exposed as writable via exposeWritableRecord
on the server (and any guard function passes), clients can publish updates using the publishRecordUpdate
method:
const userId = "123";
const success = await client.publishRecordUpdate(`cursor:user:${userId}`, {
x: 100,
y: 250,
timestamp: Date.now(),
});
if (success) {
console.log("Cursor position updated successfully.");
} else {
console.error("Failed to update cursor position (maybe permission denied?).");
}
This client-initiated update will be processed by the server, which then uses the same publishRecordUpdate
mechanism internally to persist the change and broadcast it (as a full value or patch) to all other subscribed clients. The method returns true
if the server accepted the write, and false
if it was rejected (e.g., due to a failed guard).
Note: When a client publishes an update to a record using publishRecordUpdate
, it will also receive that update through its subscription callback just like any other client. This ensures consistency and simplifies update handling. If your app logic already applies local updates optimistically, you may choose to ignore redundant self-updates in your callback.
[!NOTE]
When a client publishes an update to a record using publishRecordUpdate, it will also receive the resulting update through its own subscription callback—just like any other client. This ensures consistency and avoids special-casing updates based on origin.This behavior is always true, unless the client unsubscribed from the record before the update was broadcast. If your client logic applies optimistic updates locally, you may choose to ignore the echoed update if it's redundant.
In full
mode, the client receives the entire updated record every time. This is simpler to use and ideal for small records or when patching isn't needed.
let userProfile = {};
const { success, record, version } = await client.subscribeRecord(
"user:123",
(update) => {
// update contains { recordId, full, version }
userProfile = update.full;
console.log(
`Received full update for ${update.recordId} v${update.version}:`,
update.full
);
}
);
if (success) {
userProfile = record;
}
In patch
mode, the client receives only changes as JSON patches and must apply them locally. This is especially useful for large records that only change in small ways over time.
[!NOTE]
Patch mode only works for records that are objects or arrays. Primitive values like strings, numbers, or booleans aren't representable as JSON patches, so you should use full mode for those cases.
import { applyPatch } from "@prsm/mesh/client";
let productData = {};
const { success, record, version } = await client.subscribeRecord(
"product:456",
(update) => {
// update contains { recordId, patch?, full?, version }
if (update.patch) {
// normally you’ll receive `patch`, but if the client falls out of sync,
// the server will send a full update instead to resynchronize.
applyPatch(productData, update.patch);
console.log(`Applied patch for ${update.recordId} v${update.version}`);
} else {
productData = update.full;
console.log(
`Received full (resync) for ${update.recordId} v${update.version}`
);
}
},
{ mode: "patch" }
);
if (success) {
productData = record;
}
await client.unsubscribeRecord("user:123");
await client.unsubscribeRecord("product:456");
Every update includes a version
. Clients track the current version and, in patch
mode, expect version === localVersion + 1
. If a gap is detected (missed patch), the client will automatically be sent a full record update to resync. It does this by transparently unsubscribing and resubscribing to the record.
This system allows fine-grained, real-time synchronization of distributed state with minimal overhead.
Some of this was mentioned above, but to reiterate:
Mesh handles all diffing on the server.
When you update a record - whether from the server or a client - the server:
- Applies the new value
- Computes a JSON patch between the old and new values.
- Broadcasts the patch (or full value if needed) to all subscribed clients.
- Increments and attaches a version number.
Clients simply apply the patch. If any patch is missed (e.g. due to disconnect or version mismatch), the server automatically sends a full record to resynchronize the client.
This means:
- Clients don't generate diffs.
- Clients don't compare old vs. new data.
- You never need to guess what changed.
All you do is apply the update you receive.
This makes real-time sync easier, safer, and dramatically more scalable—no complex client logic or custom diffing is required.
Mesh allows you to define middleware functions that run before your command handlers. This is useful for tasks like authentication, validation, logging, or modifying the context before the main command logic executes.
Middleware can be applied globally to all commands or specifically to individual commands.
Global Middleware:
Applied to every command received by the server.
server.addMiddleware(async (ctx) => {
console.log(`Received command: ${ctx.command} from ${ctx.connection.id}`);
});
server.addMiddleware(async (ctx) => {
const metadata = await server.connectionManager.getMetadata(ctx.connection);
if (!metadata?.userId) {
throw new Error("Unauthorized");
}
// add to context for downstream handler access
ctx.user = { id: metadata.userId };
});
Command-Specific Middleware:
Applied only to the specified command, running after any global middleware.
const validateProfileUpdate = async (ctx) => {
const { name, email } = ctx.payload;
if (typeof name !== "string" || name.length === 0) {
throw new Error("Invalid name");
}
if (typeof email !== "string" || !email.includes("@")) {
throw new Error("Invalid email");
}
};
server.registerCommand(
"update-profile",
async (ctx) => {
// ..
return { success: true };
},
[validateProfileUpdate]
);
Middleware functions receive the same MeshContext
object as command handlers and can be asynchronous. If a middleware function throws an error, the execution chain stops, and the error is sent back to the client.
Mesh includes a built-in ping/pong system to track latency and detect dead connections. This is implemented at the application level (not via raw WebSocket protocol ping()
frames) to allow for:
- Accurate round-trip latency measurement from server to client.
- Graceful connection closure and multi-instance Redis cleanup.
- Fine-tuned control using configurable missed ping/pong thresholds.
By default, the server sends periodic ping
commands. Clients respond with pong
. If the server misses more than maxMissedPongs
consecutive responses, the connection is considered stale and is closed cleanly. This ensures all connection metadata and room membership are safely cleaned up across distributed instances.
You can configure the server like so:
const server = new MeshServer({
port: 8080,
redisOptions: { host: "localhost", port: 6379 },
pingInterval: 30000, // ms between ping commands
latencyInterval: 5000, // ms between latency checks
maxMissedPongs: 1, // how many consecutive pongs can be missed before closing (default: 1)
});
With the default maxMissedPongs
value of 1, a client has roughly 2 * pingInterval time to respond before being disconnected.
On the client, Mesh automatically handles incoming ping
commands by responding with a pong
, and resets its internal missed pings counter. If the server stops sending ping
messages (e.g. due to a dropped connection), the client will increment its missed pings counter. Once the counter exceeds maxMissedPings
, the client will attempt to reconnect if shouldReconnect
is enabled.
Client-side configuration looks like this:
const client = new MeshClient("ws://localhost:8080", {
pingTimeout: 30000, // ms between ping timeout checks - should match the server's pingInterval
maxMissedPings: 1, // how many consecutive pings can be missed before reconnecting (default: 1)
shouldReconnect: true, // auto-reconnect when connection is lost
reconnectInterval: 2000, // ms between reconnection attempts
maxReconnectAttempts: 5, // give up after 5 tries (or Infinity by default)
});
Together, this system provides end-to-end connection liveness guarantees without relying on low-level WebSocket protocol ping
/pong
frames, which do not offer cross-instance cleanup or latency tracking. The configurable thresholds on both sides allow for fine-tuning the balance between responsiveness and tolerance for network latency.
Mesh | Socket.IO | Colyseus | Deepstream.io | ws (+ custom) | uWebSockets.js | |
---|---|---|---|---|---|---|
Command API (RPC) | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ |
Raw Events Support | ✅ | ✅ | ✅ | ✅ | ✅ | |
Room Support | ✅ | ✅ | ✅ | ✅ | ||
Presence Tracking | ✅ Built-in | ✅ | ✅ | ❌ | ❌ | |
Redis Scaling | ✅ Native | ✅ With adapter | ✅ | ✅ | ✅ If added | ❌ |
Connection Metadata | ✅ Redis-backed | ✅ Records | ❌ | ❌ | ||
Latency Tracking | ✅ Built-in | ❌ | ❌ | ❌ | ❌ | |
Automatic Reconnect | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Redis Pub/Sub | ✅ Client subscription | ❌ | ✅ | ❌ | ❌ | |
History on Subscribe | ✅ Optional Redis-backed | ❌ | ❌ | ❌ | ||
Record Subscriptions | ✅ Versioned + Patchable | ❌ | ❌ | ❌ | ❌ | |
Typescript-First | ✅ Yes, mostly | ✅ | ❌ | |||
Scalability | ✅ Horizontal via Redis | ✅ Horizontal via Redis Adapter | ✅ | ✅ | ✅ But no sync | |
Target Use Case | Real-time/generic async | Real-time apps, chat | Multiplayer games | Pub/Sub, IoT | Anything (low-level) | Anything (perf-focused) |
Ease of Use | ✅ Minimal API |