import nodemailer from 'nodemailer'; import prisma from './prisma'; import DOMPurify from 'isomorphic-dompurify'; // Create email transporter with configuration from database or environment async function createTransporter() { try { // Try to get email configuration from database const emailConfig = await prisma.notification_delivery_config.findFirst({ where: { channel_type: 'email', is_enabled: true } }); let transportConfig; if (emailConfig && emailConfig.provider_config) { // Use database configuration const config = emailConfig.provider_config; transportConfig = { host: config.host, port: config.port || 587, secure: config.secure || false, auth: { user: config.user || config.username, pass: config.pass || config.password } }; console.log('Using database email config:', { host: config.host, port: config.port, user: config.user || config.username }); } else { // Fallback to environment variables transportConfig = { host: process.env.SMTP_HOST || 'smtp.gmail.com', port: parseInt(process.env.SMTP_PORT) || 587, secure: process.env.SMTP_SECURE === 'true', auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS } }; console.log('Using environment email config:', { host: transportConfig.host, port: transportConfig.port, user: transportConfig.auth.user, hasPassword: !!transportConfig.auth.pass }); } if (!transportConfig.auth.user || !transportConfig.auth.pass) { throw new Error('Email credentials not configured. Please set SMTP_USER and SMTP_PASS or configure via delivery settings.'); } return nodemailer.createTransport(transportConfig); } catch (error) { console.error('Error creating email transporter:', error); throw new Error('Failed to create email transporter: ' + error.message); } } // Send email notification export async function sendEmailNotification({ to, subject, content, callToActionText, callToActionUrl, notificationId }) { try { const transporter = await createTransporter(); // Create HTML email template const htmlContent = createEmailTemplate({ content, callToActionText, callToActionUrl }); // Get sender email from database config or environment let senderEmail = process.env.SMTP_FROM || process.env.SMTP_USER; try { const emailConfig = await prisma.notification_delivery_config.findFirst({ where: { channel_type: 'email', is_enabled: true } }); if (emailConfig?.provider_config?.senderEmail) { senderEmail = emailConfig.provider_config.senderEmail; } } catch (error) { console.warn('Could not fetch sender email from database config, using environment default'); } const mailOptions = { from: senderEmail, to, subject, html: htmlContent, text: stripHtmlTags(content) // Plain text version }; const result = await transporter.sendMail(mailOptions); // Log the successful send if (notificationId) { await logEmailDelivery({ notificationId, recipientEmail: to, status: 'sent', messageId: result.messageId, response: result.response }); } return { success: true, messageId: result.messageId, response: result.response }; } catch (error) { console.error('Error sending email:', error); // Log the failed send if (notificationId) { await logEmailDelivery({ notificationId, recipientEmail: to, status: 'failed', error: error.message }); } throw error; } } // Create HTML email template function createEmailTemplate({ content, callToActionText, callToActionUrl }) { // Sanitize HTML content to prevent XSS attacks const sanitizedContent = DOMPurify.sanitize(content, { ALLOWED_TAGS: ['p', 'br', 'strong', 'em', 'u', 'h1', 'h2', 'h3', 'h4', 'ul', 'ol', 'li', 'a', 'div', 'span'], ALLOWED_ATTR: ['href', 'target', 'style', 'class'], ALLOW_DATA_ATTR: false }); // Sanitize call to action text and validate URL const sanitizedCtaText = callToActionText ? DOMPurify.sanitize(callToActionText, { ALLOWED_TAGS: [] }) : ''; // Validate call to action URL (only allow http/https) let sanitizedCtaUrl = ''; if (callToActionUrl) { try { const url = new URL(callToActionUrl); if (url.protocol === 'http:' || url.protocol === 'https:') { sanitizedCtaUrl = url.href; } } catch (e) { console.warn('Invalid call to action URL:', callToActionUrl); } } const ctaButton = sanitizedCtaText && sanitizedCtaUrl ? `
${sanitizedCtaText}
` : ''; return ` Notification
${sanitizedContent} ${ctaButton}

This is an automated notification. Please do not reply to this email.

