/** * Unified multi-domain worker coordinator * * Manages the lifecycle of all DomainPoller instances: * - Resolves SQS queue URLs for each domain * - Creates pollers for valid domains * - Provides aggregate stats * - Graceful shutdown */ import { DynamoDBHandler } from '../aws/dynamodb'; import { S3Handler} from '../aws/s3.js'; import { SQSHandler} from '../aws/sqs.js' import { SESHandler } from '../aws/ses'; import { EmailDelivery } from '../smtp/delivery.js'; import { MessageProcessor } from './message-processor.js'; import { DomainPoller, type DomainPollerStats } from './domain-poller.js'; import type { MetricsCollector } from '../metrics.js'; import { log } from '../logger.js'; export class UnifiedWorker { private pollers: DomainPoller[] = []; private processor: MessageProcessor; private sqs: SQSHandler; private statusInterval: NodeJS.Timeout | null = null; constructor( private domains: string[], private metrics: MetricsCollector | null, ) { const s3 = new S3Handler(); this.sqs = new SQSHandler(); const ses = new SESHandler(); const dynamodb = new DynamoDBHandler(); const delivery = new EmailDelivery(); this.processor = new MessageProcessor(s3, this.sqs, ses, dynamodb, delivery); this.processor.metrics = metrics; dynamodb.verifyTables().catch(() => {}); } async start(): Promise { log(`πŸš€ Starting unified worker for ${this.domains.length} domain(s)...`); const resolvedPollers: DomainPoller[] = []; for (const domain of this.domains) { const queueUrl = await this.sqs.getQueueUrl(domain); if (!queueUrl) { log(`⚠ Skipping ${domain}: No SQS queue found`, 'WARNING'); continue; } const poller = new DomainPoller( domain, queueUrl, this.sqs, this.processor, this.metrics, ); resolvedPollers.push(poller); } if (resolvedPollers.length === 0) { log('❌ No valid domains with SQS queues found. Exiting.', 'ERROR'); process.exit(1); } this.pollers = resolvedPollers; for (const poller of this.pollers) { poller.start(); } log( `βœ… All ${this.pollers.length} domain poller(s) running: ` + this.pollers.map((p) => p.stats.domain).join(', '), 'SUCCESS', ); // Starte den 5-Minuten-Status-Report this.statusInterval = setInterval(() => { this.printStatus(); }, 5 * 60 * 1000); } async stop(): Promise { log('πŸ›‘ Stopping all domain pollers...'); if (this.statusInterval) clearInterval(this.statusInterval); // <-- Neue Zeile await Promise.all(this.pollers.map((p) => p.stop())); log('βœ… All pollers stopped.'); } getStats(): { totalProcessed: number; totalErrors: number; domains: DomainPollerStats[]; } { let totalProcessed = 0; let totalErrors = 0; const domains: DomainPollerStats[] = []; for (const p of this.pollers) { totalProcessed += p.stats.processed; totalErrors += p.stats.errors; domains.push({ ...p.stats }); } return { totalProcessed, totalErrors, domains }; } private printStatus(): void { const stats = this.getStats(); // ZΓ€hle aktive Poller const activePollers = this.pollers.filter((p) => p.stats.running).length; const totalPollers = this.pollers.length; // Formatiere die Domain-Statistiken (z.B. hotshpotshga:1) const domainStats = stats.domains .map((d) => { const shortName = d.domain.split('.')[0].substring(0, 12); return `${shortName}:${d.processed}`; }) .join(' | '); log( `πŸ“Š Status: ${activePollers}/${totalPollers} active, total:${stats.totalProcessed} | ${domainStats}`, 'INFO', 'unified-worker' ); } }