7.0 KiB
7.0 KiB
Notification Queue System
Overview
The notification system now uses a background queue processor that runs automatically every 30 seconds to send queued emails.
How It Works
1. Queue Creation
When you create a notification:
POST /api/public/create-notification
↓
Creates notification → status: "sending"
Creates recipients → status: "pending"
Creates queue items → status: "queued"
↓
Returns immediately (non-blocking) ✅
2. Background Processing
A background worker runs automatically:
Server starts
↓
Queue Processor Plugin initializes
↓
Processes queue every 30 seconds
↓
Sends emails → Updates status → Repeats
3. Status Updates
notification.status: "sending" → "sent" (when all recipients done)
recipients.status: "pending" → "sent" or "failed"
queue.status: "queued" → "processing" → "completed" or "failed"
Components
1. Queue Processor (server/utils/queueProcessor.js)
- Runs every 30 seconds
- Processes up to 50 jobs per batch
- Prevents concurrent processing (lock mechanism)
- Logs activity for monitoring
2. Nitro Plugin (server/plugins/queueProcessor.js)
- Auto-starts when server starts
- Runs in background
- Gracefully shuts down with server
3. Email Service (server/utils/emailService.js)
processEmailQueue()- Processes queued emailssendEmailNotification()- Sends individual emails- Updates all statuses (notification, recipient, queue)
Queue Flow
┌─────────────────────────────────────────┐
│ Create Notification API │
│ - Creates notification (sending) │
│ - Creates recipients (pending) │
│ - Creates queue items (queued) │
│ - Returns immediately ✅ │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Background Queue Processor │
│ (Runs every 30 seconds) │
│ │
│ 1. Find queued items (scheduled_for <= now)
│ 2. For each item: │
│ - Mark as "processing" │
│ - Send email via SMTP │
│ - Update recipient status │
│ - Update queue status │
│ - Update notification status │
│ 3. Log results │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Email Sent! │
│ - notification.status → "sent" │
│ - recipient.status → "sent" │
│ - queue.status → "completed" │
│ - notification.actual_sent = count │
└─────────────────────────────────────────┘
Current Implementation
✅ What We Have:
- Non-blocking API - Returns immediately after queuing
- Background processor - Runs every 30 seconds
- Auto-start on server boot - Via Nitro plugin
- Concurrent processing prevention - Lock mechanism
- Status updates - All tables updated correctly
- Error handling - Failed emails marked as failed
- Logging - Console logs for monitoring
❌ What We Don't Have (Yet):
- BullMQ - Not using a production queue system
- Redis - No distributed queue
- Worker pools - Single-threaded processing
- Job retries - No automatic retry on failure
- Priority queues - All jobs equal priority
- Rate limiting - No per-provider rate limits
Manual Queue Processing
You can manually trigger queue processing via API:
POST /api/notifications/queue/process
This is useful for:
- Testing
- Debugging
- Force processing without waiting
Monitoring
Console Logs:
🚀 Starting background queue processor...
✅ Queue processor started (runs every 30 seconds)
⚙️ Processing notification queue...
✅ Queue processed: 5 sent, 0 failed, 5 total
Check Queue Status:
-- See queued items
SELECT * FROM notification_queue WHERE status = 'queued';
-- See processing status
SELECT
n.id,
n.title,
n.status,
COUNT(nr.id) as total_recipients,
SUM(CASE WHEN nr.status = 'sent' THEN 1 ELSE 0 END) as sent,
SUM(CASE WHEN nr.status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN nr.status = 'pending' THEN 1 ELSE 0 END) as pending
FROM notifications n
LEFT JOIN notification_recipients nr ON n.id = nr.notification_id
GROUP BY n.id, n.title, n.status;
Configuration
Queue Processing Interval:
Change in server/utils/queueProcessor.js:
// Current: 30 seconds
processorInterval = setInterval(processQueue, 30000);
// Faster: 10 seconds
processorInterval = setInterval(processQueue, 10000);
// Slower: 1 minute
processorInterval = setInterval(processQueue, 60000);
Batch Size:
Change in server/utils/emailService.js:
// Current: 50 jobs per batch
take: 50
// Larger batch: 100 jobs
take: 100
// Smaller batch: 10 jobs
take: 10
Future: Upgrading to BullMQ
If you want a production-grade queue system:
Install BullMQ:
npm install bullmq ioredis
Benefits:
- ✅ Persistent queue (survives server restart)
- ✅ Automatic retries with exponential backoff
- ✅ Priority queues
- ✅ Concurrent workers
- ✅ Rate limiting per provider
- ✅ Job scheduling
- ✅ Web UI for monitoring
- ✅ Distributed processing
When to Upgrade:
- High volume (>1000 emails/day)
- Multiple servers
- Need guaranteed delivery
- Need advanced retry logic
- Need monitoring dashboard
Troubleshooting
Queue not processing?
- Check server logs for errors
- Check if plugin is loaded:
grep "queue processor" logs - Manually trigger:
POST /api/notifications/queue/process
Emails not sending?
- Check SMTP credentials in delivery settings
- Check notification_queue table for errors
- Check notification_recipients for failed status
- Review email service logs
Status stuck on "sending"?
- Check if recipients are still "pending"
- Manually trigger queue processing
- Check for errors in notification_queue table
Summary
Current System:
- Simple, lightweight queue using database + setInterval
- Runs every 30 seconds automatically
- Non-blocking API responses
- Suitable for low-medium volume
When to upgrade:
- Need higher reliability
- Higher volume
- Multiple servers
- Advanced features
The current system works well for most use cases and can handle thousands of notifications per day!