Files
Nas-Notification/server/utils/queueProcessor.js

89 lines
2.6 KiB
JavaScript

import { processEmailQueue } from './emailService';
let isProcessing = false;
let processorInterval = null;
/**
* Start the background queue processor
* Processes the queue every 30 seconds
*/
export function startQueueProcessor() {
if (processorInterval) {
console.log('Queue processor already running');
return;
}
console.log('🚀 Starting background queue processor...');
// Process immediately on start
processQueue();
// Then process every 30 seconds
processorInterval = setInterval(processQueue, 30000);
console.log('✅ Queue processor started (runs every 30 seconds)');
}
/**
* Stop the background queue processor
*/
export function stopQueueProcessor() {
if (processorInterval) {
clearInterval(processorInterval);
processorInterval = null;
console.log('⏸️ Queue processor stopped');
}
}
/**
* Process the queue (with lock to prevent concurrent processing)
*/
async function processQueue() {
const timestamp = new Date().toISOString();
// Skip if already processing
if (isProcessing) {
console.log(`[${timestamp}] ⏭️ Queue processor skipped (already running)`);
return;
}
try {
isProcessing = true;
const startTime = Date.now();
console.log(`[${timestamp}] ⚙️ Processing notification queue...`);
const results = await processEmailQueue();
const duration = ((Date.now() - startTime) / 1000).toFixed(2);
if (results && results.length > 0) {
const sent = results.filter(r => r.status === 'sent').length;
const failed = results.filter(r => r.status === 'failed').length;
const recovered = results.filter(r => r.recovered).length;
let message = `[${new Date().toISOString()}] ✅ Queue processed in ${duration}s: ${sent} sent, ${failed} failed, ${results.length} total`;
if (recovered > 0) {
message += ` (${recovered} recovered from timeout)`;
}
console.log(message);
// Log detailed results for debugging
console.log(`[${new Date().toISOString()}] 📊 Queue stats: Success rate: ${((sent/results.length)*100).toFixed(1)}%`);
} else {
console.log(`[${new Date().toISOString()}] 📭 No queued items to process`);
}
} catch (error) {
console.error(`[${new Date().toISOString()}] ❌ Error processing queue:`, error);
console.error(`[${new Date().toISOString()}] 📋 Error stack:`, error.stack);
} finally {
isProcessing = false;
console.log(`[${new Date().toISOString()}] 🔓 Queue processor lock released`);
}
}
/**
* Manually trigger queue processing
*/
export async function triggerQueueProcessing() {
return processQueue();
}