const axios = require('axios'); const logger = require('../utils/logger'); class ListmonkService { constructor() { this.baseURL = process.env.LISTMONK_API_URL || 'http://listmonk:9000/api'; this.username = process.env.LISTMONK_USERNAME; this.password = process.env.LISTMONK_PASSWORD; this.lists = { allCampaigns: null, activeCampaigns: null, customRecipients: null, campaignParticipants: null, emailLogs: null, // For generic email logs (non-campaign) campaignLists: {} // Dynamic per-campaign lists }; // Debug logging for environment variables console.log('🔍 Listmonk Environment Variables (Influence):'); console.log(` LISTMONK_SYNC_ENABLED: ${process.env.LISTMONK_SYNC_ENABLED}`); console.log(` LISTMONK_INITIAL_SYNC: ${process.env.LISTMONK_INITIAL_SYNC}`); console.log(` LISTMONK_API_URL: ${process.env.LISTMONK_API_URL}`); console.log(` LISTMONK_USERNAME: ${this.username ? 'SET' : 'NOT SET'}`); console.log(` LISTMONK_PASSWORD: ${this.password ? 'SET' : 'NOT SET'}`); this.syncEnabled = process.env.LISTMONK_SYNC_ENABLED === 'true'; // Additional validation - disable if credentials are missing if (this.syncEnabled && (!this.username || !this.password)) { logger.warn('Listmonk credentials missing - disabling sync'); this.syncEnabled = false; } console.log(` Final syncEnabled: ${this.syncEnabled}`); this.lastError = null; this.lastErrorTime = null; } // Validate and clean email address validateAndCleanEmail(email) { if (!email || typeof email !== 'string') { return { valid: false, cleaned: null, error: 'Email is required' }; } // Trim whitespace and convert to lowercase let cleaned = email.trim().toLowerCase(); // Basic email format validation const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; if (!emailRegex.test(cleaned)) { return { valid: false, cleaned: null, error: 'Invalid email format' }; } // Check for common typos in domain extensions const commonTypos = { '.co': '.ca', '.cA': '.ca', '.Ca': '.ca', '.cOM': '.com', '.coM': '.com', '.cOm': '.com', '.neT': '.net', '.nEt': '.net', '.ORg': '.org', '.oRg': '.org' }; // Fix common domain extension typos for (const [typo, correction] of Object.entries(commonTypos)) { if (cleaned.endsWith(typo)) { const fixedEmail = cleaned.slice(0, -typo.length) + correction; logger.warn(`Email validation: Fixed typo in ${email} -> ${fixedEmail}`); cleaned = fixedEmail; break; } } // Additional validation: check for suspicious patterns if (cleaned.includes('..') || cleaned.startsWith('.') || cleaned.endsWith('.')) { return { valid: false, cleaned: null, error: 'Invalid email pattern' }; } return { valid: true, cleaned, error: null }; } // Create axios instance with auth getClient() { return axios.create({ baseURL: this.baseURL, auth: { username: this.username, password: this.password }, headers: { 'Content-Type': 'application/json' }, timeout: 10000 // 10 second timeout }); } // Test connection to Listmonk async checkConnection() { if (!this.syncEnabled) { return false; } try { console.log(`🔍 Testing connection to: ${this.baseURL}`); console.log(`🔍 Using credentials: ${this.username}:${this.password ? 'SET' : 'NOT SET'}`); const client = this.getClient(); console.log('🔍 Making request to /health endpoint...'); const { data } = await client.get('/health'); console.log('🔍 Response received:', JSON.stringify(data, null, 2)); if (data.data === true) { logger.info('Listmonk connection successful'); this.lastError = null; this.lastErrorTime = null; return true; } console.log('🔍 Health check failed - data.data is not true'); return false; } catch (error) { console.log('🔍 Connection error details:', error.message); if (error.response) { console.log('🔍 Response status:', error.response.status); console.log('🔍 Response data:', error.response.data); } this.lastError = `Listmonk connection failed: ${error.message}`; this.lastErrorTime = new Date(); logger.error(this.lastError); return false; } } // Initialize all lists on startup async initializeLists() { if (!this.syncEnabled) { logger.info('Listmonk sync is disabled'); return false; } try { // Check connection first const connected = await this.checkConnection(); if (!connected) { throw new Error(`Cannot connect to Listmonk: ${this.lastError || 'Unknown connection error'}`); } // Create main campaign lists this.lists.allCampaigns = await this.ensureList({ name: 'Influence - All Campaigns', type: 'private', optin: 'single', tags: ['influence', 'campaigns', 'automated'], description: 'All campaign participants from the influence tool' }); this.lists.activeCampaigns = await this.ensureList({ name: 'Influence - Active Campaigns', type: 'private', optin: 'single', tags: ['influence', 'active', 'automated'], description: 'Participants in active campaigns only' }); this.lists.customRecipients = await this.ensureList({ name: 'Influence - Custom Recipients', type: 'private', optin: 'single', tags: ['influence', 'custom-recipients', 'automated'], description: 'Custom recipients added to campaigns' }); this.lists.campaignParticipants = await this.ensureList({ name: 'Influence - Campaign Participants', type: 'private', optin: 'single', tags: ['influence', 'participants', 'automated'], description: 'Users who have participated in sending campaign emails' }); this.lists.emailLogs = await this.ensureList({ name: 'Influence - Email Logs', type: 'private', optin: 'single', tags: ['influence', 'email-logs', 'automated'], description: 'All email activity from the public influence service' }); logger.info('✅ Listmonk main lists initialized successfully'); // Initialize campaign-specific lists for all campaigns try { const nocodbService = require('./nocodb'); const campaigns = await nocodbService.getAllCampaigns(); if (campaigns && campaigns.length > 0) { logger.info(`🔄 Initializing lists for ${campaigns.length} campaigns...`); for (const campaign of campaigns) { const slug = campaign['Campaign Slug']; const title = campaign['Campaign Title']; const status = campaign['Status']; if (slug && title) { try { const campaignList = await this.ensureCampaignList(slug, title); if (campaignList) { logger.info(`📋 Initialized list for campaign: ${title} (${status})`); } } catch (error) { logger.warn(`Failed to initialize list for campaign ${title}:`, error.message); } } } logger.info(`✅ Campaign lists initialized: ${Object.keys(this.lists.campaignLists).length} lists`); } } catch (error) { logger.warn('Failed to initialize campaign-specific lists:', error.message); // Don't fail the entire initialization if campaign lists fail } return true; } catch (error) { this.lastError = `Failed to initialize Listmonk lists: ${error.message}`; this.lastErrorTime = new Date(); logger.error(this.lastError); return false; } } // Ensure a list exists, create if not async ensureList(listConfig) { try { const client = this.getClient(); // First, try to find existing list by name const { data: listsResponse } = await client.get('/lists'); const existingList = listsResponse.data.results.find(list => list.name === listConfig.name); if (existingList) { logger.info(`📋 Found existing list: ${listConfig.name}`); return existingList; } // Create new list const { data: createResponse } = await client.post('/lists', listConfig); logger.info(`📋 Created new list: ${listConfig.name}`); return createResponse.data; } catch (error) { logger.error(`Failed to ensure list ${listConfig.name}:`, error.message); throw error; } } // Ensure a list exists for a specific campaign async ensureCampaignList(campaignSlug, campaignTitle) { if (!this.syncEnabled) { return null; } // Check if we already have this campaign list cached if (this.lists.campaignLists[campaignSlug]) { return this.lists.campaignLists[campaignSlug]; } try { const listConfig = { name: `Campaign: ${campaignTitle}`, type: 'private', optin: 'single', tags: ['influence', 'campaign', campaignSlug, 'automated'], description: `Participants who sent emails for the "${campaignTitle}" campaign` }; const list = await this.ensureList(listConfig); this.lists.campaignLists[campaignSlug] = list; logger.info(`✅ Campaign list created/found for: ${campaignTitle}`); return list; } catch (error) { logger.error(`Failed to ensure campaign list for ${campaignSlug}:`, error.message); return null; } } // Sync a campaign participant to Listmonk async syncCampaignParticipant(emailData, campaignData) { // Map NocoDB field names (column titles) to properties // Try User fields first (new Campaign Emails table), fall back to Sender fields (old table) const userEmail = emailData['User Email'] || emailData['Sender Email'] || emailData.sender_email; const userName = emailData['User Name'] || emailData['Sender Name'] || emailData.sender_name; const userPostalCode = emailData['User Postal Code'] || emailData['Postal Code'] || emailData.postal_code; const createdAt = emailData['CreatedAt'] || emailData.created_at; const sentTo = emailData['Sent To'] || emailData.sent_to; const recipientEmail = emailData['Recipient Email']; const recipientName = emailData['Recipient Name']; if (!this.syncEnabled || !userEmail) { return { success: false, error: 'Sync disabled or no email provided' }; } // Validate and clean the email address const emailValidation = this.validateAndCleanEmail(userEmail); if (!emailValidation.valid) { logger.warn(`Skipping invalid email: ${userEmail} - ${emailValidation.error}`); return { success: false, error: emailValidation.error }; } try { const subscriberLists = [this.lists.allCampaigns.id, this.lists.campaignParticipants.id]; // Add to active campaigns list if campaign is active const campaignStatus = campaignData?.Status; if (campaignStatus === 'active') { subscriberLists.push(this.lists.activeCampaigns.id); } // Add to campaign-specific list const campaignSlug = campaignData?.['Campaign Slug']; const campaignTitle = campaignData?.['Campaign Title']; if (campaignSlug && campaignTitle) { const campaignList = await this.ensureCampaignList(campaignSlug, campaignTitle); if (campaignList) { subscriberLists.push(campaignList.id); logger.info(`📧 Added ${emailValidation.cleaned} to campaign list: ${campaignTitle}`); } } const subscriberData = { email: emailValidation.cleaned, name: userName || emailValidation.cleaned, status: 'enabled', lists: subscriberLists, attribs: { last_campaign: campaignTitle || 'Unknown', campaign_slug: campaignSlug || null, last_sent: createdAt ? new Date(createdAt).toISOString() : new Date().toISOString(), postal_code: userPostalCode || null, sent_to_representatives: sentTo || null, last_recipient_email: recipientEmail || null, last_recipient_name: recipientName || null } }; const result = await this.upsertSubscriber(subscriberData); return { success: true, subscriberId: result.id }; } catch (error) { logger.error('Failed to sync campaign participant:', error.message); return { success: false, error: error.message }; } } // Sync a custom recipient to Listmonk async syncCustomRecipient(recipientData, campaignData) { // Map NocoDB field names (column titles) to properties const email = recipientData['Recipient Email']; const name = recipientData['Recipient Name']; const title = recipientData['Recipient Title']; const organization = recipientData['Recipient Organization']; const phone = recipientData['Recipient Phone']; const createdAt = recipientData['CreatedAt']; const campaignId = recipientData['Campaign ID']; if (!this.syncEnabled || !email) { return { success: false, error: 'Sync disabled or no email provided' }; } // Validate and clean the email address const emailValidation = this.validateAndCleanEmail(email); if (!emailValidation.valid) { logger.warn(`Skipping invalid recipient email: ${email} - ${emailValidation.error}`); return { success: false, error: emailValidation.error }; } try { const subscriberLists = [this.lists.customRecipients.id]; // Add to campaign-specific list const campaignSlug = campaignData?.['Campaign Slug']; const campaignTitle = campaignData?.['Campaign Title']; if (campaignSlug && campaignTitle) { const campaignList = await this.ensureCampaignList(campaignSlug, campaignTitle); if (campaignList) { subscriberLists.push(campaignList.id); logger.info(`📧 Added recipient ${emailValidation.cleaned} to campaign list: ${campaignTitle}`); } } const subscriberData = { email: emailValidation.cleaned, name: name || emailValidation.cleaned, status: 'enabled', lists: subscriberLists, attribs: { campaign: campaignTitle || 'Unknown', campaign_slug: campaignSlug || null, title: title || null, organization: organization || null, phone: phone || null, added_date: createdAt ? new Date(createdAt).toISOString() : null, recipient_type: 'custom' } }; const result = await this.upsertSubscriber(subscriberData); return { success: true, subscriberId: result.id }; } catch (error) { logger.error('Failed to sync custom recipient:', error.message); return { success: false, error: error.message }; } } // Sync an email log entry to Listmonk (generic public emails) async syncEmailLog(emailData) { // Map NocoDB field names for the Email Logs table const senderEmail = emailData['Sender Email']; const senderName = emailData['Sender Name']; const recipientEmail = emailData['Recipient Email']; const postalCode = emailData['Postal Code']; const createdAt = emailData['CreatedAt']; const sentAt = emailData['Sent At']; if (!this.syncEnabled || !senderEmail) { return { success: false, error: 'Sync disabled or no email provided' }; } // Validate and clean the email address const emailValidation = this.validateAndCleanEmail(senderEmail); if (!emailValidation.valid) { logger.warn(`Skipping invalid email log: ${senderEmail} - ${emailValidation.error}`); return { success: false, error: emailValidation.error }; } try { const subscriberData = { email: emailValidation.cleaned, name: senderName || emailValidation.cleaned, status: 'enabled', lists: [this.lists.emailLogs.id], attribs: { last_sent: sentAt ? new Date(sentAt).toISOString() : (createdAt ? new Date(createdAt).toISOString() : new Date().toISOString()), postal_code: postalCode || null, last_recipient_email: recipientEmail || null, source: 'email_logs' } }; const result = await this.upsertSubscriber(subscriberData); return { success: true, subscriberId: result.id }; } catch (error) { logger.error('Failed to sync email log:', error.message); return { success: false, error: error.message }; } } // Upsert subscriber (create or update) async upsertSubscriber(subscriberData) { try { const client = this.getClient(); // Try to find existing subscriber by email const { data: searchResponse } = await client.get('/subscribers', { params: { query: `subscribers.email = '${subscriberData.email}'` } }); if (searchResponse.data.results && searchResponse.data.results.length > 0) { // Update existing subscriber const existingSubscriber = searchResponse.data.results[0]; const subscriberId = existingSubscriber.id; // Merge lists (don't remove existing ones) const existingLists = existingSubscriber.lists.map(l => l.id); const newLists = [...new Set([...existingLists, ...subscriberData.lists])]; // Merge attributes const mergedAttribs = { ...existingSubscriber.attribs, ...subscriberData.attribs }; const updateData = { ...subscriberData, lists: newLists, attribs: mergedAttribs }; const { data: updateResponse } = await client.put(`/subscribers/${subscriberId}`, updateData); logger.info(`Updated subscriber: ${subscriberData.email}`); return updateResponse.data; } else { // Create new subscriber const { data: createResponse } = await client.post('/subscribers', subscriberData); logger.info(`Created new subscriber: ${subscriberData.email}`); return createResponse.data; } } catch (error) { logger.error(`Failed to upsert subscriber ${subscriberData.email}:`, error.message); throw error; } } // Bulk sync campaign participants async bulkSyncCampaignParticipants(emails, campaigns) { if (!this.syncEnabled) { return { total: 0, success: 0, failed: 0, errors: [] }; } const results = { total: emails.length, success: 0, failed: 0, errors: [] }; // Create a map of campaign IDs to campaign data for quick lookup const campaignMap = {}; if (campaigns && Array.isArray(campaigns)) { console.log(`🔍 Building campaign map from ${campaigns.length} campaigns`); campaigns.forEach((campaign, index) => { // Show keys for first campaign to debug if (index === 0) { console.log('🔍 First campaign keys:', Object.keys(campaign)); } // NocoDB returns 'ID' (all caps) as the system field for record ID // This is what the 'Campaign ID' link field in emails table references const id = campaign.ID || campaign.Id || campaign.id; if (id) { campaignMap[id] = campaign; // Also map by slug for fallback lookup const slug = campaign['Campaign Slug']; if (slug) { campaignMap[slug] = campaign; } const title = campaign['Campaign Title']; console.log(`🔍 Mapped campaign ID ${id} and slug ${slug}: ${title}`); } else { console.log('⚠️ Campaign has no ID field! Keys:', Object.keys(campaign)); } }); console.log(`🔍 Campaign map has ${Object.keys(campaignMap).length} entries`); } else { console.log('⚠️ No campaigns provided for mapping!'); } for (const email of emails) { try { // Try to find campaign data by Campaign ID (link field) or Campaign Slug (text field) const campaignId = email['Campaign ID']; const campaignSlug = email['Campaign Slug']; const campaignData = campaignId ? campaignMap[campaignId] : (campaignSlug ? campaignMap[campaignSlug] : null); // Debug first email if (emails.indexOf(email) === 0) { console.log('🔍 First email keys:', Object.keys(email)); console.log('🔍 First email Campaign ID field:', email['Campaign ID']); console.log('🔍 First email Campaign Slug field:', email['Campaign Slug']); } if (!campaignData && (campaignId || campaignSlug)) { console.log(`⚠️ Campaign not found - ID: ${campaignId}, Slug: ${campaignSlug}. Available IDs:`, Object.keys(campaignMap).slice(0, 10)); } const result = await this.syncCampaignParticipant(email, campaignData); if (result.success) { results.success++; } else { results.failed++; const emailAddr = email['Sender Email'] || email.sender_email || 'unknown'; results.errors.push({ email: emailAddr, error: result.error }); } } catch (error) { results.failed++; const emailAddr = email['Sender Email'] || email.sender_email || 'unknown'; results.errors.push({ email: emailAddr, error: error.message }); } } logger.info(`Bulk sync completed: ${results.success} succeeded, ${results.failed} failed`); return results; } // Bulk sync custom recipients async bulkSyncCustomRecipients(recipients, campaigns) { if (!this.syncEnabled) { return { total: 0, success: 0, failed: 0, errors: [] }; } const results = { total: recipients.length, success: 0, failed: 0, errors: [] }; // Create a map of campaign IDs to campaign data for quick lookup const campaignMap = {}; if (campaigns && Array.isArray(campaigns)) { campaigns.forEach(campaign => { // NocoDB returns 'ID' (all caps) as the system field for record ID const id = campaign.ID || campaign.Id || campaign.id; if (id) { campaignMap[id] = campaign; // Also map by slug for fallback lookup const slug = campaign['Campaign Slug']; if (slug) { campaignMap[slug] = campaign; } } }); } for (const recipient of recipients) { try { // Try to find campaign data by Campaign ID or Campaign Slug const campaignId = recipient['Campaign ID']; const campaignSlug = recipient['Campaign Slug']; const campaignData = campaignId ? campaignMap[campaignId] : (campaignSlug ? campaignMap[campaignSlug] : null); const result = await this.syncCustomRecipient(recipient, campaignData); if (result.success) { results.success++; } else { results.failed++; const emailAddr = recipient['Recipient Email'] || 'unknown'; results.errors.push({ email: emailAddr, error: result.error }); } } catch (error) { results.failed++; const emailAddr = recipient['Recipient Email'] || 'unknown'; results.errors.push({ email: emailAddr, error: error.message }); } } logger.info(`Bulk sync custom recipients completed: ${results.success} succeeded, ${results.failed} failed`); return results; } // Bulk sync email logs async bulkSyncEmailLogs(emailLogs) { if (!this.syncEnabled) { return { total: 0, success: 0, failed: 0, errors: [] }; } const results = { total: emailLogs.length, success: 0, failed: 0, errors: [] }; for (const emailLog of emailLogs) { try { const result = await this.syncEmailLog(emailLog); if (result.success) { results.success++; } else { results.failed++; const emailAddr = emailLog['Sender Email'] || 'unknown'; results.errors.push({ email: emailAddr, error: result.error }); } } catch (error) { results.failed++; const emailAddr = emailLog['Sender Email'] || 'unknown'; results.errors.push({ email: emailAddr, error: error.message }); } } logger.info(`Bulk sync email logs completed: ${results.success} succeeded, ${results.failed} failed`); return results; } // Get list statistics async getListStats() { if (!this.syncEnabled) { return null; } try { const client = this.getClient(); const { data: listsResponse } = await client.get('/lists'); const stats = {}; const influenceLists = listsResponse.data.results.filter(list => list.tags && list.tags.includes('influence') ); for (const list of influenceLists) { stats[list.name] = { name: list.name, subscriber_count: list.subscriber_count || 0, id: list.id }; } return stats; } catch (error) { logger.error('Failed to get list stats:', error.message); return null; } } // Get sync status getSyncStatus() { return { enabled: this.syncEnabled, connected: this.lastError === null, lastError: this.lastError, lastErrorTime: this.lastErrorTime, listsInitialized: Object.values(this.lists).every(list => list !== null) }; } } // Create singleton instance const listmonkService = new ListmonkService(); // Initialize lists on startup if enabled if (listmonkService.syncEnabled) { listmonkService.initializeLists() .then(async () => { logger.info('✅ Listmonk service initialized successfully'); // Optional initial sync (only if explicitly enabled) if (process.env.LISTMONK_INITIAL_SYNC === 'true') { logger.info('🔄 Performing initial Listmonk sync for influence system...'); // Use setTimeout to delay initial sync to let app fully start setTimeout(async () => { try { const nocodbService = require('./nocodb'); // Sync existing campaign participants try { // Use campaignEmails table (not emails) to get proper User Email/Name fields const emailsData = await nocodbService.getAll(nocodbService.tableIds.campaignEmails); const emails = emailsData?.list || []; console.log('🔍 Initial sync - fetched campaign emails:', emails?.length || 0); if (emails && emails.length > 0) { console.log('🔍 First email full data:', JSON.stringify(emails[0], null, 2)); } if (emails && emails.length > 0) { const campaigns = await nocodbService.getAllCampaigns(); console.log('🔍 Campaigns fetched:', campaigns?.length || 0); if (campaigns && campaigns.length > 0) { console.log('🔍 First campaign full data:', JSON.stringify(campaigns[0], null, 2)); } const emailResults = await listmonkService.bulkSyncCampaignParticipants(emails, campaigns); logger.info(`📧 Initial campaign participants sync: ${emailResults.success} succeeded, ${emailResults.failed} failed`); } else { logger.warn('No campaign participants found for initial sync'); } } catch (emailError) { logger.warn('Initial campaign participants sync failed:', { message: emailError.message, stack: emailError.stack }); } // Sync existing custom recipients try { const recipientsData = await nocodbService.getAll(nocodbService.tableIds.customRecipients); const recipients = recipientsData?.list || []; console.log('🔍 Initial sync - fetched custom recipients:', recipients?.length || 0); if (recipients && recipients.length > 0) { const campaigns = await nocodbService.getAllCampaigns(); const recipientResults = await listmonkService.bulkSyncCustomRecipients(recipients, campaigns); logger.info(`📋 Initial custom recipients sync: ${recipientResults.success} succeeded, ${recipientResults.failed} failed`); } else { logger.warn('No custom recipients found for initial sync'); } } catch (recipientError) { logger.warn('Initial custom recipients sync failed:', { message: recipientError.message, stack: recipientError.stack }); } logger.info('✅ Initial Listmonk sync completed'); } catch (error) { logger.error('Initial Listmonk sync failed:', { message: error.message, stack: error.stack }); } }, 5000); // Wait 5 seconds for app to fully start } }) .catch(error => { logger.error('Failed to initialize Listmonk service:', error.message); }); } module.exports = listmonkService;