This library was generated with Nx.
Run nx build centrifugo-plugin
to build the library.
Run nx test centrifugo-plugin
to execute the unit tests via Jest.
Your backend application should provide an endpoint to generate authentication tokens. In your service layer you should add a method for publishing messages to channels.
// [Controller.ts]
import { Body, Controller, Post } from '@nestjs/common';
import { Service } from './Service';
interface IPublish {
channel: string;
message: string;
}
@Controller('centrifuge')
export class DefaultController {
constructor(private readonly service: Service) {}
@Post('/token')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
create(@Body() user:any) {
return this.service.createToken(user);
}
// example
@Post('/publish')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
publish(@Body() data:IPublish) {
return this.service.publish(data.channel, data.message);
}
}
// [Service.ts]
import { Injectable, Logger } from '@nestjs/common';
import { CentrifugoAuthClient, CentrifugoClient, CentrifugoPlugin } from "@team_seki/centrifugo-plugin"
const prefix = '[centrifuge-service]'
interface IUser {
id: string
name: string
}
const TOKEN_EXPIRATION_IN_SECONDS = 60 * 60 * 3; // 3 hours
@Injectable()
export class Service {
private authClient: CentrifugoAuthClient
private client: CentrifugoClient
constructor() {
const centrifugoPlugin = new CentrifugoPlugin()
this.authClient = centrifugoPlugin.getAuthClient()
this.client = centrifugoPlugin.getClient()
}
/**
* Create centrifuge token
* @param user User data
* @returns {Promise<UI.IJwtToken>}
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async createToken(user: IUser): Promise<any> {
try {
return await this.authClient.createToken({
userId: user.id,
extraInfo: {
userName: user.name
}
}, TOKEN_EXPIRATION_IN_SECONDS
)
} catch (error) {
const message = 'Unexpected error creating centrifuge token'
Logger.debug(`${prefix} ${message}`, { user })
Logger.error(`${prefix} ${message}`, error)
throw error
}
}
// publish example
async publish(channel: string, message:string): Promise<void> {
this.client.publish(channel, message)
}
}
export default Service
private request = axios.create({
baseURL: 'http://localhost:8080',
timeout: 10000,
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private intervalId: any;
override state: IState = {
users: []
};
override async componentDidMount() {
const wsEndpoint = "ws://localhost:8000/connection/websocket";
const centrifuge = new Centrifuge(wsEndpoint, {
getToken: this.getToken // function for getting/refreshing the token
});
centrifuge.on('connecting', function (ctx) {
console.log(`connecting: ${ctx.code}, ${ctx.reason}`);
}).on('connected', function (ctx) {
console.log(`connected over ${ctx.transport}`);
}).on('disconnected', function (ctx) {
console.log(`disconnected: ${ctx.code}, ${ctx.reason}`);
}).connect();
// subscribe to channel
const sub = centrifuge.newSubscription("users");
sub.on('publication', (ctx) => {
this.setState(({ users:newUser }) => {
newUser.push(ctx.data);
console.log(`receiving message: ${JSON.stringify(ctx.data)}`)
return { users: newUser }
})
}).on('subscribing', function (ctx) {
console.log(`subscribing: ${ctx.code}, ${ctx.reason}`);
}).on('subscribed', function (ctx) {
console.log('subscribed', ctx);
}).on('unsubscribed', function (ctx) {
console.log(`unsubscribed: ${ctx.code}, ${ctx.reason}`);
}).subscribe();
this.intervalId = setInterval(this.publish, 5000);
}
// get an authentication token
getToken = async ():Promise<string> => {
const response = await this.request.post('/centrifuge/token',{ id: 'admin', name: '123'});
return response.data.access_token;
}
// publish example
publish = async () => {
if(this.state.users.length < 10) {
await this.request.post('/centrifuge/publish', {
channel: 'users',
message: `user_${new Date().getTime()}`
});
} else {
clearInterval(this.intervalId);
}
}
import React from 'react'
import axios from 'axios'
interface IProps {}
interface IState {}
export default class Main extends React.Component<IProps, IState> {
centrifugoSseUrl = 'http://localhost:8000/connection/uni_sse'
backendUrl = 'http://localhost:8080'
override async componentDidMount() {
const url = new URL(this.centrifugoSseUrl)
const token = await this.getToken()
url.searchParams.append("cf_connect", JSON.stringify({
'token': token,
'subs': {
'notifications': {}, // subscription to 'notifications' channel
'tasks': {} // subscription to 'tasks' channel
}
}))
const eventSource = new EventSource(url)
eventSource.onopen = () => {
console.log('onopen')
}
eventSource.onmessage = (event) => {
console.log(`receiving data: ${event.data}`)
}
eventSource.onerror = (event) => {
console.log(`Receiving error: ${JSON.stringify(event)}`)
// TODO: reconnect when the token expires
}
}
// get an authentication token
getToken = async (): Promise<string> => {
const request = axios.create({
baseURL: this.backendUrl,
timeout: 5000
})
const user = { id: '123', name: 'admin'}
const response = await request.post('/centrifuge/token', user)
return response.data.access_token
}
override render() {
return (
...
)
}
}