This commit is contained in:
Andreas Knuth 2025-07-07 14:53:41 -05:00
parent b556ac8283
commit 493743e8aa
3 changed files with 313 additions and 15 deletions

View File

@ -3,7 +3,7 @@ version: '3.8'
services: services:
email-api: email-api:
container_name: email-api container_name: email-api
image: node:18-slim image: node:22-slim
restart: unless-stopped restart: unless-stopped
network_mode: host network_mode: host
volumes: volumes:
@ -27,8 +27,7 @@ services:
- PGDATABASE=${PGDATABASE:-email_db} - PGDATABASE=${PGDATABASE:-email_db}
- PGPORT=${PGPORT:-5432} - PGPORT=${PGPORT:-5432}
command: > command: >
bash -c "npm install express aws-sdk nodemailer emailjs-mime-parser emailjs-addressparser js-base64 winston dotenv axios pg && bash -c "npm install && node app.js"
node app.js"
depends_on: depends_on:
- postgres - postgres
@ -42,8 +41,8 @@ services:
- POSTGRES_PASSWORD=${PGPASSWORD:-email_password} - POSTGRES_PASSWORD=${PGPASSWORD:-email_password}
- POSTGRES_DB=${PGDATABASE:-email_db} - POSTGRES_DB=${PGDATABASE:-email_db}
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - email_postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql - ./init.sql:/docker-entrypoint-initdb.d/init.sql
volumes: volumes:
postgres_data: email_postgres_data:

View File

