231 lines
7.4 KiB
TypeScript
231 lines
7.4 KiB
TypeScript
/**
|
||
* DynamoDB operations handler
|
||
*
|
||
* Tables:
|
||
* - email-rules → OOO / Forward rules per address
|
||
* - ses-outbound-messages → Bounce info (MessageId → original sender)
|
||
* - email-blocked-senders → Blocked patterns per address
|
||
*/
|
||
|
||
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
|
||
import {
|
||
DynamoDBDocumentClient,
|
||
GetCommand,
|
||
BatchGetCommand,
|
||
} from '@aws-sdk/lib-dynamodb';
|
||
import { config } from '../config.js';
|
||
import { log } from '../logger.js';
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Types
|
||
// ---------------------------------------------------------------------------
|
||
export interface EmailRule {
|
||
email_address: string;
|
||
ooo_active?: boolean;
|
||
ooo_message?: string;
|
||
ooo_content_type?: string;
|
||
forwards?: string[];
|
||
[key: string]: unknown;
|
||
}
|
||
|
||
export interface BounceInfo {
|
||
original_source: string;
|
||
bounceType: string;
|
||
bounceSubType: string;
|
||
bouncedRecipients: string[];
|
||
timestamp: string;
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Handler
|
||
// ---------------------------------------------------------------------------
|
||
export class DynamoDBHandler {
|
||
private docClient: DynamoDBDocumentClient;
|
||
public available = false;
|
||
|
||
constructor() {
|
||
const raw = new DynamoDBClient({ region: config.awsRegion });
|
||
this.docClient = DynamoDBDocumentClient.from(raw, {
|
||
marshallOptions: { removeUndefinedValues: true },
|
||
});
|
||
this.initialize();
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// Init
|
||
// -----------------------------------------------------------------------
|
||
private initialize(): void {
|
||
// We just mark as available; actual connectivity is tested on first call.
|
||
// The Python version tested table_status, but that's a DescribeTable call
|
||
// which is heavy and not needed – the first GetItem will tell us.
|
||
this.available = true;
|
||
log('✓ DynamoDB client initialized');
|
||
}
|
||
|
||
/**
|
||
* Verify tables exist by doing a cheap GetItem on each.
|
||
* Called once during startup.
|
||
*/
|
||
async verifyTables(): Promise<boolean> {
|
||
try {
|
||
await Promise.all([
|
||
this.docClient.send(
|
||
new GetCommand({ TableName: config.rulesTable, Key: { email_address: '__probe__' } }),
|
||
),
|
||
this.docClient.send(
|
||
new GetCommand({ TableName: config.messagesTable, Key: { MessageId: '__probe__' } }),
|
||
),
|
||
this.docClient.send(
|
||
new GetCommand({ TableName: config.blockedTable, Key: { email_address: '__probe__' } }),
|
||
),
|
||
]);
|
||
this.available = true;
|
||
log('✓ DynamoDB tables connected successfully');
|
||
return true;
|
||
} catch (err: any) {
|
||
log(`⚠ DynamoDB not fully available: ${err.message ?? err}`, 'WARNING');
|
||
this.available = false;
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// Email rules
|
||
// -----------------------------------------------------------------------
|
||
async getEmailRules(emailAddress: string): Promise<EmailRule | null> {
|
||
if (!this.available) return null;
|
||
try {
|
||
const resp = await this.docClient.send(
|
||
new GetCommand({
|
||
TableName: config.rulesTable,
|
||
Key: { email_address: emailAddress },
|
||
}),
|
||
);
|
||
return (resp.Item as EmailRule) ?? null;
|
||
} catch (err: any) {
|
||
if (err.name !== 'ResourceNotFoundException') {
|
||
log(`⚠ DynamoDB error for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
|
||
}
|
||
return null;
|
||
}
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// Bounce info
|
||
// -----------------------------------------------------------------------
|
||
async getBounceInfo(
|
||
messageId: string,
|
||
workerName = 'unified',
|
||
): Promise<BounceInfo | null> {
|
||
if (!this.available) return null;
|
||
|
||
for (let attempt = 0; attempt < config.bounceLookupRetries; attempt++) {
|
||
try {
|
||
const resp = await this.docClient.send(
|
||
new GetCommand({
|
||
TableName: config.messagesTable,
|
||
Key: { MessageId: messageId },
|
||
}),
|
||
);
|
||
|
||
if (resp.Item) {
|
||
return {
|
||
original_source: (resp.Item.original_source as string) ?? '',
|
||
bounceType: (resp.Item.bounceType as string) ?? 'Unknown',
|
||
bounceSubType: (resp.Item.bounceSubType as string) ?? 'Unknown',
|
||
bouncedRecipients: (resp.Item.bouncedRecipients as string[]) ?? [],
|
||
timestamp: (resp.Item.timestamp as string) ?? '',
|
||
};
|
||
}
|
||
|
||
if (attempt < config.bounceLookupRetries - 1) {
|
||
log(
|
||
` Bounce record not found yet, retrying in ${config.bounceLookupDelay}s ` +
|
||
`(attempt ${attempt + 1}/${config.bounceLookupRetries})...`,
|
||
'INFO',
|
||
workerName,
|
||
);
|
||
await sleep(config.bounceLookupDelay * 1000);
|
||
} else {
|
||
log(
|
||
`⚠ No bounce record found after ${config.bounceLookupRetries} attempts ` +
|
||
`for Message-ID: ${messageId}`,
|
||
'WARNING',
|
||
workerName,
|
||
);
|
||
return null;
|
||
}
|
||
} catch (err: any) {
|
||
log(
|
||
`⚠ DynamoDB Error (attempt ${attempt + 1}/${config.bounceLookupRetries}): ` +
|
||
`${err.message ?? err}`,
|
||
'ERROR',
|
||
workerName,
|
||
);
|
||
if (attempt < config.bounceLookupRetries - 1) {
|
||
await sleep(config.bounceLookupDelay * 1000);
|
||
} else {
|
||
return null;
|
||
}
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// Blocked senders
|
||
// -----------------------------------------------------------------------
|
||
async getBlockedPatterns(emailAddress: string): Promise<string[]> {
|
||
if (!this.available) return [];
|
||
try {
|
||
const resp = await this.docClient.send(
|
||
new GetCommand({
|
||
TableName: config.blockedTable,
|
||
Key: { email_address: emailAddress },
|
||
}),
|
||
);
|
||
return (resp.Item?.blocked_patterns as string[]) ?? [];
|
||
} catch (err: any) {
|
||
log(`⚠ Error getting block list for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
|
||
return [];
|
||
}
|
||
}
|
||
|
||
async batchGetBlockedPatterns(
|
||
emailAddresses: string[],
|
||
): Promise<Record<string, string[]>> {
|
||
const empty: Record<string, string[]> = {};
|
||
for (const a of emailAddresses) empty[a] = [];
|
||
if (!this.available || emailAddresses.length === 0) return empty;
|
||
|
||
try {
|
||
const keys = emailAddresses.map((a) => ({ email_address: a }));
|
||
const resp = await this.docClient.send(
|
||
new BatchGetCommand({
|
||
RequestItems: {
|
||
[config.blockedTable]: { Keys: keys },
|
||
},
|
||
}),
|
||
);
|
||
|
||
const items = resp.Responses?.[config.blockedTable] ?? [];
|
||
const result: Record<string, string[]> = { ...empty };
|
||
for (const item of items) {
|
||
const addr = item.email_address as string;
|
||
result[addr] = (item.blocked_patterns as string[]) ?? [];
|
||
}
|
||
return result;
|
||
} catch (err: any) {
|
||
log(`⚠ Batch blocklist check error: ${err.message ?? err}`, 'ERROR');
|
||
return empty;
|
||
}
|
||
}
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Helpers
|
||
// ---------------------------------------------------------------------------
|
||
function sleep(ms: number): Promise<void> {
|
||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||
}
|