/** * DynamoDB operations handler * * Tables: * - email-rules → OOO / Forward rules per address * - ses-outbound-messages → Bounce info (MessageId → original sender) * - email-blocked-senders → Blocked patterns per address */ import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; import { DynamoDBDocumentClient, GetCommand, BatchGetCommand, } from '@aws-sdk/lib-dynamodb'; import { config } from '../config.js'; import { log } from '../logger.js'; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- export interface EmailRule { email_address: string; ooo_active?: boolean; ooo_message?: string; ooo_content_type?: string; forwards?: string[]; [key: string]: unknown; } export interface BounceInfo { original_source: string; bounceType: string; bounceSubType: string; bouncedRecipients: string[]; timestamp: string; } // --------------------------------------------------------------------------- // Handler // --------------------------------------------------------------------------- export class DynamoDBHandler { private docClient: DynamoDBDocumentClient; public available = false; constructor() { const raw = new DynamoDBClient({ region: config.awsRegion }); this.docClient = DynamoDBDocumentClient.from(raw, { marshallOptions: { removeUndefinedValues: true }, }); this.initialize(); } // ----------------------------------------------------------------------- // Init // ----------------------------------------------------------------------- private initialize(): void { // We just mark as available; actual connectivity is tested on first call. // The Python version tested table_status, but that's a DescribeTable call // which is heavy and not needed – the first GetItem will tell us. this.available = true; log('✓ DynamoDB client initialized'); } /** * Verify tables exist by doing a cheap GetItem on each. * Called once during startup. */ async verifyTables(): Promise { try { await Promise.all([ this.docClient.send( new GetCommand({ TableName: config.rulesTable, Key: { email_address: '__probe__' } }), ), this.docClient.send( new GetCommand({ TableName: config.messagesTable, Key: { MessageId: '__probe__' } }), ), this.docClient.send( new GetCommand({ TableName: config.blockedTable, Key: { email_address: '__probe__' } }), ), ]); this.available = true; log('✓ DynamoDB tables connected successfully'); return true; } catch (err: any) { log(`⚠ DynamoDB not fully available: ${err.message ?? err}`, 'WARNING'); this.available = false; return false; } } // ----------------------------------------------------------------------- // Email rules // ----------------------------------------------------------------------- async getEmailRules(emailAddress: string): Promise { if (!this.available) return null; try { const resp = await this.docClient.send( new GetCommand({ TableName: config.rulesTable, Key: { email_address: emailAddress }, }), ); return (resp.Item as EmailRule) ?? null; } catch (err: any) { if (err.name !== 'ResourceNotFoundException') { log(`⚠ DynamoDB error for ${emailAddress}: ${err.message ?? err}`, 'ERROR'); } return null; } } // ----------------------------------------------------------------------- // Bounce info // ----------------------------------------------------------------------- async getBounceInfo( messageId: string, workerName = 'unified', ): Promise { if (!this.available) return null; for (let attempt = 0; attempt < config.bounceLookupRetries; attempt++) { try { const resp = await this.docClient.send( new GetCommand({ TableName: config.messagesTable, Key: { MessageId: messageId }, }), ); if (resp.Item) { return { original_source: (resp.Item.original_source as string) ?? '', bounceType: (resp.Item.bounceType as string) ?? 'Unknown', bounceSubType: (resp.Item.bounceSubType as string) ?? 'Unknown', bouncedRecipients: (resp.Item.bouncedRecipients as string[]) ?? [], timestamp: (resp.Item.timestamp as string) ?? '', }; } if (attempt < config.bounceLookupRetries - 1) { log( ` Bounce record not found yet, retrying in ${config.bounceLookupDelay}s ` + `(attempt ${attempt + 1}/${config.bounceLookupRetries})...`, 'INFO', workerName, ); await sleep(config.bounceLookupDelay * 1000); } else { log( `⚠ No bounce record found after ${config.bounceLookupRetries} attempts ` + `for Message-ID: ${messageId}`, 'WARNING', workerName, ); return null; } } catch (err: any) { log( `⚠ DynamoDB Error (attempt ${attempt + 1}/${config.bounceLookupRetries}): ` + `${err.message ?? err}`, 'ERROR', workerName, ); if (attempt < config.bounceLookupRetries - 1) { await sleep(config.bounceLookupDelay * 1000); } else { return null; } } } return null; } // ----------------------------------------------------------------------- // Blocked senders // ----------------------------------------------------------------------- async getBlockedPatterns(emailAddress: string): Promise { if (!this.available) return []; try { const resp = await this.docClient.send( new GetCommand({ TableName: config.blockedTable, Key: { email_address: emailAddress }, }), ); return (resp.Item?.blocked_patterns as string[]) ?? []; } catch (err: any) { log(`⚠ Error getting block list for ${emailAddress}: ${err.message ?? err}`, 'ERROR'); return []; } } async batchGetBlockedPatterns( emailAddresses: string[], ): Promise> { const empty: Record = {}; for (const a of emailAddresses) empty[a] = []; if (!this.available || emailAddresses.length === 0) return empty; try { const keys = emailAddresses.map((a) => ({ email_address: a })); const resp = await this.docClient.send( new BatchGetCommand({ RequestItems: { [config.blockedTable]: { Keys: keys }, }, }), ); const items = resp.Responses?.[config.blockedTable] ?? []; const result: Record = { ...empty }; for (const item of items) { const addr = item.email_address as string; result[addr] = (item.blocked_patterns as string[]) ?? []; } return result; } catch (err: any) { log(`⚠ Batch blocklist check error: ${err.message ?? err}`, 'ERROR'); return empty; } } } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }