203 lines
5.7 KiB
TypeScript
203 lines
5.7 KiB
TypeScript
/**
|
|
* S3 operations handler
|
|
*
|
|
* Responsibilities:
|
|
* - Download raw email from domain-specific bucket
|
|
* - Mark email metadata (processed / all-invalid / blocked)
|
|
* - Delete blocked emails
|
|
*/
|
|
|
|
import {
|
|
S3Client,
|
|
GetObjectCommand,
|
|
HeadObjectCommand,
|
|
CopyObjectCommand,
|
|
DeleteObjectCommand,
|
|
type S3ClientConfig,
|
|
} from '@aws-sdk/client-s3';
|
|
import { config, domainToBucketName } from '../config.js';
|
|
import { log } from '../logger.js';
|
|
|
|
export class S3Handler {
|
|
private client: S3Client;
|
|
|
|
constructor() {
|
|
const opts: S3ClientConfig = { region: config.awsRegion };
|
|
this.client = new S3Client(opts);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Download
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Download raw email bytes from S3.
|
|
* Returns `null` when the object does not exist yet (caller should retry).
|
|
* Throws on permanent errors.
|
|
*/
|
|
async getEmail(
|
|
domain: string,
|
|
messageId: string,
|
|
receiveCount: number,
|
|
): Promise<Buffer | null> {
|
|
const bucket = domainToBucketName(domain);
|
|
|
|
try {
|
|
const resp = await this.client.send(
|
|
new GetObjectCommand({ Bucket: bucket, Key: messageId }),
|
|
);
|
|
const bytes = await resp.Body?.transformToByteArray();
|
|
return bytes ? Buffer.from(bytes) : null;
|
|
} catch (err: any) {
|
|
if (err.name === 'NoSuchKey' || err.Code === 'NoSuchKey') {
|
|
if (receiveCount < 5) {
|
|
log(`⏳ S3 Object not found yet (Attempt ${receiveCount}). Retrying...`, 'WARNING');
|
|
return null;
|
|
}
|
|
log('❌ S3 Object missing permanently after retries.', 'ERROR');
|
|
throw err;
|
|
}
|
|
log(`❌ S3 Download Error: ${err.message ?? err}`, 'ERROR');
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Metadata helpers (copy-in-place with updated metadata)
|
|
// -------------------------------------------------------------------------
|
|
|
|
private async updateMetadata(
|
|
bucket: string,
|
|
key: string,
|
|
patch: Record<string, string>,
|
|
removeKeys: string[] = [],
|
|
): Promise<void> {
|
|
const head = await this.client.send(
|
|
new HeadObjectCommand({ Bucket: bucket, Key: key }),
|
|
);
|
|
const metadata = { ...(head.Metadata ?? {}) };
|
|
|
|
// Apply patch
|
|
for (const [k, v] of Object.entries(patch)) {
|
|
metadata[k] = v;
|
|
}
|
|
// Remove keys
|
|
for (const k of removeKeys) {
|
|
delete metadata[k];
|
|
}
|
|
|
|
await this.client.send(
|
|
new CopyObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
CopySource: `${bucket}/${key}`,
|
|
Metadata: metadata,
|
|
MetadataDirective: 'REPLACE',
|
|
}),
|
|
);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mark helpers
|
|
// -------------------------------------------------------------------------
|
|
|
|
async markAsProcessed(
|
|
domain: string,
|
|
messageId: string,
|
|
workerName: string,
|
|
invalidInboxes?: string[],
|
|
): Promise<void> {
|
|
const bucket = domainToBucketName(domain);
|
|
try {
|
|
const patch: Record<string, string> = {
|
|
processed: 'true',
|
|
processed_at: String(Math.floor(Date.now() / 1000)),
|
|
processed_by: workerName,
|
|
status: 'delivered',
|
|
};
|
|
if (invalidInboxes?.length) {
|
|
patch['invalid_inboxes'] = invalidInboxes.join(',');
|
|
log(`⚠ Invalid inboxes recorded: ${invalidInboxes.join(', ')}`, 'WARNING', workerName);
|
|
}
|
|
await this.updateMetadata(bucket, messageId, patch, [
|
|
'processing_started',
|
|
'queued_at',
|
|
]);
|
|
} catch (err: any) {
|
|
log(`Failed to mark as processed: ${err.message ?? err}`, 'WARNING', workerName);
|
|
}
|
|
}
|
|
|
|
async markAsAllInvalid(
|
|
domain: string,
|
|
messageId: string,
|
|
invalidInboxes: string[],
|
|
workerName: string,
|
|
): Promise<void> {
|
|
const bucket = domainToBucketName(domain);
|
|
try {
|
|
await this.updateMetadata(
|
|
bucket,
|
|
messageId,
|
|
{
|
|
processed: 'true',
|
|
processed_at: String(Math.floor(Date.now() / 1000)),
|
|
processed_by: workerName,
|
|
status: 'failed',
|
|
error: 'All recipients are invalid (mailboxes do not exist)',
|
|
invalid_inboxes: invalidInboxes.join(','),
|
|
},
|
|
['processing_started', 'queued_at'],
|
|
);
|
|
} catch (err: any) {
|
|
log(`Failed to mark as all invalid: ${err.message ?? err}`, 'WARNING', workerName);
|
|
}
|
|
}
|
|
|
|
async markAsBlocked(
|
|
domain: string,
|
|
messageId: string,
|
|
blockedRecipients: string[],
|
|
sender: string,
|
|
workerName: string,
|
|
): Promise<void> {
|
|
const bucket = domainToBucketName(domain);
|
|
try {
|
|
await this.updateMetadata(
|
|
bucket,
|
|
messageId,
|
|
{
|
|
processed: 'true',
|
|
processed_at: String(Math.floor(Date.now() / 1000)),
|
|
processed_by: workerName,
|
|
status: 'blocked',
|
|
blocked_recipients: blockedRecipients.join(','),
|
|
blocked_sender: sender,
|
|
},
|
|
['processing_started', 'queued_at'],
|
|
);
|
|
log('✓ Marked as blocked in S3 metadata', 'INFO', workerName);
|
|
} catch (err: any) {
|
|
log(`⚠ Failed to mark as blocked: ${err.message ?? err}`, 'ERROR', workerName);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
async deleteBlockedEmail(
|
|
domain: string,
|
|
messageId: string,
|
|
workerName: string,
|
|
): Promise<void> {
|
|
const bucket = domainToBucketName(domain);
|
|
try {
|
|
await this.client.send(
|
|
new DeleteObjectCommand({ Bucket: bucket, Key: messageId }),
|
|
);
|
|
log('🗑 Deleted blocked email from S3', 'SUCCESS', workerName);
|
|
} catch (err: any) {
|
|
log(`⚠ Failed to delete blocked email: ${err.message ?? err}`, 'ERROR', workerName);
|
|
throw err;
|
|
}
|
|
}
|
|
}
|