/** * Email message processing worker * * Processes a single SQS message: * 1. Unpack SNS/SES envelope * 2. Download raw email from S3 * 3. Loop detection * 4. Parse & sanitize headers * 5. Bounce detection & header rewrite * 6. Blocklist check * 7. Process recipients (rules, SMTP delivery) * 8. Mark result in S3 metadata */ import type { Message } from '@aws-sdk/client-sqs'; import type { S3Handler } from '../aws/s3.js'; import type { SQSHandler } from '../aws/sqs.js'; import type { SESHandler } from '../aws/ses.js'; import type { DynamoDBHandler } from '../aws/dynamodb.js'; import type { EmailDelivery } from '../smtp/delivery.js'; import type { MetricsCollector } from '../metrics.js'; import { parseEmail, isProcessedByWorker, BounceHandler, RulesProcessor, BlocklistChecker, } from '../email/index.js'; import { domainToBucketName } from '../config.js'; import { log } from '../logger.js'; // --------------------------------------------------------------------------- // Processor // --------------------------------------------------------------------------- export class MessageProcessor { private bounceHandler: BounceHandler; private rulesProcessor: RulesProcessor; private blocklist: BlocklistChecker; public metrics: MetricsCollector | null = null; constructor( private s3: S3Handler, private sqs: SQSHandler, private ses: SESHandler, private dynamodb: DynamoDBHandler, private delivery: EmailDelivery, ) { this.bounceHandler = new BounceHandler(dynamodb); this.rulesProcessor = new RulesProcessor(dynamodb, ses); this.blocklist = new BlocklistChecker(dynamodb); } /** * Process one email message from queue. * Returns true → delete from queue. * Returns false → leave in queue for retry. */ async processMessage( domain: string, message: Message, receiveCount: number, ): Promise { const workerName = `worker-${domain}`; try { // 1. UNPACK (SNS → SES) const body = JSON.parse(message.Body ?? '{}'); let sesMsg: any; if (body.Message && body.Type) { // SNS Notification wrapper const snsContent = body.Message; sesMsg = typeof snsContent === 'string' ? JSON.parse(snsContent) : snsContent; } else { sesMsg = body; } // 2. EXTRACT DATA const mail = sesMsg.mail ?? {}; const receipt = sesMsg.receipt ?? {}; const messageId: string | undefined = mail.messageId; // Ignore SES setup notifications if (messageId === 'AMAZON_SES_SETUP_NOTIFICATION') { log('ℹ️ Received Amazon SES Setup Notification. Ignoring.', 'INFO', workerName); return true; } const fromAddr: string = mail.source ?? ''; const recipients: string[] = receipt.recipients ?? []; if (!messageId) { log('❌ Error: No messageId in event payload', 'ERROR', workerName); return true; } // Domain validation if (recipients.length === 0) { log('⚠ Warning: No recipients in event', 'WARNING', workerName); return true; } const recipientDomain = recipients[0].split('@')[1]; if (recipientDomain.toLowerCase() !== domain.toLowerCase()) { log( `⚠ Security: Ignored message for ${recipientDomain} ` + `(I am worker for ${domain})`, 'WARNING', workerName, ); return true; } // Compact log const recipientsStr = recipients.length === 1 ? recipients[0] : `${recipients.length} recipients`; log( `📧 Processing: ${messageId.slice(0, 20)}... -> ${recipientsStr}`, 'INFO', workerName, ); // 3. DOWNLOAD FROM S3 const rawBytes = await this.s3.getEmail(domain, messageId, receiveCount); if (rawBytes === null) return false; // retry later // 4. LOOP DETECTION const tempParsed = await parseEmail(rawBytes); const skipRules = isProcessedByWorker(tempParsed); if (skipRules) { log('🔄 Loop prevention: Already processed by worker', 'INFO', workerName); } // 5. PARSING & BOUNCE LOGIC let finalRawBytes = rawBytes; let fromAddrFinal = fromAddr; let isBounce = false; try { const parsed = await parseEmail(rawBytes); const subject = parsed.subject ?? '(no subject)'; // Bounce header rewriting const bounceResult = await this.bounceHandler.applyBounceLogic( parsed, rawBytes, subject, workerName, ); isBounce = bounceResult.isBounce; finalRawBytes = bounceResult.rawBytes; if (bounceResult.modified) { log(' ✨ Bounce detected & headers rewritten via DynamoDB', 'INFO', workerName); fromAddrFinal = bounceResult.fromAddr; this.metrics?.incrementBounce(domain, 'rewritten'); } else { fromAddrFinal = fromAddr; } // Add processing marker for non-processed emails if (!skipRules) { finalRawBytes = addProcessedHeader(finalRawBytes); } // Re-parse after modifications for rules processing var parsedFinal = await parseEmail(finalRawBytes); } catch (err: any) { log( `⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`, 'WARNING', workerName, ); log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName); fromAddrFinal = fromAddr; isBounce = false; var parsedFinal = await parseEmail(rawBytes); } // 6. BLOCKLIST CHECK const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders( recipients, fromAddrFinal, workerName, ); // 7. PROCESS RECIPIENTS log(`📤 Sending to ${recipients.length} recipient(s)...`, 'INFO', workerName); const successful: string[] = []; const failedPermanent: string[] = []; const failedTemporary: string[] = []; const blockedRecipients: string[] = []; for (const recipient of recipients) { // Blocked? if (blockedByRecipient[recipient]) { log( `🗑 Silently dropping message for ${recipient} (Sender blocked)`, 'INFO', workerName, ); blockedRecipients.push(recipient); this.metrics?.incrementBlocked(domain); continue; } // Process rules (OOO, Forwarding) — not for bounces or already forwarded if (!isBounce && !skipRules) { const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => { if (action === 'autoreply') this.metrics?.incrementAutoreply(dom); else if (action === 'forward') this.metrics?.incrementForward(dom); }; await this.rulesProcessor.processRulesForRecipient( recipient, parsedFinal, finalRawBytes, domain, workerName, metricsCallback, ); } // SMTP delivery const [success, error, isPerm] = await this.delivery.sendToRecipient( fromAddrFinal, recipient, finalRawBytes, workerName, ); if (success) { successful.push(recipient); this.metrics?.incrementProcessed(domain, 'success'); } else if (isPerm) { failedPermanent.push(recipient); this.metrics?.incrementProcessed(domain, 'permanent_failure'); } else { failedTemporary.push(recipient); this.metrics?.incrementProcessed(domain, 'temporary_failure'); } } // 8. RESULT & CLEANUP const totalHandled = successful.length + failedPermanent.length + blockedRecipients.length; if (totalHandled === recipients.length) { if (blockedRecipients.length === recipients.length) { // All blocked try { await this.s3.markAsBlocked( domain, messageId, blockedRecipients, fromAddrFinal, workerName, ); await this.s3.deleteBlockedEmail(domain, messageId, workerName); } catch (err: any) { log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName); return false; } } else if (successful.length > 0) { await this.s3.markAsProcessed( domain, messageId, workerName, failedPermanent.length > 0 ? failedPermanent : undefined, ); } else if (failedPermanent.length > 0) { await this.s3.markAsAllInvalid( domain, messageId, failedPermanent, workerName, ); } // Summary const parts: string[] = []; if (successful.length) parts.push(`${successful.length} OK`); if (failedPermanent.length) parts.push(`${failedPermanent.length} invalid`); if (blockedRecipients.length) parts.push(`${blockedRecipients.length} blocked`); log(`✅ Completed (${parts.join(', ')})`, 'SUCCESS', workerName); return true; } else { // Temporary failures remain log( `🔄 Temp failure (${failedTemporary.length} failed), will retry`, 'WARNING', workerName, ); return false; } } catch (err: any) { log(`❌ CRITICAL WORKER ERROR: ${err.message ?? err}`, 'ERROR', workerName); log(err.stack ?? '', 'ERROR', workerName); return false; } } } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** * Add X-SES-Worker-Processed header to raw email bytes. */ function addProcessedHeader(raw: Buffer): Buffer { const str = raw.toString('utf-8'); const sep = str.match(/\r?\n\r?\n/); if (!sep || sep.index === undefined) return raw; const before = str.slice(0, sep.index); const after = str.slice(sep.index); return Buffer.from( `${before}\r\nX-SES-Worker-Processed: delivered${after}`, 'utf-8', ); }