#!/usr/bin/env python3 """ Unified Worker - coordinates all domain pollers """ import sys import time import threading from typing import List, Dict from logger import log from config import config, load_domains from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler from smtp import SMTPPool from smtp.delivery import EmailDelivery from worker import MessageProcessor from domain_poller import DomainPoller from metrics.prometheus import MetricsCollector class UnifiedWorker: """Main worker coordinating all domain pollers""" def __init__(self): self.stop_event = threading.Event() self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} self.poller_threads: List[threading.Thread] = [] # Shared stats across all pollers self.domain_stats: Dict[str, int] = {} # domain -> processed count self.stats_lock = threading.Lock() # AWS handlers self.s3 = S3Handler() self.sqs = SQSHandler() self.ses = SESHandler() self.dynamodb = DynamoDBHandler() # SMTP pool self.smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size) # Email delivery self.delivery = EmailDelivery(self.smtp_pool) # Metrics self.metrics: MetricsCollector = None # Message processor self.processor = MessageProcessor( self.s3, self.sqs, self.ses, self.dynamodb, self.delivery, None # Metrics will be set later ) def setup(self): """Initialize worker""" self.domains = load_domains() if not self.domains: log("❌ No domains configured!", 'ERROR') sys.exit(1) # Get queue URLs for domain in self.domains: url = self.sqs.get_queue_url(domain) if url: self.queue_urls[domain] = url log(f" ✓ {domain} -> queue found") else: log(f" ✗ {domain} -> Queue not found!", 'WARNING') if not self.queue_urls: log("❌ No valid queues found!", 'ERROR') sys.exit(1) # Initialize SMTP pool self.smtp_pool.initialize() log(f"Initialized with {len(self.queue_urls)} domains") def start(self): """Start all domain pollers""" # Initialize stats for all domains for domain in self.queue_urls.keys(): self.domain_stats[domain] = 0 # Create poller for each domain for domain, queue_url in self.queue_urls.items(): poller = DomainPoller( domain=domain, queue_url=queue_url, message_processor=self.processor, sqs=self.sqs, metrics=self.metrics, stop_event=self.stop_event, stats_dict=self.domain_stats, stats_lock=self.stats_lock ) thread = threading.Thread( target=poller.poll, name=f"poller-{domain}", daemon=True ) thread.start() self.poller_threads.append(thread) log(f"Started {len(self.poller_threads)} domain pollers") # Periodic status log (every 5 minutes) last_status_log = time.time() status_interval = 300 # 5 minutes try: while not self.stop_event.is_set(): self.stop_event.wait(timeout=10) # Log status summary every 5 minutes if time.time() - last_status_log > status_interval: self._log_status_table() last_status_log = time.time() except KeyboardInterrupt: pass def _log_status_table(self): """Log a compact status table""" active_threads = sum(1 for t in self.poller_threads if t.is_alive()) with self.stats_lock: total_processed = sum(self.domain_stats.values()) # Build compact stats: only show domains with activity or top domains stats_parts = [] for domain in sorted(self.queue_urls.keys()): count = self.domain_stats.get(domain, 0) if count > 0: # Only show active domains # Shorten domain for display short_domain = domain.split('.')[0][:12] stats_parts.append(f"{short_domain}:{count}") if stats_parts: stats_line = " | ".join(stats_parts) else: stats_line = "no activity" log( f"📊 Status: {active_threads}/{len(self.poller_threads)} active, " f"total:{total_processed} | {stats_line}" ) def stop(self): """Stop gracefully""" log("⚠ Stopping worker...") self.stop_event.set() # Wait for poller threads (max 10 seconds each) for thread in self.poller_threads: thread.join(timeout=10) if thread.is_alive(): log(f"Warning: {thread.name} did not stop gracefully", 'WARNING') self.smtp_pool.close_all() log("👋 Worker stopped") def set_metrics(self, metrics: MetricsCollector): """Set metrics collector""" self.metrics = metrics self.processor.metrics = metrics def print_startup_banner(self): """Print startup information""" log(f"\n{'='*70}") log(f"🚀 UNIFIED EMAIL WORKER") log(f"{'='*70}") log(f" Domains: {len(self.queue_urls)}") log(f" DynamoDB: {'Connected' if self.dynamodb.available else 'Not Available'}") if config.lmtp_enabled: log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)") else: log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}") log(f" Poll Interval: {config.poll_interval}s") log(f" Visibility: {config.visibility_timeout}s") log(f"") log(f" Features:") log(f" ✓ Bounce Detection & Header Rewriting") log(f" {'✓' if self.dynamodb.available else '✗'} Auto-Reply / Out-of-Office") log(f" {'✓' if self.dynamodb.available else '✗'} Email Forwarding") log(f" {'✓' if self.dynamodb.available else '✗'} Blocked Senders (Wildcard)") log(f" {'✓' if self.metrics else '✗'} Prometheus Metrics") log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery") log(f"") log(f" Active Domains:") for domain in sorted(self.queue_urls.keys()): log(f" • {domain}") log(f"{'='*70}\n")