From f95461ad75c10227bbb3cd6d7eed1b4f79ca6659 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sat, 10 Jan 2026 18:03:08 -0600 Subject: [PATCH] hot reload & better pooling --- unified-worker/unified_worker.py | 286 ++++++++++++++++++++++--------- 1 file changed, 207 insertions(+), 79 deletions(-) diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index e9b2784..f027be3 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -161,7 +161,7 @@ except Exception as e: # ============================================ class SMTPPool: - """Thread-safe SMTP Connection Pool""" + """Thread-safe SMTP Connection Pool with robust connection handling""" def __init__(self, host: str, port: int, pool_size: int = 5): self.host = host @@ -181,42 +181,71 @@ class SMTPPool: conn.ehlo() if config.smtp_user and config.smtp_pass: conn.login(config.smtp_user, config.smtp_pass) + log(f" 📡 New SMTP connection created to {self.host}:{self.port}") return conn except Exception as e: log(f"Failed to create SMTP connection: {e}", 'ERROR') return None + def _test_connection(self, conn: smtplib.SMTP) -> bool: + """Test if connection is still alive""" + try: + status = conn.noop()[0] + return status == 250 + except Exception: + return False + def initialize(self): """Pre-create connections""" if self._initialized: return - for _ in range(self.pool_size): + # Nur 1-2 Connections initial, Rest on-demand + for _ in range(min(2, self.pool_size)): conn = self._create_connection() if conn: self._pool.put(conn) self._initialized = True - log(f"SMTP pool initialized with {self._pool.qsize()} connections") + log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})") def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]: - """Get connection from pool""" + """Get a valid connection from pool or create new one""" + # Versuche aus Pool zu holen try: - conn = self._pool.get(timeout=timeout) - try: - conn.noop() - return conn - except: - conn = self._create_connection() + conn = self._pool.get(block=False) + # Teste ob Connection noch lebt + if self._test_connection(conn): return conn + else: + # Connection ist tot, schließen und neue erstellen + try: + conn.quit() + except: + pass + log(f" ♻ Recycled stale SMTP connection") + return self._create_connection() except Empty: + # Pool leer, neue Connection erstellen return self._create_connection() def return_connection(self, conn: smtplib.SMTP): - """Return connection to pool""" + """Return connection to pool if still valid""" if conn is None: return + + # Prüfen ob Connection noch gut ist + if not self._test_connection(conn): + try: + conn.quit() + except: + pass + log(f" 🗑 Discarded broken SMTP connection") + return + + # Versuche zurück in Pool zu legen try: self._pool.put_nowait(conn) except: + # Pool voll, Connection schließen try: conn.quit() except: @@ -651,38 +680,87 @@ def is_permanent_recipient_error(error_msg: str) -> bool: error_lower = error_msg.lower() return any(indicator in error_lower for indicator in permanent_indicators) -def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, smtp_conn: smtplib.SMTP, worker_name: str) -> Tuple[bool, Optional[str], bool]: +def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]: """ Sendet E-Mail via SMTP an EINEN Empfänger + Mit Retry-Logik bei Connection-Fehlern Returns: (success: bool, error: str or None, is_permanent: bool) """ - try: - result = smtp_conn.sendmail(from_addr, [recipient], raw_message) + last_error = None + + for attempt in range(max_retries + 1): + smtp_conn = smtp_pool.get_connection() - if isinstance(result, dict) and result: - error = str(result.get(recipient, 'Unknown refusal')) - is_permanent = is_permanent_recipient_error(error) - log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name) - return False, error, is_permanent - else: - log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name) - return True, None, False + if not smtp_conn: + last_error = "Could not get SMTP connection" + log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) + time.sleep(0.5) + continue + + try: + result = smtp_conn.sendmail(from_addr, [recipient], raw_message) + + # Connection war erfolgreich, zurück in Pool + smtp_pool.return_connection(smtp_conn) + + if isinstance(result, dict) and result: + error = str(result.get(recipient, 'Unknown refusal')) + is_permanent = is_permanent_recipient_error(error) + log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name) + return False, error, is_permanent + else: + log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name) + return True, None, False + + except smtplib.SMTPServerDisconnected as e: + # Connection wurde geschlossen - Retry mit neuer Connection + log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) + last_error = str(e) + # Connection nicht zurückgeben (ist kaputt) + try: + smtp_conn.quit() + except: + pass + time.sleep(0.3) + continue + + except smtplib.SMTPRecipientsRefused as e: + smtp_pool.return_connection(smtp_conn) + error_msg = str(e) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) + return False, error_msg, is_permanent + + except smtplib.SMTPException as e: + error_msg = str(e) + # Bei Connection-Fehlern: Retry + if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower(): + log(f" ⚠ {recipient}: SMTP connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) + last_error = error_msg + try: + smtp_conn.quit() + except: + pass + time.sleep(0.3) + continue + + smtp_pool.return_connection(smtp_conn) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name) + return False, error_msg, is_permanent + + except Exception as e: + # Unbekannter Fehler - Connection verwerfen, aber nicht permanent + try: + smtp_conn.quit() + except: + pass + log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name) + return False, str(e), False - except smtplib.SMTPRecipientsRefused as e: - error_msg = str(e) - is_permanent = is_permanent_recipient_error(error_msg) - log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) - return False, error_msg, is_permanent - - except smtplib.SMTPException as e: - error_msg = str(e) - is_permanent = is_permanent_recipient_error(error_msg) - log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name) - return False, error_msg, is_permanent - - except Exception as e: - log(f" ✗ {recipient}: Connection error - {e}", 'ERROR', worker_name) - return False, str(e), False + # Alle Retries fehlgeschlagen + log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name) + return False, last_error or "Connection failed after retries", False # ============================================ # S3 METADATA UPDATES (wie in domain-worker) @@ -872,29 +950,21 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: failed_permanent = [] failed_temporary = [] - smtp_conn = smtp_pool.get_connection() - if not smtp_conn: - log("❌ Could not get SMTP connection", 'ERROR', worker_name) - return False - - try: - for recipient in recipients: - success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, smtp_conn, worker_name) - - if success: - successful.append(recipient) - if PROMETHEUS_ENABLED: - emails_processed.labels(domain=domain, status='success').inc() - elif is_perm: - failed_permanent.append(recipient) - if PROMETHEUS_ENABLED: - emails_processed.labels(domain=domain, status='permanent_failure').inc() - else: - failed_temporary.append(recipient) - if PROMETHEUS_ENABLED: - emails_processed.labels(domain=domain, status='temporary_failure').inc() - finally: - smtp_pool.return_connection(smtp_conn) + for recipient in recipients: + success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name) + + if success: + successful.append(recipient) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='success').inc() + elif is_perm: + failed_permanent.append(recipient) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='permanent_failure').inc() + else: + failed_temporary.append(recipient) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='temporary_failure').inc() # 7. RESULTAT & CLEANUP log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name) @@ -1028,6 +1098,9 @@ class UnifiedWorker: self.executor: Optional[ThreadPoolExecutor] = None self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} + self.active_pollers: Dict[str, threading.Thread] = {} + self.poller_lock = threading.Lock() + self.reload_interval = int(os.environ.get('DOMAIN_RELOAD_INTERVAL', '60')) # Sekunden def setup(self): """Initialize worker""" @@ -1056,35 +1129,90 @@ class UnifiedWorker: log(f"Initialized with {len(self.queue_urls)} domains") def start(self): - """Start all domain pollers""" - self.executor = ThreadPoolExecutor( - max_workers=len(self.queue_urls), - thread_name_prefix='poller' - ) - - futures = [] + """Start all domain pollers with hot-reload support""" + # Initial pollers starten for domain, queue_url in self.queue_urls.items(): - future = self.executor.submit(poll_domain, domain, queue_url, self.stop_event) - futures.append(future) + self._start_poller(domain, queue_url) - log(f"Started {len(futures)} domain pollers") + log(f"Started {len(self.active_pollers)} domain pollers") + # Domain reload thread starten + reload_thread = threading.Thread( + target=self._domain_reload_loop, + name='domain-reloader', + daemon=True + ) + reload_thread.start() + log(f"Domain hot-reload enabled (checking every {self.reload_interval}s)") + + # Warten bis Shutdown try: - for future in as_completed(futures): - try: - future.result() - except Exception as e: - log(f"Poller error: {e}", 'ERROR') + while not self.stop_event.is_set(): + self.stop_event.wait(timeout=1) except KeyboardInterrupt: pass + def _start_poller(self, domain: str, queue_url: str): + """Start a single domain poller thread""" + with self.poller_lock: + if domain in self.active_pollers: + return # Already running + + thread = threading.Thread( + target=poll_domain, + args=(domain, queue_url, self.stop_event), + name=f"poller-{domain}", + daemon=True + ) + thread.start() + self.active_pollers[domain] = thread + log(f"🚀 Started poller for {domain}", 'INFO', f"worker-{domain}") + + def _domain_reload_loop(self): + """Periodically check for new domains and start pollers""" + while not self.stop_event.is_set(): + self.stop_event.wait(timeout=self.reload_interval) + + if self.stop_event.is_set(): + break + + try: + # Aktuelle Domains laden + current_domains = load_domains() + + # Neue Domains finden + new_domains = set(current_domains) - set(self.queue_urls.keys()) + + if new_domains: + log(f"🔄 Hot-reload: Found {len(new_domains)} new domain(s): {', '.join(new_domains)}") + + for domain in new_domains: + queue_url = get_queue_url(domain) + if queue_url: + self.queue_urls[domain] = queue_url + self._start_poller(domain, queue_url) + else: + log(f"⚠ Could not find queue for new domain: {domain}", 'WARNING') + + # Cleanup: Entfernte Domains (optional - Threads laufen weiter bis restart) + removed_domains = set(self.queue_urls.keys()) - set(current_domains) + if removed_domains: + log(f"ℹ Domains removed from config (will stop on restart): {', '.join(removed_domains)}") + + except Exception as e: + log(f"Error in domain reload: {e}", 'ERROR') + def stop(self): """Stop gracefully""" log("⚠ Stopping worker...") self.stop_event.set() - if self.executor: - self.executor.shutdown(wait=True, cancel_futures=False) + # Warten auf Poller-Threads (max 10 Sekunden) + with self.poller_lock: + for domain, thread in self.active_pollers.items(): + thread.join(timeout=10) + if thread.is_alive(): + log(f"Warning: Poller for {domain} did not stop gracefully", 'WARNING') smtp_pool.close_all() log("👋 Worker stopped") @@ -1163,7 +1291,7 @@ def main(): log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)") log(f"{'='*70}") log(f" Domains: {len(worker.queue_urls)}") - log(f" Threads: {config.worker_threads}") + log(f" Hot-Reload: Every {worker.reload_interval}s") log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}") log(f" Poll Interval: {config.poll_interval}s") @@ -1183,4 +1311,4 @@ def main(): worker.start() if __name__ == '__main__': - main() \ No newline at end of file + main()