#!/usr/bin/env python3 """ unified_worker.py - Multi-Domain Email Worker (Full Featured) Features: - Multi-Domain parallel processing via Thread Pool - Bounce Detection & Rewriting (SES MAILER-DAEMON handling) - Auto-Reply / Out-of-Office (from DynamoDB email-rules) - Email Forwarding (from DynamoDB email-rules) - SMTP Connection Pooling - Prometheus Metrics - Graceful Shutdown DynamoDB Tables: - ses-outbound-messages: Tracking für Bounce-Korrelation - email-rules: Forwards und Auto-Reply Regeln Schema email-rules (wie in domain-worker): { "email_address": "user@domain.com", # Partition Key "ooo_active": true/false, "ooo_message": "I am currently...", "ooo_content_type": "text" | "html", "forwards": ["other@example.com", "another@example.com"] } Schema ses-outbound-messages: { "MessageId": "abc123...", # Partition Key (SES Message-ID) "original_source": "sender@domain.com", "recipients": ["recipient@other.com"], "timestamp": "2025-01-01T12:00:00Z", "bounceType": "Permanent", # Nach Bounce-Notification "bounceSubType": "General", "bouncedRecipients": ["recipient@other.com"] } """ import os import sys import json import time import signal import threading import smtplib import traceback from queue import Queue, Empty from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any from datetime import datetime from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.message import MIMEMessage from email.utils import formataddr, parseaddr, formatdate, make_msgid import boto3 from botocore.exceptions import ClientError # Optional: Prometheus Metrics try: from prometheus_client import start_http_server, Counter, Gauge, Histogram PROMETHEUS_ENABLED = True except ImportError: PROMETHEUS_ENABLED = False # ============================================ # CONFIGURATION # ============================================ @dataclass class Config: """Worker Configuration""" # AWS aws_region: str = os.environ.get('AWS_REGION', 'us-east-2') # Domains to process domains_list: str = os.environ.get('DOMAINS', '') domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt') # Worker Settings worker_threads: int = int(os.environ.get('WORKER_THREADS', '10')) poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20')) max_messages: int = int(os.environ.get('MAX_MESSAGES', '10')) visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) # SMTP smtp_host: str = os.environ.get('SMTP_HOST', 'localhost') smtp_port: int = int(os.environ.get('SMTP_PORT', '25')) smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' smtp_user: str = os.environ.get('SMTP_USER', '') smtp_pass: str = os.environ.get('SMTP_PASS', '') smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5')) # DynamoDB Tables rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules') messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages') # Bounce Handling bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3')) bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0')) # Monitoring metrics_port: int = int(os.environ.get('METRICS_PORT', '8000')) health_port: int = int(os.environ.get('HEALTH_PORT', '8080')) config = Config() # ============================================ # LOGGING (wie in domain-worker) # ============================================ def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'): """Structured logging with timestamp - matches domain-worker format""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') thread_name = threading.current_thread().name print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True) # ============================================ # METRICS # ============================================ if PROMETHEUS_ENABLED: emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status']) emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed') processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain']) queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain']) bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type']) autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain']) forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain']) # ============================================ # AWS CLIENTS # ============================================ sqs = boto3.client('sqs', region_name=config.aws_region) s3 = boto3.client('s3', region_name=config.aws_region) ses = boto3.client('ses', region_name=config.aws_region) # DynamoDB dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) DYNAMODB_AVAILABLE = False rules_table = None messages_table = None try: rules_table = dynamodb.Table(config.rules_table) messages_table = dynamodb.Table(config.messages_table) # Test connection rules_table.table_status messages_table.table_status DYNAMODB_AVAILABLE = True except Exception as e: pass # Will be logged at startup # ============================================ # SMTP CONNECTION POOL # ============================================ class SMTPPool: """Thread-safe SMTP Connection Pool with robust connection handling""" def __init__(self, host: str, port: int, pool_size: int = 5): self.host = host self.port = port self.pool_size = pool_size self._pool: Queue = Queue(maxsize=pool_size) self._lock = threading.Lock() self._initialized = False def _create_connection(self) -> Optional[smtplib.SMTP]: """Create new SMTP connection""" try: conn = smtplib.SMTP(self.host, self.port, timeout=30) conn.ehlo() if config.smtp_use_tls: conn.starttls() conn.ehlo() if config.smtp_user and config.smtp_pass: conn.login(config.smtp_user, config.smtp_pass) log(f" 📡 New SMTP connection created to {self.host}:{self.port}") return conn except Exception as e: log(f"Failed to create SMTP connection: {e}", 'ERROR') return None def _test_connection(self, conn: smtplib.SMTP) -> bool: """Test if connection is still alive""" try: status = conn.noop()[0] return status == 250 except Exception: return False def initialize(self): """Pre-create connections""" if self._initialized: return # Nur 1-2 Connections initial, Rest on-demand for _ in range(min(2, self.pool_size)): conn = self._create_connection() if conn: self._pool.put(conn) self._initialized = True log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})") def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]: """Get a valid connection from pool or create new one""" # Versuche aus Pool zu holen try: conn = self._pool.get(block=False) # Teste ob Connection noch lebt if self._test_connection(conn): return conn else: # Connection ist tot, schließen und neue erstellen try: conn.quit() except: pass log(f" ♻ Recycled stale SMTP connection") return self._create_connection() except Empty: # Pool leer, neue Connection erstellen return self._create_connection() def return_connection(self, conn: smtplib.SMTP): """Return connection to pool if still valid""" if conn is None: return # Prüfen ob Connection noch gut ist if not self._test_connection(conn): try: conn.quit() except: pass log(f" 🗑 Discarded broken SMTP connection") return # Versuche zurück in Pool zu legen try: self._pool.put_nowait(conn) except: # Pool voll, Connection schließen try: conn.quit() except: pass def close_all(self): """Close all connections""" while not self._pool.empty(): try: conn = self._pool.get_nowait() conn.quit() except: pass smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size) # ============================================ # HELPER FUNCTIONS # ============================================ def domain_to_queue_name(domain: str) -> str: return domain.replace('.', '-') + '-queue' def domain_to_bucket_name(domain: str) -> str: return domain.replace('.', '-') + '-emails' def get_queue_url(domain: str) -> Optional[str]: queue_name = domain_to_queue_name(domain) try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': log(f"Queue not found for domain: {domain}", 'WARNING') else: log(f"Error getting queue URL for {domain}: {e}", 'ERROR') return None def load_domains() -> List[str]: """Load domains from config""" domains = [] if config.domains_list: domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()]) if os.path.exists(config.domains_file): with open(config.domains_file, 'r') as f: for line in f: domain = line.strip() if domain and not domain.startswith('#'): domains.append(domain) domains = list(set(domains)) log(f"Loaded {len(domains)} domains: {', '.join(domains)}") return domains # ============================================ # BOUNCE HANDLING (wie in domain-worker) # ============================================ def is_ses_bounce_notification(parsed_email) -> bool: """Check if email is from SES MAILER-DAEMON""" from_header = (parsed_email.get('From') or '').lower() return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]: """ Sucht Bounce-Info in DynamoDB anhand der Message-ID Mit Retry-Logik für Timing-Issues (wie in domain-worker) """ if not DYNAMODB_AVAILABLE or not messages_table: return None for attempt in range(config.bounce_lookup_retries): try: response = messages_table.get_item(Key={'MessageId': message_id}) item = response.get('Item') if item: return { 'original_source': item.get('original_source', ''), 'bounceType': item.get('bounceType', 'Unknown'), 'bounceSubType': item.get('bounceSubType', 'Unknown'), 'bouncedRecipients': item.get('bouncedRecipients', []), 'timestamp': item.get('timestamp', '') } if attempt < config.bounce_lookup_retries - 1: log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name) time.sleep(config.bounce_lookup_delay) else: log(f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts for Message-ID: {message_id}", 'WARNING', worker_name) return None except Exception as e: log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name) if attempt < config.bounce_lookup_retries - 1: time.sleep(config.bounce_lookup_delay) else: return None return None def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]: """ Prüft auf SES Bounce, sucht in DynamoDB und schreibt Header um. Returns: (parsed_email_object, was_modified_bool) """ if not is_ses_bounce_notification(parsed): return parsed, False log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name) # Message-ID aus Header extrahieren message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0] if not message_id: log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name) return parsed, False log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name) # Lookup in DynamoDB bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name) if not bounce_info: return parsed, False # Bounce Info ausgeben original_source = bounce_info['original_source'] bounced_recipients = bounce_info['bouncedRecipients'] bounce_type = bounce_info['bounceType'] bounce_subtype = bounce_info['bounceSubType'] log(f"✓ Found bounce info:", 'INFO', worker_name) log(f" Original sender: {original_source}", 'INFO', worker_name) log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name) log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name) if bounced_recipients: new_from = bounced_recipients[0] # Rewrite Headers parsed['X-Original-SES-From'] = parsed.get('From', '') parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" parsed.replace_header('From', new_from) if not parsed.get('Reply-To'): parsed['Reply-To'] = new_from # Subject anpassen if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower(): parsed.replace_header('Subject', f"Delivery Status: {new_from}") log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name) return parsed, True log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name) return parsed, False # ============================================ # EMAIL BODY EXTRACTION (wie in domain-worker) # ============================================ def extract_body_parts(parsed) -> Tuple[str, Optional[str]]: """ Extrahiert sowohl text/plain als auch text/html Body-Parts. Returns: (text_body: str, html_body: str or None) """ text_body = '' html_body = None if parsed.is_multipart(): for part in parsed.walk(): content_type = part.get_content_type() if content_type == 'text/plain': try: text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore') except Exception: pass elif content_type == 'text/html': try: html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore') except Exception: pass else: try: payload = parsed.get_payload(decode=True) if payload: decoded = payload.decode('utf-8', errors='ignore') if parsed.get_content_type() == 'text/html': html_body = decoded else: text_body = decoded except Exception: text_body = str(parsed.get_payload()) return text_body.strip() if text_body else '(No body content)', html_body # ============================================ # AUTO-REPLY / OOO (wie in domain-worker) # ============================================ def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'): """ Erstellt eine Out-of-Office Reply als komplette MIME-Message. Behält Original-Body (text + html) bei. """ text_body, html_body = extract_body_parts(original_parsed) original_subject = original_parsed.get('Subject', '(no subject)') original_from = original_parsed.get('From', 'unknown') # Neue Message erstellen msg = MIMEMultipart('mixed') msg['From'] = recipient msg['To'] = original_from msg['Subject'] = f"Out of Office: {original_subject}" msg['Date'] = formatdate(localtime=True) msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['In-Reply-To'] = original_parsed.get('Message-ID', '') msg['References'] = original_parsed.get('Message-ID', '') msg['Auto-Submitted'] = 'auto-replied' # Verhindert Loops # Body-Teil erstellen body_part = MIMEMultipart('alternative') # Text-Version text_content = f"{ooo_msg}\n\n" text_content += "--- Original Message ---\n" text_content += f"From: {original_from}\n" text_content += f"Subject: {original_subject}\n\n" text_content += text_body body_part.attach(MIMEText(text_content, 'plain', 'utf-8')) # HTML-Version (wenn gewünscht und Original vorhanden) if content_type == 'html' or html_body: html_content = f"
{ooo_msg}



