Library to implement task dispatching with long-polling approach on express server.
npm i express-long-polling
See complete demo in server.ts, process-task.ts and submit-task.ts.
import express from 'express'
import cors from 'cors'
import { print } from 'listening-on'
import { LongPollingTaskQueue } from 'express-long-polling'
let app = express()
app.use(cors())
app.use(express.json())
let taskQueue = new LongPollingTaskQueue({ pollingInterval: 30 * 1000 })
// client creates pending task
app.post('/task', (req, res) => {
let { id } = taskQueue.addTask({
input: req.body,
})
res.json({ id })
})
// worker polls pending task
app.get('/task', (req, res) => {
taskQueue.getOrWaitTask('random', req, ({ id, input }) =>
res.json({ id, input }),
)
})
// worker submits task result
app.post('/task/result', (req, res) => {
let { id, output } = req.body
let found = taskQueue.dispatchResult(id, output)
res.status(201)
res.end()
})
// client gets task result
app.get('/task/result', (req, res) => {
let { id } = req.query
taskQueue.getOrWaitResult(
id,
req,
output => res.json({ status: 'completed', output }),
/* optional, default will redirect with 307 for the same url */
timeout => res.json({ status: 'pending' }),
)
})
// client delete completed task
app.delete('/task', (req, res) => {
let { id } = req.query
taskQueue.deleteTask(id)
res.end()
})
let PORT = 8100
app.listen(PORT, () => {
print(PORT)
})
import type { Request } from 'express'
/** @description redirect with 307 to let client retry */
function defaultOnTimeout(req: Request): void {
req.res?.redirect(307, req.url)
}
export class LongPollingTaskQueue<Input, Output> {
constructor(options?: {
/** @default 30 seconds */
pollingInterval?: number
})
/**
* @description create task from client
*/
addTask(options: {
/** @default randomUUID */
id?: string | (() => string)
input: Input
}): {
id: string
}
/**
* @description get task from worker
*/
getOrWaitTask(
getTask: 'first' | 'random',
req: Request,
onTask: (task: { id: string; input: Input }) => void,
onTimeout?: typeof defaultOnTimeout,
): void
/**
* @description dispatch result from worker
* @returns true if the task is found and deleted
* @returns false if the task is not found (maybe already deleted)
*/
dispatchResult(id: string, output: Output): boolean
/**
* @description get result from client (dispatched from worker)
*/
getOrWaitResult(
id: string,
req: Request,
onOutput: (output: Output) => void,
onTimeout?: typeof defaultOnTimeout,
): void
/**
* @description delete completed task from client (to release memory)
* @returns true if the task is found and deleted
* @returns false if the task is not found (maybe already deleted)
*/
deleteTask(id: string): boolean
}
This project is licensed with BSD-2-Clause
This is free, libre, and open-source software. It comes down to four essential freedoms [ref]:
- The freedom to run the program as you wish, for any purpose
- The freedom to study how the program works, and change it so it does your computing as you wish
- The freedom to redistribute copies so you can help others
- The freedom to distribute copies of your modified versions to others