89 lines
2.6 KiB
JavaScript
89 lines
2.6 KiB
JavaScript
import { processEmailQueue } from './emailService';
|
|
|
|
let isProcessing = false;
|
|
let processorInterval = null;
|
|
|
|
/**
|
|
* Start the background queue processor
|
|
* Processes the queue every 30 seconds
|
|
*/
|
|
export function startQueueProcessor() {
|
|
if (processorInterval) {
|
|
console.log('Queue processor already running');
|
|
return;
|
|
}
|
|
|
|
console.log('🚀 Starting background queue processor...');
|
|
|
|
// Process immediately on start
|
|
processQueue();
|
|
|
|
// Then process every 30 seconds
|
|
processorInterval = setInterval(processQueue, 30000);
|
|
|
|
console.log('✅ Queue processor started (runs every 30 seconds)');
|
|
}
|
|
|
|
/**
|
|
* Stop the background queue processor
|
|
*/
|
|
export function stopQueueProcessor() {
|
|
if (processorInterval) {
|
|
clearInterval(processorInterval);
|
|
processorInterval = null;
|
|
console.log('⏸️ Queue processor stopped');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process the queue (with lock to prevent concurrent processing)
|
|
*/
|
|
async function processQueue() {
|
|
const timestamp = new Date().toISOString();
|
|
|
|
// Skip if already processing
|
|
if (isProcessing) {
|
|
console.log(`[${timestamp}] ⏭️ Queue processor skipped (already running)`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
isProcessing = true;
|
|
const startTime = Date.now();
|
|
console.log(`[${timestamp}] ⚙️ Processing notification queue...`);
|
|
|
|
const results = await processEmailQueue();
|
|
const duration = ((Date.now() - startTime) / 1000).toFixed(2);
|
|
|
|
if (results && results.length > 0) {
|
|
const sent = results.filter(r => r.status === 'sent').length;
|
|
const failed = results.filter(r => r.status === 'failed').length;
|
|
const recovered = results.filter(r => r.recovered).length;
|
|
|
|
let message = `[${new Date().toISOString()}] ✅ Queue processed in ${duration}s: ${sent} sent, ${failed} failed, ${results.length} total`;
|
|
if (recovered > 0) {
|
|
message += ` (${recovered} recovered from timeout)`;
|
|
}
|
|
console.log(message);
|
|
|
|
// Log detailed results for debugging
|
|
console.log(`[${new Date().toISOString()}] 📊 Queue stats: Success rate: ${((sent/results.length)*100).toFixed(1)}%`);
|
|
} else {
|
|
console.log(`[${new Date().toISOString()}] 📭 No queued items to process`);
|
|
}
|
|
} catch (error) {
|
|
console.error(`[${new Date().toISOString()}] ❌ Error processing queue:`, error);
|
|
console.error(`[${new Date().toISOString()}] 📋 Error stack:`, error.stack);
|
|
} finally {
|
|
isProcessing = false;
|
|
console.log(`[${new Date().toISOString()}] 🔓 Queue processor lock released`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Manually trigger queue processing
|
|
*/
|
|
export async function triggerQueueProcessing() {
|
|
return processQueue();
|
|
}
|