- Implemented a comprehensive health check utility to monitor system dependencies including NocoDB, SMTP, Represent API, disk space, and memory usage. - Created a logger utility using Winston for structured logging with daily rotation and various log levels. - Developed a metrics utility using Prometheus client to track application performance metrics such as email sends, HTTP requests, and user activity. - Added a backup script for automated backups of NocoDB data, uploaded files, and environment configurations with optional S3 support. - Introduced a toggle script to switch between development (MailHog) and production (ProtonMail) SMTP configurations.
369 lines
8.4 KiB
JavaScript
369 lines
8.4 KiB
JavaScript
const Queue = require('bull');
|
|
const logger = require('./logger');
|
|
const metrics = require('./metrics');
|
|
const emailService = require('../services/email');
|
|
|
|
// Configure Redis connection for Bull
|
|
const redisConfig = {
|
|
host: process.env.REDIS_HOST || 'localhost',
|
|
port: parseInt(process.env.REDIS_PORT || '6379'),
|
|
password: process.env.REDIS_PASSWORD || undefined,
|
|
db: parseInt(process.env.REDIS_DB || '0'),
|
|
// Retry strategy for connection failures
|
|
retryStrategy: (times) => {
|
|
const delay = Math.min(times * 50, 2000);
|
|
return delay;
|
|
},
|
|
// Enable offline queue
|
|
enableOfflineQueue: true,
|
|
maxRetriesPerRequest: 3
|
|
};
|
|
|
|
// Create email queue
|
|
const emailQueue = new Queue('email-queue', {
|
|
redis: redisConfig,
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 2000 // Start with 2 seconds, then 4, 8, etc.
|
|
},
|
|
removeOnComplete: {
|
|
age: 24 * 3600, // Keep completed jobs for 24 hours
|
|
count: 1000 // Keep last 1000 completed jobs
|
|
},
|
|
removeOnFail: {
|
|
age: 7 * 24 * 3600 // Keep failed jobs for 7 days
|
|
},
|
|
timeout: 30000 // 30 seconds timeout per job
|
|
}
|
|
});
|
|
|
|
// Process email jobs
|
|
emailQueue.process(async (job) => {
|
|
const { type, data } = job.data;
|
|
const start = Date.now();
|
|
|
|
logger.info('Processing email job', {
|
|
jobId: job.id,
|
|
type,
|
|
attempt: job.attemptsMade + 1,
|
|
maxAttempts: job.opts.attempts
|
|
});
|
|
|
|
try {
|
|
let result;
|
|
|
|
switch (type) {
|
|
case 'campaign':
|
|
result = await emailService.sendCampaignEmail(data);
|
|
break;
|
|
|
|
case 'verification':
|
|
result = await emailService.sendVerificationEmail(data);
|
|
break;
|
|
|
|
case 'login-details':
|
|
result = await emailService.sendLoginDetails(data);
|
|
break;
|
|
|
|
case 'broadcast':
|
|
result = await emailService.sendBroadcast(data);
|
|
break;
|
|
|
|
case 'response-verification':
|
|
result = await emailService.sendResponseVerificationEmail(data);
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`Unknown email type: ${type}`);
|
|
}
|
|
|
|
const duration = (Date.now() - start) / 1000;
|
|
|
|
// Record metrics
|
|
metrics.recordEmailSent(
|
|
data.campaignId || 'system',
|
|
data.representativeLevel || 'unknown'
|
|
);
|
|
metrics.observeEmailSendDuration(
|
|
data.campaignId || 'system',
|
|
duration
|
|
);
|
|
|
|
logger.logEmailSent(
|
|
data.to || data.email,
|
|
data.campaignId || type,
|
|
'success'
|
|
);
|
|
|
|
return result;
|
|
} catch (error) {
|
|
const duration = (Date.now() - start) / 1000;
|
|
|
|
// Record failure metrics
|
|
metrics.recordEmailFailed(
|
|
data.campaignId || 'system',
|
|
error.code || 'unknown'
|
|
);
|
|
|
|
logger.logEmailFailed(
|
|
data.to || data.email,
|
|
data.campaignId || type,
|
|
error
|
|
);
|
|
|
|
// Throw error to trigger retry
|
|
throw error;
|
|
}
|
|
});
|
|
|
|
// Queue event handlers
|
|
emailQueue.on('completed', (job, result) => {
|
|
logger.info('Email job completed', {
|
|
jobId: job.id,
|
|
type: job.data.type,
|
|
duration: Date.now() - job.timestamp
|
|
});
|
|
});
|
|
|
|
emailQueue.on('failed', (job, err) => {
|
|
logger.error('Email job failed', {
|
|
jobId: job.id,
|
|
type: job.data.type,
|
|
attempt: job.attemptsMade,
|
|
maxAttempts: job.opts.attempts,
|
|
error: err.message,
|
|
willRetry: job.attemptsMade < job.opts.attempts
|
|
});
|
|
});
|
|
|
|
emailQueue.on('stalled', (job) => {
|
|
logger.warn('Email job stalled', {
|
|
jobId: job.id,
|
|
type: job.data.type
|
|
});
|
|
});
|
|
|
|
emailQueue.on('error', (error) => {
|
|
logger.error('Email queue error', { error: error.message });
|
|
});
|
|
|
|
// Update queue size metric every 10 seconds
|
|
setInterval(async () => {
|
|
try {
|
|
const counts = await emailQueue.getJobCounts();
|
|
const queueSize = counts.waiting + counts.active;
|
|
metrics.setEmailQueueSize(queueSize);
|
|
} catch (error) {
|
|
logger.warn('Failed to update queue metrics', { error: error.message });
|
|
}
|
|
}, 10000);
|
|
|
|
/**
|
|
* Email Queue Service
|
|
* Provides methods to enqueue different types of emails
|
|
*/
|
|
class EmailQueueService {
|
|
/**
|
|
* Send campaign email (to representative)
|
|
*/
|
|
async sendCampaignEmail(emailData) {
|
|
const job = await emailQueue.add(
|
|
{
|
|
type: 'campaign',
|
|
data: emailData
|
|
},
|
|
{
|
|
priority: 2, // Normal priority
|
|
jobId: `campaign-${emailData.campaignId}-${Date.now()}`
|
|
}
|
|
);
|
|
|
|
logger.info('Campaign email queued', {
|
|
jobId: job.id,
|
|
campaignId: emailData.campaignId,
|
|
recipient: emailData.to
|
|
});
|
|
|
|
return { jobId: job.id, queued: true };
|
|
}
|
|
|
|
/**
|
|
* Send email verification
|
|
*/
|
|
async sendVerificationEmail(emailData) {
|
|
const job = await emailQueue.add(
|
|
{
|
|
type: 'verification',
|
|
data: emailData
|
|
},
|
|
{
|
|
priority: 1, // High priority - user waiting
|
|
jobId: `verification-${emailData.email}-${Date.now()}`
|
|
}
|
|
);
|
|
|
|
logger.info('Verification email queued', {
|
|
jobId: job.id,
|
|
email: emailData.email
|
|
});
|
|
|
|
return { jobId: job.id, queued: true };
|
|
}
|
|
|
|
/**
|
|
* Send login details
|
|
*/
|
|
async sendLoginDetails(emailData) {
|
|
const job = await emailQueue.add(
|
|
{
|
|
type: 'login-details',
|
|
data: emailData
|
|
},
|
|
{
|
|
priority: 1, // High priority
|
|
jobId: `login-${emailData.email}-${Date.now()}`
|
|
}
|
|
);
|
|
|
|
logger.info('Login details email queued', {
|
|
jobId: job.id,
|
|
email: emailData.email
|
|
});
|
|
|
|
return { jobId: job.id, queued: true };
|
|
}
|
|
|
|
/**
|
|
* Send broadcast to users
|
|
*/
|
|
async sendBroadcast(emailData) {
|
|
const job = await emailQueue.add(
|
|
{
|
|
type: 'broadcast',
|
|
data: emailData
|
|
},
|
|
{
|
|
priority: 3, // Lower priority - batch operation
|
|
jobId: `broadcast-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
|
|
}
|
|
);
|
|
|
|
logger.info('Broadcast email queued', {
|
|
jobId: job.id,
|
|
recipientCount: emailData.recipients?.length || 1
|
|
});
|
|
|
|
return { jobId: job.id, queued: true };
|
|
}
|
|
|
|
/**
|
|
* Send response verification email
|
|
*/
|
|
async sendResponseVerificationEmail(emailData) {
|
|
const job = await emailQueue.add(
|
|
{
|
|
type: 'response-verification',
|
|
data: emailData
|
|
},
|
|
{
|
|
priority: 2,
|
|
jobId: `response-verification-${emailData.email}-${Date.now()}`
|
|
}
|
|
);
|
|
|
|
logger.info('Response verification email queued', {
|
|
jobId: job.id,
|
|
email: emailData.email
|
|
});
|
|
|
|
return { jobId: job.id, queued: true };
|
|
}
|
|
|
|
/**
|
|
* Get job status
|
|
*/
|
|
async getJobStatus(jobId) {
|
|
const job = await emailQueue.getJob(jobId);
|
|
|
|
if (!job) {
|
|
return { status: 'not_found' };
|
|
}
|
|
|
|
const state = await job.getState();
|
|
const progress = job.progress();
|
|
|
|
return {
|
|
jobId: job.id,
|
|
status: state,
|
|
progress,
|
|
attempts: job.attemptsMade,
|
|
data: job.data,
|
|
createdAt: job.timestamp,
|
|
processedAt: job.processedOn,
|
|
finishedAt: job.finishedOn,
|
|
failedReason: job.failedReason
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get queue statistics
|
|
*/
|
|
async getQueueStats() {
|
|
const counts = await emailQueue.getJobCounts();
|
|
const jobs = {
|
|
waiting: await emailQueue.getWaiting(0, 10),
|
|
active: await emailQueue.getActive(0, 10),
|
|
completed: await emailQueue.getCompleted(0, 10),
|
|
failed: await emailQueue.getFailed(0, 10)
|
|
};
|
|
|
|
return {
|
|
counts,
|
|
samples: {
|
|
waiting: jobs.waiting.map(j => ({ id: j.id, type: j.data.type })),
|
|
active: jobs.active.map(j => ({ id: j.id, type: j.data.type })),
|
|
completed: jobs.completed.slice(0, 5).map(j => ({ id: j.id, type: j.data.type })),
|
|
failed: jobs.failed.slice(0, 5).map(j => ({ id: j.id, type: j.data.type, reason: j.failedReason }))
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Clean old jobs
|
|
*/
|
|
async cleanQueue(grace = 24 * 3600 * 1000) {
|
|
const cleaned = await emailQueue.clean(grace, 'completed');
|
|
logger.info('Queue cleaned', { removedJobs: cleaned.length });
|
|
return { cleaned: cleaned.length };
|
|
}
|
|
|
|
/**
|
|
* Pause queue
|
|
*/
|
|
async pauseQueue() {
|
|
await emailQueue.pause();
|
|
logger.warn('Email queue paused');
|
|
return { paused: true };
|
|
}
|
|
|
|
/**
|
|
* Resume queue
|
|
*/
|
|
async resumeQueue() {
|
|
await emailQueue.resume();
|
|
logger.info('Email queue resumed');
|
|
return { resumed: true };
|
|
}
|
|
|
|
/**
|
|
* Get queue instance (for advanced operations)
|
|
*/
|
|
getQueue() {
|
|
return emailQueue;
|
|
}
|
|
}
|
|
|
|
module.exports = new EmailQueueService();
|