const Queue = require('bull'); const logger = require('./logger'); const metrics = require('./metrics'); const emailService = require('../services/email'); // Configure Redis connection for Bull const redisConfig = { host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), password: process.env.REDIS_PASSWORD || undefined, db: parseInt(process.env.REDIS_DB || '0'), // Retry strategy for connection failures retryStrategy: (times) => { const delay = Math.min(times * 50, 2000); return delay; }, // Enable offline queue enableOfflineQueue: true, maxRetriesPerRequest: 3 }; // Create email queue const emailQueue = new Queue('email-queue', { redis: redisConfig, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 // Start with 2 seconds, then 4, 8, etc. }, removeOnComplete: { age: 24 * 3600, // Keep completed jobs for 24 hours count: 1000 // Keep last 1000 completed jobs }, removeOnFail: { age: 7 * 24 * 3600 // Keep failed jobs for 7 days }, timeout: 30000 // 30 seconds timeout per job } }); // Process email jobs emailQueue.process(async (job) => { const { type, data } = job.data; const start = Date.now(); logger.info('Processing email job', { jobId: job.id, type, attempt: job.attemptsMade + 1, maxAttempts: job.opts.attempts }); try { let result; switch (type) { case 'campaign': result = await emailService.sendCampaignEmail(data); break; case 'verification': result = await emailService.sendVerificationEmail(data); break; case 'login-details': result = await emailService.sendLoginDetails(data); break; case 'broadcast': result = await emailService.sendBroadcast(data); break; case 'response-verification': result = await emailService.sendResponseVerificationEmail(data); break; default: throw new Error(`Unknown email type: ${type}`); } const duration = (Date.now() - start) / 1000; // Record metrics metrics.recordEmailSent( data.campaignId || 'system', data.representativeLevel || 'unknown' ); metrics.observeEmailSendDuration( data.campaignId || 'system', duration ); logger.logEmailSent( data.to || data.email, data.campaignId || type, 'success' ); return result; } catch (error) { const duration = (Date.now() - start) / 1000; // Record failure metrics metrics.recordEmailFailed( data.campaignId || 'system', error.code || 'unknown' ); logger.logEmailFailed( data.to || data.email, data.campaignId || type, error ); // Throw error to trigger retry throw error; } }); // Queue event handlers emailQueue.on('completed', (job, result) => { logger.info('Email job completed', { jobId: job.id, type: job.data.type, duration: Date.now() - job.timestamp }); }); emailQueue.on('failed', (job, err) => { logger.error('Email job failed', { jobId: job.id, type: job.data.type, attempt: job.attemptsMade, maxAttempts: job.opts.attempts, error: err.message, willRetry: job.attemptsMade < job.opts.attempts }); }); emailQueue.on('stalled', (job) => { logger.warn('Email job stalled', { jobId: job.id, type: job.data.type }); }); emailQueue.on('error', (error) => { logger.error('Email queue error', { error: error.message }); }); // Update queue size metric every 10 seconds setInterval(async () => { try { const counts = await emailQueue.getJobCounts(); const queueSize = counts.waiting + counts.active; metrics.setEmailQueueSize(queueSize); } catch (error) { logger.warn('Failed to update queue metrics', { error: error.message }); } }, 10000); /** * Email Queue Service * Provides methods to enqueue different types of emails */ class EmailQueueService { /** * Send campaign email (to representative) */ async sendCampaignEmail(emailData) { const job = await emailQueue.add( { type: 'campaign', data: emailData }, { priority: 2, // Normal priority jobId: `campaign-${emailData.campaignId}-${Date.now()}` } ); logger.info('Campaign email queued', { jobId: job.id, campaignId: emailData.campaignId, recipient: emailData.to }); return { jobId: job.id, queued: true }; } /** * Send email verification */ async sendVerificationEmail(emailData) { const job = await emailQueue.add( { type: 'verification', data: emailData }, { priority: 1, // High priority - user waiting jobId: `verification-${emailData.email}-${Date.now()}` } ); logger.info('Verification email queued', { jobId: job.id, email: emailData.email }); return { jobId: job.id, queued: true }; } /** * Send login details */ async sendLoginDetails(emailData) { const job = await emailQueue.add( { type: 'login-details', data: emailData }, { priority: 1, // High priority jobId: `login-${emailData.email}-${Date.now()}` } ); logger.info('Login details email queued', { jobId: job.id, email: emailData.email }); return { jobId: job.id, queued: true }; } /** * Send broadcast to users */ async sendBroadcast(emailData) { const job = await emailQueue.add( { type: 'broadcast', data: emailData }, { priority: 3, // Lower priority - batch operation jobId: `broadcast-${Date.now()}-${Math.random().toString(36).substr(2, 9)}` } ); logger.info('Broadcast email queued', { jobId: job.id, recipientCount: emailData.recipients?.length || 1 }); return { jobId: job.id, queued: true }; } /** * Send response verification email */ async sendResponseVerificationEmail(emailData) { const job = await emailQueue.add( { type: 'response-verification', data: emailData }, { priority: 2, jobId: `response-verification-${emailData.email}-${Date.now()}` } ); logger.info('Response verification email queued', { jobId: job.id, email: emailData.email }); return { jobId: job.id, queued: true }; } /** * Get job status */ async getJobStatus(jobId) { const job = await emailQueue.getJob(jobId); if (!job) { return { status: 'not_found' }; } const state = await job.getState(); const progress = job.progress(); return { jobId: job.id, status: state, progress, attempts: job.attemptsMade, data: job.data, createdAt: job.timestamp, processedAt: job.processedOn, finishedAt: job.finishedOn, failedReason: job.failedReason }; } /** * Get queue statistics */ async getQueueStats() { const counts = await emailQueue.getJobCounts(); const jobs = { waiting: await emailQueue.getWaiting(0, 10), active: await emailQueue.getActive(0, 10), completed: await emailQueue.getCompleted(0, 10), failed: await emailQueue.getFailed(0, 10) }; return { counts, samples: { waiting: jobs.waiting.map(j => ({ id: j.id, type: j.data.type })), active: jobs.active.map(j => ({ id: j.id, type: j.data.type })), completed: jobs.completed.slice(0, 5).map(j => ({ id: j.id, type: j.data.type })), failed: jobs.failed.slice(0, 5).map(j => ({ id: j.id, type: j.data.type, reason: j.failedReason })) } }; } /** * Clean old jobs */ async cleanQueue(grace = 24 * 3600 * 1000) { const cleaned = await emailQueue.clean(grace, 'completed'); logger.info('Queue cleaned', { removedJobs: cleaned.length }); return { cleaned: cleaned.length }; } /** * Pause queue */ async pauseQueue() { await emailQueue.pause(); logger.warn('Email queue paused'); return { paused: true }; } /** * Resume queue */ async resumeQueue() { await emailQueue.resume(); logger.info('Email queue resumed'); return { resumed: true }; } /** * Get queue instance (for advanced operations) */ getQueue() { return emailQueue; } } module.exports = new EmailQueueService();