docker/email_api/email_api/app.js

300 lines
10 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express from 'express';
import { Pool } from 'pg';
import AWS from 'aws-sdk';
import nodemailer from 'nodemailer';
import { simpleParser } from 'mailparser';
import { Base64 } from 'js-base64';
import { createGzip, gunzipSync } from 'zlib';
import { createLogger, format, transports } from 'winston';
import { config } from 'dotenv';
// Load environment variables
config();
// Check Node.js version
const [major] = process.versions.node.split('.').map(Number);
if (major < 22) {
throw new Error('Node.js 22 or higher required');
}
// Logger setup
const logger = createLogger({
level: 'info',
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`)
),
transports: [new transports.Console()]
});
const app = express();
app.use(express.json({ limit: '20mb' }));
app.use(express.urlencoded({ limit: '20mb', extended: true }));
const SMTP_HOST = process.env.SMTP_HOST || 'localhost';
const SMTP_PORT = parseInt(process.env.SMTP_PORT || '25', 10);
const API_TOKEN = process.env.API_TOKEN;
const AWS_REGION = process.env.AWS_REGION || 'us-east-1';
const API_KEY = process.env.MAILCOW_API_KEY;
const MAILCOW_API = process.env.MAILCOW_API;
// PostgreSQL client
const pool = new Pool({
user: process.env.PGUSER || 'email_user',
password: process.env.PGPASSWORD || 'email_password',
host: process.env.PGHOST || 'postgres',
database: process.env.PGDATABASE || 'email_db',
port: parseInt(process.env.PGPORT || '5433', 10)
});
// AWS S3 client
const s3Client = new AWS.S3({ region: AWS_REGION });
// Nodemailer transporter
const transporter = nodemailer.createTransport({
host: SMTP_HOST,
port: SMTP_PORT,
secure: false, // Adjust if SMTP requires TLS
tls: {
rejectUnauthorized: false
}
});
// Utility to check if domain exists
async function domainExists(domain) {
try {
const response = await fetch(`${MAILCOW_API}/get/domain/all`, {
headers: { 'X-API-Key': API_KEY },
signal: AbortSignal.timeout(5000)
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const domains = await response.json();
return domains.some(d => d.domain_name?.toLowerCase() === domain.toLowerCase());
} catch (error) {
logger.error(`Error checking domain '${domain}': ${error.message}`);
throw error;
}
}
// Utility to check if inbox exists
async function inboxExists(domain, localPart) {
if (!(await domainExists(domain))) {
logger.info(`Domain '${domain}' unknown skip mailbox lookup`);
return false;
}
try {
const response = await fetch(`${MAILCOW_API}/get/mailbox/all/${domain}`, {
headers: { 'X-API-Key': API_KEY },
signal: AbortSignal.timeout(5000)
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const mailboxes = await response.json();
return mailboxes.some(m => m.local_part?.toLowerCase() === localPart.toLowerCase());
} catch (error) {
logger.error(`Error checking inbox '${localPart}@${domain}': ${error.message}`);
throw error;
}
}
// Utility to mark email as processed in PostgreSQL
async function markEmailAsProcessed(domain, key, status, processor = 'rest-api', fromAddr = null, toAddrs = []) {
try {
await pool.query(
`INSERT INTO email_statuses (domain, s3_key, status, timestamp, processor, from_addr, to_addrs)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (domain, s3_key) DO UPDATE SET
status = EXCLUDED.status,
timestamp = EXCLUDED.timestamp,
processor = EXCLUDED.processor,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs`,
[domain, key, status, Math.floor(Date.now() / 1000), processor, fromAddr, toAddrs]
);
logger.info(`Marked ${domain}/${key} as ${status} in database`);
return true;
} catch (error) {
logger.error(`Error marking ${domain}/${key} in database: ${error.message}`);
return false;
}
}
// Process endpoint
app.post('/process/:domain', async (req, res) => {
const { domain } = req.params;
const auth = req.headers['authorization']; // Fixed: Use req.headers['authorization'] instead of req.headers.get
if (auth !== `Bearer ${API_TOKEN}`) {
return res.status(401).json({ error: 'Unauthorized' });
}
const data = req.body;
if (!data) {
return res.status(400).json({ error: 'Invalid payload' });
}
const requestId = data.request_id || 'no-request-id';
const payloadSummary = Object.fromEntries(
Object.entries(data)
.filter(([k, v]) => k !== 'email_content' || typeof v === 'string')
.map(([k, v]) => [k, k === 'email_content' ? v.length : v])
);
logger.info(`[${requestId}] INCOMING POST /process/${domain}: payload_summary=${JSON.stringify(payloadSummary)}`);
let recipients = [];
let parser;
let fromAddr = `lambda@${req.params.domain}`;
try {
// Decode and parse email
const content = data.email_content;
const compressed = data.compressed || false;
const raw = Base64.decode(content);
const emailBytes = compressed ? gunzipSync(Buffer.from(raw, 'binary')).toString('binary') : raw;
const emailBuffer = Buffer.from(emailBytes, 'binary');
parser = await simpleParser(emailBuffer);
fromAddr = parser.from?.value[0]?.address || `lambda@${domain}`;
recipients = [
...(parser.to?.value || []),
...(parser.cc?.value || []),
...(parser.bcc?.value || [])
].map(addr => addr.address).filter(Boolean);
if (!recipients.length) {
await markEmailAsProcessed(domain, data.s3_key, 'noRecipients', 'rest-api', fromAddr, []);
return res.status(400).json({ error: 'No recipients' });
}
// Filter valid recipients
const validRecipients = [];
for (const addr of recipients) {
const [local, dom] = addr.split('@');
if (!dom || dom.toLowerCase() !== domain.toLowerCase()) {
continue;
}
if (await inboxExists(domain, local)) {
validRecipients.push(addr);
} else {
logger.info(`Skipping non-existent inbox: ${addr}`);
}
}
if (!validRecipients.length) {
logger.info(`[${requestId}] No valid inboxes for ${domain} skip.`);
await markEmailAsProcessed(domain, data.s3_key, 'unknownUser', 'rest-api', fromAddr, recipients);
return res.status(404).json({ message: 'No valid inboxes skipped' });
}
// Send email
await transporter.sendMail({
from: fromAddr,
to: validRecipients,
raw: emailBytes
});
// Mark as processed
await markEmailAsProcessed(domain, data.s3_key, 'true', 'rest-api', fromAddr, validRecipients);
res.status(200).json({
message: 'Email forwarded',
forwarded_to: validRecipients
});
} catch (error) {
logger.error(`[${requestId}] Error in /process/${domain}: ${error.message}`);
await markEmailAsProcessed(domain, data.s3_key, 'error', 'rest-api', parser?.from?.value[0]?.address || `lambda@${domain}`, recipients);
res.status(500).json({ error: 'Internal server error' });
}
});
// Stats endpoint
app.get('/stats/:domain', async (req, res) => {
const { domain } = req.params;
const auth = req.headers['authorization']; // Fixed: Use req.headers['authorization'] instead of req.headers.get
if (auth !== `Bearer ${API_TOKEN}`) {
return res.status(401).json({ error: 'Unauthorized' });
}
const bucket = domain.replace(/\./g, '-') + '-emails';
let total = 0;
const counts = { true: 0, unknownDomain: 0, unknownUser: 0, noRecipients: 0, error: 0 };
const details = { unknownDomain: [], unknownUser: [], noRecipients: [], error: [] };
try {
// Fetch statuses from database
const { rows: domainStatuses } = await pool.query(
'SELECT s3_key, status, from_addr, to_addrs FROM email_statuses WHERE domain = $1',
[domain]
);
const statusMap = domainStatuses.reduce((acc, row) => {
acc[row.s3_key] = { status: row.status, from: row.from_addr, to: row.to_addrs || [] };
return acc;
}, {});
// List S3 objects
let continuationToken;
do {
const params = { Bucket: bucket, ContinuationToken: continuationToken };
const data = await s3Client.listObjectsV2(params).promise();
continuationToken = data.NextContinuationToken;
for (const obj of data.Contents || []) {
const key = obj.Key;
total += 1;
const statusInfo = statusMap[key] || { status: 'none' };
const status = statusInfo.status;
if (status in counts) {
counts[status] += 1;
}
if (status in details) {
try {
const objData = await s3Client.getObject({ Bucket: bucket, Key: key }).promise();
const parser = new MailParser();
await new Promise((resolve, reject) => {
parser.on('error', reject);
parser.on('end', resolve);
parser.write(objData.Body);
parser.end();
});
const fromAddr = parser.from?.value[0]?.address || null;
const toAddrs = [
...(parser.to?.value || []),
...(parser.cc?.value || []),
...(parser.bcc?.value || [])
].map(addr => addr.address).filter(Boolean);
details[status].push({
key,
from: statusInfo.from || fromAddr,
to: statusInfo.to || toAddrs
});
} catch (error) {
logger.error(`Error parsing ${bucket}/${key}: ${error.message}`);
}
}
}
} while (continuationToken);
const result = {
domain,
total_messages: total,
successful: counts.true,
wrong_domain: counts.unknownDomain,
unknown_user: counts.unknownUser,
no_recipients: counts.noRecipients,
errors: counts.error,
details
};
logger.info(`Stats for ${domain}: ${JSON.stringify(result)}`);
res.status(200).json(result);
} catch (error) {
logger.error(`Error in /stats/${domain}: ${error.message}`);
res.status(500).json({ error: 'Internal server error' });
}
});
// Start server
app.listen(5000, '0.0.0.0', () => {
logger.info('Server running on http://0.0.0.0:5000');
});