const csv = require('csv-parse'); const { Readable } = require('stream'); const nocodbService = require('../services/nocodb'); const { forwardGeocode } = require('../services/geocoding'); const logger = require('../utils/logger'); const config = require('../config'); // In-memory storage for processing results (in production, use Redis or database) const processingResults = new Map(); class DataConvertController { constructor() { // Bind methods to preserve 'this' context this.processCSV = this.processCSV.bind(this); this.parseCSV = this.parseCSV.bind(this); this.saveGeocodedData = this.saveGeocodedData.bind(this); this.downloadReport = this.downloadReport.bind(this); this.scanAndGeocode = this.scanAndGeocode.bind(this); } // Process CSV upload and geocode addresses with SSE progress updates async processCSV(req, res) { try { if (!req.file) { return res.status(400).json({ success: false, error: 'No file uploaded' }); } // Store the filename for later use in notes const originalFilename = req.file.originalname; const sessionId = Date.now().toString(); // Simple session ID for storing results // Set up SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' // Disable Nginx buffering }); // Parse CSV const results = await this.parseCSV(req.file.buffer); if (!results || results.length === 0) { res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV file is empty or invalid' })}\n\n`); res.end(); return; } // Validate required address field const hasAddressField = results[0].hasOwnProperty('address') || results[0].hasOwnProperty('Address') || results[0].hasOwnProperty('ADDRESS'); if (!hasAddressField) { res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV must contain an "address" column' })}\n\n`); res.end(); return; } // Send initial progress res.write(`data: ${JSON.stringify({ type: 'start', total: results.length })}\n\n`); res.flush && res.flush(); // Process all addresses const processedData = []; const allResults = []; // Store ALL results for report generation const errors = []; const total = results.length; // Process each address with progress updates for (let i = 0; i < results.length; i++) { const row = results[i]; // Extract address - with better validation const addressField = row.address || row.Address || row.ADDRESS || row.street_address || row['Street Address'] || row.full_address || row['Full Address']; // Extract unit number if available const unitField = row.unit || row.Unit || row.UNIT || row.unit_number || row['Unit Number'] || row.unit_no; if (!addressField || addressField.trim() === '') { logger.warn(`Row ${i + 1}: Empty or missing address field`); const errorRow = { ...row, latitude: '', longitude: '', 'Geo-Location': '', geocoded_address: '', geocode_success: false, geocode_status: 'FAILED', geocode_error: 'Missing address field', csv_filename: originalFilename, row_number: i + 1 }; allResults.push(errorRow); errors.push({ index: i, address: 'No address provided', error: 'Missing address field' }); // Send progress update res.write(`data: ${JSON.stringify({ type: 'progress', current: i + 1, total: total, currentAddress: 'No address - skipping', status: 'failed' })}\n\n`); res.flush && res.flush(); continue; // Skip to next row } // Construct full address with unit if available let address = addressField.trim(); if (unitField && unitField.toString().trim()) { const unit = unitField.toString().trim(); // Add unit prefix if it doesn't already exist if (!unit.toLowerCase().startsWith('unit') && !unit.toLowerCase().startsWith('apt') && !unit.toLowerCase().startsWith('#')) { address = `Unit ${unit}, ${address}`; } else { address = `${unit}, ${address}`; } } // Send progress update res.write(`data: ${JSON.stringify({ type: 'progress', current: i + 1, total: total, currentAddress: address, status: 'processing' })}\n\n`); res.flush && res.flush(); try { logger.info(`Geocoding ${i + 1}/${total}: ${address}`); // Geocode the address const geocodeResult = await forwardGeocode(address); if (geocodeResult && geocodeResult.coordinates) { // Check if result is malformed const isMalformed = geocodeResult.validation && geocodeResult.validation.isMalformed; // Use combined confidence for best overall assessment const confidence = geocodeResult.combinedConfidence !== undefined ? geocodeResult.combinedConfidence : (geocodeResult.validation ? geocodeResult.validation.confidence : 100); const warnings = geocodeResult.validation ? geocodeResult.validation.warnings : []; const processedRow = { ...row, latitude: geocodeResult.coordinates.lat, longitude: geocodeResult.coordinates.lng, 'Geo-Location': `${geocodeResult.coordinates.lat};${geocodeResult.coordinates.lng}`, geocoded_address: geocodeResult.formattedAddress || address, geocode_success: true, geocode_status: isMalformed ? 'WARNING' : 'SUCCESS', geocode_error: '', confidence_score: confidence, provider_confidence: geocodeResult.providerConfidence || null, validation_confidence: geocodeResult.validation ? geocodeResult.validation.confidence : null, warnings: warnings.join('; '), is_malformed: isMalformed, provider: geocodeResult.provider || 'Unknown', csv_filename: originalFilename, row_number: i + 1 }; processedData.push(processedRow); allResults.push(processedRow); // Send success update with status const successMessage = { type: 'geocoded', data: processedRow, index: i, status: isMalformed ? 'warning' : 'success', confidence: confidence, warnings: warnings }; const successJson = JSON.stringify(successMessage); logger.info(`Successfully geocoded: ${address} (Confidence: ${confidence}%)`); res.write(`data: ${successJson}\n\n`); res.flush && res.flush(); } else { throw new Error('Geocoding failed - no coordinates returned'); } } catch (error) { logger.error(`Failed to geocode address: ${address}`, error.message); // Create error row with original data plus error info const errorRow = { ...row, latitude: '', longitude: '', 'Geo-Location': '', geocoded_address: '', geocode_success: false, geocode_status: 'FAILED', geocode_error: error.message, confidence_score: 0, warnings: '', is_malformed: false, csv_filename: originalFilename, row_number: i + 1 }; allResults.push(errorRow); const errorData = { index: i, address: address, error: error.message }; errors.push(errorData); // Send error update const errorMessage = { type: 'error', data: errorData }; const errorJson = JSON.stringify(errorMessage); logger.debug(`Sending error update: ${errorJson.length} chars`); res.write(`data: ${errorJson}\n\n`); res.flush && res.flush(); // Ensure data is sent immediately } // Add delay to avoid rate limiting await new Promise(resolve => setTimeout(resolve, 2000)); } // Store processing results for report generation const successful = processedData.filter(r => r.geocode_status === 'SUCCESS').length; const warnings = processedData.filter(r => r.geocode_status === 'WARNING').length; const failed = errors.length; const malformed = processedData.filter(r => r.is_malformed).length; processingResults.set(sessionId, { filename: originalFilename, timestamp: new Date().toISOString(), allResults: allResults, summary: { total: total, successful: successful, warnings: warnings, failed: failed, malformed: malformed } }); // Send completion const completeMessage = { type: 'complete', processed: processedData.length, successful: successful, warnings: warnings, errors: errors.length, malformed: malformed, total: total, sessionId: sessionId // Include session ID for report download }; const completeJson = JSON.stringify(completeMessage); logger.info(`Sending completion message: ${completeJson.length} chars`); res.write(`data: ${completeJson}\n\n`); res.flush && res.flush(); // Ensure data is sent immediately res.end(); } catch (error) { logger.error('CSV processing error:', error); res.write(`data: ${JSON.stringify({ type: 'fatal_error', message: 'Failed to process CSV file', error: error.message })}\n\n`); res.end(); } } // Parse CSV buffer into array of objects async parseCSV(buffer) { return new Promise((resolve, reject) => { const results = []; const stream = Readable.from(buffer); stream .pipe(csv.parse({ columns: true, skip_empty_lines: true, trim: true })) .on('data', (data) => results.push(data)) .on('error', reject) .on('end', () => resolve(results)); }); } // Enhanced save method that transforms data to match locations table structure async saveGeocodedData(req, res) { try { const { data } = req.body; if (!data || !Array.isArray(data)) { return res.status(400).json({ success: false, error: 'Invalid data format' }); } const results = { success: 0, failed: 0, errors: [] }; // Process each location for (const location of data) { try { // Transform to match locations table structure // Preserve original address, don't overwrite with geocoded address const originalAddress = location.address || location.Address || location.ADDRESS; const geocodedAddress = location.geocoded_address; const locationData = { 'Geo-Location': location['Geo-Location'], latitude: parseFloat(location.latitude), longitude: parseFloat(location.longitude), Address: originalAddress, // Always use the original address from CSV 'Geocode Confidence': location.confidence_score || null, // Add confidence score 'Geocode Provider': location.provider || null, // Add provider name created_by_user: req.session.userEmail || 'csv_import', last_updated_by_user: req.session.userEmail || 'csv_import' }; // Track if geocoded address differs from original const addressDiffers = geocodedAddress && geocodedAddress.toLowerCase() !== originalAddress.toLowerCase(); // Map CSV fields to NocoDB fields const fieldMapping = { 'first name': 'First Name', 'firstname': 'First Name', 'first_name': 'First Name', 'last name': 'Last Name', 'lastname': 'Last Name', 'last_name': 'Last Name', 'email': 'Email', 'phone': 'Phone', 'unit': 'Unit Number', 'unit number': 'Unit Number', 'unit_number': 'Unit Number', 'support level': 'Support Level', 'support_level': 'Support Level', 'sign': 'Sign', 'sign size': 'Sign Size', 'sign_size': 'Sign Size', 'notes': 'Notes' }; // Process all fields from CSV Object.keys(location).forEach(key => { const lowerKey = key.toLowerCase(); // Skip already processed fields if (['latitude', 'longitude', 'geo-location', 'geocoded_address', 'geocode_success', 'address', 'csv_filename', 'confidence_score', 'provider_confidence', 'validation_confidence', 'warnings', 'is_malformed', 'provider', 'row_number', 'geocode_status', 'geocode_error'].includes(lowerKey)) { return; } // Check if we have a mapping for this field if (fieldMapping[lowerKey]) { const targetField = fieldMapping[lowerKey]; // Special handling for certain fields if (targetField === 'Sign') { // Convert to boolean locationData[targetField] = ['true', 'yes', '1', 'y'].includes(String(location[key]).toLowerCase()); } else if (targetField === 'Support Level') { // Ensure it's a string number 1-4 const level = parseInt(location[key]); if (level >= 1 && level <= 4) { locationData[targetField] = String(level); } } else if (targetField === 'Notes') { // Build notes with existing content, CSV info, and geocoding info const noteParts = []; // Add existing notes if present if (location[key]) { noteParts.push(location[key]); } // Add CSV import info noteParts.push(`Imported from CSV: ${location.csv_filename || 'unknown'}`); // Add geocoded address if it differs from original if (addressDiffers) { noteParts.push(`Geocoded as: ${geocodedAddress}`); } // Add confidence information if available if (location.confidence_score !== undefined && location.confidence_score !== null) { noteParts.push(`Geocode confidence: ${location.confidence_score}%`); } // Add provider information if available if (location.provider) { noteParts.push(`Provider: ${location.provider}`); } // Add warnings if present if (location.warnings && location.warnings.trim()) { noteParts.push(`Warnings: ${location.warnings}`); } locationData[targetField] = noteParts.join(' | '); } else { locationData[targetField] = location[key]; } } }); // If no notes field was found in CSV, add the CSV import info and geocoding info if (!locationData['Notes']) { const noteParts = [`Imported from CSV: ${location.csv_filename || 'unknown'}`]; // Add geocoded address if it differs from original if (addressDiffers) { noteParts.push(`Geocoded as: ${geocodedAddress}`); } // Add confidence information if available if (location.confidence_score !== undefined && location.confidence_score !== null) { noteParts.push(`Geocode confidence: ${location.confidence_score}%`); } // Add provider information if available if (location.provider) { noteParts.push(`Provider: ${location.provider}`); } // Add warnings if present if (location.warnings && location.warnings.trim()) { noteParts.push(`Warnings: ${location.warnings}`); } locationData['Notes'] = noteParts.join(' | '); } // Create location in NocoDB const result = await nocodbService.create(config.nocodb.tableId, locationData); results.success++; logger.debug(`Successfully saved location: ${locationData.Address}`); } catch (error) { logger.error('Failed to save location:', error); logger.error('Location data:', locationData); results.failed++; results.errors.push({ address: location.address || location.Address || location.ADDRESS, error: error.message }); } } res.json({ success: true, results: results }); } catch (error) { logger.error('Save geocoded data error:', error); res.status(500).json({ success: false, error: 'Failed to save locations' }); } } // Generate and download processing report async downloadReport(req, res) { try { const { sessionId } = req.params; const format = req.query.format || 'csv'; // Default to CSV, support 'txt' for backward compatibility if (!sessionId || !processingResults.has(sessionId)) { return res.status(404).json({ success: false, error: 'Processing results not found or expired' }); } const results = processingResults.get(sessionId); const { filename, timestamp, allResults, summary } = results; let reportContent, contentType, fileExtension; if (format === 'csv') { // Generate CSV report reportContent = this.generateReportCSV(allResults, filename, timestamp, summary); contentType = 'text/csv'; fileExtension = 'csv'; } else { // Generate text report (backward compatibility) reportContent = this.generateComprehensiveReport(allResults, filename, timestamp, summary); contentType = 'text/plain'; fileExtension = 'txt'; } // Set headers for download const reportFilename = `geocoding-report-${sessionId}.${fileExtension}`; res.setHeader('Content-Type', contentType); res.setHeader('Content-Disposition', `attachment; filename="${reportFilename}"`); res.setHeader('Cache-Control', 'no-cache'); logger.info(`Generating ${format.toUpperCase()} report for session ${sessionId}: ${allResults.length} records`); res.send(reportContent); // Clean up stored results after download (optional) setTimeout(() => { processingResults.delete(sessionId); logger.info(`Cleaned up processing results for session ${sessionId}`); }, 60000); // Delete after 1 minute } catch (error) { logger.error('Download report error:', error); res.status(500).json({ success: false, error: 'Failed to generate report' }); } } // Generate comprehensive text report generateComprehensiveReport(results, originalFilename, timestamp, summary) { let report = `Geocoding Processing Report\n`; report += `Generated: ${timestamp}\n`; report += `Original File: ${originalFilename}\n`; report += `================================\n\n`; report += `Summary:\n`; report += `- Total Addresses: ${summary.total}\n`; report += `- Successfully Geocoded: ${summary.successful}\n`; report += `- Warnings (Low Confidence): ${summary.warnings}\n`; report += `- Failed: ${summary.failed}\n`; report += `- Potentially Malformed: ${summary.malformed}\n\n`; // Section for malformed addresses requiring review const malformedResults = results.filter(r => r.is_malformed); if (malformedResults.length > 0) { report += `ADDRESSES REQUIRING REVIEW (Potentially Malformed):\n`; report += `================================================\n`; malformedResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Original: ${originalAddress}\n`; report += ` Result: ${result.geocoded_address || 'N/A'}\n`; report += ` Confidence: ${result.confidence_score || 0}%\n`; if (result.warnings) { report += ` Warnings: ${result.warnings}\n`; } report += ` Coordinates: ${result.latitude || 'N/A'}, ${result.longitude || 'N/A'}\n`; report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Failed addresses section const failedResults = results.filter(r => r.geocode_status === 'FAILED'); if (failedResults.length > 0) { report += `FAILED GEOCODING ATTEMPTS:\n`; report += `========================\n`; failedResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Address: ${originalAddress}\n`; report += ` Error: ${result.geocode_error}\n`; report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Successful geocoding with low confidence const lowConfidenceResults = results.filter(r => r.geocode_status === 'SUCCESS' && r.confidence_score && r.confidence_score < 75 ); if (lowConfidenceResults.length > 0) { report += `LOW CONFIDENCE SUCCESSFUL GEOCODING:\n`; report += `==================================\n`; lowConfidenceResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Original: ${originalAddress}\n`; report += ` Result: ${result.geocoded_address}\n`; report += ` Confidence: ${result.confidence_score}%\n`; if (result.warnings) { report += ` Warnings: ${result.warnings}\n`; } report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Summary statistics report += `DETAILED STATISTICS:\n`; report += `==================\n`; report += `Success Rate: ${((summary.successful / summary.total) * 100).toFixed(1)}%\n`; report += `Warning Rate: ${((summary.warnings / summary.total) * 100).toFixed(1)}%\n`; report += `Failure Rate: ${((summary.failed / summary.total) * 100).toFixed(1)}%\n`; report += `Malformed Rate: ${((summary.malformed / summary.total) * 100).toFixed(1)}%\n\n`; // Recommendations report += `RECOMMENDATIONS:\n`; report += `===============\n`; if (summary.malformed > 0) { report += `- Review ${summary.malformed} addresses marked as potentially malformed\n`; } if (summary.failed > 0) { report += `- Check ${summary.failed} failed addresses for formatting issues\n`; } if (summary.warnings > 0) { report += `- Verify ${summary.warnings} low confidence results manually\n`; } report += `- Consider using more specific address formats for better results\n`; report += `- Ensure addresses include proper directional indicators (NW, SW, etc.)\n`; return report; } // Generate CSV content for the report generateReportCSV(allResults, originalFilename, timestamp, summary) { if (!allResults || allResults.length === 0) { return 'No data available for report generation'; } // Get all unique field names from the results const allFields = new Set(); allResults.forEach(row => { Object.keys(row).forEach(field => allFields.add(field)); }); // Define the header order - put important fields first const priorityHeaders = [ 'geocode_status', 'geocode_error', 'address', 'Address', 'geocoded_address', 'latitude', 'longitude', 'Geo-Location' ]; const otherHeaders = Array.from(allFields).filter(field => !priorityHeaders.includes(field) && !['geocode_success', 'csv_filename'].includes(field) ).sort(); const headers = [...priorityHeaders.filter(h => allFields.has(h)), ...otherHeaders]; // Generate CSV header with metadata let csvContent = `# Geocoding Processing Report\n`; csvContent += `# Original File: ${originalFilename}\n`; csvContent += `# Processed: ${timestamp}\n`; csvContent += `# Total Records: ${summary.total}\n`; csvContent += `# Successful: ${summary.successful}\n`; csvContent += `# Failed: ${summary.failed}\n`; csvContent += `# \n`; // Add CSV headers csvContent += headers.map(header => this.escapeCSVField(header)).join(',') + '\n'; // Add data rows allResults.forEach(row => { const values = headers.map(header => { const value = row[header]; return this.escapeCSVField(value !== undefined && value !== null ? String(value) : ''); }); csvContent += values.join(',') + '\n'; }); return csvContent; } // Escape CSV fields properly escapeCSVField(field) { if (field === null || field === undefined) return ''; const stringField = String(field); // If field contains comma, quote, or newline, wrap in quotes and escape quotes if (stringField.includes(',') || stringField.includes('"') || stringField.includes('\n') || stringField.includes('\r')) { return '"' + stringField.replace(/"/g, '""') + '"'; } return stringField; } // Scan NocoDB database for records missing geo-location data and geocode them async scanAndGeocode(req, res) { try { const sessionId = Date.now().toString(); // Set up SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' }); logger.info(`Starting database scan for missing geo-location data (session: ${sessionId})`); // Send initial status res.write(`data: ${JSON.stringify({ type: 'status', message: 'Scanning database for records missing geo-location data...', sessionId: sessionId })}\n\n`); res.flush && res.flush(); // Fetch all records from NocoDB let allRecords = []; let offset = 0; const limit = 100; // Process in batches let hasMoreRecords = true; while (hasMoreRecords) { try { const response = await nocodbService.getAll(config.nocodb.tableId, { limit, offset }); if (response && response.list && response.list.length > 0) { allRecords.push(...response.list); offset += limit; // Send progress update res.write(`data: ${JSON.stringify({ type: 'scanning', message: `Fetched ${allRecords.length} records from database...`, count: allRecords.length })}\n\n`); res.flush && res.flush(); // Check if we've fetched all records hasMoreRecords = response.list.length === limit; } else { hasMoreRecords = false; } } catch (error) { logger.error('Error fetching records from NocoDB:', error); res.write(`data: ${JSON.stringify({ type: 'error', message: `Error fetching records: ${error.message}` })}\n\n`); res.end(); return; } } logger.info(`Database scan complete: found ${allRecords.length} total records`); // Filter records that need geocoding const recordsNeedingGeocode = allRecords.filter(record => { // Check if record is missing geo-location data const hasGeoLocation = record['Geo-Location'] && record['Geo-Location'].trim() !== '' && record['Geo-Location'] !== 'null'; const hasCoordinates = (record.latitude && record.longitude) || (record.Latitude && record.Longitude); const hasAddress = record.Address || record.address || record.ADDRESS; return !hasGeoLocation && !hasCoordinates && hasAddress; }); const totalToGeocode = recordsNeedingGeocode.length; logger.info(`Found ${totalToGeocode} records needing geocoding`); // Send summary res.write(`data: ${JSON.stringify({ type: 'scan_complete', message: `Scan complete: ${totalToGeocode} records need geocoding`, total: allRecords.length, needingGeocode: totalToGeocode })}\n\n`); res.flush && res.flush(); if (totalToGeocode === 0) { res.write(`data: ${JSON.stringify({ type: 'complete', message: 'No records found that need geocoding. All records already have location data!', results: { success: 0, failed: 0, skipped: allRecords.length } })}\n\n`); res.end(); return; } // Process geocoding const results = { success: 0, failed: 0, errors: [], sessionId: sessionId }; const allResults = []; let processedCount = 0; for (const record of recordsNeedingGeocode) { try { processedCount++; const address = record.Address || record.address || record.ADDRESS; // Send progress update res.write(`data: ${JSON.stringify({ type: 'progress', current: processedCount, total: totalToGeocode, currentAddress: address, status: 'processing' })}\n\n`); res.flush && res.flush(); logger.info(`Geocoding ${processedCount}/${totalToGeocode}: ${address} (Record ID: ${record.id || record.Id || record.ID})`); // Geocode the address const geocodeResult = await forwardGeocode(address); if (geocodeResult && geocodeResult.coordinates) { // Check if result is malformed const isMalformed = geocodeResult.validation && geocodeResult.validation.isMalformed; // Use combined confidence for best overall assessment const confidence = geocodeResult.combinedConfidence !== undefined ? geocodeResult.combinedConfidence : (geocodeResult.validation ? geocodeResult.validation.confidence : 100); const warnings = geocodeResult.validation ? geocodeResult.validation.warnings : []; // Update the record in NocoDB const updateData = { 'Geo-Location': `${geocodeResult.coordinates.lat};${geocodeResult.coordinates.lng}`, latitude: geocodeResult.coordinates.lat, longitude: geocodeResult.coordinates.lng, 'Geocode Confidence': confidence, 'Geocode Provider': geocodeResult.provider || 'Unknown', last_updated_by_user: req.session?.userEmail || 'scan_geocode' }; // Update the record in NocoDB await nocodbService.update(config.nocodb.tableId, record.id || record.Id || record.ID, updateData); const processedRecord = { id: record.id || record.Id || record.ID, address: address, latitude: geocodeResult.coordinates.lat, longitude: geocodeResult.coordinates.lng, confidence_score: confidence, provider: geocodeResult.provider || 'Unknown', status: isMalformed ? 'WARNING' : 'SUCCESS', warnings: warnings.join('; ') }; allResults.push(processedRecord); results.success++; // Send success update const successMessage = { type: 'geocoded', data: processedRecord, index: processedCount - 1, status: isMalformed ? 'warning' : 'success', confidence: confidence, warnings: warnings }; logger.info(`✓ Successfully geocoded and updated: ${address} (Confidence: ${confidence}%)`); res.write(`data: ${JSON.stringify(successMessage)}\n\n`); res.flush && res.flush(); } else { throw new Error('Geocoding failed - no coordinates returned'); } } catch (error) { logger.error(`Failed to geocode record ${processedCount}/${totalToGeocode}:`, error.message); const errorRecord = { id: record.id || record.Id || record.ID, address: record.Address || record.address || record.ADDRESS, error: error.message, status: 'ERROR' }; allResults.push(errorRecord); results.failed++; results.errors.push({ address: errorRecord.address, error: error.message }); // Send error update res.write(`data: ${JSON.stringify({ type: 'error', data: errorRecord, index: processedCount - 1, message: `Failed to geocode: ${errorRecord.address}` })}\n\n`); res.flush && res.flush(); } // Rate limiting to be nice to geocoding APIs if (processedCount < totalToGeocode) { await new Promise(resolve => setTimeout(resolve, 500)); // 0.5 second delay between requests } } // Calculate summary statistics for report const successful = allResults.filter(r => r.status === 'SUCCESS').length; const warnings = allResults.filter(r => r.status === 'WARNING').length; const failed = allResults.filter(r => r.status === 'ERROR').length; const malformed = allResults.filter(r => r.warnings && r.warnings.includes('malformed')).length; const total = successful + warnings + failed; // Transform scan results to match CSV processing format for report generation const transformedResults = allResults.map(result => ({ // Original format fields address: result.address, Address: result.address, geocoded_address: result.address, // For scan, this is the same latitude: result.latitude, longitude: result.longitude, 'Geo-Location': result.latitude && result.longitude ? `${result.latitude};${result.longitude}` : '', confidence_score: result.confidence_score, provider: result.provider, // Status mapping geocode_success: result.status !== 'ERROR', geocode_status: result.status, geocode_error: result.error || '', is_malformed: result.warnings && result.warnings.includes('malformed'), warnings: result.warnings || '', // Scan-specific fields record_id: result.id, source: 'database_scan', row_number: result.id // Use record ID as row number for scan })); // Store results for potential report download processingResults.set(sessionId, { filename: 'database_scan', timestamp: new Date().toISOString(), allResults: transformedResults, summary: { total: total, successful: successful, warnings: warnings, failed: failed, malformed: malformed } }); // Send completion message logger.info(`Database scan and geocoding completed: ${results.success} successful, ${results.failed} failed`); res.write(`data: ${JSON.stringify({ type: 'complete', message: `Scan and geocode completed! Successfully updated ${results.success} records, ${results.failed} failed.`, results: results, sessionId: sessionId })}\n\n`); res.end(); } catch (error) { logger.error('Database scan error:', error); res.write(`data: ${JSON.stringify({ type: 'error', message: `Database scan failed: ${error.message}` })}\n\n`); res.end(); } } } module.exports = new DataConvertController();