472 lines
16 KiB
JavaScript
472 lines
16 KiB
JavaScript
import nodemailer from 'nodemailer';
|
|
import prisma from './prisma';
|
|
import DOMPurify from 'isomorphic-dompurify';
|
|
|
|
// Create email transporter with configuration from database or environment
|
|
async function createTransporter() {
|
|
try {
|
|
// Try to get email configuration from database
|
|
const emailConfig = await prisma.notification_delivery_config.findFirst({
|
|
where: {
|
|
channel_type: 'email',
|
|
is_enabled: true
|
|
}
|
|
});
|
|
|
|
let transportConfig;
|
|
|
|
if (emailConfig && emailConfig.provider_config) {
|
|
// Use database configuration
|
|
const config = emailConfig.provider_config;
|
|
transportConfig = {
|
|
host: config.host,
|
|
port: config.port || 587,
|
|
secure: config.secure || false,
|
|
auth: {
|
|
user: config.user || config.username,
|
|
pass: config.pass || config.password
|
|
}
|
|
};
|
|
console.log('Using database email config:', { host: config.host, port: config.port, user: config.user || config.username });
|
|
} else {
|
|
// Fallback to environment variables
|
|
transportConfig = {
|
|
host: process.env.SMTP_HOST || 'smtp.gmail.com',
|
|
port: parseInt(process.env.SMTP_PORT) || 587,
|
|
secure: process.env.SMTP_SECURE === 'true',
|
|
auth: {
|
|
user: process.env.SMTP_USER,
|
|
pass: process.env.SMTP_PASS
|
|
}
|
|
};
|
|
console.log('Using environment email config:', {
|
|
host: transportConfig.host,
|
|
port: transportConfig.port,
|
|
user: transportConfig.auth.user,
|
|
hasPassword: !!transportConfig.auth.pass
|
|
});
|
|
}
|
|
|
|
if (!transportConfig.auth.user || !transportConfig.auth.pass) {
|
|
throw new Error('Email credentials not configured. Please set SMTP_USER and SMTP_PASS or configure via delivery settings.');
|
|
}
|
|
|
|
return nodemailer.createTransport(transportConfig);
|
|
} catch (error) {
|
|
console.error('Error creating email transporter:', error);
|
|
throw new Error('Failed to create email transporter: ' + error.message);
|
|
}
|
|
}
|
|
|
|
// Send email notification
|
|
export async function sendEmailNotification({ to, subject, content, callToActionText, callToActionUrl, notificationId }) {
|
|
try {
|
|
const transporter = await createTransporter();
|
|
|
|
// Create HTML email template
|
|
const htmlContent = createEmailTemplate({
|
|
content,
|
|
callToActionText,
|
|
callToActionUrl
|
|
});
|
|
|
|
// Get sender email from database config or environment
|
|
let senderEmail = process.env.SMTP_FROM || process.env.SMTP_USER;
|
|
|
|
try {
|
|
const emailConfig = await prisma.notification_delivery_config.findFirst({
|
|
where: {
|
|
channel_type: 'email',
|
|
is_enabled: true
|
|
}
|
|
});
|
|
|
|
if (emailConfig?.provider_config?.senderEmail) {
|
|
senderEmail = emailConfig.provider_config.senderEmail;
|
|
}
|
|
} catch (error) {
|
|
console.warn('Could not fetch sender email from database config, using environment default');
|
|
}
|
|
|
|
const mailOptions = {
|
|
from: senderEmail,
|
|
to,
|
|
subject,
|
|
html: htmlContent,
|
|
text: stripHtmlTags(content) // Plain text version
|
|
};
|
|
|
|
const result = await transporter.sendMail(mailOptions);
|
|
|
|
// Log the successful send
|
|
if (notificationId) {
|
|
await logEmailDelivery({
|
|
notificationId,
|
|
recipientEmail: to,
|
|
status: 'sent',
|
|
messageId: result.messageId,
|
|
response: result.response
|
|
});
|
|
}
|
|
|
|
return {
|
|
success: true,
|
|
messageId: result.messageId,
|
|
response: result.response
|
|
};
|
|
} catch (error) {
|
|
console.error('Error sending email:', error);
|
|
|
|
// Log the failed send
|
|
if (notificationId) {
|
|
await logEmailDelivery({
|
|
notificationId,
|
|
recipientEmail: to,
|
|
status: 'failed',
|
|
error: error.message
|
|
});
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Create HTML email template
|
|
function createEmailTemplate({ content, callToActionText, callToActionUrl }) {
|
|
// Sanitize HTML content to prevent XSS attacks
|
|
const sanitizedContent = DOMPurify.sanitize(content, {
|
|
ALLOWED_TAGS: ['p', 'br', 'strong', 'em', 'u', 'h1', 'h2', 'h3', 'h4', 'ul', 'ol', 'li', 'a', 'div', 'span'],
|
|
ALLOWED_ATTR: ['href', 'target', 'style', 'class'],
|
|
ALLOW_DATA_ATTR: false
|
|
});
|
|
|
|
// Sanitize call to action text and validate URL
|
|
const sanitizedCtaText = callToActionText ? DOMPurify.sanitize(callToActionText, { ALLOWED_TAGS: [] }) : '';
|
|
|
|
// Validate call to action URL (only allow http/https)
|
|
let sanitizedCtaUrl = '';
|
|
if (callToActionUrl) {
|
|
try {
|
|
const url = new URL(callToActionUrl);
|
|
if (url.protocol === 'http:' || url.protocol === 'https:') {
|
|
sanitizedCtaUrl = url.href;
|
|
}
|
|
} catch (e) {
|
|
console.warn('Invalid call to action URL:', callToActionUrl);
|
|
}
|
|
}
|
|
|
|
const ctaButton = sanitizedCtaText && sanitizedCtaUrl ? `
|
|
<div style="text-align: center; margin: 30px 0;">
|
|
<a href="${sanitizedCtaUrl}"
|
|
style="background-color: #007bff; color: white; padding: 12px 24px;
|
|
text-decoration: none; border-radius: 4px; display: inline-block;">
|
|
${sanitizedCtaText}
|
|
</a>
|
|
</div>
|
|
` : '';
|
|
|
|
return `
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>Notification</title>
|
|
</head>
|
|
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px;">
|
|
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px;">
|
|
${sanitizedContent}
|
|
${ctaButton}
|
|
</div>
|
|
<div style="margin-top: 20px; padding: 10px; border-top: 1px solid #eee; font-size: 12px; color: #666; text-align: center;">
|
|
<p>This is an automated notification. Please do not reply to this email.</p>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
`;
|
|
}
|
|
|
|
// Strip HTML tags for plain text version
|
|
function stripHtmlTags(html) {
|
|
return html.replace(/<[^>]*>/g, '').replace(/\s+/g, ' ').trim();
|
|
}
|
|
|
|
// Log email delivery status
|
|
async function logEmailDelivery({ notificationId, recipientEmail, status, messageId, response, error }) {
|
|
try {
|
|
await prisma.notification_logs.create({
|
|
data: {
|
|
notification_id: notificationId,
|
|
action: status === 'sent' ? 'email_delivered' : 'email_failed',
|
|
channel_type: 'email',
|
|
status,
|
|
details: `Email ${status} to ${recipientEmail}. ${response || error || ''}`,
|
|
error_message: status === 'failed' ? (error || response) : null,
|
|
metadata: {
|
|
recipient_email: recipientEmail,
|
|
message_id: messageId,
|
|
response: response
|
|
}
|
|
}
|
|
});
|
|
} catch (logError) {
|
|
console.error('Error logging email delivery:', logError);
|
|
}
|
|
}
|
|
|
|
// Process notification queue for email sending
|
|
export async function processEmailQueue() {
|
|
try {
|
|
// Calculate timeout threshold (5 minutes ago)
|
|
const timeoutThreshold = new Date(Date.now() - 5 * 60 * 1000);
|
|
|
|
// Get pending email notifications from queue
|
|
// Include both 'queued' items AND 'processing' items that are stuck (older than 5 minutes)
|
|
const queueItems = await prisma.notification_queue.findMany({
|
|
where: {
|
|
OR: [
|
|
{
|
|
status: 'queued',
|
|
scheduled_for: {
|
|
lte: new Date()
|
|
}
|
|
},
|
|
{
|
|
status: 'processing',
|
|
last_attempt_at: {
|
|
lt: timeoutThreshold
|
|
}
|
|
}
|
|
]
|
|
},
|
|
include: {
|
|
notifications: true,
|
|
notification_recipients: true
|
|
},
|
|
orderBy: [
|
|
{ priority: 'asc' },
|
|
{ created_at: 'desc' }, // Process newer notifications first
|
|
{ scheduled_for: 'asc' }
|
|
],
|
|
take: 500 // Process in batches - increased for better throughput with large recipient lists
|
|
});
|
|
|
|
const results = [];
|
|
const totalItems = queueItems.length;
|
|
let processedCount = 0;
|
|
|
|
if (totalItems > 0) {
|
|
console.log(`[${new Date().toISOString()}] 📧 Starting to process ${totalItems} queue items...`);
|
|
}
|
|
|
|
for (const queueItem of queueItems) {
|
|
const { notifications: notification, notification_recipients: recipient } = queueItem;
|
|
processedCount++;
|
|
|
|
if (recipient.channel_type === 'email' && recipient.email) {
|
|
try {
|
|
// Check if max retry attempts exceeded (max 3 attempts)
|
|
const currentAttempts = queueItem.attempts || 0;
|
|
if (currentAttempts >= 3) {
|
|
console.log(`[${new Date().toISOString()}] ❌ Queue item #${queueItem.id} exceeded max retry attempts (${currentAttempts}), marking as failed`);
|
|
|
|
await prisma.notification_queue.update({
|
|
where: { id: queueItem.id },
|
|
data: {
|
|
status: 'failed',
|
|
error_message: `Maximum retry attempts (${currentAttempts}) exceeded`,
|
|
updated_at: new Date()
|
|
}
|
|
});
|
|
|
|
await prisma.notification_recipients.update({
|
|
where: { id: recipient.id },
|
|
data: {
|
|
status: 'failed',
|
|
error_message: `Maximum retry attempts (${currentAttempts}) exceeded`
|
|
}
|
|
});
|
|
|
|
results.push({
|
|
queueId: queueItem.id,
|
|
recipientId: recipient.id,
|
|
status: 'failed',
|
|
error: 'Maximum retry attempts exceeded'
|
|
});
|
|
|
|
continue; // Skip to next item
|
|
}
|
|
|
|
// Log if this is a recovery of a stuck item
|
|
if (queueItem.status === 'processing') {
|
|
console.log(`[${new Date().toISOString()}] 🔄 Recovering stuck queue item #${queueItem.id} (attempt ${currentAttempts + 1}/3)`);
|
|
}
|
|
|
|
// Mark as processing (atomic operation)
|
|
await prisma.notification_queue.update({
|
|
where: { id: queueItem.id },
|
|
data: {
|
|
status: 'processing',
|
|
last_attempt_at: new Date(),
|
|
attempts: currentAttempts + 1
|
|
}
|
|
});
|
|
|
|
// Send the email (external API call - do NOT wrap in transaction)
|
|
await sendEmailNotification({
|
|
to: recipient.email,
|
|
subject: notification.email_subject,
|
|
content: notification.email_content,
|
|
callToActionText: notification.call_to_action_text,
|
|
callToActionUrl: notification.call_to_action_url,
|
|
notificationId: notification.id
|
|
});
|
|
|
|
// Update all statuses in a single transaction for consistency
|
|
await prisma.$transaction(async (tx) => {
|
|
// Update recipient status
|
|
await tx.notification_recipients.update({
|
|
where: { id: recipient.id },
|
|
data: {
|
|
status: 'sent',
|
|
sent_at: new Date()
|
|
}
|
|
});
|
|
|
|
// Update queue status to completed
|
|
await tx.notification_queue.update({
|
|
where: { id: queueItem.id },
|
|
data: {
|
|
status: 'completed',
|
|
updated_at: new Date()
|
|
}
|
|
});
|
|
|
|
// Check if all recipients for this notification are done (sent or failed)
|
|
const pendingRecipients = await tx.notification_recipients.count({
|
|
where: {
|
|
notification_id: notification.id,
|
|
status: 'pending'
|
|
}
|
|
});
|
|
|
|
// If no more pending recipients, update notification status to 'sent'
|
|
if (pendingRecipients === 0) {
|
|
const sentCount = await tx.notification_recipients.count({
|
|
where: {
|
|
notification_id: notification.id,
|
|
status: 'sent'
|
|
}
|
|
});
|
|
|
|
await tx.notifications.update({
|
|
where: { id: notification.id },
|
|
data: {
|
|
status: 'sent',
|
|
sent_at: new Date(),
|
|
actual_sent: sentCount
|
|
}
|
|
});
|
|
}
|
|
}, {
|
|
maxWait: 5000, // Maximum time to wait for transaction to start (5s)
|
|
timeout: 10000, // Maximum time for transaction to complete (10s)
|
|
});
|
|
|
|
results.push({
|
|
queueId: queueItem.id,
|
|
recipientId: recipient.id,
|
|
status: 'sent',
|
|
recovered: queueItem.attempts > 1 // True if this was a retry
|
|
});
|
|
|
|
// Log progress every 100 items
|
|
if (processedCount % 100 === 0) {
|
|
console.log(`[${new Date().toISOString()}] ⏳ Progress: ${processedCount}/${totalItems} items processed (${Math.round(processedCount/totalItems*100)}%)`);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error(`[${new Date().toISOString()}] ❌ Error sending email to ${recipient.email}:`, error.message);
|
|
|
|
// Update all failure statuses in a single transaction for consistency
|
|
await prisma.$transaction(async (tx) => {
|
|
// Update recipient status to failed
|
|
await tx.notification_recipients.update({
|
|
where: { id: recipient.id },
|
|
data: {
|
|
status: 'failed',
|
|
error_message: error.message
|
|
}
|
|
});
|
|
|
|
// Update queue status to failed
|
|
await tx.notification_queue.update({
|
|
where: { id: queueItem.id },
|
|
data: {
|
|
status: 'failed',
|
|
error_message: error.message,
|
|
updated_at: new Date()
|
|
}
|
|
});
|
|
|
|
// Check if all recipients are done (sent or failed)
|
|
const pendingRecipients = await tx.notification_recipients.count({
|
|
where: {
|
|
notification_id: notification.id,
|
|
status: 'pending'
|
|
}
|
|
});
|
|
|
|
// If no more pending recipients, update notification status
|
|
if (pendingRecipients === 0) {
|
|
const successfulCount = await tx.notification_recipients.count({
|
|
where: {
|
|
notification_id: notification.id,
|
|
status: 'sent'
|
|
}
|
|
});
|
|
|
|
await tx.notifications.update({
|
|
where: { id: notification.id },
|
|
data: {
|
|
status: successfulCount > 0 ? 'sent' : 'failed',
|
|
sent_at: new Date(),
|
|
actual_sent: successfulCount
|
|
}
|
|
});
|
|
}
|
|
}, {
|
|
maxWait: 5000, // Maximum time to wait for transaction to start (5s)
|
|
timeout: 10000, // Maximum time for transaction to complete (10s)
|
|
});
|
|
|
|
results.push({
|
|
queueId: queueItem.id,
|
|
recipientId: recipient.id,
|
|
status: 'failed',
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Final summary
|
|
if (totalItems > 0) {
|
|
const sent = results.filter(r => r.status === 'sent').length;
|
|
const failed = results.filter(r => r.status === 'failed').length;
|
|
const recovered = results.filter(r => r.recovered).length;
|
|
|
|
console.log(`[${new Date().toISOString()}] ✅ Batch complete: ${sent} sent, ${failed} failed out of ${totalItems} total`);
|
|
if (recovered > 0) {
|
|
console.log(`[${new Date().toISOString()}] 🔄 Recovered items: ${recovered}`);
|
|
}
|
|
}
|
|
|
|
return results;
|
|
} catch (error) {
|
|
console.error(`[${new Date().toISOString()}] ❌ Error processing email queue:`, error.message);
|
|
console.error(`[${new Date().toISOString()}] 📋 Error stack:`, error.stack);
|
|
throw error;
|
|
}
|
|
} |