email-amazon/email-worker-nodejs/s3.ts

203 lines
5.7 KiB
TypeScript

/**
* S3 operations handler
*
* Responsibilities:
* - Download raw email from domain-specific bucket
* - Mark email metadata (processed / all-invalid / blocked)
* - Delete blocked emails
*/
import {
S3Client,
GetObjectCommand,
HeadObjectCommand,
CopyObjectCommand,
DeleteObjectCommand,
type S3ClientConfig,
} from '@aws-sdk/client-s3';
import { config, domainToBucketName } from '../config.js';
import { log } from '../logger.js';
export class S3Handler {
private client: S3Client;
constructor() {
const opts: S3ClientConfig = { region: config.awsRegion };
this.client = new S3Client(opts);
}
// -------------------------------------------------------------------------
// Download
// -------------------------------------------------------------------------
/**
* Download raw email bytes from S3.
* Returns `null` when the object does not exist yet (caller should retry).
* Throws on permanent errors.
*/
async getEmail(
domain: string,
messageId: string,
receiveCount: number,
): Promise<Buffer | null> {
const bucket = domainToBucketName(domain);
try {
const resp = await this.client.send(
new GetObjectCommand({ Bucket: bucket, Key: messageId }),
);
const bytes = await resp.Body?.transformToByteArray();
return bytes ? Buffer.from(bytes) : null;
} catch (err: any) {
if (err.name === 'NoSuchKey' || err.Code === 'NoSuchKey') {
if (receiveCount < 5) {
log(`⏳ S3 Object not found yet (Attempt ${receiveCount}). Retrying...`, 'WARNING');
return null;
}
log('❌ S3 Object missing permanently after retries.', 'ERROR');
throw err;
}
log(`❌ S3 Download Error: ${err.message ?? err}`, 'ERROR');
throw err;
}
}
// -------------------------------------------------------------------------
// Metadata helpers (copy-in-place with updated metadata)
// -------------------------------------------------------------------------
private async updateMetadata(
bucket: string,
key: string,
patch: Record<string, string>,
removeKeys: string[] = [],
): Promise<void> {
const head = await this.client.send(
new HeadObjectCommand({ Bucket: bucket, Key: key }),
);
const metadata = { ...(head.Metadata ?? {}) };
// Apply patch
for (const [k, v] of Object.entries(patch)) {
metadata[k] = v;
}
// Remove keys
for (const k of removeKeys) {
delete metadata[k];
}
await this.client.send(
new CopyObjectCommand({
Bucket: bucket,
Key: key,
CopySource: `${bucket}/${key}`,
Metadata: metadata,
MetadataDirective: 'REPLACE',
}),
);
}
// -------------------------------------------------------------------------
// Mark helpers
// -------------------------------------------------------------------------
async markAsProcessed(
domain: string,
messageId: string,
workerName: string,
invalidInboxes?: string[],
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
const patch: Record<string, string> = {
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'delivered',
};
if (invalidInboxes?.length) {
patch['invalid_inboxes'] = invalidInboxes.join(',');
log(`⚠ Invalid inboxes recorded: ${invalidInboxes.join(', ')}`, 'WARNING', workerName);
}
await this.updateMetadata(bucket, messageId, patch, [
'processing_started',
'queued_at',
]);
} catch (err: any) {
log(`Failed to mark as processed: ${err.message ?? err}`, 'WARNING', workerName);
}
}
async markAsAllInvalid(
domain: string,
messageId: string,
invalidInboxes: string[],
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.updateMetadata(
bucket,
messageId,
{
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'failed',
error: 'All recipients are invalid (mailboxes do not exist)',
invalid_inboxes: invalidInboxes.join(','),
},
['processing_started', 'queued_at'],
);
} catch (err: any) {
log(`Failed to mark as all invalid: ${err.message ?? err}`, 'WARNING', workerName);
}
}
async markAsBlocked(
domain: string,
messageId: string,
blockedRecipients: string[],
sender: string,
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.updateMetadata(
bucket,
messageId,
{
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'blocked',
blocked_recipients: blockedRecipients.join(','),
blocked_sender: sender,
},
['processing_started', 'queued_at'],
);
log('✓ Marked as blocked in S3 metadata', 'INFO', workerName);
} catch (err: any) {
log(`⚠ Failed to mark as blocked: ${err.message ?? err}`, 'ERROR', workerName);
throw err;
}
}
async deleteBlockedEmail(
domain: string,
messageId: string,
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.client.send(
new DeleteObjectCommand({ Bucket: bucket, Key: messageId }),
);
log('🗑 Deleted blocked email from S3', 'SUCCESS', workerName);
} catch (err: any) {
log(`⚠ Failed to delete blocked email: ${err.message ?? err}`, 'ERROR', workerName);
throw err;
}
}
}