A flexible queue management system for MemberJunction applications that enables background task processing, job scheduling, and asynchronous execution with database persistence.
The @memberjunction/queue
package provides a robust framework for implementing persistent queues in MemberJunction applications. It offers:
- Database-backed task persistence
- Automatic queue creation and management
- Concurrent task processing with configurable limits
- Heartbeat monitoring for process health
- Type-safe task definitions
- Extensible queue implementations
npm install @memberjunction/queue
This package requires the following MemberJunction packages:
-
@memberjunction/core
- Core functionality and entity management -
@memberjunction/global
- Global utilities and class registration -
@memberjunction/core-entities
- Entity type definitions -
@memberjunction/ai
- AI functionality (for AI-related queues) -
@memberjunction/aiengine
- AI Engine integration
Additional dependencies:
-
uuid
- For generating unique identifiers
The TaskBase
class represents an individual task in a queue:
export class TaskBase {
constructor(
taskRecord: QueueTaskEntity,
data: any,
options: TaskOptions
)
// Properties
ID: string // Unique task identifier
Status: TaskStatus // Current task status
Data: any // Task payload data
Options: TaskOptions // Task configuration
TaskRecord: QueueTaskEntity // Database entity
}
Available task statuses:
export const TaskStatus = {
Pending: 'Pending',
InProgress: 'InProgress',
Complete: 'Complete',
Failed: 'Failed',
Cancelled: 'Cancelled',
} as const;
The abstract QueueBase
class serves as the foundation for all queue implementations:
export abstract class QueueBase {
constructor(
QueueRecord: QueueEntity,
QueueTypeID: string,
ContextUser: UserInfo
)
// Public methods
AddTask(task: TaskBase): boolean
FindTask(ID: string): TaskBase
// Protected abstract method to implement
protected abstract ProcessTask(
task: TaskBase,
contextUser: UserInfo
): Promise<TaskResult>
}
The QueueManager
is a singleton that manages all active queues:
export class QueueManager {
// Singleton access
static get Instance(): QueueManager
// Static methods
static async Config(contextUser: UserInfo): Promise<void>
static async AddTask(
QueueType: string,
data: any,
options: any,
contextUser: UserInfo
): Promise<TaskBase | undefined>
// Instance methods
async AddTask(
QueueTypeID: string,
data: any,
options: any,
contextUser: UserInfo
): Promise<TaskBase | undefined>
}
Structure returned by task processing:
export class TaskResult {
success: boolean // Whether task completed successfully
userMessage: string // User-friendly message
output: any // Task output data
exception: any // Exception details if failed
}
Create a custom queue by extending QueueBase
:
import { QueueBase, TaskBase, TaskResult } from '@memberjunction/queue';
import { RegisterClass } from '@memberjunction/global';
import { UserInfo } from '@memberjunction/core';
// Register your queue with a specific queue type name
@RegisterClass(QueueBase, 'Email Notification')
export class EmailNotificationQueue extends QueueBase {
protected async ProcessTask(
task: TaskBase,
contextUser: UserInfo
): Promise<TaskResult> {
try {
// Extract task data
const { recipient, subject, body } = task.Data;
// Implement your email sending logic here
console.log(`Sending email to ${recipient}`);
console.log(`Subject: ${subject}`);
// Simulate email sending
await this.sendEmail(recipient, subject, body);
// Return success result
return {
success: true,
userMessage: 'Email sent successfully',
output: { sentAt: new Date() },
exception: null
};
} catch (error) {
// Return failure result
return {
success: false,
userMessage: `Failed to send email: ${error.message}`,
output: null,
exception: error
};
}
}
private async sendEmail(to: string, subject: string, body: string) {
// Your email service integration here
}
}
import { QueueManager } from '@memberjunction/queue';
import { UserInfo } from '@memberjunction/core';
// Initialize queue manager (typically done once at app startup)
await QueueManager.Config(contextUser);
// Add a task using queue type name
const task = await QueueManager.AddTask(
'Email Notification', // Queue type name
{ // Task data
recipient: 'user@example.com',
subject: 'Welcome to MemberJunction',
body: 'Thank you for joining!'
},
{ // Task options
priority: 1
},
contextUser
);
if (task) {
console.log(`Task created with ID: ${task.ID}`);
}
The package includes built-in queues for AI operations:
import { AIActionQueue, EntityAIActionQueue } from '@memberjunction/queue';
// These queues are automatically registered and available for use
// Add an AI action task
const aiTask = await QueueManager.AddTask(
'AI Action',
{
actionName: 'GenerateText',
prompt: 'Write a product description',
parameters: { maxTokens: 100 }
},
{},
contextUser
);
// Add an entity-specific AI action
const entityAITask = await QueueManager.AddTask(
'Entity AI Action',
{
entityName: 'Products',
entityID: 123,
actionName: 'GenerateDescription'
},
{},
contextUser
);
The queue system requires the following database tables:
Stores definitions of different queue types (e.g., "Email Notification", "AI Action")
Tracks active queue instances with process information:
- Queue type reference
- Process details (PID, platform, hostname)
- Heartbeat timestamp
- Network information
Stores individual tasks:
- Queue reference
- Task status
- Task data (JSON)
- Task options (JSON)
- Output and error information
The QueueManager automatically captures process information for monitoring:
- Process ID (PID)
- Platform and version
- Working directory
- Network interfaces
- Operating system details
- User information
- Heartbeat timestamps
This information helps track queue health and enables failover scenarios.
Queue behavior can be configured through the implementation:
export class CustomQueue extends QueueBase {
private _maxTasks = 5; // Maximum concurrent tasks
private _checkInterval = 500; // Check interval in milliseconds
// Override these values in your constructor
constructor(QueueRecord: QueueEntity, QueueTypeID: string, ContextUser: UserInfo) {
super(QueueRecord, QueueTypeID, ContextUser);
// Customize queue behavior
this._maxTasks = 10;
this._checkInterval = 1000;
}
}
- Task Data Structure: Keep task data serializable as JSON
- Error Handling: Always return proper TaskResult with error details
-
Queue Registration: Use
@RegisterClass
decorator for automatic registration - Idempotency: Design tasks to be safely retryable
- Resource Cleanup: Clean up resources in finally blocks
- Monitoring: Check heartbeat timestamps for queue health
The queue system integrates seamlessly with:
- Entity System: Use entities for task data and processing
- User Context: All operations respect user permissions
- Global Registry: Automatic queue discovery via class registration
- AI Engine: Built-in support for AI task processing
# Build the package
npm run build
# Development mode with auto-reload
npm run start
# TypeScript compilation only
npm run build
ISC