email-amazon/email-worker-nodejs/dynamodb.ts

231 lines
7.4 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* DynamoDB operations handler
*
* Tables:
* - email-rules → OOO / Forward rules per address
* - ses-outbound-messages → Bounce info (MessageId → original sender)
* - email-blocked-senders → Blocked patterns per address
*/
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
DynamoDBDocumentClient,
GetCommand,
BatchGetCommand,
} from '@aws-sdk/lib-dynamodb';
import { config } from '../config.js';
import { log } from '../logger.js';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface EmailRule {
email_address: string;
ooo_active?: boolean;
ooo_message?: string;
ooo_content_type?: string;
forwards?: string[];
[key: string]: unknown;
}
export interface BounceInfo {
original_source: string;
bounceType: string;
bounceSubType: string;
bouncedRecipients: string[];
timestamp: string;
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
export class DynamoDBHandler {
private docClient: DynamoDBDocumentClient;
public available = false;
constructor() {
const raw = new DynamoDBClient({ region: config.awsRegion });
this.docClient = DynamoDBDocumentClient.from(raw, {
marshallOptions: { removeUndefinedValues: true },
});
this.initialize();
}
// -----------------------------------------------------------------------
// Init
// -----------------------------------------------------------------------
private initialize(): void {
// We just mark as available; actual connectivity is tested on first call.
// The Python version tested table_status, but that's a DescribeTable call
// which is heavy and not needed the first GetItem will tell us.
this.available = true;
log('✓ DynamoDB client initialized');
}
/**
* Verify tables exist by doing a cheap GetItem on each.
* Called once during startup.
*/
async verifyTables(): Promise<boolean> {
try {
await Promise.all([
this.docClient.send(
new GetCommand({ TableName: config.rulesTable, Key: { email_address: '__probe__' } }),
),
this.docClient.send(
new GetCommand({ TableName: config.messagesTable, Key: { MessageId: '__probe__' } }),
),
this.docClient.send(
new GetCommand({ TableName: config.blockedTable, Key: { email_address: '__probe__' } }),
),
]);
this.available = true;
log('✓ DynamoDB tables connected successfully');
return true;
} catch (err: any) {
log(`⚠ DynamoDB not fully available: ${err.message ?? err}`, 'WARNING');
this.available = false;
return false;
}
}
// -----------------------------------------------------------------------
// Email rules
// -----------------------------------------------------------------------
async getEmailRules(emailAddress: string): Promise<EmailRule | null> {
if (!this.available) return null;
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.rulesTable,
Key: { email_address: emailAddress },
}),
);
return (resp.Item as EmailRule) ?? null;
} catch (err: any) {
if (err.name !== 'ResourceNotFoundException') {
log(`⚠ DynamoDB error for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
}
return null;
}
}
// -----------------------------------------------------------------------
// Bounce info
// -----------------------------------------------------------------------
async getBounceInfo(
messageId: string,
workerName = 'unified',
): Promise<BounceInfo | null> {
if (!this.available) return null;
for (let attempt = 0; attempt < config.bounceLookupRetries; attempt++) {
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.messagesTable,
Key: { MessageId: messageId },
}),
);
if (resp.Item) {
return {
original_source: (resp.Item.original_source as string) ?? '',
bounceType: (resp.Item.bounceType as string) ?? 'Unknown',
bounceSubType: (resp.Item.bounceSubType as string) ?? 'Unknown',
bouncedRecipients: (resp.Item.bouncedRecipients as string[]) ?? [],
timestamp: (resp.Item.timestamp as string) ?? '',
};
}
if (attempt < config.bounceLookupRetries - 1) {
log(
` Bounce record not found yet, retrying in ${config.bounceLookupDelay}s ` +
`(attempt ${attempt + 1}/${config.bounceLookupRetries})...`,
'INFO',
workerName,
);
await sleep(config.bounceLookupDelay * 1000);
} else {
log(
`⚠ No bounce record found after ${config.bounceLookupRetries} attempts ` +
`for Message-ID: ${messageId}`,
'WARNING',
workerName,
);
return null;
}
} catch (err: any) {
log(
`⚠ DynamoDB Error (attempt ${attempt + 1}/${config.bounceLookupRetries}): ` +
`${err.message ?? err}`,
'ERROR',
workerName,
);
if (attempt < config.bounceLookupRetries - 1) {
await sleep(config.bounceLookupDelay * 1000);
} else {
return null;
}
}
}
return null;
}
// -----------------------------------------------------------------------
// Blocked senders
// -----------------------------------------------------------------------
async getBlockedPatterns(emailAddress: string): Promise<string[]> {
if (!this.available) return [];
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.blockedTable,
Key: { email_address: emailAddress },
}),
);
return (resp.Item?.blocked_patterns as string[]) ?? [];
} catch (err: any) {
log(`⚠ Error getting block list for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
return [];
}
}
async batchGetBlockedPatterns(
emailAddresses: string[],
): Promise<Record<string, string[]>> {
const empty: Record<string, string[]> = {};
for (const a of emailAddresses) empty[a] = [];
if (!this.available || emailAddresses.length === 0) return empty;
try {
const keys = emailAddresses.map((a) => ({ email_address: a }));
const resp = await this.docClient.send(
new BatchGetCommand({
RequestItems: {
[config.blockedTable]: { Keys: keys },
},
}),
);
const items = resp.Responses?.[config.blockedTable] ?? [];
const result: Record<string, string[]> = { ...empty };
for (const item of items) {
const addr = item.email_address as string;
result[addr] = (item.blocked_patterns as string[]) ?? [];
}
return result;
} catch (err: any) {
log(`⚠ Batch blocklist check error: ${err.message ?? err}`, 'ERROR');
return empty;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}