email-amazon/email-worker-nodejs/src/worker/unified-worker.ts

135 lines
3.7 KiB
TypeScript

/**
* Unified multi-domain worker coordinator
*
* Manages the lifecycle of all DomainPoller instances:
* - Resolves SQS queue URLs for each domain
* - Creates pollers for valid domains
* - Provides aggregate stats
* - Graceful shutdown
*/
import { DynamoDBHandler } from '../aws/dynamodb';
import { S3Handler} from '../aws/s3.js';
import { SQSHandler} from '../aws/sqs.js'
import { SESHandler } from '../aws/ses';
import { EmailDelivery } from '../smtp/delivery.js';
import { MessageProcessor } from './message-processor.js';
import { DomainPoller, type DomainPollerStats } from './domain-poller.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
export class UnifiedWorker {
private pollers: DomainPoller[] = [];
private processor: MessageProcessor;
private sqs: SQSHandler;
private statusInterval: NodeJS.Timeout | null = null;
constructor(
private domains: string[],
private metrics: MetricsCollector | null,
) {
const s3 = new S3Handler();
this.sqs = new SQSHandler();
const ses = new SESHandler();
const dynamodb = new DynamoDBHandler();
const delivery = new EmailDelivery();
this.processor = new MessageProcessor(s3, this.sqs, ses, dynamodb, delivery);
this.processor.metrics = metrics;
dynamodb.verifyTables().catch(() => {});
}
async start(): Promise<void> {
log(`🚀 Starting unified worker for ${this.domains.length} domain(s)...`);
const resolvedPollers: DomainPoller[] = [];
for (const domain of this.domains) {
const queueUrl = await this.sqs.getQueueUrl(domain);
if (!queueUrl) {
log(`⚠ Skipping ${domain}: No SQS queue found`, 'WARNING');
continue;
}
const poller = new DomainPoller(
domain,
queueUrl,
this.sqs,
this.processor,
this.metrics,
);
resolvedPollers.push(poller);
}
if (resolvedPollers.length === 0) {
log('❌ No valid domains with SQS queues found. Exiting.', 'ERROR');
process.exit(1);
}
this.pollers = resolvedPollers;
for (const poller of this.pollers) {
poller.start();
}
log(
`✅ All ${this.pollers.length} domain poller(s) running: ` +
this.pollers.map((p) => p.stats.domain).join(', '),
'SUCCESS',
);
// Starte den 5-Minuten-Status-Report
this.statusInterval = setInterval(() => {
this.printStatus();
}, 5 * 60 * 1000);
}
async stop(): Promise<void> {
log('🛑 Stopping all domain pollers...');
if (this.statusInterval) clearInterval(this.statusInterval); // <-- Neue Zeile
await Promise.all(this.pollers.map((p) => p.stop()));
log('✅ All pollers stopped.');
}
getStats(): {
totalProcessed: number;
totalErrors: number;
domains: DomainPollerStats[];
} {
let totalProcessed = 0;
let totalErrors = 0;
const domains: DomainPollerStats[] = [];
for (const p of this.pollers) {
totalProcessed += p.stats.processed;
totalErrors += p.stats.errors;
domains.push({ ...p.stats });
}
return { totalProcessed, totalErrors, domains };
}
private printStatus(): void {
const stats = this.getStats();
// Zähle aktive Poller
const activePollers = this.pollers.filter((p) => p.stats.running).length;
const totalPollers = this.pollers.length;
// Formatiere die Domain-Statistiken (z.B. hotshpotshga:1)
const domainStats = stats.domains
.map((d) => {
const shortName = d.domain.split('.')[0].substring(0, 12);
return `${shortName}:${d.processed}`;
})
.join(' | ');
log(
`📊 Status: ${activePollers}/${totalPollers} active, total:${stats.totalProcessed} | ${domainStats}`,
'INFO',
'unified-worker'
);
}
}