@ -0,0 +1,300 @@
import express from 'express';
import { Pool } from 'pg';
import AWS from 'aws-sdk';
import nodemailer from 'nodemailer';
import { MailParser } from 'mailparser';
import { Base64 } from 'js-base64';
import { createGzip, gunzipSync } from 'zlib';
import { createLogger, format, transports } from 'winston';
import { config } from 'dotenv';
// Load environment variables
config();
// Check Node.js version
const [major] = process.versions.node.split('.').map(Number);
if (major < 22) {
throw new Error('Node.js 22 or higher required');
}
// Logger setup
const logger = createLogger({
level: 'info',
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`)
),
transports: [new transports.Console()]
});
const app = express();
app.use(express.json());
const SMTP_HOST = process.env.SMTP_HOST || 'localhost';
const SMTP_PORT = parseInt(process.env.SMTP_PORT || '25', 10);
const API_TOKEN = process.env.API_TOKEN;
const AWS_REGION = process.env.AWS_REGION || 'us-east-1';
const API_KEY = process.env.MAILCOW_API_KEY;
const MAILCOW_API = process.env.MAILCOW_API;
// PostgreSQL client
const pool = new Pool({
user: process.env.PGUSER || 'email_user',
password: process.env.PGPASSWORD || 'email_password',
host: process.env.PGHOST || 'postgres',
database: process.env.PGDATABASE || 'email_db',
port: parseInt(process.env.PGPORT || '5432', 10)
});
// AWS S3 client
const s3Client = new AWS.S3({ region: AWS_REGION });
// Nodemailer transporter
const transporter = nodemailer.createTransport({
host: SMTP_HOST,
port: SMTP_PORT,
secure: false // Adjust if SMTP requires TLS
});
// Utility to check if domain exists
async function domainExists(domain) {
try {
const response = await fetch(`${MAILCOW_API}/get/domain/all`, {
headers: { 'X-API-Key': API_KEY },
signal: AbortSignal.timeout(5000)
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const domains = await response.json();
return domains.some(d => d.domain_name?.toLowerCase() === domain.toLowerCase());
} catch (error) {
logger.error(`Error checking domain '${domain}': ${error.message}`);
throw error;
}
}
// Utility to check if inbox exists
async function inboxExists(domain, localPart) {
if (!(await domainExists(domain))) {
logger.info(`Domain '${domain}' unknown skip mailbox lookup`);
return false;
}
try {
const response = await fetch(`${MAILCOW_API}/get/mailbox/all/${domain}`, {
headers: { 'X-API-Key': API_KEY },
signal: AbortSignal.timeout(5000)
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const mailboxes = await response.json();
return mailboxes.some(m => m.local_part?.toLowerCase() === localPart.toLowerCase());
} catch (error) {
logger.error(`Error checking inbox '${localPart}@${domain}': ${error.message}`);
throw error;
}
}
// Utility to mark email as processed in PostgreSQL
async function markEmailAsProcessed(domain, key, status, processor = 'rest-api', fromAddr = null, toAddrs = []) {
try {
await pool.query(
`INSERT INTO email_statuses (domain, s3_key, status, timestamp, processor, from_addr, to_addrs)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (domain, s3_key) DO UPDATE SET
status = EXCLUDED.status,
timestamp = EXCLUDED.timestamp,
processor = EXCLUDED.processor,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs`,
[domain, key, status, Math.floor(Date.now() / 1000), processor, fromAddr, toAddrs]
);
logger.info(`Marked ${domain}/${key} as ${status} in database`);
return true;
} catch (error) {
logger.error(`Error marking ${domain}/${key} in database: ${error.message}`);
return false;
}
}
// Process endpoint
app.post('/process/:domain', async (req, res) => {
const { domain } = req.params;
const auth = req.headers.get('authorization');
if (auth !== `Bearer ${API_TOKEN}`) {
return res.status(401).json({ error: 'Unauthorized' });
}
const data = req.body;
if (!data) {
return res.status(400).json({ error: 'Invalid payload' });
}
const requestId = data.request_id || 'no-request-id';
const payloadSummary = Object.fromEntries(
Object.entries(data)
.filter(([k, v]) => k !== 'email_content' || typeof v === 'string')
.map(([k, v]) => [k, k === 'email_content' ? v.length : v])
);
logger.info(`[${requestId}] INCOMING POST /process/${domain}: payload_summary=${JSON.stringify(payloadSummary)}`);
try {
// Decode and parse email
const content = data.email_content;
const compressed = data.compressed || false;
const raw = Base64.decode(content);
const emailBytes = compressed ? gunzipSync(Buffer.from(raw, 'binary')).toString('binary') : raw;
const parser = new MailParser();
const emailBuffer = Buffer.from(emailBytes, 'binary');
await new Promise((resolve, reject) => {
parser.on('error', reject);
parser.on('end', resolve);
parser.write(emailBuffer);
parser.end();
});
const fromAddr = parser.from?.value[0]?.address || `lambda@${domain}`;
const recipients = [
...(parser.to?.value || []),
...(parser.cc?.value || []),
...(parser.bcc?.value || [])
].map(addr => addr.address).filter(Boolean);
if (!recipients.length) {
await markEmailAsProcessed(domain, data.s3_key, 'noRecipients', 'rest-api', fromAddr, []);
return res.status(400).json({ error: 'No recipients' });
}
// Filter valid recipients
const validRecipients = [];
for (const addr of recipients) {
const [local, dom] = addr.split('@');
if (!dom || dom.toLowerCase() !== domain.toLowerCase()) {
continue;
}
if (await inboxExists(domain, local)) {
validRecipients.push(addr);
} else {
logger.info(`Skipping non-existent inbox: ${addr}`);
}
}
if (!validRecipients.length) {
logger.info(`[${requestId}] No valid inboxes for ${domain} skip.`);
await markEmailAsProcessed(domain, data.s3_key, 'unknownUser', 'rest-api', fromAddr, recipients);
return res.status(404).json({ message: 'No valid inboxes skipped' });
}
// Send email
await transporter.sendMail({
from: fromAddr,
to: validRecipients,
raw: emailBytes
});
// Mark as processed
await markEmailAsProcessed(domain, data.s3_key, 'true', 'rest-api', fromAddr, validRecipients);
res.status(200).json({
message: 'Email forwarded',
forwarded_to: validRecipients
});
} catch (error) {
logger.error(`[${requestId}] Error in /process/${domain}: ${error.message}`);
await markEmailAsProcessed(domain, data.s3_key, 'error', 'rest-api', fromAddr, recipients);
res.status(500).json({ error: 'Internal server error' });
}
});
// Stats endpoint
app.get('/stats/:domain', async (req, res) => {
const { domain } = req.params;
const auth = req.headers.get('authorization');
if (auth !== `Bearer ${API_TOKEN}`) {
return res.status(401).json({ error: 'Unauthorized' });
}
const bucket = domain.replace(/\./g, '-') + '-emails';
let total = 0;
const counts = { true: 0, unknownDomain: 0, unknownUser: 0, noRecipients: 0, error: 0 };
const details = { unknownDomain: [], unknownUser: [], noRecipients: [], error: [] };
try {
// Fetch statuses from database
const { rows: domainStatuses } = await pool.query(
'SELECT s3_key, status, from_addr, to_addrs FROM email_statuses WHERE domain = $1',
[domain]
);
const statusMap = domainStatuses.reduce((acc, row) => {
acc[row.s3_key] = { status: row.status, from: row.from_addr, to: row.to_addrs || [] };
return acc;
}, {});
// List S3 objects
let continuationToken;
do {
const params = { Bucket: bucket, ContinuationToken: continuationToken };
const data = await s3Client.listObjectsV2(params).promise();
continuationToken = data.NextContinuationToken;
for (const obj of data.Contents || []) {
const key = obj.Key;
total += 1;
const statusInfo = statusMap[key] || { status: 'none' };
const status = statusInfo.status;
if (status in counts) {
counts[status] += 1;
}
if (status in details) {
try {
const objData = await s3Client.getObject({ Bucket: bucket, Key: key }).promise();
const parser = new MailParser();
await new Promise((resolve, reject) => {
parser.on('error', reject);
parser.on('end', resolve);
parser.write(objData.Body);
parser.end();
});
const fromAddr = parser.from?.value[0]?.address || null;
const toAddrs = [
...(parser.to?.value || []),
...(parser.cc?.value || []),
...(parser.bcc?.value || [])
].map(addr => addr.address).filter(Boolean);
details[status].push({
key,
from: statusInfo.from || fromAddr,
to: statusInfo.to || toAddrs
});
} catch (error) {
logger.error(`Error parsing ${bucket}/${key}: ${error.message}`);
}
}
}
} while (continuationToken);
const result = {
domain,
total_messages: total,
successful: counts.true,
wrong_domain: counts.unknownDomain,
unknown_user: counts.unknownUser,
no_recipients: counts.noRecipients,
errors: counts.error,
details
};
logger.info(`Stats for ${domain}: ${JSON.stringify(result)}`);
res.status(200).json(result);
} catch (error) {
logger.error(`Error in /stats/${domain}: ${error.message}`);
res.status(500).json({ error: 'Internal server error' });
}
});
// Start server
app.listen(5000, '0.0.0.0', () => {
logger.info('Server running on http://0.0.0.0:5000');
});

View File

@ -1,16 +1,15 @@
{ {
"name": "email-api", "name": "email-api",
"version": "1.0.0", "version": "1.0.0",
"type": "module",
"dependencies": { "dependencies": {
"express": "^4.18.2", "express": "^4.19.2",
"aws-sdk": "^2.1490.0", "aws-sdk": "^2.1650.0",
"nodemailer": "^6.9.7", "nodemailer": "^6.9.14",
"emailjs-mime-parser": "^2.1.0", "mailparser": "^3.7.1",
"emailjs-addressparser": "^2.0.0", "js-base64": "^3.7.7",
"js-base64": "^3.7.5", "winston": "^3.13.1",
"winston": "^3.11.0", "dotenv": "^16.4.5",
"dotenv": "^16.3.1", "pg": "^8.12.0"
"axios": "^1.6.2",
"pg": "^8.11.3"
} }
} }