`; } // Strip HTML tags for plain text version function stripHtmlTags(html) { return html.replace(/<[^>]*>/g, '').replace(/\s+/g, ' ').trim(); } // Log email delivery status async function logEmailDelivery({ notificationId, recipientEmail, status, messageId, response, error }) { try { await prisma.notification_logs.create({ data: { notification_id: notificationId, action: status === 'sent' ? 'email_delivered' : 'email_failed', channel_type: 'email', status, details: `Email ${status} to ${recipientEmail}. ${response || error || ''}`, error_message: status === 'failed' ? (error || response) : null, metadata: { recipient_email: recipientEmail, message_id: messageId, response: response } } }); } catch (logError) { console.error('Error logging email delivery:', logError); } } // Process notification queue for email sending export async function processEmailQueue() { try { // Calculate timeout threshold (5 minutes ago) const timeoutThreshold = new Date(Date.now() - 5 * 60 * 1000); // Get pending email notifications from queue // Include both 'queued' items AND 'processing' items that are stuck (older than 5 minutes) const queueItems = await prisma.notification_queue.findMany({ where: { OR: [ { status: 'queued', scheduled_for: { lte: new Date() } }, { status: 'processing', last_attempt_at: { lt: timeoutThreshold } } ] }, include: { notifications: true, notification_recipients: true }, orderBy: [ { priority: 'asc' }, { created_at: 'desc' }, // Process newer notifications first { scheduled_for: 'asc' } ], take: 500 // Process in batches - increased for better throughput with large recipient lists }); const results = []; const totalItems = queueItems.length; let processedCount = 0; if (totalItems > 0) { console.log(`[${new Date().toISOString()}] 📧 Starting to process ${totalItems} queue items...`); } for (const queueItem of queueItems) { const { notifications: notification, notification_recipients: recipient } = queueItem; processedCount++; if (recipient.channel_type === 'email' && recipient.email) { try { // Check if max retry attempts exceeded (max 3 attempts) const currentAttempts = queueItem.attempts || 0; if (currentAttempts >= 3) { console.log(`[${new Date().toISOString()}] ❌ Queue item #${queueItem.id} exceeded max retry attempts (${currentAttempts}), marking as failed`); await prisma.notification_queue.update({ where: { id: queueItem.id }, data: { status: 'failed', error_message: `Maximum retry attempts (${currentAttempts}) exceeded`, updated_at: new Date() } }); await prisma.notification_recipients.update({ where: { id: recipient.id }, data: { status: 'failed', error_message: `Maximum retry attempts (${currentAttempts}) exceeded` } }); results.push({ queueId: queueItem.id, recipientId: recipient.id, status: 'failed', error: 'Maximum retry attempts exceeded' }); continue; // Skip to next item } // Log if this is a recovery of a stuck item if (queueItem.status === 'processing') { console.log(`[${new Date().toISOString()}] 🔄 Recovering stuck queue item #${queueItem.id} (attempt ${currentAttempts + 1}/3)`); } // Mark as processing (atomic operation) await prisma.notification_queue.update({ where: { id: queueItem.id }, data: { status: 'processing', last_attempt_at: new Date(), attempts: currentAttempts + 1 } }); // Send the email (external API call - do NOT wrap in transaction) await sendEmailNotification({ to: recipient.email, subject: notification.email_subject, content: notification.email_content, callToActionText: notification.call_to_action_text, callToActionUrl: notification.call_to_action_url, notificationId: notification.id }); // Update all statuses in a single transaction for consistency await prisma.$transaction(async (tx) => { // Update recipient status await tx.notification_recipients.update({ where: { id: recipient.id }, data: { status: 'sent', sent_at: new Date() } }); // Update queue status to completed await tx.notification_queue.update({ where: { id: queueItem.id }, data: { status: 'completed', updated_at: new Date() } }); // Check if all recipients for this notification are done (sent or failed) const pendingRecipients = await tx.notification_recipients.count({ where: { notification_id: notification.id, status: 'pending' } }); // If no more pending recipients, update notification status to 'sent' if (pendingRecipients === 0) { const sentCount = await tx.notification_recipients.count({ where: { notification_id: notification.id, status: 'sent' } }); await tx.notifications.update({ where: { id: notification.id }, data: { status: 'sent', sent_at: new Date(), actual_sent: sentCount } }); } }, { maxWait: 5000, // Maximum time to wait for transaction to start (5s) timeout: 10000, // Maximum time for transaction to complete (10s) }); results.push({ queueId: queueItem.id, recipientId: recipient.id, status: 'sent', recovered: queueItem.attempts > 1 // True if this was a retry }); // Log progress every 100 items if (processedCount % 100 === 0) { console.log(`[${new Date().toISOString()}] ⏳ Progress: ${processedCount}/${totalItems} items processed (${Math.round(processedCount/totalItems*100)}%)`); } } catch (error) { console.error(`[${new Date().toISOString()}] ❌ Error sending email to ${recipient.email}:`, error.message); // Update all failure statuses in a single transaction for consistency await prisma.$transaction(async (tx) => { // Update recipient status to failed await tx.notification_recipients.update({ where: { id: recipient.id }, data: { status: 'failed', error_message: error.message } }); // Update queue status to failed await tx.notification_queue.update({ where: { id: queueItem.id }, data: { status: 'failed', error_message: error.message, updated_at: new Date() } }); // Check if all recipients are done (sent or failed) const pendingRecipients = await tx.notification_recipients.count({ where: { notification_id: notification.id, status: 'pending' } }); // If no more pending recipients, update notification status if (pendingRecipients === 0) { const successfulCount = await tx.notification_recipients.count({ where: { notification_id: notification.id, status: 'sent' } }); await tx.notifications.update({ where: { id: notification.id }, data: { status: successfulCount > 0 ? 'sent' : 'failed', sent_at: new Date(), actual_sent: successfulCount } }); } }, { maxWait: 5000, // Maximum time to wait for transaction to start (5s) timeout: 10000, // Maximum time for transaction to complete (10s) }); results.push({ queueId: queueItem.id, recipientId: recipient.id, status: 'failed', error: error.message }); } } } // Final summary if (totalItems > 0) { const sent = results.filter(r => r.status === 'sent').length; const failed = results.filter(r => r.status === 'failed').length; const recovered = results.filter(r => r.recovered).length; console.log(`[${new Date().toISOString()}] ✅ Batch complete: ${sent} sent, ${failed} failed out of ${totalItems} total`); if (recovered > 0) { console.log(`[${new Date().toISOString()}] 🔄 Recovered items: ${recovered}`); } } return results; } catch (error) { console.error(`[${new Date().toISOString()}] ❌ Error processing email queue:`, error.message); console.error(`[${new Date().toISOString()}] 📋 Error stack:`, error.stack); throw error; } }