hot reload & better pooling
This commit is contained in:
parent
06195b9a60
commit
f95461ad75
|
|
@ -161,7 +161,7 @@ except Exception as e:
|
|||
# ============================================
|
||||
|
||||
class SMTPPool:
|
||||
"""Thread-safe SMTP Connection Pool"""
|
||||
"""Thread-safe SMTP Connection Pool with robust connection handling"""
|
||||
|
||||
def __init__(self, host: str, port: int, pool_size: int = 5):
|
||||
self.host = host
|
||||
|
|
@ -181,42 +181,71 @@ class SMTPPool:
|
|||
conn.ehlo()
|
||||
if config.smtp_user and config.smtp_pass:
|
||||
conn.login(config.smtp_user, config.smtp_pass)
|
||||
log(f" 📡 New SMTP connection created to {self.host}:{self.port}")
|
||||
return conn
|
||||
except Exception as e:
|
||||
log(f"Failed to create SMTP connection: {e}", 'ERROR')
|
||||
return None
|
||||
|
||||
def _test_connection(self, conn: smtplib.SMTP) -> bool:
|
||||
"""Test if connection is still alive"""
|
||||
try:
|
||||
status = conn.noop()[0]
|
||||
return status == 250
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def initialize(self):
|
||||
"""Pre-create connections"""
|
||||
if self._initialized:
|
||||
return
|
||||
for _ in range(self.pool_size):
|
||||
# Nur 1-2 Connections initial, Rest on-demand
|
||||
for _ in range(min(2, self.pool_size)):
|
||||
conn = self._create_connection()
|
||||
if conn:
|
||||
self._pool.put(conn)
|
||||
self._initialized = True
|
||||
log(f"SMTP pool initialized with {self._pool.qsize()} connections")
|
||||
log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})")
|
||||
|
||||
def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]:
|
||||
"""Get connection from pool"""
|
||||
"""Get a valid connection from pool or create new one"""
|
||||
# Versuche aus Pool zu holen
|
||||
try:
|
||||
conn = self._pool.get(timeout=timeout)
|
||||
try:
|
||||
conn.noop()
|
||||
return conn
|
||||
except:
|
||||
conn = self._create_connection()
|
||||
conn = self._pool.get(block=False)
|
||||
# Teste ob Connection noch lebt
|
||||
if self._test_connection(conn):
|
||||
return conn
|
||||
else:
|
||||
# Connection ist tot, schließen und neue erstellen
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
log(f" ♻ Recycled stale SMTP connection")
|
||||
return self._create_connection()
|
||||
except Empty:
|
||||
# Pool leer, neue Connection erstellen
|
||||
return self._create_connection()
|
||||
|
||||
def return_connection(self, conn: smtplib.SMTP):
|
||||
"""Return connection to pool"""
|
||||
"""Return connection to pool if still valid"""
|
||||
if conn is None:
|
||||
return
|
||||
|
||||
# Prüfen ob Connection noch gut ist
|
||||
if not self._test_connection(conn):
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
log(f" 🗑 Discarded broken SMTP connection")
|
||||
return
|
||||
|
||||
# Versuche zurück in Pool zu legen
|
||||
try:
|
||||
self._pool.put_nowait(conn)
|
||||
except:
|
||||
# Pool voll, Connection schließen
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
|
|
@ -651,38 +680,87 @@ def is_permanent_recipient_error(error_msg: str) -> bool:
|
|||
error_lower = error_msg.lower()
|
||||
return any(indicator in error_lower for indicator in permanent_indicators)
|
||||
|
||||
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, smtp_conn: smtplib.SMTP, worker_name: str) -> Tuple[bool, Optional[str], bool]:
|
||||
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]:
|
||||
"""
|
||||
Sendet E-Mail via SMTP an EINEN Empfänger
|
||||
Mit Retry-Logik bei Connection-Fehlern
|
||||
Returns: (success: bool, error: str or None, is_permanent: bool)
|
||||
"""
|
||||
try:
|
||||
result = smtp_conn.sendmail(from_addr, [recipient], raw_message)
|
||||
last_error = None
|
||||
|
||||
if isinstance(result, dict) and result:
|
||||
error = str(result.get(recipient, 'Unknown refusal'))
|
||||
is_permanent = is_permanent_recipient_error(error)
|
||||
log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name)
|
||||
return False, error, is_permanent
|
||||
else:
|
||||
log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name)
|
||||
return True, None, False
|
||||
for attempt in range(max_retries + 1):
|
||||
smtp_conn = smtp_pool.get_connection()
|
||||
|
||||
except smtplib.SMTPRecipientsRefused as e:
|
||||
error_msg = str(e)
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
|
||||
return False, error_msg, is_permanent
|
||||
if not smtp_conn:
|
||||
last_error = "Could not get SMTP connection"
|
||||
log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
except smtplib.SMTPException as e:
|
||||
error_msg = str(e)
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name)
|
||||
return False, error_msg, is_permanent
|
||||
try:
|
||||
result = smtp_conn.sendmail(from_addr, [recipient], raw_message)
|
||||
|
||||
except Exception as e:
|
||||
log(f" ✗ {recipient}: Connection error - {e}", 'ERROR', worker_name)
|
||||
return False, str(e), False
|
||||
# Connection war erfolgreich, zurück in Pool
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
|
||||
if isinstance(result, dict) and result:
|
||||
error = str(result.get(recipient, 'Unknown refusal'))
|
||||
is_permanent = is_permanent_recipient_error(error)
|
||||
log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name)
|
||||
return False, error, is_permanent
|
||||
else:
|
||||
log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name)
|
||||
return True, None, False
|
||||
|
||||
except smtplib.SMTPServerDisconnected as e:
|
||||
# Connection wurde geschlossen - Retry mit neuer Connection
|
||||
log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
last_error = str(e)
|
||||
# Connection nicht zurückgeben (ist kaputt)
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
time.sleep(0.3)
|
||||
continue
|
||||
|
||||
except smtplib.SMTPRecipientsRefused as e:
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
error_msg = str(e)
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
|
||||
return False, error_msg, is_permanent
|
||||
|
||||
except smtplib.SMTPException as e:
|
||||
error_msg = str(e)
|
||||
# Bei Connection-Fehlern: Retry
|
||||
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
|
||||
log(f" ⚠ {recipient}: SMTP connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
last_error = error_msg
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
time.sleep(0.3)
|
||||
continue
|
||||
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name)
|
||||
return False, error_msg, is_permanent
|
||||
|
||||
except Exception as e:
|
||||
# Unbekannter Fehler - Connection verwerfen, aber nicht permanent
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name)
|
||||
return False, str(e), False
|
||||
|
||||
# Alle Retries fehlgeschlagen
|
||||
log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name)
|
||||
return False, last_error or "Connection failed after retries", False
|
||||
|
||||
# ============================================
|
||||
# S3 METADATA UPDATES (wie in domain-worker)
|
||||
|
|
@ -872,29 +950,21 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool:
|
|||
failed_permanent = []
|
||||
failed_temporary = []
|
||||
|
||||
smtp_conn = smtp_pool.get_connection()
|
||||
if not smtp_conn:
|
||||
log("❌ Could not get SMTP connection", 'ERROR', worker_name)
|
||||
return False
|
||||
for recipient in recipients:
|
||||
success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name)
|
||||
|
||||
try:
|
||||
for recipient in recipients:
|
||||
success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, smtp_conn, worker_name)
|
||||
|
||||
if success:
|
||||
successful.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='success').inc()
|
||||
elif is_perm:
|
||||
failed_permanent.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='permanent_failure').inc()
|
||||
else:
|
||||
failed_temporary.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='temporary_failure').inc()
|
||||
finally:
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
if success:
|
||||
successful.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='success').inc()
|
||||
elif is_perm:
|
||||
failed_permanent.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='permanent_failure').inc()
|
||||
else:
|
||||
failed_temporary.append(recipient)
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_processed.labels(domain=domain, status='temporary_failure').inc()
|
||||
|
||||
# 7. RESULTAT & CLEANUP
|
||||
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name)
|
||||
|
|
@ -1028,6 +1098,9 @@ class UnifiedWorker:
|
|||
self.executor: Optional[ThreadPoolExecutor] = None
|
||||
self.domains: List[str] = []
|
||||
self.queue_urls: Dict[str, str] = {}
|
||||
self.active_pollers: Dict[str, threading.Thread] = {}
|
||||
self.poller_lock = threading.Lock()
|
||||
self.reload_interval = int(os.environ.get('DOMAIN_RELOAD_INTERVAL', '60')) # Sekunden
|
||||
|
||||
def setup(self):
|
||||
"""Initialize worker"""
|
||||
|
|
@ -1056,35 +1129,90 @@ class UnifiedWorker:
|
|||
log(f"Initialized with {len(self.queue_urls)} domains")
|
||||
|
||||
def start(self):
|
||||
"""Start all domain pollers"""
|
||||
self.executor = ThreadPoolExecutor(
|
||||
max_workers=len(self.queue_urls),
|
||||
thread_name_prefix='poller'
|
||||
)
|
||||
|
||||
futures = []
|
||||
"""Start all domain pollers with hot-reload support"""
|
||||
# Initial pollers starten
|
||||
for domain, queue_url in self.queue_urls.items():
|
||||
future = self.executor.submit(poll_domain, domain, queue_url, self.stop_event)
|
||||
futures.append(future)
|
||||
self._start_poller(domain, queue_url)
|
||||
|
||||
log(f"Started {len(futures)} domain pollers")
|
||||
log(f"Started {len(self.active_pollers)} domain pollers")
|
||||
|
||||
# Domain reload thread starten
|
||||
reload_thread = threading.Thread(
|
||||
target=self._domain_reload_loop,
|
||||
name='domain-reloader',
|
||||
daemon=True
|
||||
)
|
||||
reload_thread.start()
|
||||
log(f"Domain hot-reload enabled (checking every {self.reload_interval}s)")
|
||||
|
||||
# Warten bis Shutdown
|
||||
try:
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
future.result()
|
||||
except Exception as e:
|
||||
log(f"Poller error: {e}", 'ERROR')
|
||||
while not self.stop_event.is_set():
|
||||
self.stop_event.wait(timeout=1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def _start_poller(self, domain: str, queue_url: str):
|
||||
"""Start a single domain poller thread"""
|
||||
with self.poller_lock:
|
||||
if domain in self.active_pollers:
|
||||
return # Already running
|
||||
|
||||
thread = threading.Thread(
|
||||
target=poll_domain,
|
||||
args=(domain, queue_url, self.stop_event),
|
||||
name=f"poller-{domain}",
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
self.active_pollers[domain] = thread
|
||||
log(f"🚀 Started poller for {domain}", 'INFO', f"worker-{domain}")
|
||||
|
||||
def _domain_reload_loop(self):
|
||||
"""Periodically check for new domains and start pollers"""
|
||||
while not self.stop_event.is_set():
|
||||
self.stop_event.wait(timeout=self.reload_interval)
|
||||
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
try:
|
||||
# Aktuelle Domains laden
|
||||
current_domains = load_domains()
|
||||
|
||||
# Neue Domains finden
|
||||
new_domains = set(current_domains) - set(self.queue_urls.keys())
|
||||
|
||||
if new_domains:
|
||||
log(f"🔄 Hot-reload: Found {len(new_domains)} new domain(s): {', '.join(new_domains)}")
|
||||
|
||||
for domain in new_domains:
|
||||
queue_url = get_queue_url(domain)
|
||||
if queue_url:
|
||||
self.queue_urls[domain] = queue_url
|
||||
self._start_poller(domain, queue_url)
|
||||
else:
|
||||
log(f"⚠ Could not find queue for new domain: {domain}", 'WARNING')
|
||||
|
||||
# Cleanup: Entfernte Domains (optional - Threads laufen weiter bis restart)
|
||||
removed_domains = set(self.queue_urls.keys()) - set(current_domains)
|
||||
if removed_domains:
|
||||
log(f"ℹ Domains removed from config (will stop on restart): {', '.join(removed_domains)}")
|
||||
|
||||
except Exception as e:
|
||||
log(f"Error in domain reload: {e}", 'ERROR')
|
||||
|
||||
def stop(self):
|
||||
"""Stop gracefully"""
|
||||
log("⚠ Stopping worker...")
|
||||
self.stop_event.set()
|
||||
|
||||
if self.executor:
|
||||
self.executor.shutdown(wait=True, cancel_futures=False)
|
||||
# Warten auf Poller-Threads (max 10 Sekunden)
|
||||
with self.poller_lock:
|
||||
for domain, thread in self.active_pollers.items():
|
||||
thread.join(timeout=10)
|
||||
if thread.is_alive():
|
||||
log(f"Warning: Poller for {domain} did not stop gracefully", 'WARNING')
|
||||
|
||||
smtp_pool.close_all()
|
||||
log("👋 Worker stopped")
|
||||
|
|
@ -1163,7 +1291,7 @@ def main():
|
|||
log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)")
|
||||
log(f"{'='*70}")
|
||||
log(f" Domains: {len(worker.queue_urls)}")
|
||||
log(f" Threads: {config.worker_threads}")
|
||||
log(f" Hot-Reload: Every {worker.reload_interval}s")
|
||||
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
|
||||
log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}")
|
||||
log(f" Poll Interval: {config.poll_interval}s")
|
||||
|
|
|
|||
Loading…
Reference in New Issue