350 lines
11 KiB
TypeScript
350 lines
11 KiB
TypeScript
/**
|
||
* Email message processing worker
|
||
*
|
||
* Processes a single SQS message:
|
||
* 1. Unpack SNS/SES envelope
|
||
* 2. Download raw email from S3
|
||
* 3. Loop detection
|
||
* 4. Parse & sanitize headers
|
||
* 5. Bounce detection & header rewrite
|
||
* 6. Blocklist check
|
||
* 7. Process recipients (rules, SMTP delivery)
|
||
* 8. Mark result in S3 metadata
|
||
*/
|
||
|
||
import type { Message } from '@aws-sdk/client-sqs';
|
||
import type { S3Handler } from '../aws/s3.js';
|
||
import type { SQSHandler } from '../aws/sqs.js';
|
||
import type { SESHandler } from '../aws/ses.js';
|
||
import type { DynamoDBHandler } from '../aws/dynamodb.js';
|
||
import type { EmailDelivery } from '../smtp/delivery.js';
|
||
import type { MetricsCollector } from '../metrics.js';
|
||
import {
|
||
parseEmail,
|
||
isProcessedByWorker,
|
||
BounceHandler,
|
||
RulesProcessor,
|
||
BlocklistChecker,
|
||
} from '../email/index.js';
|
||
import { domainToBucketName } from '../config.js';
|
||
import { log } from '../logger.js';
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Processor
|
||
// ---------------------------------------------------------------------------
|
||
export class MessageProcessor {
|
||
private bounceHandler: BounceHandler;
|
||
private rulesProcessor: RulesProcessor;
|
||
private blocklist: BlocklistChecker;
|
||
|
||
public metrics: MetricsCollector | null = null;
|
||
|
||
constructor(
|
||
private s3: S3Handler,
|
||
private sqs: SQSHandler,
|
||
private ses: SESHandler,
|
||
private dynamodb: DynamoDBHandler,
|
||
private delivery: EmailDelivery,
|
||
) {
|
||
this.bounceHandler = new BounceHandler(dynamodb);
|
||
this.rulesProcessor = new RulesProcessor(dynamodb, ses);
|
||
this.blocklist = new BlocklistChecker(dynamodb);
|
||
}
|
||
|
||
/**
|
||
* Process one email message from queue.
|
||
* Returns true → delete from queue.
|
||
* Returns false → leave in queue for retry.
|
||
*/
|
||
async processMessage(
|
||
domain: string,
|
||
message: Message,
|
||
receiveCount: number,
|
||
): Promise<boolean> {
|
||
const workerName = `worker-${domain}`;
|
||
|
||
try {
|
||
// 1. UNPACK (SNS → SES)
|
||
const body = JSON.parse(message.Body ?? '{}');
|
||
let sesMsg: any;
|
||
|
||
if (body.Message && body.Type) {
|
||
// SNS Notification wrapper
|
||
const snsContent = body.Message;
|
||
sesMsg = typeof snsContent === 'string' ? JSON.parse(snsContent) : snsContent;
|
||
} else {
|
||
sesMsg = body;
|
||
}
|
||
|
||
// 2. EXTRACT DATA
|
||
const mail = sesMsg.mail ?? {};
|
||
const receipt = sesMsg.receipt ?? {};
|
||
const messageId: string | undefined = mail.messageId;
|
||
|
||
// Ignore SES setup notifications
|
||
if (messageId === 'AMAZON_SES_SETUP_NOTIFICATION') {
|
||
log('ℹ️ Received Amazon SES Setup Notification. Ignoring.', 'INFO', workerName);
|
||
return true;
|
||
}
|
||
|
||
const fromAddr: string = mail.source ?? '';
|
||
const recipients: string[] = receipt.recipients ?? [];
|
||
|
||
if (!messageId) {
|
||
log('❌ Error: No messageId in event payload', 'ERROR', workerName);
|
||
return true;
|
||
}
|
||
|
||
// Domain validation
|
||
if (recipients.length === 0) {
|
||
log('⚠ Warning: No recipients in event', 'WARNING', workerName);
|
||
return true;
|
||
}
|
||
|
||
const recipientDomain = recipients[0].split('@')[1];
|
||
if (recipientDomain.toLowerCase() !== domain.toLowerCase()) {
|
||
log(
|
||
`⚠ Security: Ignored message for ${recipientDomain} ` +
|
||
`(I am worker for ${domain})`,
|
||
'WARNING',
|
||
workerName,
|
||
);
|
||
return true;
|
||
}
|
||
|
||
// Compact log
|
||
const recipientsStr =
|
||
recipients.length === 1
|
||
? recipients[0]
|
||
: `${recipients.length} recipients`;
|
||
log(
|
||
`📧 Processing: ${messageId.slice(0, 20)}... -> ${recipientsStr}`,
|
||
'INFO',
|
||
workerName,
|
||
);
|
||
|
||
// 3. DOWNLOAD FROM S3
|
||
const rawBytes = await this.s3.getEmail(domain, messageId, receiveCount);
|
||
if (rawBytes === null) return false; // retry later
|
||
|
||
// 4. LOOP DETECTION
|
||
const tempParsed = await parseEmail(rawBytes);
|
||
const skipRules = isProcessedByWorker(tempParsed);
|
||
if (skipRules) {
|
||
log('🔄 Loop prevention: Already processed by worker', 'INFO', workerName);
|
||
}
|
||
|
||
// 5. PARSING & BOUNCE LOGIC
|
||
let finalRawBytes = rawBytes;
|
||
let fromAddrFinal = fromAddr;
|
||
let isBounce = false;
|
||
|
||
try {
|
||
const parsed = await parseEmail(rawBytes);
|
||
const subject = parsed.subject ?? '(no subject)';
|
||
|
||
// Bounce header rewriting
|
||
const bounceResult = await this.bounceHandler.applyBounceLogic(
|
||
parsed,
|
||
rawBytes,
|
||
subject,
|
||
workerName,
|
||
);
|
||
|
||
isBounce = bounceResult.isBounce;
|
||
finalRawBytes = bounceResult.rawBytes;
|
||
|
||
if (bounceResult.modified) {
|
||
log(' ✨ Bounce detected & headers rewritten via DynamoDB', 'INFO', workerName);
|
||
fromAddrFinal = bounceResult.fromAddr;
|
||
this.metrics?.incrementBounce(domain, 'rewritten');
|
||
} else {
|
||
fromAddrFinal = fromAddr;
|
||
}
|
||
|
||
// Add processing marker for non-processed emails
|
||
if (!skipRules) {
|
||
finalRawBytes = addProcessedHeader(finalRawBytes);
|
||
}
|
||
|
||
// Re-parse after modifications for rules processing
|
||
var parsedFinal = await parseEmail(finalRawBytes);
|
||
} catch (err: any) {
|
||
log(
|
||
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`,
|
||
'WARNING',
|
||
workerName,
|
||
);
|
||
log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName);
|
||
fromAddrFinal = fromAddr;
|
||
isBounce = false;
|
||
var parsedFinal = await parseEmail(rawBytes);
|
||
}
|
||
|
||
// 6. BLOCKLIST CHECK
|
||
const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders(
|
||
recipients,
|
||
fromAddrFinal,
|
||
workerName,
|
||
);
|
||
|
||
// 7. PROCESS RECIPIENTS
|
||
log(`📤 Sending to ${recipients.length} recipient(s)...`, 'INFO', workerName);
|
||
|
||
const successful: string[] = [];
|
||
const failedPermanent: string[] = [];
|
||
const failedTemporary: string[] = [];
|
||
const blockedRecipients: string[] = [];
|
||
|
||
for (const recipient of recipients) {
|
||
// Blocked?
|
||
if (blockedByRecipient[recipient]) {
|
||
log(
|
||
`🗑 Silently dropping message for ${recipient} (Sender blocked)`,
|
||
'INFO',
|
||
workerName,
|
||
);
|
||
blockedRecipients.push(recipient);
|
||
this.metrics?.incrementBlocked(domain);
|
||
continue;
|
||
}
|
||
|
||
// Process rules (OOO, Forwarding) — not for bounces or already forwarded
|
||
if (!isBounce && !skipRules) {
|
||
const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => {
|
||
if (action === 'autoreply') this.metrics?.incrementAutoreply(dom);
|
||
else if (action === 'forward') this.metrics?.incrementForward(dom);
|
||
};
|
||
|
||
await this.rulesProcessor.processRulesForRecipient(
|
||
recipient,
|
||
parsedFinal,
|
||
finalRawBytes,
|
||
domain,
|
||
workerName,
|
||
metricsCallback,
|
||
);
|
||
}
|
||
|
||
// SMTP delivery
|
||
const [success, error, isPerm] = await this.delivery.sendToRecipient(
|
||
fromAddrFinal,
|
||
recipient,
|
||
finalRawBytes,
|
||
workerName,
|
||
);
|
||
|
||
if (success) {
|
||
successful.push(recipient);
|
||
this.metrics?.incrementProcessed(domain, 'success');
|
||
} else if (isPerm) {
|
||
failedPermanent.push(recipient);
|
||
this.metrics?.incrementProcessed(domain, 'permanent_failure');
|
||
} else {
|
||
failedTemporary.push(recipient);
|
||
this.metrics?.incrementProcessed(domain, 'temporary_failure');
|
||
}
|
||
}
|
||
|
||
// 8. RESULT & CLEANUP
|
||
const totalHandled =
|
||
successful.length + failedPermanent.length + blockedRecipients.length;
|
||
|
||
if (totalHandled === recipients.length) {
|
||
if (blockedRecipients.length === recipients.length) {
|
||
// All blocked
|
||
try {
|
||
await this.s3.markAsBlocked(
|
||
domain,
|
||
messageId,
|
||
blockedRecipients,
|
||
fromAddrFinal,
|
||
workerName,
|
||
);
|
||
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
|
||
} catch (err: any) {
|
||
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
|
||
return false;
|
||
}
|
||
} else if (successful.length > 0) {
|
||
await this.s3.markAsProcessed(
|
||
domain,
|
||
messageId,
|
||
workerName,
|
||
failedPermanent.length > 0 ? failedPermanent : undefined,
|
||
);
|
||
} else if (failedPermanent.length > 0) {
|
||
await this.s3.markAsAllInvalid(
|
||
domain,
|
||
messageId,
|
||
failedPermanent,
|
||
workerName,
|
||
);
|
||
}
|
||
|
||
// Summary
|
||
const parts: string[] = [];
|
||
if (successful.length) parts.push(`${successful.length} OK`);
|
||
if (failedPermanent.length) parts.push(`${failedPermanent.length} invalid`);
|
||
if (blockedRecipients.length) parts.push(`${blockedRecipients.length} blocked`);
|
||
|
||
log(`✅ Completed (${parts.join(', ')})`, 'SUCCESS', workerName);
|
||
return true;
|
||
} else {
|
||
// Temporary failures remain
|
||
log(
|
||
`🔄 Temp failure (${failedTemporary.length} failed), will retry`,
|
||
'WARNING',
|
||
workerName,
|
||
);
|
||
return false;
|
||
}
|
||
} catch (err: any) {
|
||
log(`❌ CRITICAL WORKER ERROR: ${err.message ?? err}`, 'ERROR', workerName);
|
||
log(err.stack ?? '', 'ERROR', workerName);
|
||
return false;
|
||
}
|
||
}
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Helpers
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/**
|
||
* Add X-SES-Worker-Processed header to raw email bytes using Buffer manipulation.
|
||
* More robust and memory efficient than toString().
|
||
*/
|
||
function addProcessedHeader(raw: Buffer): Buffer {
|
||
// Wir suchen nach dem Ende der Header: Double Newline (\r\n\r\n oder \n\n)
|
||
let headerEndIndex = -1;
|
||
|
||
// Effiziente Suche im Buffer
|
||
for (let i = 0; i < raw.length - 3; i++) {
|
||
// Check für \r\n\r\n
|
||
if (raw[i] === 0x0d && raw[i+1] === 0x0a && raw[i+2] === 0x0d && raw[i+3] === 0x0a) {
|
||
headerEndIndex = i;
|
||
break;
|
||
}
|
||
// Check für \n\n (Unix style, seltener bei E-Mail aber möglich)
|
||
if (raw[i] === 0x0a && raw[i+1] === 0x0a) {
|
||
headerEndIndex = i;
|
||
break;
|
||
}
|
||
}
|
||
|
||
// Falls keine Header-Trennung gefunden wurde (kaputte Mail?), hängen wir es einfach vorne an
|
||
if (headerEndIndex === -1) {
|
||
const headerLine = Buffer.from('X-SES-Worker-Processed: delivered\r\n', 'utf-8');
|
||
return Buffer.concat([headerLine, raw]);
|
||
}
|
||
|
||
// Wir fügen den Header VOR der leeren Zeile ein
|
||
const before = raw.subarray(0, headerEndIndex);
|
||
const after = raw.subarray(headerEndIndex);
|
||
|
||
const newHeader = Buffer.from('\r\nX-SES-Worker-Processed: delivered', 'utf-8');
|
||
|
||
return Buffer.concat([before, newHeader, after]);
|
||
}
|