308 lines
13 KiB
JavaScript
308 lines
13 KiB
JavaScript
const csv = require('csv-parse');
|
|
const { Readable } = require('stream');
|
|
const nocodbService = require('../services/nocodb');
|
|
const { forwardGeocode } = require('../services/geocoding');
|
|
const logger = require('../utils/logger');
|
|
const config = require('../config');
|
|
|
|
class DataConvertController {
|
|
constructor() {
|
|
// Bind methods to preserve 'this' context
|
|
this.processCSV = this.processCSV.bind(this);
|
|
this.parseCSV = this.parseCSV.bind(this);
|
|
this.saveGeocodedData = this.saveGeocodedData.bind(this);
|
|
}
|
|
|
|
// Process CSV upload and geocode addresses with SSE progress updates
|
|
async processCSV(req, res) {
|
|
try {
|
|
if (!req.file) {
|
|
return res.status(400).json({
|
|
success: false,
|
|
error: 'No file uploaded'
|
|
});
|
|
}
|
|
|
|
// Store the filename for later use in notes
|
|
const originalFilename = req.file.originalname;
|
|
|
|
// Set up SSE headers
|
|
res.writeHead(200, {
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
'Connection': 'keep-alive',
|
|
'X-Accel-Buffering': 'no' // Disable Nginx buffering
|
|
});
|
|
|
|
// Parse CSV
|
|
const results = await this.parseCSV(req.file.buffer);
|
|
|
|
if (!results || results.length === 0) {
|
|
res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV file is empty or invalid' })}\n\n`);
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
// Validate required address field
|
|
const hasAddressField = results[0].hasOwnProperty('address') ||
|
|
results[0].hasOwnProperty('Address') ||
|
|
results[0].hasOwnProperty('ADDRESS');
|
|
|
|
if (!hasAddressField) {
|
|
res.write(`data: ${JSON.stringify({ type: 'error', message: 'CSV must contain an "address" column' })}\n\n`);
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
// Send initial progress
|
|
res.write(`data: ${JSON.stringify({
|
|
type: 'start',
|
|
total: results.length
|
|
})}\n\n`);
|
|
res.flush && res.flush();
|
|
|
|
// Process all addresses
|
|
const processedData = [];
|
|
const errors = [];
|
|
const total = results.length;
|
|
|
|
// Process each address with progress updates
|
|
for (let i = 0; i < results.length; i++) {
|
|
const row = results[i];
|
|
const address = row.address || row.Address || row.ADDRESS;
|
|
|
|
// Send progress update
|
|
res.write(`data: ${JSON.stringify({
|
|
type: 'progress',
|
|
current: i + 1,
|
|
total: total,
|
|
address: address
|
|
})}\n\n`);
|
|
res.flush && res.flush();
|
|
|
|
try {
|
|
logger.info(`Geocoding ${i + 1}/${total}: ${address}`);
|
|
|
|
// Geocode the address
|
|
const geocodeResult = await forwardGeocode(address);
|
|
|
|
if (geocodeResult && geocodeResult.coordinates) {
|
|
const processedRow = {
|
|
...row,
|
|
latitude: geocodeResult.coordinates.lat,
|
|
longitude: geocodeResult.coordinates.lng,
|
|
'Geo-Location': `${geocodeResult.coordinates.lat};${geocodeResult.coordinates.lng}`,
|
|
geocoded_address: geocodeResult.formattedAddress || address,
|
|
geocode_success: true,
|
|
csv_filename: originalFilename // Include filename for notes
|
|
};
|
|
|
|
processedData.push(processedRow);
|
|
|
|
// Send success update
|
|
const successMessage = {
|
|
type: 'geocoded',
|
|
data: processedRow,
|
|
index: i
|
|
};
|
|
const successJson = JSON.stringify(successMessage);
|
|
logger.debug(`Sending geocoded update: ${successJson.length} chars`);
|
|
res.write(`data: ${successJson}\n\n`);
|
|
res.flush && res.flush(); // Ensure data is sent immediately
|
|
} else {
|
|
throw new Error('Geocoding failed - no coordinates returned');
|
|
}
|
|
|
|
} catch (error) {
|
|
logger.error(`Failed to geocode address: ${address}`, error);
|
|
const errorData = {
|
|
index: i,
|
|
address: address,
|
|
error: error.message
|
|
};
|
|
errors.push(errorData);
|
|
|
|
// Send error update
|
|
const errorMessage = {
|
|
type: 'error',
|
|
data: errorData
|
|
};
|
|
const errorJson = JSON.stringify(errorMessage);
|
|
logger.debug(`Sending error update: ${errorJson.length} chars`);
|
|
res.write(`data: ${errorJson}\n\n`);
|
|
res.flush && res.flush(); // Ensure data is sent immediately
|
|
}
|
|
|
|
// Add delay to avoid rate limiting
|
|
await new Promise(resolve => setTimeout(resolve, 2000));
|
|
}
|
|
|
|
// Send completion
|
|
const completeMessage = {
|
|
type: 'complete',
|
|
processed: processedData.length,
|
|
errors: errors.length,
|
|
total: total
|
|
};
|
|
const completeJson = JSON.stringify(completeMessage);
|
|
logger.info(`Sending completion message: ${completeJson.length} chars`);
|
|
res.write(`data: ${completeJson}\n\n`);
|
|
res.flush && res.flush(); // Ensure data is sent immediately
|
|
|
|
res.end();
|
|
|
|
} catch (error) {
|
|
logger.error('CSV processing error:', error);
|
|
res.write(`data: ${JSON.stringify({
|
|
type: 'fatal_error',
|
|
message: 'Failed to process CSV file',
|
|
error: error.message
|
|
})}\n\n`);
|
|
res.end();
|
|
}
|
|
}
|
|
|
|
// Parse CSV buffer into array of objects
|
|
async parseCSV(buffer) {
|
|
return new Promise((resolve, reject) => {
|
|
const results = [];
|
|
const stream = Readable.from(buffer);
|
|
|
|
stream
|
|
.pipe(csv.parse({
|
|
columns: true,
|
|
skip_empty_lines: true,
|
|
trim: true
|
|
}))
|
|
.on('data', (data) => results.push(data))
|
|
.on('error', reject)
|
|
.on('end', () => resolve(results));
|
|
});
|
|
}
|
|
|
|
// Enhanced save method that transforms data to match locations table structure
|
|
async saveGeocodedData(req, res) {
|
|
try {
|
|
const { data } = req.body;
|
|
|
|
if (!data || !Array.isArray(data)) {
|
|
return res.status(400).json({
|
|
success: false,
|
|
error: 'Invalid data format'
|
|
});
|
|
}
|
|
|
|
const results = {
|
|
success: 0,
|
|
failed: 0,
|
|
errors: []
|
|
};
|
|
|
|
// Process each location
|
|
for (const location of data) {
|
|
try {
|
|
// Transform to match locations table structure
|
|
const locationData = {
|
|
'Geo-Location': location['Geo-Location'],
|
|
latitude: parseFloat(location.latitude),
|
|
longitude: parseFloat(location.longitude),
|
|
Address: location.geocoded_address || location.address || location.Address,
|
|
created_by_user: req.session.userEmail || 'csv_import',
|
|
last_updated_by_user: req.session.userEmail || 'csv_import'
|
|
};
|
|
|
|
// Map CSV fields to NocoDB fields
|
|
const fieldMapping = {
|
|
'first name': 'First Name',
|
|
'firstname': 'First Name',
|
|
'first_name': 'First Name',
|
|
'last name': 'Last Name',
|
|
'lastname': 'Last Name',
|
|
'last_name': 'Last Name',
|
|
'email': 'Email',
|
|
'phone': 'Phone',
|
|
'unit': 'Unit Number',
|
|
'unit number': 'Unit Number',
|
|
'unit_number': 'Unit Number',
|
|
'support level': 'Support Level',
|
|
'support_level': 'Support Level',
|
|
'sign': 'Sign',
|
|
'sign size': 'Sign Size',
|
|
'sign_size': 'Sign Size',
|
|
'notes': 'Notes'
|
|
};
|
|
|
|
// Process all fields from CSV
|
|
Object.keys(location).forEach(key => {
|
|
const lowerKey = key.toLowerCase();
|
|
|
|
// Skip already processed fields
|
|
if (['latitude', 'longitude', 'geo-location', 'geocoded_address', 'geocode_success', 'address', 'csv_filename'].includes(lowerKey)) {
|
|
return;
|
|
}
|
|
|
|
// Check if we have a mapping for this field
|
|
if (fieldMapping[lowerKey]) {
|
|
const targetField = fieldMapping[lowerKey];
|
|
|
|
// Special handling for certain fields
|
|
if (targetField === 'Sign') {
|
|
// Convert to boolean
|
|
locationData[targetField] = ['true', 'yes', '1', 'y'].includes(String(location[key]).toLowerCase());
|
|
} else if (targetField === 'Support Level') {
|
|
// Ensure it's a string number 1-4
|
|
const level = parseInt(location[key]);
|
|
if (level >= 1 && level <= 4) {
|
|
locationData[targetField] = String(level);
|
|
}
|
|
} else if (targetField === 'Notes') {
|
|
// Append CSV filename info to existing notes
|
|
const existingNotes = location[key] || '';
|
|
const csvInfo = `Imported from CSV: ${location.csv_filename || 'unknown'}`;
|
|
locationData[targetField] = existingNotes ?
|
|
`${existingNotes} | ${csvInfo}` :
|
|
csvInfo;
|
|
} else {
|
|
locationData[targetField] = location[key];
|
|
}
|
|
}
|
|
});
|
|
|
|
// If no notes field was found in CSV, add the CSV import info
|
|
if (!locationData['Notes']) {
|
|
locationData['Notes'] = `Imported from CSV: ${location.csv_filename || 'unknown'}`;
|
|
}
|
|
|
|
// Create location in NocoDB
|
|
const result = await nocodbService.create(config.nocodb.tableId, locationData);
|
|
results.success++;
|
|
logger.debug(`Successfully saved location: ${locationData.Address}`);
|
|
|
|
} catch (error) {
|
|
logger.error('Failed to save location:', error);
|
|
logger.error('Location data:', locationData);
|
|
results.failed++;
|
|
results.errors.push({
|
|
address: location.address || location.Address || location.geocoded_address,
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
res.json({
|
|
success: true,
|
|
results: results
|
|
});
|
|
|
|
} catch (error) {
|
|
logger.error('Save geocoded data error:', error);
|
|
res.status(500).json({
|
|
success: false,
|
|
error: 'Failed to save locations'
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = new DataConvertController();
|