admin 4d8b9effd0 feat(blog): add detailed update on Influence and Map app developments since August
A bunch of udpates to the listmonk sync to add influence to it
2025-10-25 12:45:35 -06:00

833 lines
34 KiB
JavaScript

const axios = require('axios');
const logger = require('../utils/logger');
class ListmonkService {
constructor() {
this.baseURL = process.env.LISTMONK_API_URL || 'http://listmonk:9000/api';
this.username = process.env.LISTMONK_USERNAME;
this.password = process.env.LISTMONK_PASSWORD;
this.lists = {
allCampaigns: null,
activeCampaigns: null,
customRecipients: null,
campaignParticipants: null,
emailLogs: null, // For generic email logs (non-campaign)
campaignLists: {} // Dynamic per-campaign lists
};
// Debug logging for environment variables
console.log('🔍 Listmonk Environment Variables (Influence):');
console.log(` LISTMONK_SYNC_ENABLED: ${process.env.LISTMONK_SYNC_ENABLED}`);
console.log(` LISTMONK_INITIAL_SYNC: ${process.env.LISTMONK_INITIAL_SYNC}`);
console.log(` LISTMONK_API_URL: ${process.env.LISTMONK_API_URL}`);
console.log(` LISTMONK_USERNAME: ${this.username ? 'SET' : 'NOT SET'}`);
console.log(` LISTMONK_PASSWORD: ${this.password ? 'SET' : 'NOT SET'}`);
this.syncEnabled = process.env.LISTMONK_SYNC_ENABLED === 'true';
// Additional validation - disable if credentials are missing
if (this.syncEnabled && (!this.username || !this.password)) {
logger.warn('Listmonk credentials missing - disabling sync');
this.syncEnabled = false;
}
console.log(` Final syncEnabled: ${this.syncEnabled}`);
this.lastError = null;
this.lastErrorTime = null;
}
// Validate and clean email address
validateAndCleanEmail(email) {
if (!email || typeof email !== 'string') {
return { valid: false, cleaned: null, error: 'Email is required' };
}
// Trim whitespace and convert to lowercase
let cleaned = email.trim().toLowerCase();
// Basic email format validation
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(cleaned)) {
return { valid: false, cleaned: null, error: 'Invalid email format' };
}
// Check for common typos in domain extensions
const commonTypos = {
'.co': '.ca',
'.cA': '.ca',
'.Ca': '.ca',
'.cOM': '.com',
'.coM': '.com',
'.cOm': '.com',
'.neT': '.net',
'.nEt': '.net',
'.ORg': '.org',
'.oRg': '.org'
};
// Fix common domain extension typos
for (const [typo, correction] of Object.entries(commonTypos)) {
if (cleaned.endsWith(typo)) {
const fixedEmail = cleaned.slice(0, -typo.length) + correction;
logger.warn(`Email validation: Fixed typo in ${email} -> ${fixedEmail}`);
cleaned = fixedEmail;
break;
}
}
// Additional validation: check for suspicious patterns
if (cleaned.includes('..') || cleaned.startsWith('.') || cleaned.endsWith('.')) {
return { valid: false, cleaned: null, error: 'Invalid email pattern' };
}
return { valid: true, cleaned, error: null };
}
// Create axios instance with auth
getClient() {
return axios.create({
baseURL: this.baseURL,
auth: {
username: this.username,
password: this.password
},
headers: {
'Content-Type': 'application/json'
},
timeout: 10000 // 10 second timeout
});
}
// Test connection to Listmonk
async checkConnection() {
if (!this.syncEnabled) {
return false;
}
try {
console.log(`🔍 Testing connection to: ${this.baseURL}`);
console.log(`🔍 Using credentials: ${this.username}:${this.password ? 'SET' : 'NOT SET'}`);
const client = this.getClient();
console.log('🔍 Making request to /health endpoint...');
const { data } = await client.get('/health');
console.log('🔍 Response received:', JSON.stringify(data, null, 2));
if (data.data === true) {
logger.info('Listmonk connection successful');
this.lastError = null;
this.lastErrorTime = null;
return true;
}
console.log('🔍 Health check failed - data.data is not true');
return false;
} catch (error) {
console.log('🔍 Connection error details:', error.message);
if (error.response) {
console.log('🔍 Response status:', error.response.status);
console.log('🔍 Response data:', error.response.data);
}
this.lastError = `Listmonk connection failed: ${error.message}`;
this.lastErrorTime = new Date();
logger.error(this.lastError);
return false;
}
}
// Initialize all lists on startup
async initializeLists() {
if (!this.syncEnabled) {
logger.info('Listmonk sync is disabled');
return false;
}
try {
// Check connection first
const connected = await this.checkConnection();
if (!connected) {
throw new Error(`Cannot connect to Listmonk: ${this.lastError || 'Unknown connection error'}`);
}
// Create main campaign lists
this.lists.allCampaigns = await this.ensureList({
name: 'Influence - All Campaigns',
type: 'private',
optin: 'single',
tags: ['influence', 'campaigns', 'automated'],
description: 'All campaign participants from the influence tool'
});
this.lists.activeCampaigns = await this.ensureList({
name: 'Influence - Active Campaigns',
type: 'private',
optin: 'single',
tags: ['influence', 'active', 'automated'],
description: 'Participants in active campaigns only'
});
this.lists.customRecipients = await this.ensureList({
name: 'Influence - Custom Recipients',
type: 'private',
optin: 'single',
tags: ['influence', 'custom-recipients', 'automated'],
description: 'Custom recipients added to campaigns'
});
this.lists.campaignParticipants = await this.ensureList({
name: 'Influence - Campaign Participants',
type: 'private',
optin: 'single',
tags: ['influence', 'participants', 'automated'],
description: 'Users who have participated in sending campaign emails'
});
this.lists.emailLogs = await this.ensureList({
name: 'Influence - Email Logs',
type: 'private',
optin: 'single',
tags: ['influence', 'email-logs', 'automated'],
description: 'All email activity from the public influence service'
});
logger.info('✅ Listmonk main lists initialized successfully');
// Initialize campaign-specific lists for all campaigns
try {
const nocodbService = require('./nocodb');
const campaigns = await nocodbService.getAllCampaigns();
if (campaigns && campaigns.length > 0) {
logger.info(`🔄 Initializing lists for ${campaigns.length} campaigns...`);
for (const campaign of campaigns) {
const slug = campaign['Campaign Slug'];
const title = campaign['Campaign Title'];
const status = campaign['Status'];
if (slug && title) {
try {
const campaignList = await this.ensureCampaignList(slug, title);
if (campaignList) {
logger.info(`📋 Initialized list for campaign: ${title} (${status})`);
}
} catch (error) {
logger.warn(`Failed to initialize list for campaign ${title}:`, error.message);
}
}
}
logger.info(`✅ Campaign lists initialized: ${Object.keys(this.lists.campaignLists).length} lists`);
}
} catch (error) {
logger.warn('Failed to initialize campaign-specific lists:', error.message);
// Don't fail the entire initialization if campaign lists fail
}
return true;
} catch (error) {
this.lastError = `Failed to initialize Listmonk lists: ${error.message}`;
this.lastErrorTime = new Date();
logger.error(this.lastError);
return false;
}
}
// Ensure a list exists, create if not
async ensureList(listConfig) {
try {
const client = this.getClient();
// First, try to find existing list by name
const { data: listsResponse } = await client.get('/lists');
const existingList = listsResponse.data.results.find(list => list.name === listConfig.name);
if (existingList) {
logger.info(`📋 Found existing list: ${listConfig.name}`);
return existingList;
}
// Create new list
const { data: createResponse } = await client.post('/lists', listConfig);
logger.info(`📋 Created new list: ${listConfig.name}`);
return createResponse.data;
} catch (error) {
logger.error(`Failed to ensure list ${listConfig.name}:`, error.message);
throw error;
}
}
// Ensure a list exists for a specific campaign
async ensureCampaignList(campaignSlug, campaignTitle) {
if (!this.syncEnabled) {
return null;
}
// Check if we already have this campaign list cached
if (this.lists.campaignLists[campaignSlug]) {
return this.lists.campaignLists[campaignSlug];
}
try {
const listConfig = {
name: `Campaign: ${campaignTitle}`,
type: 'private',
optin: 'single',
tags: ['influence', 'campaign', campaignSlug, 'automated'],
description: `Participants who sent emails for the "${campaignTitle}" campaign`
};
const list = await this.ensureList(listConfig);
this.lists.campaignLists[campaignSlug] = list;
logger.info(`✅ Campaign list created/found for: ${campaignTitle}`);
return list;
} catch (error) {
logger.error(`Failed to ensure campaign list for ${campaignSlug}:`, error.message);
return null;
}
}
// Sync a campaign participant to Listmonk
async syncCampaignParticipant(emailData, campaignData) {
// Map NocoDB field names (column titles) to properties
// Try User fields first (new Campaign Emails table), fall back to Sender fields (old table)
const userEmail = emailData['User Email'] || emailData['Sender Email'] || emailData.sender_email;
const userName = emailData['User Name'] || emailData['Sender Name'] || emailData.sender_name;
const userPostalCode = emailData['User Postal Code'] || emailData['Postal Code'] || emailData.postal_code;
const createdAt = emailData['CreatedAt'] || emailData.created_at;
const sentTo = emailData['Sent To'] || emailData.sent_to;
const recipientEmail = emailData['Recipient Email'];
const recipientName = emailData['Recipient Name'];
if (!this.syncEnabled || !userEmail) {
return { success: false, error: 'Sync disabled or no email provided' };
}
// Validate and clean the email address
const emailValidation = this.validateAndCleanEmail(userEmail);
if (!emailValidation.valid) {
logger.warn(`Skipping invalid email: ${userEmail} - ${emailValidation.error}`);
return { success: false, error: emailValidation.error };
}
try {
const subscriberLists = [this.lists.allCampaigns.id, this.lists.campaignParticipants.id];
// Add to active campaigns list if campaign is active
const campaignStatus = campaignData?.Status;
if (campaignStatus === 'active') {
subscriberLists.push(this.lists.activeCampaigns.id);
}
// Add to campaign-specific list
const campaignSlug = campaignData?.['Campaign Slug'];
const campaignTitle = campaignData?.['Campaign Title'];
if (campaignSlug && campaignTitle) {
const campaignList = await this.ensureCampaignList(campaignSlug, campaignTitle);
if (campaignList) {
subscriberLists.push(campaignList.id);
logger.info(`📧 Added ${emailValidation.cleaned} to campaign list: ${campaignTitle}`);
}
}
const subscriberData = {
email: emailValidation.cleaned,
name: userName || emailValidation.cleaned,
status: 'enabled',
lists: subscriberLists,
attribs: {
last_campaign: campaignTitle || 'Unknown',
campaign_slug: campaignSlug || null,
last_sent: createdAt ? new Date(createdAt).toISOString() : new Date().toISOString(),
postal_code: userPostalCode || null,
sent_to_representatives: sentTo || null,
last_recipient_email: recipientEmail || null,
last_recipient_name: recipientName || null
}
};
const result = await this.upsertSubscriber(subscriberData);
return { success: true, subscriberId: result.id };
} catch (error) {
logger.error('Failed to sync campaign participant:', error.message);
return { success: false, error: error.message };
}
}
// Sync a custom recipient to Listmonk
async syncCustomRecipient(recipientData, campaignData) {
// Map NocoDB field names (column titles) to properties
const email = recipientData['Recipient Email'];
const name = recipientData['Recipient Name'];
const title = recipientData['Recipient Title'];
const organization = recipientData['Recipient Organization'];
const phone = recipientData['Recipient Phone'];
const createdAt = recipientData['CreatedAt'];
const campaignId = recipientData['Campaign ID'];
if (!this.syncEnabled || !email) {
return { success: false, error: 'Sync disabled or no email provided' };
}
// Validate and clean the email address
const emailValidation = this.validateAndCleanEmail(email);
if (!emailValidation.valid) {
logger.warn(`Skipping invalid recipient email: ${email} - ${emailValidation.error}`);
return { success: false, error: emailValidation.error };
}
try {
const subscriberLists = [this.lists.customRecipients.id];
// Add to campaign-specific list
const campaignSlug = campaignData?.['Campaign Slug'];
const campaignTitle = campaignData?.['Campaign Title'];
if (campaignSlug && campaignTitle) {
const campaignList = await this.ensureCampaignList(campaignSlug, campaignTitle);
if (campaignList) {
subscriberLists.push(campaignList.id);
logger.info(`📧 Added recipient ${emailValidation.cleaned} to campaign list: ${campaignTitle}`);
}
}
const subscriberData = {
email: emailValidation.cleaned,
name: name || emailValidation.cleaned,
status: 'enabled',
lists: subscriberLists,
attribs: {
campaign: campaignTitle || 'Unknown',
campaign_slug: campaignSlug || null,
title: title || null,
organization: organization || null,
phone: phone || null,
added_date: createdAt ? new Date(createdAt).toISOString() : null,
recipient_type: 'custom'
}
};
const result = await this.upsertSubscriber(subscriberData);
return { success: true, subscriberId: result.id };
} catch (error) {
logger.error('Failed to sync custom recipient:', error.message);
return { success: false, error: error.message };
}
}
// Sync an email log entry to Listmonk (generic public emails)
async syncEmailLog(emailData) {
// Map NocoDB field names for the Email Logs table
const senderEmail = emailData['Sender Email'];
const senderName = emailData['Sender Name'];
const recipientEmail = emailData['Recipient Email'];
const postalCode = emailData['Postal Code'];
const createdAt = emailData['CreatedAt'];
const sentAt = emailData['Sent At'];
if (!this.syncEnabled || !senderEmail) {
return { success: false, error: 'Sync disabled or no email provided' };
}
// Validate and clean the email address
const emailValidation = this.validateAndCleanEmail(senderEmail);
if (!emailValidation.valid) {
logger.warn(`Skipping invalid email log: ${senderEmail} - ${emailValidation.error}`);
return { success: false, error: emailValidation.error };
}
try {
const subscriberData = {
email: emailValidation.cleaned,
name: senderName || emailValidation.cleaned,
status: 'enabled',
lists: [this.lists.emailLogs.id],
attribs: {
last_sent: sentAt ? new Date(sentAt).toISOString() : (createdAt ? new Date(createdAt).toISOString() : new Date().toISOString()),
postal_code: postalCode || null,
last_recipient_email: recipientEmail || null,
source: 'email_logs'
}
};
const result = await this.upsertSubscriber(subscriberData);
return { success: true, subscriberId: result.id };
} catch (error) {
logger.error('Failed to sync email log:', error.message);
return { success: false, error: error.message };
}
}
// Upsert subscriber (create or update)
async upsertSubscriber(subscriberData) {
try {
const client = this.getClient();
// Try to find existing subscriber by email
const { data: searchResponse } = await client.get('/subscribers', {
params: { query: `subscribers.email = '${subscriberData.email}'` }
});
if (searchResponse.data.results && searchResponse.data.results.length > 0) {
// Update existing subscriber
const existingSubscriber = searchResponse.data.results[0];
const subscriberId = existingSubscriber.id;
// Merge lists (don't remove existing ones)
const existingLists = existingSubscriber.lists.map(l => l.id);
const newLists = [...new Set([...existingLists, ...subscriberData.lists])];
// Merge attributes
const mergedAttribs = {
...existingSubscriber.attribs,
...subscriberData.attribs
};
const updateData = {
...subscriberData,
lists: newLists,
attribs: mergedAttribs
};
const { data: updateResponse } = await client.put(`/subscribers/${subscriberId}`, updateData);
logger.info(`Updated subscriber: ${subscriberData.email}`);
return updateResponse.data;
} else {
// Create new subscriber
const { data: createResponse } = await client.post('/subscribers', subscriberData);
logger.info(`Created new subscriber: ${subscriberData.email}`);
return createResponse.data;
}
} catch (error) {
logger.error(`Failed to upsert subscriber ${subscriberData.email}:`, error.message);
throw error;
}
}
// Bulk sync campaign participants
async bulkSyncCampaignParticipants(emails, campaigns) {
if (!this.syncEnabled) {
return { total: 0, success: 0, failed: 0, errors: [] };
}
const results = {
total: emails.length,
success: 0,
failed: 0,
errors: []
};
// Create a map of campaign IDs to campaign data for quick lookup
const campaignMap = {};
if (campaigns && Array.isArray(campaigns)) {
console.log(`🔍 Building campaign map from ${campaigns.length} campaigns`);
campaigns.forEach((campaign, index) => {
// Show keys for first campaign to debug
if (index === 0) {
console.log('🔍 First campaign keys:', Object.keys(campaign));
}
// NocoDB returns 'ID' (all caps) as the system field for record ID
// This is what the 'Campaign ID' link field in emails table references
const id = campaign.ID || campaign.Id || campaign.id;
if (id) {
campaignMap[id] = campaign;
// Also map by slug for fallback lookup
const slug = campaign['Campaign Slug'];
if (slug) {
campaignMap[slug] = campaign;
}
const title = campaign['Campaign Title'];
console.log(`🔍 Mapped campaign ID ${id} and slug ${slug}: ${title}`);
} else {
console.log('⚠️ Campaign has no ID field! Keys:', Object.keys(campaign));
}
});
console.log(`🔍 Campaign map has ${Object.keys(campaignMap).length} entries`);
} else {
console.log('⚠️ No campaigns provided for mapping!');
}
for (const email of emails) {
try {
// Try to find campaign data by Campaign ID (link field) or Campaign Slug (text field)
const campaignId = email['Campaign ID'];
const campaignSlug = email['Campaign Slug'];
const campaignData = campaignId ? campaignMap[campaignId] : (campaignSlug ? campaignMap[campaignSlug] : null);
// Debug first email
if (emails.indexOf(email) === 0) {
console.log('🔍 First email keys:', Object.keys(email));
console.log('🔍 First email Campaign ID field:', email['Campaign ID']);
console.log('🔍 First email Campaign Slug field:', email['Campaign Slug']);
}
if (!campaignData && (campaignId || campaignSlug)) {
console.log(`⚠️ Campaign not found - ID: ${campaignId}, Slug: ${campaignSlug}. Available IDs:`, Object.keys(campaignMap).slice(0, 10));
}
const result = await this.syncCampaignParticipant(email, campaignData);
if (result.success) {
results.success++;
} else {
results.failed++;
const emailAddr = email['Sender Email'] || email.sender_email || 'unknown';
results.errors.push({
email: emailAddr,
error: result.error
});
}
} catch (error) {
results.failed++;
const emailAddr = email['Sender Email'] || email.sender_email || 'unknown';
results.errors.push({
email: emailAddr,
error: error.message
});
}
}
logger.info(`Bulk sync completed: ${results.success} succeeded, ${results.failed} failed`);
return results;
}
// Bulk sync custom recipients
async bulkSyncCustomRecipients(recipients, campaigns) {
if (!this.syncEnabled) {
return { total: 0, success: 0, failed: 0, errors: [] };
}
const results = {
total: recipients.length,
success: 0,
failed: 0,
errors: []
};
// Create a map of campaign IDs to campaign data for quick lookup
const campaignMap = {};
if (campaigns && Array.isArray(campaigns)) {
campaigns.forEach(campaign => {
// NocoDB returns 'ID' (all caps) as the system field for record ID
const id = campaign.ID || campaign.Id || campaign.id;
if (id) {
campaignMap[id] = campaign;
// Also map by slug for fallback lookup
const slug = campaign['Campaign Slug'];
if (slug) {
campaignMap[slug] = campaign;
}
}
});
}
for (const recipient of recipients) {
try {
// Try to find campaign data by Campaign ID or Campaign Slug
const campaignId = recipient['Campaign ID'];
const campaignSlug = recipient['Campaign Slug'];
const campaignData = campaignId ? campaignMap[campaignId] : (campaignSlug ? campaignMap[campaignSlug] : null);
const result = await this.syncCustomRecipient(recipient, campaignData);
if (result.success) {
results.success++;
} else {
results.failed++;
const emailAddr = recipient['Recipient Email'] || 'unknown';
results.errors.push({
email: emailAddr,
error: result.error
});
}
} catch (error) {
results.failed++;
const emailAddr = recipient['Recipient Email'] || 'unknown';
results.errors.push({
email: emailAddr,
error: error.message
});
}
}
logger.info(`Bulk sync custom recipients completed: ${results.success} succeeded, ${results.failed} failed`);
return results;
}
// Bulk sync email logs
async bulkSyncEmailLogs(emailLogs) {
if (!this.syncEnabled) {
return { total: 0, success: 0, failed: 0, errors: [] };
}
const results = {
total: emailLogs.length,
success: 0,
failed: 0,
errors: []
};
for (const emailLog of emailLogs) {
try {
const result = await this.syncEmailLog(emailLog);
if (result.success) {
results.success++;
} else {
results.failed++;
const emailAddr = emailLog['Sender Email'] || 'unknown';
results.errors.push({
email: emailAddr,
error: result.error
});
}
} catch (error) {
results.failed++;
const emailAddr = emailLog['Sender Email'] || 'unknown';
results.errors.push({
email: emailAddr,
error: error.message
});
}
}
logger.info(`Bulk sync email logs completed: ${results.success} succeeded, ${results.failed} failed`);
return results;
}
// Get list statistics
async getListStats() {
if (!this.syncEnabled) {
return null;
}
try {
const client = this.getClient();
const { data: listsResponse } = await client.get('/lists');
const stats = {};
const influenceLists = listsResponse.data.results.filter(list =>
list.tags && list.tags.includes('influence')
);
for (const list of influenceLists) {
stats[list.name] = {
name: list.name,
subscriber_count: list.subscriber_count || 0,
id: list.id
};
}
return stats;
} catch (error) {
logger.error('Failed to get list stats:', error.message);
return null;
}
}
// Get sync status
getSyncStatus() {
return {
enabled: this.syncEnabled,
connected: this.lastError === null,
lastError: this.lastError,
lastErrorTime: this.lastErrorTime,
listsInitialized: Object.values(this.lists).every(list => list !== null)
};
}
}
// Create singleton instance
const listmonkService = new ListmonkService();
// Initialize lists on startup if enabled
if (listmonkService.syncEnabled) {
listmonkService.initializeLists()
.then(async () => {
logger.info('✅ Listmonk service initialized successfully');
// Optional initial sync (only if explicitly enabled)
if (process.env.LISTMONK_INITIAL_SYNC === 'true') {
logger.info('🔄 Performing initial Listmonk sync for influence system...');
// Use setTimeout to delay initial sync to let app fully start
setTimeout(async () => {
try {
const nocodbService = require('./nocodb');
// Sync existing campaign participants
try {
// Use campaignEmails table (not emails) to get proper User Email/Name fields
const emailsData = await nocodbService.getAll(nocodbService.tableIds.campaignEmails);
const emails = emailsData?.list || [];
console.log('🔍 Initial sync - fetched campaign emails:', emails?.length || 0);
if (emails && emails.length > 0) {
console.log('🔍 First email full data:', JSON.stringify(emails[0], null, 2));
}
if (emails && emails.length > 0) {
const campaigns = await nocodbService.getAllCampaigns();
console.log('🔍 Campaigns fetched:', campaigns?.length || 0);
if (campaigns && campaigns.length > 0) {
console.log('🔍 First campaign full data:', JSON.stringify(campaigns[0], null, 2));
}
const emailResults = await listmonkService.bulkSyncCampaignParticipants(emails, campaigns);
logger.info(`📧 Initial campaign participants sync: ${emailResults.success} succeeded, ${emailResults.failed} failed`);
} else {
logger.warn('No campaign participants found for initial sync');
}
} catch (emailError) {
logger.warn('Initial campaign participants sync failed:', {
message: emailError.message,
stack: emailError.stack
});
}
// Sync existing custom recipients
try {
const recipientsData = await nocodbService.getAll(nocodbService.tableIds.customRecipients);
const recipients = recipientsData?.list || [];
console.log('🔍 Initial sync - fetched custom recipients:', recipients?.length || 0);
if (recipients && recipients.length > 0) {
const campaigns = await nocodbService.getAllCampaigns();
const recipientResults = await listmonkService.bulkSyncCustomRecipients(recipients, campaigns);
logger.info(`📋 Initial custom recipients sync: ${recipientResults.success} succeeded, ${recipientResults.failed} failed`);
} else {
logger.warn('No custom recipients found for initial sync');
}
} catch (recipientError) {
logger.warn('Initial custom recipients sync failed:', {
message: recipientError.message,
stack: recipientError.stack
});
}
logger.info('✅ Initial Listmonk sync completed');
} catch (error) {
logger.error('Initial Listmonk sync failed:', {
message: error.message,
stack: error.stack
});
}
}, 5000); // Wait 5 seconds for app to fully start
}
})
.catch(error => {
logger.error('Failed to initialize Listmonk service:', error.message);
});
}
module.exports = listmonkService;