" html_content += "
" html_content += f"Original Message
" html_content += f"From: {original_from}
" html_content += f"Subject: {original_subject}

" html_content += (html_body if html_body else text_body.replace('\n', '
')) html_content += "
" body_part.attach(MIMEText(html_content, 'html', 'utf-8')) msg.attach(body_part) return msg # ============================================ # FORWARDING (wie in domain-worker) # ============================================ def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str): """ Erstellt eine Forward-Message als komplette MIME-Message. Behält ALLE Original-Parts inkl. Attachments bei. """ original_subject = original_parsed.get('Subject', '(no subject)') original_date = original_parsed.get('Date', 'unknown') # Neue Message erstellen msg = MIMEMultipart('mixed') msg['From'] = recipient msg['To'] = forward_to msg['Subject'] = f"FWD: {original_subject}" msg['Date'] = formatdate(localtime=True) msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['Reply-To'] = original_from # Forward-Header als Text text_body, html_body = extract_body_parts(original_parsed) # Body-Teil body_part = MIMEMultipart('alternative') # Text-Version fwd_text = "---------- Forwarded message ---------\n" fwd_text += f"From: {original_from}\n" fwd_text += f"Date: {original_date}\n" fwd_text += f"Subject: {original_subject}\n" fwd_text += f"To: {recipient}\n\n" fwd_text += text_body body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8')) # HTML-Version if html_body: fwd_html = "
" fwd_html += "---------- Forwarded message ---------
" fwd_html += f"From: {original_from}
" fwd_html += f"Date: {original_date}
" fwd_html += f"Subject: {original_subject}
" fwd_html += f"To: {recipient}

" fwd_html += html_body fwd_html += "
" body_part.attach(MIMEText(fwd_html, 'html', 'utf-8')) msg.attach(body_part) # WICHTIG: Attachments kopieren if original_parsed.is_multipart(): for part in original_parsed.walk(): # Nur non-body parts (Attachments) if part.get_content_maintype() == 'multipart': continue if part.get_content_type() in ['text/plain', 'text/html']: continue # Body bereits oben behandelt # Attachment hinzufügen msg.attach(part) return msg # ============================================ # RULES PROCESSING (KORRIGIERT - wie domain-worker Schema) # ============================================ def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None: """ Verarbeitet OOO und Forward-Regeln für einen Empfänger. Schema wie in domain-worker: email_address als Key, ooo_active, forwards als Felder. """ if not DYNAMODB_AVAILABLE or not rules_table: return try: # Regel aus DynamoDB laden (Schema: email_address als Partition Key) response = rules_table.get_item(Key={'email_address': recipient}) rule = response.get('Item', {}) if not rule: return original_from = parsed.get('From', '') original_subject = parsed.get('Subject', '(no subject)') # ============================================ # OOO / Auto-Reply handling # ============================================ if rule.get('ooo_active', False): ooo_msg = rule.get('ooo_message', 'I am currently out of office.') content_type = rule.get('ooo_content_type', 'text') # Sender für Reply extrahieren (nur Email-Adresse) sender_name, sender_addr = parseaddr(original_from) if not sender_addr: sender_addr = original_from # Nicht auf automatische Mails antworten auto_submitted = parsed.get('Auto-Submitted', '') precedence = (parsed.get('Precedence') or '').lower() if auto_submitted and auto_submitted != 'no': log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name) elif precedence in ['bulk', 'junk', 'list']: log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name) elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster']): log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name) else: try: ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type) ses.send_raw_email( Source=recipient, Destinations=[sender_addr], RawMessage={'Data': ooo_reply.as_bytes()} ) log(f"✓ Sent OOO reply to {sender_addr} from {recipient}", 'SUCCESS', worker_name) if PROMETHEUS_ENABLED: autoreplies_sent.labels(domain=domain).inc() except ClientError as e: error_code = e.response['Error']['Code'] log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR', worker_name) # ============================================ # Forward handling # ============================================ forwards = rule.get('forwards', []) if forwards: for forward_to in forwards: try: fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from) ses.send_raw_email( Source=recipient, Destinations=[forward_to], RawMessage={'Data': fwd_msg.as_bytes()} ) log(f"✓ Forwarded to {forward_to} from {recipient}", 'SUCCESS', worker_name) if PROMETHEUS_ENABLED: forwards_sent.labels(domain=domain).inc() except ClientError as e: error_code = e.response['Error']['Code'] log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR', worker_name) except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'ResourceNotFoundException': pass # Keine Regel vorhanden - normal else: log(f"⚠ DynamoDB error for {recipient}: {e}", 'ERROR', worker_name) except Exception as e: log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name) traceback.print_exc() # ============================================ # SMTP SENDING (wie in domain-worker) # ============================================ def is_permanent_recipient_error(error_msg: str) -> bool: """ Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) """ permanent_indicators = [ '550', # Mailbox unavailable / not found '551', # User not local '553', # Mailbox name not allowed / invalid 'mailbox not found', 'user unknown', 'no such user', 'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in permanent_indicators) def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]: """ Sendet E-Mail via SMTP an EINEN Empfänger Mit Retry-Logik bei Connection-Fehlern Returns: (success: bool, error: str or None, is_permanent: bool) """ last_error = None for attempt in range(max_retries + 1): smtp_conn = smtp_pool.get_connection() if not smtp_conn: last_error = "Could not get SMTP connection" log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) time.sleep(0.5) continue try: result = smtp_conn.sendmail(from_addr, [recipient], raw_message) # Connection war erfolgreich, zurück in Pool smtp_pool.return_connection(smtp_conn) if isinstance(result, dict) and result: error = str(result.get(recipient, 'Unknown refusal')) is_permanent = is_permanent_recipient_error(error) log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name) return False, error, is_permanent else: log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name) return True, None, False except smtplib.SMTPServerDisconnected as e: # Connection wurde geschlossen - Retry mit neuer Connection log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) last_error = str(e) # Connection nicht zurückgeben (ist kaputt) try: smtp_conn.quit() except: pass time.sleep(0.3) continue except smtplib.SMTPRecipientsRefused as e: smtp_pool.return_connection(smtp_conn) error_msg = str(e) is_permanent = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) return False, error_msg, is_permanent except smtplib.SMTPException as e: error_msg = str(e) # Bei Connection-Fehlern: Retry if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower(): log(f" ⚠ {recipient}: SMTP connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) last_error = error_msg try: smtp_conn.quit() except: pass time.sleep(0.3) continue smtp_pool.return_connection(smtp_conn) is_permanent = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name) return False, error_msg, is_permanent except Exception as e: # Unbekannter Fehler - Connection verwerfen, aber nicht permanent try: smtp_conn.quit() except: pass log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name) return False, str(e), False # Alle Retries fehlgeschlagen log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name) return False, last_error or "Connection failed after retries", False # ============================================ # S3 METADATA UPDATES (wie in domain-worker) # ============================================ def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None): """Markiert E-Mail als erfolgreich zugestellt""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str): """Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'failed' metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' metadata['invalid_inboxes'] = ','.join(invalid_inboxes) metadata.pop('processing_started', None) metadata.pop('queued_at', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) # ============================================ # MAIN MESSAGE PROCESSING # ============================================ def process_message(domain: str, message: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) Returns: True (Erfolg/Löschen), False (Retry/Behalten) """ worker_name = f"worker-{domain}" try: # 1. UNPACKING (SNS -> SES) message_body = json.loads(message['Body']) if 'Message' in message_body and 'Type' in message_body: # Es ist eine SNS Notification sns_content = message_body['Message'] if isinstance(sns_content, str): ses_msg = json.loads(sns_content) else: ses_msg = sns_content else: ses_msg = message_body # 2. DATEN EXTRAHIEREN mail = ses_msg.get('mail', {}) receipt = ses_msg.get('receipt', {}) message_id = mail.get('messageId') # FIX: Amazon SES Setup Notification ignorieren if message_id == "AMAZON_SES_SETUP_NOTIFICATION": log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name) return True from_addr = mail.get('source') recipients = receipt.get('recipients', []) if not message_id: log("❌ Error: No messageId in event payload", 'ERROR', worker_name) return True # Domain Validation if recipients: first_recipient = recipients[0] recipient_domain = first_recipient.split('@')[1] if recipient_domain.lower() != domain.lower(): log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name) return True else: log("⚠ Warning: No recipients in event", 'WARNING', worker_name) return True bucket = domain_to_bucket_name(domain) key = message_id # Compact single-line log for email processing recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients" log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name) # 3. LADEN AUS S3 try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() except s3.exceptions.NoSuchKey: if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) return True except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) return True log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) return False except Exception as e: log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) return False # 4. PARSING & BOUNCE LOGIC try: parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') # Bounce Header umschreiben (setzt auch is_bounce intern) is_bounce = is_ses_bounce_notification(parsed) parsed, modified = apply_bounce_logic(parsed, subject, worker_name) if modified: log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name) raw_bytes = parsed.as_bytes() from_addr_final = parsed.get('From') if PROMETHEUS_ENABLED: bounces_processed.labels(domain=domain, type='rewritten').inc() else: from_addr_final = from_addr except Exception as e: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name) from_addr_final = from_addr is_bounce = False # 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces) if not is_bounce: for recipient in recipients: process_rules_for_recipient(recipient, parsed, domain, worker_name) # 6. SMTP VERSAND log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name) successful = [] failed_permanent = [] failed_temporary = [] for recipient in recipients: success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name) if success: successful.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='success').inc() elif is_perm: failed_permanent.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='permanent_failure').inc() else: failed_temporary.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='temporary_failure').inc() # 7. RESULTAT & CLEANUP if len(successful) > 0: mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None) result_info = f"{len(successful)} OK" if failed_permanent: result_info += f", {len(failed_permanent)} invalid" log(f"✅ Delivered ({result_info})", 'SUCCESS', worker_name) return True elif len(failed_permanent) == len(recipients): mark_as_all_invalid(bucket, key, failed_permanent, worker_name) log(f"🛑 All {len(recipients)} recipients invalid", 'ERROR', worker_name) return True else: log(f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name) return False except Exception as e: log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name) traceback.print_exc() return False # ============================================ # DOMAIN POLLER # ============================================ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event, domain_stats: Dict[str, int] = None, stats_lock: threading.Lock = None): """Poll single domain's queue continuously""" worker_name = f"worker-{domain}" log(f"🚀 Starting poller for {domain}", 'INFO', worker_name) messages_processed = 0 while not stop_event.is_set(): try: response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=config.max_messages, WaitTimeSeconds=config.poll_interval, VisibilityTimeout=config.visibility_timeout, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'] ) messages = response.get('Messages', []) # Update queue size metric if PROMETHEUS_ENABLED: try: attrs = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages'] ) queue_size.labels(domain=domain).set( int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0)) ) except: pass if not messages: continue log(f"✉ Received {len(messages)} message(s)", 'INFO', worker_name) for message in messages: if stop_event.is_set(): break receipt_handle = message['ReceiptHandle'] receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1)) if PROMETHEUS_ENABLED: emails_in_flight.inc() start_time = time.time() try: success = process_message(domain, message, receive_count) if success: sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) messages_processed += 1 # Update shared stats if domain_stats is not None and stats_lock is not None: with stats_lock: domain_stats[domain] = messages_processed else: log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name) except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR', worker_name) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR', worker_name) traceback.print_exc() finally: if PROMETHEUS_ENABLED: emails_in_flight.dec() processing_time.labels(domain=domain).observe(time.time() - start_time) except Exception as e: log(f"✗ Error polling: {e}", 'ERROR', worker_name) time.sleep(5) log(f"👋 Stopped (processed: {messages_processed})", 'INFO', worker_name) # ============================================ # UNIFIED WORKER # ============================================ class UnifiedWorker: """Main worker coordinating all domain pollers""" def __init__(self): self.stop_event = threading.Event() self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} self.poller_threads: List[threading.Thread] = [] # Shared stats across all pollers self.domain_stats: Dict[str, int] = {} # domain -> processed count self.stats_lock = threading.Lock() def setup(self): """Initialize worker""" self.domains = load_domains() if not self.domains: log("❌ No domains configured!", 'ERROR') sys.exit(1) # Get queue URLs for domain in self.domains: url = get_queue_url(domain) if url: self.queue_urls[domain] = url log(f" ✓ {domain} -> {domain_to_queue_name(domain)}") else: log(f" ✗ {domain} -> Queue not found!", 'WARNING') if not self.queue_urls: log("❌ No valid queues found!", 'ERROR') sys.exit(1) # Initialize SMTP pool smtp_pool.initialize() log(f"Initialized with {len(self.queue_urls)} domains") def start(self): """Start all domain pollers""" # Initialize stats for all domains for domain in self.queue_urls.keys(): self.domain_stats[domain] = 0 for domain, queue_url in self.queue_urls.items(): thread = threading.Thread( target=poll_domain, args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock), name=f"poller-{domain}", daemon=True ) thread.start() self.poller_threads.append(thread) log(f"Started {len(self.poller_threads)} domain pollers") # Periodic status log (every 5 minutes) last_status_log = time.time() status_interval = 300 # 5 minutes try: while not self.stop_event.is_set(): self.stop_event.wait(timeout=10) # Log status summary every 5 minutes if time.time() - last_status_log > status_interval: self._log_status_table() last_status_log = time.time() except KeyboardInterrupt: pass def _log_status_table(self): """Log a compact status table""" active_threads = sum(1 for t in self.poller_threads if t.is_alive()) with self.stats_lock: total_processed = sum(self.domain_stats.values()) # Build compact stats: only show domains with activity or top domains stats_parts = [] for domain in sorted(self.queue_urls.keys()): count = self.domain_stats.get(domain, 0) # Shorten domain for display short_domain = domain.split('.')[0][:12] stats_parts.append(f"{short_domain}:{count}") stats_line = " | ".join(stats_parts) log(f"📊 Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {stats_line}") def stop(self): """Stop gracefully""" log("⚠ Stopping worker...") self.stop_event.set() # Warten auf Poller-Threads (max 10 Sekunden) for thread in self.poller_threads: thread.join(timeout=10) if thread.is_alive(): log(f"Warning: {thread.name} did not stop gracefully", 'WARNING') smtp_pool.close_all() log("👋 Worker stopped") # ============================================ # HEALTH CHECK SERVER # ============================================ def start_health_server(worker: UnifiedWorker): """HTTP health endpoint""" from http.server import HTTPServer, BaseHTTPRequestHandler class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/health' or self.path == '/': self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() status = { 'status': 'healthy', 'domains': len(worker.queue_urls), 'domain_list': list(worker.queue_urls.keys()), 'dynamodb': DYNAMODB_AVAILABLE, 'features': { 'bounce_rewriting': True, 'auto_reply': DYNAMODB_AVAILABLE, 'forwarding': DYNAMODB_AVAILABLE }, 'timestamp': datetime.utcnow().isoformat() } self.wfile.write(json.dumps(status, indent=2).encode()) elif self.path == '/domains': self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode()) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # Suppress HTTP access logs class SilentHTTPServer(HTTPServer): """HTTP Server that ignores connection reset errors from scanners""" def handle_error(self, request, client_address): exc_type = sys.exc_info()[0] if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): pass # Silently ignore - these are just scanners/health checks disconnecting else: log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING') server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') thread.start() log(f"Health server on port {config.health_port}") # ============================================ # ENTRY POINT # ============================================ def main(): worker = UnifiedWorker() def signal_handler(signum, frame): log(f"Received signal {signum}") worker.stop() sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) worker.setup() if PROMETHEUS_ENABLED: start_http_server(config.metrics_port) log(f"Prometheus metrics on port {config.metrics_port}") start_health_server(worker) log(f"\n{'='*70}") log(f"🚀 UNIFIED EMAIL WORKER") log(f"{'='*70}") log(f" Domains: {len(worker.queue_urls)}") log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}") log(f" Poll Interval: {config.poll_interval}s") log(f" Visibility: {config.visibility_timeout}s") log(f"") log(f" Features:") log(f" ✓ Bounce Detection & Header Rewriting") log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office") log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding") log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics") log(f"") log(f" Active Domains:") for domain in sorted(worker.queue_urls.keys()): log(f" • {domain}") log(f"{'='*70}\n") worker.start() if __name__ == '__main__': main()