diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index 3445abc..4392aa2 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -1104,12 +1104,9 @@ class UnifiedWorker: def __init__(self): self.stop_event = threading.Event() - self.executor: Optional[ThreadPoolExecutor] = None self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} - self.active_pollers: Dict[str, threading.Thread] = {} - self.poller_lock = threading.Lock() - self.reload_interval = int(os.environ.get('DOMAIN_RELOAD_INTERVAL', '60')) # Sekunden + self.poller_threads: List[threading.Thread] = [] def setup(self): """Initialize worker""" @@ -1138,21 +1135,18 @@ class UnifiedWorker: log(f"Initialized with {len(self.queue_urls)} domains") def start(self): - """Start all domain pollers with hot-reload support""" - # Initial pollers starten + """Start all domain pollers""" for domain, queue_url in self.queue_urls.items(): - self._start_poller(domain, queue_url) + thread = threading.Thread( + target=poll_domain, + args=(domain, queue_url, self.stop_event), + name=f"poller-{domain}", + daemon=True + ) + thread.start() + self.poller_threads.append(thread) - log(f"Started {len(self.active_pollers)} domain pollers") - - # Domain reload thread starten - reload_thread = threading.Thread( - target=self._domain_reload_loop, - name='domain-reloader', - daemon=True - ) - reload_thread.start() - log(f"Domain hot-reload enabled (checking every {self.reload_interval}s)") + log(f"Started {len(self.poller_threads)} domain pollers") # Warten bis Shutdown try: @@ -1161,67 +1155,16 @@ class UnifiedWorker: except KeyboardInterrupt: pass - def _start_poller(self, domain: str, queue_url: str): - """Start a single domain poller thread""" - with self.poller_lock: - if domain in self.active_pollers: - return # Already running - - thread = threading.Thread( - target=poll_domain, - args=(domain, queue_url, self.stop_event), - name=f"poller-{domain}", - daemon=True - ) - thread.start() - self.active_pollers[domain] = thread - log(f"🚀 Started poller for {domain}", 'INFO', f"worker-{domain}") - - def _domain_reload_loop(self): - """Periodically check for new domains and start pollers""" - while not self.stop_event.is_set(): - self.stop_event.wait(timeout=self.reload_interval) - - if self.stop_event.is_set(): - break - - try: - # Aktuelle Domains laden - current_domains = load_domains() - - # Neue Domains finden - new_domains = set(current_domains) - set(self.queue_urls.keys()) - - if new_domains: - log(f"🔄 Hot-reload: Found {len(new_domains)} new domain(s): {', '.join(new_domains)}") - - for domain in new_domains: - queue_url = get_queue_url(domain) - if queue_url: - self.queue_urls[domain] = queue_url - self._start_poller(domain, queue_url) - else: - log(f"⚠ Could not find queue for new domain: {domain}", 'WARNING') - - # Cleanup: Entfernte Domains (optional - Threads laufen weiter bis restart) - removed_domains = set(self.queue_urls.keys()) - set(current_domains) - if removed_domains: - log(f"ℹ Domains removed from config (will stop on restart): {', '.join(removed_domains)}") - - except Exception as e: - log(f"Error in domain reload: {e}", 'ERROR') - def stop(self): """Stop gracefully""" log("⚠ Stopping worker...") self.stop_event.set() # Warten auf Poller-Threads (max 10 Sekunden) - with self.poller_lock: - for domain, thread in self.active_pollers.items(): - thread.join(timeout=10) - if thread.is_alive(): - log(f"Warning: Poller for {domain} did not stop gracefully", 'WARNING') + for thread in self.poller_threads: + thread.join(timeout=10) + if thread.is_alive(): + log(f"Warning: {thread.name} did not stop gracefully", 'WARNING') smtp_pool.close_all() log("👋 Worker stopped") @@ -1311,10 +1254,9 @@ def main(): start_health_server(worker) log(f"\n{'='*70}") - log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)") + log(f"🚀 UNIFIED EMAIL WORKER") log(f"{'='*70}") log(f" Domains: {len(worker.queue_urls)}") - log(f" Hot-Reload: Every {worker.reload_interval}s") log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}") log(f" Poll Interval: {config.poll_interval}s")