From 493743e8aacc8fb689b48f18e00388f36437b928 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Mon, 7 Jul 2025 14:53:41 -0500 Subject: [PATCH] updated --- email_api/docker-compose.yml | 9 +- email_api/email_api/app.js | 300 +++++++++++++++++++++++++++++++ email_api/email_api/package.json | 19 +- 3 files changed, 313 insertions(+), 15 deletions(-) diff --git a/email_api/docker-compose.yml b/email_api/docker-compose.yml index 9d90da8..7527534 100644 --- a/email_api/docker-compose.yml +++ b/email_api/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.8' services: email-api: container_name: email-api - image: node:18-slim + image: node:22-slim restart: unless-stopped network_mode: host volumes: @@ -27,8 +27,7 @@ services: - PGDATABASE=${PGDATABASE:-email_db} - PGPORT=${PGPORT:-5432} command: > - bash -c "npm install express aws-sdk nodemailer emailjs-mime-parser emailjs-addressparser js-base64 winston dotenv axios pg && - node app.js" + bash -c "npm install && node app.js" depends_on: - postgres @@ -42,8 +41,8 @@ services: - POSTGRES_PASSWORD=${PGPASSWORD:-email_password} - POSTGRES_DB=${PGDATABASE:-email_db} volumes: - - postgres_data:/var/lib/postgresql/data + - email_postgres_data:/var/lib/postgresql/data - ./init.sql:/docker-entrypoint-initdb.d/init.sql volumes: - postgres_data: \ No newline at end of file + email_postgres_data: \ No newline at end of file diff --git a/email_api/email_api/app.js b/email_api/email_api/app.js index e69de29..edaa69a 100644 --- a/email_api/email_api/app.js +++ b/email_api/email_api/app.js @@ -0,0 +1,300 @@ +import express from 'express'; +import { Pool } from 'pg'; +import AWS from 'aws-sdk'; +import nodemailer from 'nodemailer'; +import { MailParser } from 'mailparser'; +import { Base64 } from 'js-base64'; +import { createGzip, gunzipSync } from 'zlib'; +import { createLogger, format, transports } from 'winston'; +import { config } from 'dotenv'; + +// Load environment variables +config(); + +// Check Node.js version +const [major] = process.versions.node.split('.').map(Number); +if (major < 22) { + throw new Error('Node.js 22 or higher required'); +} + +// Logger setup +const logger = createLogger({ + level: 'info', + format: format.combine( + format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), + format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`) + ), + transports: [new transports.Console()] +}); + +const app = express(); +app.use(express.json()); + +const SMTP_HOST = process.env.SMTP_HOST || 'localhost'; +const SMTP_PORT = parseInt(process.env.SMTP_PORT || '25', 10); +const API_TOKEN = process.env.API_TOKEN; +const AWS_REGION = process.env.AWS_REGION || 'us-east-1'; +const API_KEY = process.env.MAILCOW_API_KEY; +const MAILCOW_API = process.env.MAILCOW_API; + +// PostgreSQL client +const pool = new Pool({ + user: process.env.PGUSER || 'email_user', + password: process.env.PGPASSWORD || 'email_password', + host: process.env.PGHOST || 'postgres', + database: process.env.PGDATABASE || 'email_db', + port: parseInt(process.env.PGPORT || '5432', 10) +}); + +// AWS S3 client +const s3Client = new AWS.S3({ region: AWS_REGION }); + +// Nodemailer transporter +const transporter = nodemailer.createTransport({ + host: SMTP_HOST, + port: SMTP_PORT, + secure: false // Adjust if SMTP requires TLS +}); + +// Utility to check if domain exists +async function domainExists(domain) { + try { + const response = await fetch(`${MAILCOW_API}/get/domain/all`, { + headers: { 'X-API-Key': API_KEY }, + signal: AbortSignal.timeout(5000) + }); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + const domains = await response.json(); + return domains.some(d => d.domain_name?.toLowerCase() === domain.toLowerCase()); + } catch (error) { + logger.error(`Error checking domain '${domain}': ${error.message}`); + throw error; + } +} + +// Utility to check if inbox exists +async function inboxExists(domain, localPart) { + if (!(await domainExists(domain))) { + logger.info(`Domain '${domain}' unknown – skip mailbox lookup`); + return false; + } + + try { + const response = await fetch(`${MAILCOW_API}/get/mailbox/all/${domain}`, { + headers: { 'X-API-Key': API_KEY }, + signal: AbortSignal.timeout(5000) + }); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + const mailboxes = await response.json(); + return mailboxes.some(m => m.local_part?.toLowerCase() === localPart.toLowerCase()); + } catch (error) { + logger.error(`Error checking inbox '${localPart}@${domain}': ${error.message}`); + throw error; + } +} + +// Utility to mark email as processed in PostgreSQL +async function markEmailAsProcessed(domain, key, status, processor = 'rest-api', fromAddr = null, toAddrs = []) { + try { + await pool.query( + `INSERT INTO email_statuses (domain, s3_key, status, timestamp, processor, from_addr, to_addrs) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (domain, s3_key) DO UPDATE SET + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp, + processor = EXCLUDED.processor, + from_addr = EXCLUDED.from_addr, + to_addrs = EXCLUDED.to_addrs`, + [domain, key, status, Math.floor(Date.now() / 1000), processor, fromAddr, toAddrs] + ); + logger.info(`Marked ${domain}/${key} as ${status} in database`); + return true; + } catch (error) { + logger.error(`Error marking ${domain}/${key} in database: ${error.message}`); + return false; + } +} + +// Process endpoint +app.post('/process/:domain', async (req, res) => { + const { domain } = req.params; + const auth = req.headers.get('authorization'); + if (auth !== `Bearer ${API_TOKEN}`) { + return res.status(401).json({ error: 'Unauthorized' }); + } + + const data = req.body; + if (!data) { + return res.status(400).json({ error: 'Invalid payload' }); + } + + const requestId = data.request_id || 'no-request-id'; + const payloadSummary = Object.fromEntries( + Object.entries(data) + .filter(([k, v]) => k !== 'email_content' || typeof v === 'string') + .map(([k, v]) => [k, k === 'email_content' ? v.length : v]) + ); + + logger.info(`[${requestId}] INCOMING POST /process/${domain}: payload_summary=${JSON.stringify(payloadSummary)}`); + + try { + // Decode and parse email + const content = data.email_content; + const compressed = data.compressed || false; + const raw = Base64.decode(content); + const emailBytes = compressed ? gunzipSync(Buffer.from(raw, 'binary')).toString('binary') : raw; + + const parser = new MailParser(); + const emailBuffer = Buffer.from(emailBytes, 'binary'); + await new Promise((resolve, reject) => { + parser.on('error', reject); + parser.on('end', resolve); + parser.write(emailBuffer); + parser.end(); + }); + + const fromAddr = parser.from?.value[0]?.address || `lambda@${domain}`; + const recipients = [ + ...(parser.to?.value || []), + ...(parser.cc?.value || []), + ...(parser.bcc?.value || []) + ].map(addr => addr.address).filter(Boolean); + + if (!recipients.length) { + await markEmailAsProcessed(domain, data.s3_key, 'noRecipients', 'rest-api', fromAddr, []); + return res.status(400).json({ error: 'No recipients' }); + } + + // Filter valid recipients + const validRecipients = []; + for (const addr of recipients) { + const [local, dom] = addr.split('@'); + if (!dom || dom.toLowerCase() !== domain.toLowerCase()) { + continue; + } + if (await inboxExists(domain, local)) { + validRecipients.push(addr); + } else { + logger.info(`Skipping non-existent inbox: ${addr}`); + } + } + + if (!validRecipients.length) { + logger.info(`[${requestId}] No valid inboxes for ${domain} – skip.`); + await markEmailAsProcessed(domain, data.s3_key, 'unknownUser', 'rest-api', fromAddr, recipients); + return res.status(404).json({ message: 'No valid inboxes – skipped' }); + } + + // Send email + await transporter.sendMail({ + from: fromAddr, + to: validRecipients, + raw: emailBytes + }); + + // Mark as processed + await markEmailAsProcessed(domain, data.s3_key, 'true', 'rest-api', fromAddr, validRecipients); + + res.status(200).json({ + message: 'Email forwarded', + forwarded_to: validRecipients + }); + } catch (error) { + logger.error(`[${requestId}] Error in /process/${domain}: ${error.message}`); + await markEmailAsProcessed(domain, data.s3_key, 'error', 'rest-api', fromAddr, recipients); + res.status(500).json({ error: 'Internal server error' }); + } +}); + +// Stats endpoint +app.get('/stats/:domain', async (req, res) => { + const { domain } = req.params; + const auth = req.headers.get('authorization'); + if (auth !== `Bearer ${API_TOKEN}`) { + return res.status(401).json({ error: 'Unauthorized' }); + } + + const bucket = domain.replace(/\./g, '-') + '-emails'; + let total = 0; + const counts = { true: 0, unknownDomain: 0, unknownUser: 0, noRecipients: 0, error: 0 }; + const details = { unknownDomain: [], unknownUser: [], noRecipients: [], error: [] }; + + try { + // Fetch statuses from database + const { rows: domainStatuses } = await pool.query( + 'SELECT s3_key, status, from_addr, to_addrs FROM email_statuses WHERE domain = $1', + [domain] + ); + + const statusMap = domainStatuses.reduce((acc, row) => { + acc[row.s3_key] = { status: row.status, from: row.from_addr, to: row.to_addrs || [] }; + return acc; + }, {}); + + // List S3 objects + let continuationToken; + do { + const params = { Bucket: bucket, ContinuationToken: continuationToken }; + const data = await s3Client.listObjectsV2(params).promise(); + continuationToken = data.NextContinuationToken; + + for (const obj of data.Contents || []) { + const key = obj.Key; + total += 1; + + const statusInfo = statusMap[key] || { status: 'none' }; + const status = statusInfo.status; + if (status in counts) { + counts[status] += 1; + } + + if (status in details) { + try { + const objData = await s3Client.getObject({ Bucket: bucket, Key: key }).promise(); + const parser = new MailParser(); + await new Promise((resolve, reject) => { + parser.on('error', reject); + parser.on('end', resolve); + parser.write(objData.Body); + parser.end(); + }); + const fromAddr = parser.from?.value[0]?.address || null; + const toAddrs = [ + ...(parser.to?.value || []), + ...(parser.cc?.value || []), + ...(parser.bcc?.value || []) + ].map(addr => addr.address).filter(Boolean); + details[status].push({ + key, + from: statusInfo.from || fromAddr, + to: statusInfo.to || toAddrs + }); + } catch (error) { + logger.error(`Error parsing ${bucket}/${key}: ${error.message}`); + } + } + } + } while (continuationToken); + + const result = { + domain, + total_messages: total, + successful: counts.true, + wrong_domain: counts.unknownDomain, + unknown_user: counts.unknownUser, + no_recipients: counts.noRecipients, + errors: counts.error, + details + }; + logger.info(`Stats for ${domain}: ${JSON.stringify(result)}`); + res.status(200).json(result); + } catch (error) { + logger.error(`Error in /stats/${domain}: ${error.message}`); + res.status(500).json({ error: 'Internal server error' }); + } +}); + +// Start server +app.listen(5000, '0.0.0.0', () => { + logger.info('Server running on http://0.0.0.0:5000'); +}); \ No newline at end of file diff --git a/email_api/email_api/package.json b/email_api/email_api/package.json index ac52674..523e4a6 100644 --- a/email_api/email_api/package.json +++ b/email_api/email_api/package.json @@ -1,16 +1,15 @@ { "name": "email-api", "version": "1.0.0", + "type": "module", "dependencies": { - "express": "^4.18.2", - "aws-sdk": "^2.1490.0", - "nodemailer": "^6.9.7", - "emailjs-mime-parser": "^2.1.0", - "emailjs-addressparser": "^2.0.0", - "js-base64": "^3.7.5", - "winston": "^3.11.0", - "dotenv": "^16.3.1", - "axios": "^1.6.2", - "pg": "^8.11.3" + "express": "^4.19.2", + "aws-sdk": "^2.1650.0", + "nodemailer": "^6.9.14", + "mailparser": "^3.7.1", + "js-base64": "^3.7.7", + "winston": "^3.13.1", + "dotenv": "^16.4.5", + "pg": "^8.12.0" } } \ No newline at end of file