import express from 'express'; import { Pool } from 'pg'; import AWS from 'aws-sdk'; import nodemailer from 'nodemailer'; import { simpleParser } from 'mailparser'; import { Base64 } from 'js-base64'; import { createGzip, gunzipSync } from 'zlib'; import { createLogger, format, transports } from 'winston'; import { config } from 'dotenv'; // Load environment variables config(); // Check Node.js version const [major] = process.versions.node.split('.').map(Number); if (major < 22) { throw new Error('Node.js 22 or higher required'); } // Logger setup const logger = createLogger({ level: 'info', format: format.combine( format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`) ), transports: [new transports.Console()] }); const app = express(); app.use(express.json({ limit: '20mb' })); app.use(express.urlencoded({ limit: '20mb', extended: true })); const SMTP_HOST = process.env.SMTP_HOST || 'localhost'; const SMTP_PORT = parseInt(process.env.SMTP_PORT || '25', 10); const API_TOKEN = process.env.API_TOKEN; const AWS_REGION = process.env.AWS_REGION || 'us-east-1'; const API_KEY = process.env.MAILCOW_API_KEY; const MAILCOW_API = process.env.MAILCOW_API; // PostgreSQL client const pool = new Pool({ user: process.env.PGUSER || 'email_user', password: process.env.PGPASSWORD || 'email_password', host: process.env.PGHOST || 'postgres', database: process.env.PGDATABASE || 'email_db', port: parseInt(process.env.PGPORT || '5433', 10) }); // AWS S3 client const s3Client = new AWS.S3({ region: AWS_REGION }); // Nodemailer transporter const transporter = nodemailer.createTransport({ host: SMTP_HOST, port: SMTP_PORT, secure: false, // Adjust if SMTP requires TLS tls: { rejectUnauthorized: false } }); // Utility to check if domain exists async function domainExists(domain) { try { const response = await fetch(`${MAILCOW_API}/get/domain/all`, { headers: { 'X-API-Key': API_KEY }, signal: AbortSignal.timeout(5000) }); if (!response.ok) throw new Error(`HTTP ${response.status}`); const domains = await response.json(); return domains.some(d => d.domain_name?.toLowerCase() === domain.toLowerCase()); } catch (error) { logger.error(`Error checking domain '${domain}': ${error.message}`); throw error; } } // Utility to check if inbox exists async function inboxExists(domain, localPart) { if (!(await domainExists(domain))) { logger.info(`Domain '${domain}' unknown – skip mailbox lookup`); return false; } try { const response = await fetch(`${MAILCOW_API}/get/mailbox/all/${domain}`, { headers: { 'X-API-Key': API_KEY }, signal: AbortSignal.timeout(5000) }); if (!response.ok) throw new Error(`HTTP ${response.status}`); const mailboxes = await response.json(); return mailboxes.some(m => m.local_part?.toLowerCase() === localPart.toLowerCase()); } catch (error) { logger.error(`Error checking inbox '${localPart}@${domain}': ${error.message}`); throw error; } } // Utility to mark email as processed in PostgreSQL async function markEmailAsProcessed(domain, key, status, processor = 'rest-api', fromAddr = null, toAddrs = []) { try { await pool.query( `INSERT INTO email_statuses (domain, s3_key, status, timestamp, processor, from_addr, to_addrs) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (domain, s3_key) DO UPDATE SET status = EXCLUDED.status, timestamp = EXCLUDED.timestamp, processor = EXCLUDED.processor, from_addr = EXCLUDED.from_addr, to_addrs = EXCLUDED.to_addrs`, [domain, key, status, Math.floor(Date.now() / 1000), processor, fromAddr, toAddrs] ); logger.info(`Marked ${domain}/${key} as ${status} in database`); return true; } catch (error) { logger.error(`Error marking ${domain}/${key} in database: ${error.message}`); return false; } } // Process endpoint app.post('/process/:domain', async (req, res) => { const { domain } = req.params; const auth = req.headers['authorization']; // Fixed: Use req.headers['authorization'] instead of req.headers.get if (auth !== `Bearer ${API_TOKEN}`) { return res.status(401).json({ error: 'Unauthorized' }); } const data = req.body; if (!data) { return res.status(400).json({ error: 'Invalid payload' }); } const requestId = data.request_id || 'no-request-id'; const payloadSummary = Object.fromEntries( Object.entries(data) .filter(([k, v]) => k !== 'email_content' || typeof v === 'string') .map(([k, v]) => [k, k === 'email_content' ? v.length : v]) ); logger.info(`[${requestId}] INCOMING POST /process/${domain}: payload_summary=${JSON.stringify(payloadSummary)}`); let recipients = []; let parser; let fromAddr = `lambda@${req.params.domain}`; try { // Decode and parse email const content = data.email_content; const compressed = data.compressed || false; const raw = Base64.decode(content); const emailBytes = compressed ? gunzipSync(Buffer.from(raw, 'binary')).toString('binary') : raw; const emailBuffer = Buffer.from(emailBytes, 'binary'); parser = await simpleParser(emailBuffer); fromAddr = parser.from?.value[0]?.address || `lambda@${domain}`; recipients = [ ...(parser.to?.value || []), ...(parser.cc?.value || []), ...(parser.bcc?.value || []) ].map(addr => addr.address).filter(Boolean); if (!recipients.length) { await markEmailAsProcessed(domain, data.s3_key, 'noRecipients', 'rest-api', fromAddr, []); return res.status(400).json({ error: 'No recipients' }); } // Filter valid recipients const validRecipients = []; for (const addr of recipients) { const [local, dom] = addr.split('@'); if (!dom || dom.toLowerCase() !== domain.toLowerCase()) { continue; } if (await inboxExists(domain, local)) { validRecipients.push(addr); } else { logger.info(`Skipping non-existent inbox: ${addr}`); } } if (!validRecipients.length) { logger.info(`[${requestId}] No valid inboxes for ${domain} – skip.`); await markEmailAsProcessed(domain, data.s3_key, 'unknownUser', 'rest-api', fromAddr, recipients); return res.status(404).json({ message: 'No valid inboxes – skipped' }); } // Send email await transporter.sendMail({ from: fromAddr, to: validRecipients, raw: emailBytes }); // Mark as processed await markEmailAsProcessed(domain, data.s3_key, 'true', 'rest-api', fromAddr, validRecipients); res.status(200).json({ message: 'Email forwarded', forwarded_to: validRecipients }); } catch (error) { logger.error(`[${requestId}] Error in /process/${domain}: ${error.message}`); await markEmailAsProcessed(domain, data.s3_key, 'error', 'rest-api', parser?.from?.value[0]?.address || `lambda@${domain}`, recipients); res.status(500).json({ error: 'Internal server error' }); } }); // Stats endpoint app.get('/stats/:domain', async (req, res) => { const { domain } = req.params; const auth = req.headers['authorization']; // Fixed: Use req.headers['authorization'] instead of req.headers.get if (auth !== `Bearer ${API_TOKEN}`) { return res.status(401).json({ error: 'Unauthorized' }); } const bucket = domain.replace(/\./g, '-') + '-emails'; let total = 0; const counts = { true: 0, unknownDomain: 0, unknownUser: 0, noRecipients: 0, error: 0 }; const details = { unknownDomain: [], unknownUser: [], noRecipients: [], error: [] }; try { // Fetch statuses from database const { rows: domainStatuses } = await pool.query( 'SELECT s3_key, status, from_addr, to_addrs FROM email_statuses WHERE domain = $1', [domain] ); const statusMap = domainStatuses.reduce((acc, row) => { acc[row.s3_key] = { status: row.status, from: row.from_addr, to: row.to_addrs || [] }; return acc; }, {}); // List S3 objects let continuationToken; do { const params = { Bucket: bucket, ContinuationToken: continuationToken }; const data = await s3Client.listObjectsV2(params).promise(); continuationToken = data.NextContinuationToken; for (const obj of data.Contents || []) { const key = obj.Key; total += 1; const statusInfo = statusMap[key] || { status: 'none' }; const status = statusInfo.status; if (status in counts) { counts[status] += 1; } if (status in details) { try { const objData = await s3Client.getObject({ Bucket: bucket, Key: key }).promise(); const parser = new MailParser(); await new Promise((resolve, reject) => { parser.on('error', reject); parser.on('end', resolve); parser.write(objData.Body); parser.end(); }); const fromAddr = parser.from?.value[0]?.address || null; const toAddrs = [ ...(parser.to?.value || []), ...(parser.cc?.value || []), ...(parser.bcc?.value || []) ].map(addr => addr.address).filter(Boolean); details[status].push({ key, from: statusInfo.from || fromAddr, to: statusInfo.to || toAddrs }); } catch (error) { logger.error(`Error parsing ${bucket}/${key}: ${error.message}`); } } } } while (continuationToken); const result = { domain, total_messages: total, successful: counts.true, wrong_domain: counts.unknownDomain, unknown_user: counts.unknownUser, no_recipients: counts.noRecipients, errors: counts.error, details }; logger.info(`Stats for ${domain}: ${JSON.stringify(result)}`); res.status(200).json(result); } catch (error) { logger.error(`Error in /stats/${domain}: ${error.message}`); res.status(500).json({ error: 'Internal server error' }); } }); // Start server app.listen(5000, '0.0.0.0', () => { logger.info('Server running on http://0.0.0.0:5000'); });