email-amazon/email-worker-nodejs/src/worker/unified-worker.ts

107 lines
2.8 KiB
TypeScript

/**
* Unified multi-domain worker coordinator
*
* Manages the lifecycle of all DomainPoller instances:
* - Resolves SQS queue URLs for each domain
* - Creates pollers for valid domains
* - Provides aggregate stats
* - Graceful shutdown
*/
import { DynamoDBHandler } from '../aws/dynamodb';
import { S3Handler} from '../aws/s3.js';
import { SQSHandler} from '../aws/sqs.js'
import { SESHandler } from '../aws/ses';
import { EmailDelivery } from '../smtp/delivery.js';
import { MessageProcessor } from './message-processor.js';
import { DomainPoller, type DomainPollerStats } from './domain-poller.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
export class UnifiedWorker {
private pollers: DomainPoller[] = [];
private processor: MessageProcessor;
private sqs: SQSHandler;
constructor(
private domains: string[],
private metrics: MetricsCollector | null,
) {
const s3 = new S3Handler();
this.sqs = new SQSHandler();
const ses = new SESHandler();
const dynamodb = new DynamoDBHandler();
const delivery = new EmailDelivery();
this.processor = new MessageProcessor(s3, this.sqs, ses, dynamodb, delivery);
this.processor.metrics = metrics;
dynamodb.verifyTables().catch(() => {});
}
async start(): Promise<void> {
log(`🚀 Starting unified worker for ${this.domains.length} domain(s)...`);
const resolvedPollers: DomainPoller[] = [];
for (const domain of this.domains) {
const queueUrl = await this.sqs.getQueueUrl(domain);
if (!queueUrl) {
log(`⚠ Skipping ${domain}: No SQS queue found`, 'WARNING');
continue;
}
const poller = new DomainPoller(
domain,
queueUrl,
this.sqs,
this.processor,
this.metrics,
);
resolvedPollers.push(poller);
}
if (resolvedPollers.length === 0) {
log('❌ No valid domains with SQS queues found. Exiting.', 'ERROR');
process.exit(1);
}
this.pollers = resolvedPollers;
for (const poller of this.pollers) {
poller.start();
}
log(
`✅ All ${this.pollers.length} domain poller(s) running: ` +
this.pollers.map((p) => p.stats.domain).join(', '),
'SUCCESS',
);
}
async stop(): Promise<void> {
log('🛑 Stopping all domain pollers...');
await Promise.all(this.pollers.map((p) => p.stop()));
log('✅ All pollers stopped.');
}
getStats(): {
totalProcessed: number;
totalErrors: number;
domains: DomainPollerStats[];
} {
let totalProcessed = 0;
let totalErrors = 0;
const domains: DomainPollerStats[] = [];
for (const p of this.pollers) {
totalProcessed += p.stats.processed;
totalErrors += p.stats.errors;
domains.push({ ...p.stats });
}
return { totalProcessed, totalErrors, domains };
}
}