const csv = require('csv-parse'); const { Readable } = require('stream'); const nocodbService = require('../services/nocodb'); const { forwardGeocode } = require('../services/geocoding'); const logger = require('../utils/logger'); const config = require('../config'); // In-memory storage for processing results (in production, use Redis or database) const processingResults = new Map(); class DataConvertController { constructor() { // Bind methods to preserve 'this' context this.processCSV = this.processCSV.bind(this); this.parseCSV = this.parseCSV.bind(this); this.saveGeocodedData = this.saveGeocodedData.bind(this); this.downloadReport = this.downloadReport.bind(this); } // Process CSV upload and geocode addresses with SSE progress updates async processCSV(req, res) { try { if (!req.file) { return res.status(400).json({ success: false, error: 'No file uploaded' }); } // Store the filename for later use in notes const originalFilename = req.file.originalname; const sessionId = Date.now().toString(); // Simple session ID for storing results // Set up SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' // Disable Nginx buffering }); // Parse CSV const results = await this.parseCSV(req.file.buffer); if (!results || results.length === 0) { res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV file is empty or invalid' })}\n\n`); res.end(); return; } // Validate required address field const hasAddressField = results[0].hasOwnProperty('address') || results[0].hasOwnProperty('Address') || results[0].hasOwnProperty('ADDRESS'); if (!hasAddressField) { res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV must contain an "address" column' })}\n\n`); res.end(); return; } // Send initial progress res.write(`data: ${JSON.stringify({ type: 'start', total: results.length })}\n\n`); res.flush && res.flush(); // Process all addresses const processedData = []; const allResults = []; // Store ALL results for report generation const errors = []; const total = results.length; // Process each address with progress updates for (let i = 0; i < results.length; i++) { const row = results[i]; // Extract address - with better validation const addressField = row.address || row.Address || row.ADDRESS || row.street_address || row['Street Address'] || row.full_address || row['Full Address']; // Extract unit number if available const unitField = row.unit || row.Unit || row.UNIT || row.unit_number || row['Unit Number'] || row.unit_no; if (!addressField || addressField.trim() === '') { logger.warn(`Row ${i + 1}: Empty or missing address field`); const errorRow = { ...row, latitude: '', longitude: '', 'Geo-Location': '', geocoded_address: '', geocode_success: false, geocode_status: 'FAILED', geocode_error: 'Missing address field', csv_filename: originalFilename, row_number: i + 1 }; allResults.push(errorRow); errors.push({ index: i, address: 'No address provided', error: 'Missing address field' }); // Send progress update res.write(`data: ${JSON.stringify({ type: 'progress', current: i + 1, total: total, currentAddress: 'No address - skipping', status: 'failed' })}\n\n`); res.flush && res.flush(); continue; // Skip to next row } // Construct full address with unit if available let address = addressField.trim(); if (unitField && unitField.toString().trim()) { const unit = unitField.toString().trim(); // Add unit prefix if it doesn't already exist if (!unit.toLowerCase().startsWith('unit') && !unit.toLowerCase().startsWith('apt') && !unit.toLowerCase().startsWith('#')) { address = `Unit ${unit}, ${address}`; } else { address = `${unit}, ${address}`; } } // Send progress update res.write(`data: ${JSON.stringify({ type: 'progress', current: i + 1, total: total, currentAddress: address, status: 'processing' })}\n\n`); res.flush && res.flush(); try { logger.info(`Geocoding ${i + 1}/${total}: ${address}`); // Geocode the address const geocodeResult = await forwardGeocode(address); if (geocodeResult && geocodeResult.coordinates) { // Check if result is malformed const isMalformed = geocodeResult.validation && geocodeResult.validation.isMalformed; const confidence = geocodeResult.validation ? geocodeResult.validation.confidence : 100; const warnings = geocodeResult.validation ? geocodeResult.validation.warnings : []; const processedRow = { ...row, latitude: geocodeResult.coordinates.lat, longitude: geocodeResult.coordinates.lng, 'Geo-Location': `${geocodeResult.coordinates.lat};${geocodeResult.coordinates.lng}`, geocoded_address: geocodeResult.formattedAddress || address, geocode_success: true, geocode_status: isMalformed ? 'WARNING' : 'SUCCESS', geocode_error: '', confidence_score: confidence, warnings: warnings.join('; '), is_malformed: isMalformed, provider: geocodeResult.provider || 'Unknown', csv_filename: originalFilename, row_number: i + 1 }; processedData.push(processedRow); allResults.push(processedRow); // Send success update with status const successMessage = { type: 'geocoded', data: processedRow, index: i, status: isMalformed ? 'warning' : 'success', confidence: confidence, warnings: warnings }; const successJson = JSON.stringify(successMessage); logger.info(`Successfully geocoded: ${address} (Confidence: ${confidence}%)`); res.write(`data: ${successJson}\n\n`); res.flush && res.flush(); } else { throw new Error('Geocoding failed - no coordinates returned'); } } catch (error) { logger.error(`Failed to geocode address: ${address}`, error.message); // Create error row with original data plus error info const errorRow = { ...row, latitude: '', longitude: '', 'Geo-Location': '', geocoded_address: '', geocode_success: false, geocode_status: 'FAILED', geocode_error: error.message, confidence_score: 0, warnings: '', is_malformed: false, csv_filename: originalFilename, row_number: i + 1 }; allResults.push(errorRow); const errorData = { index: i, address: address, error: error.message }; errors.push(errorData); // Send error update const errorMessage = { type: 'error', data: errorData }; const errorJson = JSON.stringify(errorMessage); logger.debug(`Sending error update: ${errorJson.length} chars`); res.write(`data: ${errorJson}\n\n`); res.flush && res.flush(); // Ensure data is sent immediately } // Add delay to avoid rate limiting await new Promise(resolve => setTimeout(resolve, 2000)); } // Store processing results for report generation const successful = processedData.filter(r => r.geocode_status === 'SUCCESS').length; const warnings = processedData.filter(r => r.geocode_status === 'WARNING').length; const failed = errors.length; const malformed = processedData.filter(r => r.is_malformed).length; processingResults.set(sessionId, { filename: originalFilename, timestamp: new Date().toISOString(), allResults: allResults, summary: { total: total, successful: successful, warnings: warnings, failed: failed, malformed: malformed } }); // Send completion const completeMessage = { type: 'complete', processed: processedData.length, successful: successful, warnings: warnings, errors: errors.length, malformed: malformed, total: total, sessionId: sessionId // Include session ID for report download }; const completeJson = JSON.stringify(completeMessage); logger.info(`Sending completion message: ${completeJson.length} chars`); res.write(`data: ${completeJson}\n\n`); res.flush && res.flush(); // Ensure data is sent immediately res.end(); } catch (error) { logger.error('CSV processing error:', error); res.write(`data: ${JSON.stringify({ type: 'fatal_error', message: 'Failed to process CSV file', error: error.message })}\n\n`); res.end(); } } // Parse CSV buffer into array of objects async parseCSV(buffer) { return new Promise((resolve, reject) => { const results = []; const stream = Readable.from(buffer); stream .pipe(csv.parse({ columns: true, skip_empty_lines: true, trim: true })) .on('data', (data) => results.push(data)) .on('error', reject) .on('end', () => resolve(results)); }); } // Enhanced save method that transforms data to match locations table structure async saveGeocodedData(req, res) { try { const { data } = req.body; if (!data || !Array.isArray(data)) { return res.status(400).json({ success: false, error: 'Invalid data format' }); } const results = { success: 0, failed: 0, errors: [] }; // Process each location for (const location of data) { try { // Transform to match locations table structure // Preserve original address, don't overwrite with geocoded address const originalAddress = location.address || location.Address || location.ADDRESS; const geocodedAddress = location.geocoded_address; const locationData = { 'Geo-Location': location['Geo-Location'], latitude: parseFloat(location.latitude), longitude: parseFloat(location.longitude), Address: originalAddress, // Always use the original address from CSV created_by_user: req.session.userEmail || 'csv_import', last_updated_by_user: req.session.userEmail || 'csv_import' }; // Track if geocoded address differs from original const addressDiffers = geocodedAddress && geocodedAddress.toLowerCase() !== originalAddress.toLowerCase(); // Map CSV fields to NocoDB fields const fieldMapping = { 'first name': 'First Name', 'firstname': 'First Name', 'first_name': 'First Name', 'last name': 'Last Name', 'lastname': 'Last Name', 'last_name': 'Last Name', 'email': 'Email', 'phone': 'Phone', 'unit': 'Unit Number', 'unit number': 'Unit Number', 'unit_number': 'Unit Number', 'support level': 'Support Level', 'support_level': 'Support Level', 'sign': 'Sign', 'sign size': 'Sign Size', 'sign_size': 'Sign Size', 'notes': 'Notes' }; // Process all fields from CSV Object.keys(location).forEach(key => { const lowerKey = key.toLowerCase(); // Skip already processed fields if (['latitude', 'longitude', 'geo-location', 'geocoded_address', 'geocode_success', 'address', 'csv_filename'].includes(lowerKey)) { return; } // Check if we have a mapping for this field if (fieldMapping[lowerKey]) { const targetField = fieldMapping[lowerKey]; // Special handling for certain fields if (targetField === 'Sign') { // Convert to boolean locationData[targetField] = ['true', 'yes', '1', 'y'].includes(String(location[key]).toLowerCase()); } else if (targetField === 'Support Level') { // Ensure it's a string number 1-4 const level = parseInt(location[key]); if (level >= 1 && level <= 4) { locationData[targetField] = String(level); } } else if (targetField === 'Notes') { // Build notes with existing content, CSV info, and geocoding info const noteParts = []; // Add existing notes if present if (location[key]) { noteParts.push(location[key]); } // Add CSV import info noteParts.push(`Imported from CSV: ${location.csv_filename || 'unknown'}`); // Add geocoded address if it differs from original if (addressDiffers) { noteParts.push(`Geocoded as: ${geocodedAddress}`); } locationData[targetField] = noteParts.join(' | '); } else { locationData[targetField] = location[key]; } } }); // If no notes field was found in CSV, add the CSV import info and geocoding info if (!locationData['Notes']) { const noteParts = [`Imported from CSV: ${location.csv_filename || 'unknown'}`]; // Add geocoded address if it differs from original if (addressDiffers) { noteParts.push(`Geocoded as: ${geocodedAddress}`); } locationData['Notes'] = noteParts.join(' | '); } // Create location in NocoDB const result = await nocodbService.create(config.nocodb.tableId, locationData); results.success++; logger.debug(`Successfully saved location: ${locationData.Address}`); } catch (error) { logger.error('Failed to save location:', error); logger.error('Location data:', locationData); results.failed++; results.errors.push({ address: location.address || location.Address || location.ADDRESS, error: error.message }); } } res.json({ success: true, results: results }); } catch (error) { logger.error('Save geocoded data error:', error); res.status(500).json({ success: false, error: 'Failed to save locations' }); } } // Generate and download processing report async downloadReport(req, res) { try { const { sessionId } = req.params; if (!sessionId || !processingResults.has(sessionId)) { return res.status(404).json({ success: false, error: 'Processing results not found or expired' }); } const results = processingResults.get(sessionId); const { filename, timestamp, allResults, summary } = results; // Generate comprehensive report content const reportContent = this.generateComprehensiveReport(allResults, filename, timestamp, summary); // Set headers for text download const reportFilename = `geocoding-report-${sessionId}.txt`; res.setHeader('Content-Type', 'text/plain'); res.setHeader('Content-Disposition', `attachment; filename="${reportFilename}"`); res.setHeader('Cache-Control', 'no-cache'); logger.info(`Generating comprehensive report for session ${sessionId}: ${allResults.length} records`); res.send(reportContent); // Clean up stored results after download (optional) setTimeout(() => { processingResults.delete(sessionId); logger.info(`Cleaned up processing results for session ${sessionId}`); }, 60000); // Delete after 1 minute } catch (error) { logger.error('Download report error:', error); res.status(500).json({ success: false, error: 'Failed to generate report' }); } } // Generate comprehensive text report generateComprehensiveReport(results, originalFilename, timestamp, summary) { let report = `Geocoding Processing Report\n`; report += `Generated: ${timestamp}\n`; report += `Original File: ${originalFilename}\n`; report += `================================\n\n`; report += `Summary:\n`; report += `- Total Addresses: ${summary.total}\n`; report += `- Successfully Geocoded: ${summary.successful}\n`; report += `- Warnings (Low Confidence): ${summary.warnings}\n`; report += `- Failed: ${summary.failed}\n`; report += `- Potentially Malformed: ${summary.malformed}\n\n`; // Section for malformed addresses requiring review const malformedResults = results.filter(r => r.is_malformed); if (malformedResults.length > 0) { report += `ADDRESSES REQUIRING REVIEW (Potentially Malformed):\n`; report += `================================================\n`; malformedResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Original: ${originalAddress}\n`; report += ` Result: ${result.geocoded_address || 'N/A'}\n`; report += ` Confidence: ${result.confidence_score || 0}%\n`; if (result.warnings) { report += ` Warnings: ${result.warnings}\n`; } report += ` Coordinates: ${result.latitude || 'N/A'}, ${result.longitude || 'N/A'}\n`; report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Failed addresses section const failedResults = results.filter(r => r.geocode_status === 'FAILED'); if (failedResults.length > 0) { report += `FAILED GEOCODING ATTEMPTS:\n`; report += `========================\n`; failedResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Address: ${originalAddress}\n`; report += ` Error: ${result.geocode_error}\n`; report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Successful geocoding with low confidence const lowConfidenceResults = results.filter(r => r.geocode_status === 'SUCCESS' && r.confidence_score && r.confidence_score < 75 ); if (lowConfidenceResults.length > 0) { report += `LOW CONFIDENCE SUCCESSFUL GEOCODING:\n`; report += `==================================\n`; lowConfidenceResults.forEach((result, index) => { const originalAddress = result.address || result.Address || result.ADDRESS || 'N/A'; report += `\n${index + 1}. Original: ${originalAddress}\n`; report += ` Result: ${result.geocoded_address}\n`; report += ` Confidence: ${result.confidence_score}%\n`; if (result.warnings) { report += ` Warnings: ${result.warnings}\n`; } report += ` Row: ${result.row_number}\n`; }); report += `\n`; } // Summary statistics report += `DETAILED STATISTICS:\n`; report += `==================\n`; report += `Success Rate: ${((summary.successful / summary.total) * 100).toFixed(1)}%\n`; report += `Warning Rate: ${((summary.warnings / summary.total) * 100).toFixed(1)}%\n`; report += `Failure Rate: ${((summary.failed / summary.total) * 100).toFixed(1)}%\n`; report += `Malformed Rate: ${((summary.malformed / summary.total) * 100).toFixed(1)}%\n\n`; // Recommendations report += `RECOMMENDATIONS:\n`; report += `===============\n`; if (summary.malformed > 0) { report += `- Review ${summary.malformed} addresses marked as potentially malformed\n`; } if (summary.failed > 0) { report += `- Check ${summary.failed} failed addresses for formatting issues\n`; } if (summary.warnings > 0) { report += `- Verify ${summary.warnings} low confidence results manually\n`; } report += `- Consider using more specific address formats for better results\n`; report += `- Ensure addresses include proper directional indicators (NW, SW, etc.)\n`; return report; } // Generate CSV content for the report generateReportCSV(allResults, originalFilename, timestamp, summary) { if (!allResults || allResults.length === 0) { return 'No data available for report generation'; } // Get all unique field names from the results const allFields = new Set(); allResults.forEach(row => { Object.keys(row).forEach(field => allFields.add(field)); }); // Define the header order - put important fields first const priorityHeaders = [ 'geocode_status', 'geocode_error', 'address', 'Address', 'geocoded_address', 'latitude', 'longitude', 'Geo-Location' ]; const otherHeaders = Array.from(allFields).filter(field => !priorityHeaders.includes(field) && !['geocode_success', 'csv_filename'].includes(field) ).sort(); const headers = [...priorityHeaders.filter(h => allFields.has(h)), ...otherHeaders]; // Generate CSV header with metadata let csvContent = `# Geocoding Processing Report\n`; csvContent += `# Original File: ${originalFilename}\n`; csvContent += `# Processed: ${timestamp}\n`; csvContent += `# Total Records: ${summary.total}\n`; csvContent += `# Successful: ${summary.successful}\n`; csvContent += `# Failed: ${summary.failed}\n`; csvContent += `# \n`; // Add CSV headers csvContent += headers.map(header => this.escapeCSVField(header)).join(',') + '\n'; // Add data rows allResults.forEach(row => { const values = headers.map(header => { const value = row[header]; return this.escapeCSVField(value !== undefined && value !== null ? String(value) : ''); }); csvContent += values.join(',') + '\n'; }); return csvContent; } // Escape CSV fields properly escapeCSVField(field) { if (field === null || field === undefined) return ''; const stringField = String(field); // If field contains comma, quote, or newline, wrap in quotes and escape quotes if (stringField.includes(',') || stringField.includes('"') || stringField.includes('\n') || stringField.includes('\r')) { return '"' + stringField.replace(/"/g, '""') + '"'; } return stringField; } } module.exports = new DataConvertController();