From 06195b9a6078ff90d8347a7d85b9219bf8288957 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sat, 10 Jan 2026 17:17:00 -0600 Subject: [PATCH] reworked --- unified-worker/unified_worker.py | 1174 +++++++++++++++++------------- 1 file changed, 681 insertions(+), 493 deletions(-) diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index 24a6091..e9b2784 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -15,16 +15,13 @@ DynamoDB Tables: - ses-outbound-messages: Tracking für Bounce-Korrelation - email-rules: Forwards und Auto-Reply Regeln -Schema email-rules: +Schema email-rules (wie in domain-worker): { - "email": "user@domain.com", # Partition Key - "rule_type": "forward|autoreply", # Sort Key - "enabled": true, - "target": "other@example.com", # Für forwards - "subject": "Out of Office", # Für autoreply - "body": "I am currently...", # Für autoreply - "start_date": "2025-01-01", # Optional: Zeitraum - "end_date": "2025-01-15" # Optional: Zeitraum + "email_address": "user@domain.com", # Partition Key + "ooo_active": true/false, + "ooo_message": "I am currently...", + "ooo_content_type": "text" | "html", + "forwards": ["other@example.com", "another@example.com"] } Schema ses-outbound-messages: @@ -44,26 +41,23 @@ import sys import json import time import signal -import logging import threading import smtplib -import hashlib -import re +import traceback from queue import Queue, Empty from concurrent.futures import ThreadPoolExecutor, as_completed -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any -from datetime import datetime, date +from datetime import datetime from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart -from email.utils import formataddr, parseaddr, formatdate -import copy +from email.mime.message import MIMEMessage +from email.utils import formataddr, parseaddr, formatdate, make_msgid import boto3 from botocore.exceptions import ClientError -from boto3.dynamodb.conditions import Key # Optional: Prometheus Metrics try: @@ -71,7 +65,6 @@ try: PROMETHEUS_ENABLED = True except ImportError: PROMETHEUS_ENABLED = False - logging.warning("prometheus_client not installed - metrics disabled") # ============================================ # CONFIGURATION @@ -109,10 +102,6 @@ class Config: bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3')) bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0')) - # Auto-Reply - autoreply_from_name: str = os.environ.get('AUTOREPLY_FROM_NAME', 'Auto-Reply') - autoreply_cooldown_hours: int = int(os.environ.get('AUTOREPLY_COOLDOWN_HOURS', '24')) - # Monitoring metrics_port: int = int(os.environ.get('METRICS_PORT', '8000')) health_port: int = int(os.environ.get('HEALTH_PORT', '8080')) @@ -120,15 +109,14 @@ class Config: config = Config() # ============================================ -# LOGGING +# LOGGING (wie in domain-worker) # ============================================ -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) -logger = logging.getLogger(__name__) +def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'): + """Structured logging with timestamp - matches domain-worker format""" + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + thread_name = threading.current_thread().name + print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True) # ============================================ # METRICS @@ -153,6 +141,10 @@ ses = boto3.client('ses', region_name=config.aws_region) # DynamoDB dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) +DYNAMODB_AVAILABLE = False +rules_table = None +messages_table = None + try: rules_table = dynamodb.Table(config.rules_table) messages_table = dynamodb.Table(config.messages_table) @@ -160,16 +152,9 @@ try: rules_table.table_status messages_table.table_status DYNAMODB_AVAILABLE = True - logger.info(f"DynamoDB connected: {config.rules_table}, {config.messages_table}") + log(f"DynamoDB connected: {config.rules_table}, {config.messages_table}") except Exception as e: - DYNAMODB_AVAILABLE = False - rules_table = None - messages_table = None - logger.warning(f"DynamoDB not available: {e}") - -# Auto-Reply Cooldown Cache (in-memory, thread-safe) -autoreply_cache: Dict[str, datetime] = {} -autoreply_cache_lock = threading.Lock() + log(f"Warning: DynamoDB not available: {e}", 'WARNING') # ============================================ # SMTP CONNECTION POOL @@ -198,7 +183,7 @@ class SMTPPool: conn.login(config.smtp_user, config.smtp_pass) return conn except Exception as e: - logger.error(f"Failed to create SMTP connection: {e}") + log(f"Failed to create SMTP connection: {e}", 'ERROR') return None def initialize(self): @@ -210,7 +195,7 @@ class SMTPPool: if conn: self._pool.put(conn) self._initialized = True - logger.info(f"SMTP pool initialized with {self._pool.qsize()} connections") + log(f"SMTP pool initialized with {self._pool.qsize()} connections") def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]: """Get connection from pool""" @@ -265,9 +250,9 @@ def get_queue_url(domain: str) -> Optional[str]: return response['QueueUrl'] except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': - logger.warning(f"Queue not found for domain: {domain}") + log(f"Queue not found for domain: {domain}", 'WARNING') else: - logger.error(f"Error getting queue URL for {domain}: {e}") + log(f"Error getting queue URL for {domain}: {e}", 'ERROR') return None def load_domains() -> List[str]: @@ -285,18 +270,11 @@ def load_domains() -> List[str]: domains.append(domain) domains = list(set(domains)) - logger.info(f"Loaded {len(domains)} domains") + log(f"Loaded {len(domains)} domains: {', '.join(domains)}") return domains -def extract_email_address(header_value: str) -> str: - """Extract pure email address from header like 'Name '""" - if not header_value: - return '' - name, addr = parseaddr(header_value) - return addr.lower() if addr else header_value.lower() - # ============================================ -# BOUNCE HANDLING +# BOUNCE HANDLING (wie in domain-worker) # ============================================ def is_ses_bounce_notification(parsed_email) -> bool: @@ -304,29 +282,22 @@ def is_ses_bounce_notification(parsed_email) -> bool: from_header = (parsed_email.get('From') or '').lower() return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header -def get_bounce_info_from_dynamodb(message_id: str) -> Optional[Dict]: +def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]: """ - Look up original sender info from DynamoDB for bounce correlation. - The message_id here is from the bounce notification, we need to find - the original outbound message. + Sucht Bounce-Info in DynamoDB anhand der Message-ID + Mit Retry-Logik für Timing-Issues (wie in domain-worker) """ if not DYNAMODB_AVAILABLE or not messages_table: return None for attempt in range(config.bounce_lookup_retries): try: - # Message-ID könnte verschiedene Formate haben - # Versuche mit und ohne < > Klammern - clean_id = message_id.strip('<>').split('@')[0] - - response = messages_table.get_item(Key={'MessageId': clean_id}) + response = messages_table.get_item(Key={'MessageId': message_id}) item = response.get('Item') if item: - logger.info(f"Found bounce info for {clean_id}") return { 'original_source': item.get('original_source', ''), - 'recipients': item.get('recipients', []), 'bounceType': item.get('bounceType', 'Unknown'), 'bounceSubType': item.get('bounceSubType', 'Unknown'), 'bouncedRecipients': item.get('bouncedRecipients', []), @@ -334,438 +305,617 @@ def get_bounce_info_from_dynamodb(message_id: str) -> Optional[Dict]: } if attempt < config.bounce_lookup_retries - 1: - logger.debug(f"Bounce record not found, retry {attempt + 1}...") + log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name) time.sleep(config.bounce_lookup_delay) - + else: + log(f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts for Message-ID: {message_id}", 'WARNING', worker_name) + return None + except Exception as e: - logger.error(f"DynamoDB error looking up bounce: {e}") + log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name) if attempt < config.bounce_lookup_retries - 1: time.sleep(config.bounce_lookup_delay) + else: + return None return None -def rewrite_bounce_headers(parsed_email, bounce_info: Dict) -> Tuple[Any, bool]: +def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]: """ - Rewrite bounce email headers so the recipient sees it correctly. - - Problem: SES sends bounces FROM mailer-daemon@amazonses.com - but we want the user to see FROM the bounced address - - Returns: (modified_email, was_modified) + Prüft auf SES Bounce, sucht in DynamoDB und schreibt Header um. + Returns: (parsed_email_object, was_modified_bool) """ + if not is_ses_bounce_notification(parsed): + return parsed, False + + log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name) + + # Message-ID aus Header extrahieren + message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0] + + if not message_id: + log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name) + return parsed, False + + log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name) + + # Lookup in DynamoDB + bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name) + if not bounce_info: - return parsed_email, False + return parsed, False + + # Bounce Info ausgeben + original_source = bounce_info['original_source'] + bounced_recipients = bounce_info['bouncedRecipients'] + bounce_type = bounce_info['bounceType'] + bounce_subtype = bounce_info['bounceSubType'] - original_source = bounce_info.get('original_source', '') - bounced_recipients = bounce_info.get('bouncedRecipients', []) - bounce_type = bounce_info.get('bounceType', 'Unknown') - bounce_subtype = bounce_info.get('bounceSubType', 'Unknown') + log(f"✓ Found bounce info:", 'INFO', worker_name) + log(f" Original sender: {original_source}", 'INFO', worker_name) + log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name) + log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name) - if not bounced_recipients: - return parsed_email, False + if bounced_recipients: + new_from = bounced_recipients[0] + + # Rewrite Headers + parsed['X-Original-SES-From'] = parsed.get('From', '') + parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" + parsed.replace_header('From', new_from) + + if not parsed.get('Reply-To'): + parsed['Reply-To'] = new_from + + # Subject anpassen + if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower(): + parsed.replace_header('Subject', f"Delivery Status: {new_from}") + + log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name) + return parsed, True - # Use first bounced recipient as the new "From" - new_from = bounced_recipients[0] if isinstance(bounced_recipients[0], str) else bounced_recipients[0].get('emailAddress', '') - - if not new_from: - return parsed_email, False - - logger.info(f"Rewriting bounce: FROM {parsed_email.get('From')} -> {new_from}") - - # Add diagnostic headers - parsed_email['X-Original-SES-From'] = parsed_email.get('From', '') - parsed_email['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" - parsed_email['X-Original-Recipient'] = original_source - - # Rewrite From header - if 'From' in parsed_email: - del parsed_email['From'] - parsed_email['From'] = new_from - - # Add Reply-To if not present - if not parsed_email.get('Reply-To'): - parsed_email['Reply-To'] = new_from - - # Improve subject for clarity - subject = parsed_email.get('Subject', '') - if 'delivery status' in subject.lower() or not subject: - if 'Subject' in parsed_email: - del parsed_email['Subject'] - parsed_email['Subject'] = f"Undeliverable: Message to {new_from}" - - return parsed_email, True + log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name) + return parsed, False # ============================================ -# EMAIL RULES (FORWARDS & AUTO-REPLY) +# EMAIL BODY EXTRACTION (wie in domain-worker) # ============================================ -@dataclass -class EmailRule: - """Represents a forward or auto-reply rule""" - email: str - rule_type: str # 'forward' or 'autoreply' - enabled: bool = True - target: str = '' # Forward destination - subject: str = '' # Auto-reply subject - body: str = '' # Auto-reply body - start_date: Optional[date] = None - end_date: Optional[date] = None +def extract_body_parts(parsed) -> Tuple[str, Optional[str]]: + """ + Extrahiert sowohl text/plain als auch text/html Body-Parts. + Returns: (text_body: str, html_body: str or None) + """ + text_body = '' + html_body = None - def is_active(self) -> bool: - """Check if rule is currently active (considering date range)""" - if not self.enabled: - return False - - today = date.today() - - if self.start_date and today < self.start_date: - return False - if self.end_date and today > self.end_date: - return False - - return True - -def get_rules_for_email(email_address: str) -> List[EmailRule]: - """Fetch all rules for an email address from DynamoDB""" - if not DYNAMODB_AVAILABLE or not rules_table: - return [] - - email_lower = email_address.lower() - rules = [] - - try: - response = rules_table.query( - KeyConditionExpression=Key('email').eq(email_lower) - ) - - for item in response.get('Items', []): - rule = EmailRule( - email=item.get('email', ''), - rule_type=item.get('rule_type', ''), - enabled=item.get('enabled', True), - target=item.get('target', ''), - subject=item.get('subject', ''), - body=item.get('body', ''), - ) + if parsed.is_multipart(): + for part in parsed.walk(): + content_type = part.get_content_type() - # Parse dates if present - if item.get('start_date'): + if content_type == 'text/plain': try: - rule.start_date = datetime.strptime(item['start_date'], '%Y-%m-%d').date() - except: - pass - if item.get('end_date'): - try: - rule.end_date = datetime.strptime(item['end_date'], '%Y-%m-%d').date() - except: + text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore') + except Exception: pass - rules.append(rule) - - logger.debug(f"Found {len(rules)} rules for {email_address}") - return rules - - except Exception as e: - logger.error(f"Error fetching rules for {email_address}: {e}") - return [] - -def should_send_autoreply(recipient: str, sender: str) -> bool: - """ - Check if we should send an auto-reply. - Implements cooldown to avoid reply loops. - """ - # Don't reply to automated messages - sender_lower = sender.lower() - if any(x in sender_lower for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster', 'bounce', 'notification']): - return False - - # Check cooldown cache - cache_key = f"{recipient}:{sender}" - - with autoreply_cache_lock: - last_reply = autoreply_cache.get(cache_key) - if last_reply: - hours_since = (datetime.now() - last_reply).total_seconds() / 3600 - if hours_since < config.autoreply_cooldown_hours: - logger.debug(f"Auto-reply cooldown active for {cache_key}") - return False - - # Update cache - autoreply_cache[cache_key] = datetime.now() - - return True - -def send_autoreply(rule: EmailRule, original_from: str, original_subject: str, domain: str) -> bool: - """Send an auto-reply email via SES""" - if not should_send_autoreply(rule.email, original_from): - return False - - try: - # Build reply message - reply_subject = rule.subject or f"Re: {original_subject}" - reply_body = rule.body or "This is an automated reply. I am currently unavailable." - - # Add original subject reference if not in body - if original_subject and original_subject not in reply_body: - reply_body += f"\n\n---\nRegarding: {original_subject}" - - msg = MIMEText(reply_body, 'plain', 'utf-8') - msg['Subject'] = reply_subject - msg['From'] = formataddr((config.autoreply_from_name, rule.email)) - msg['To'] = original_from - msg['Date'] = formatdate(localtime=True) - msg['Auto-Submitted'] = 'auto-replied' - msg['X-Auto-Response-Suppress'] = 'All' - msg['Precedence'] = 'auto_reply' - - # Send via SES - ses.send_raw_email( - Source=rule.email, - Destinations=[original_from], - RawMessage={'Data': msg.as_string()} - ) - - logger.info(f"✉ Auto-reply sent from {rule.email} to {original_from}") - - if PROMETHEUS_ENABLED: - autoreplies_sent.labels(domain=domain).inc() - - return True - - except Exception as e: - logger.error(f"Failed to send auto-reply: {e}") - return False - -def send_forward(rule: EmailRule, raw_email: bytes, original_from: str, original_subject: str, domain: str) -> bool: - """Forward email to target address""" - if not rule.target: - return False - - try: - # Parse original email - parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email) - - # Create forwarded message - fwd_msg = MIMEMultipart('mixed') - fwd_msg['Subject'] = f"Fwd: {original_subject or parsed.get('Subject', '(no subject)')}" - fwd_msg['From'] = rule.email - fwd_msg['To'] = rule.target - fwd_msg['Date'] = formatdate(localtime=True) - fwd_msg['X-Forwarded-For'] = rule.email - fwd_msg['X-Original-From'] = original_from - - # Add forward header text - forward_header = MIMEText( - f"---------- Forwarded message ----------\n" - f"From: {parsed.get('From', 'Unknown')}\n" - f"Date: {parsed.get('Date', 'Unknown')}\n" - f"Subject: {parsed.get('Subject', '(no subject)')}\n" - f"To: {parsed.get('To', rule.email)}\n\n", - 'plain', 'utf-8' - ) - fwd_msg.attach(forward_header) - - # Attach original message - from email.mime.message import MIMEMessage - fwd_msg.attach(MIMEMessage(parsed)) - - # Send via SES - ses.send_raw_email( - Source=rule.email, - Destinations=[rule.target], - RawMessage={'Data': fwd_msg.as_string()} - ) - - logger.info(f"➡ Forward sent from {rule.email} to {rule.target}") - - if PROMETHEUS_ENABLED: - forwards_sent.labels(domain=domain).inc() - - return True - - except Exception as e: - logger.error(f"Failed to forward email: {e}") - return False - -# ============================================ -# MAIN EMAIL PROCESSING -# ============================================ - -def process_message(domain: str, message: dict) -> bool: - """Process single SQS message with full feature set""" - receipt_handle = message['ReceiptHandle'] - - try: - # Parse SQS message body - body = json.loads(message['Body']) - - # Handle SNS wrapper (Lambda shim format) - if body.get('Type') == 'Notification': - ses_data = json.loads(body.get('Message', '{}')) - else: - ses_data = body.get('ses', body) - - mail = ses_data.get('mail', {}) - receipt = ses_data.get('receipt', {}) - - message_id = mail.get('messageId', 'unknown') - from_addr = mail.get('source', '') - recipients = receipt.get('recipients', []) or mail.get('destination', []) - - logger.info(f"Processing: {message_id} for {domain}") - logger.info(f" From: {from_addr}") - logger.info(f" To: {recipients}") - - # ---------------------------------------- - # 1. Download email from S3 - # ---------------------------------------- - bucket = domain_to_bucket_name(domain) - object_key = message_id - - # Check receipt action for actual S3 location - actions = receipt.get('action', {}) - if isinstance(actions, dict): - actions = [actions] - for action in actions: - if isinstance(action, dict) and action.get('type') == 'S3': - bucket = action.get('bucketName', bucket) - object_key = action.get('objectKey', object_key) - break - + elif content_type == 'text/html': + try: + html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + except Exception: + pass + else: try: - s3_response = s3.get_object(Bucket=bucket, Key=object_key) - raw_email = s3_response['Body'].read() - except ClientError as e: - logger.error(f"Failed to download from S3: {bucket}/{object_key} - {e}") - return False + payload = parsed.get_payload(decode=True) + if payload: + decoded = payload.decode('utf-8', errors='ignore') + if parsed.get_content_type() == 'text/html': + html_body = decoded + else: + text_body = decoded + except Exception: + text_body = str(parsed.get_payload()) + + return text_body.strip() if text_body else '(No body content)', html_body + +# ============================================ +# AUTO-REPLY / OOO (wie in domain-worker) +# ============================================ + +def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'): + """ + Erstellt eine Out-of-Office Reply als komplette MIME-Message. + Behält Original-Body (text + html) bei. + """ + text_body, html_body = extract_body_parts(original_parsed) + original_subject = original_parsed.get('Subject', '(no subject)') + original_from = original_parsed.get('From', 'unknown') + + # Neue Message erstellen + msg = MIMEMultipart('mixed') + msg['From'] = recipient + msg['To'] = original_from + msg['Subject'] = f"Out of Office: {original_subject}" + msg['Date'] = formatdate(localtime=True) + msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) + msg['In-Reply-To'] = original_parsed.get('Message-ID', '') + msg['References'] = original_parsed.get('Message-ID', '') + msg['Auto-Submitted'] = 'auto-replied' # Verhindert Loops + + # Body-Teil erstellen + body_part = MIMEMultipart('alternative') + + # Text-Version + text_content = f"{ooo_msg}\n\n" + text_content += "--- Original Message ---\n" + text_content += f"From: {original_from}\n" + text_content += f"Subject: {original_subject}\n\n" + text_content += text_body + body_part.attach(MIMEText(text_content, 'plain', 'utf-8')) + + # HTML-Version (wenn gewünscht und Original vorhanden) + if content_type == 'html' or html_body: + html_content = f"
{ooo_msg}



" + html_content += "
" + html_content += f"Original Message
" + html_content += f"From: {original_from}
" + html_content += f"Subject: {original_subject}

" + html_content += (html_body if html_body else text_body.replace('\n', '
')) + html_content += "
" + body_part.attach(MIMEText(html_content, 'html', 'utf-8')) + + msg.attach(body_part) + return msg + +# ============================================ +# FORWARDING (wie in domain-worker) +# ============================================ + +def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str): + """ + Erstellt eine Forward-Message als komplette MIME-Message. + Behält ALLE Original-Parts inkl. Attachments bei. + """ + original_subject = original_parsed.get('Subject', '(no subject)') + original_date = original_parsed.get('Date', 'unknown') + + # Neue Message erstellen + msg = MIMEMultipart('mixed') + msg['From'] = recipient + msg['To'] = forward_to + msg['Subject'] = f"FWD: {original_subject}" + msg['Date'] = formatdate(localtime=True) + msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) + msg['Reply-To'] = original_from + + # Forward-Header als Text + text_body, html_body = extract_body_parts(original_parsed) + + # Body-Teil + body_part = MIMEMultipart('alternative') + + # Text-Version + fwd_text = "---------- Forwarded message ---------\n" + fwd_text += f"From: {original_from}\n" + fwd_text += f"Date: {original_date}\n" + fwd_text += f"Subject: {original_subject}\n" + fwd_text += f"To: {recipient}\n\n" + fwd_text += text_body + body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8')) + + # HTML-Version + if html_body: + fwd_html = "
" + fwd_html += "---------- Forwarded message ---------
" + fwd_html += f"From: {original_from}
" + fwd_html += f"Date: {original_date}
" + fwd_html += f"Subject: {original_subject}
" + fwd_html += f"To: {recipient}

" + fwd_html += html_body + fwd_html += "
" + body_part.attach(MIMEText(fwd_html, 'html', 'utf-8')) + + msg.attach(body_part) + + # WICHTIG: Attachments kopieren + if original_parsed.is_multipart(): + for part in original_parsed.walk(): + # Nur non-body parts (Attachments) + if part.get_content_maintype() == 'multipart': + continue + if part.get_content_type() in ['text/plain', 'text/html']: + continue # Body bereits oben behandelt + + # Attachment hinzufügen + msg.attach(part) + + return msg + +# ============================================ +# RULES PROCESSING (KORRIGIERT - wie domain-worker Schema) +# ============================================ + +def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None: + """ + Verarbeitet OOO und Forward-Regeln für einen Empfänger. + Schema wie in domain-worker: email_address als Key, ooo_active, forwards als Felder. + """ + if not DYNAMODB_AVAILABLE or not rules_table: + return + + try: + # Regel aus DynamoDB laden (Schema: email_address als Partition Key) + response = rules_table.get_item(Key={'email_address': recipient}) + rule = response.get('Item', {}) - # Parse email - parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email) - subject = parsed.get('Subject', '') + if not rule: + return - # ---------------------------------------- - # 2. Bounce Detection & Rewriting - # ---------------------------------------- - is_bounce = is_ses_bounce_notification(parsed) - bounce_rewritten = False + original_from = parsed.get('From', '') + original_subject = parsed.get('Subject', '(no subject)') - if is_bounce: - logger.info("🔄 Detected SES bounce notification") + # ============================================ + # OOO / Auto-Reply handling + # ============================================ + if rule.get('ooo_active', False): + ooo_msg = rule.get('ooo_message', 'I am currently out of office.') + content_type = rule.get('ooo_content_type', 'text') - # Extract Message-ID from bounce to correlate with original - bounce_msg_id = parsed.get('Message-ID', '') + # Sender für Reply extrahieren (nur Email-Adresse) + sender_name, sender_addr = parseaddr(original_from) + if not sender_addr: + sender_addr = original_from - # Also check X-Original-Message-Id or parse from body - # SES bounces often contain the original Message-ID in headers or body - original_msg_id = parsed.get('X-Original-Message-Id', '') - if not original_msg_id: - # Try to extract from References header - refs = parsed.get('References', '') - if refs: - original_msg_id = refs.split()[0] if refs else '' + # Nicht auf automatische Mails antworten + auto_submitted = parsed.get('Auto-Submitted', '') + precedence = (parsed.get('Precedence') or '').lower() - lookup_id = original_msg_id or bounce_msg_id - - if lookup_id: - bounce_info = get_bounce_info_from_dynamodb(lookup_id) - if bounce_info: - parsed, bounce_rewritten = rewrite_bounce_headers(parsed, bounce_info) + if auto_submitted and auto_submitted != 'no': + log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name) + elif precedence in ['bulk', 'junk', 'list']: + log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name) + elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster']): + log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name) + else: + try: + ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type) + + ses.send_raw_email( + Source=recipient, + Destinations=[sender_addr], + RawMessage={'Data': ooo_reply.as_bytes()} + ) + log(f"✓ Sent OOO reply to {sender_addr} from {recipient}", 'SUCCESS', worker_name) if PROMETHEUS_ENABLED: - bounces_processed.labels( - domain=domain, - type=bounce_info.get('bounceType', 'Unknown') - ).inc() + autoreplies_sent.labels(domain=domain).inc() + + except ClientError as e: + error_code = e.response['Error']['Code'] + log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR', worker_name) - # ---------------------------------------- - # 3. Deliver to local mailboxes - # ---------------------------------------- - success_count = 0 - failed_recipients = [] - delivered_recipients = [] + # ============================================ + # Forward handling + # ============================================ + forwards = rule.get('forwards', []) + if forwards: + for forward_to in forwards: + try: + fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from) + + ses.send_raw_email( + Source=recipient, + Destinations=[forward_to], + RawMessage={'Data': fwd_msg.as_bytes()} + ) + log(f"✓ Forwarded to {forward_to} from {recipient}", 'SUCCESS', worker_name) + + if PROMETHEUS_ENABLED: + forwards_sent.labels(domain=domain).inc() + + except ClientError as e: + error_code = e.response['Error']['Code'] + log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR', worker_name) + + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == 'ResourceNotFoundException': + pass # Keine Regel vorhanden - normal + else: + log(f"⚠ DynamoDB error for {recipient}: {e}", 'ERROR', worker_name) + except Exception as e: + log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name) + traceback.print_exc() + +# ============================================ +# SMTP SENDING (wie in domain-worker) +# ============================================ + +def is_permanent_recipient_error(error_msg: str) -> bool: + """ + Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) + """ + permanent_indicators = [ + '550', # Mailbox unavailable / not found + '551', # User not local + '553', # Mailbox name not allowed / invalid + 'mailbox not found', + 'user unknown', + 'no such user', + 'recipient rejected', + 'does not exist', + 'invalid recipient', + 'unknown user' + ] + + error_lower = error_msg.lower() + return any(indicator in error_lower for indicator in permanent_indicators) + +def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, smtp_conn: smtplib.SMTP, worker_name: str) -> Tuple[bool, Optional[str], bool]: + """ + Sendet E-Mail via SMTP an EINEN Empfänger + Returns: (success: bool, error: str or None, is_permanent: bool) + """ + try: + result = smtp_conn.sendmail(from_addr, [recipient], raw_message) - # Get modified raw email if bounce was rewritten - if bounce_rewritten: - raw_email = parsed.as_bytes() + if isinstance(result, dict) and result: + error = str(result.get(recipient, 'Unknown refusal')) + is_permanent = is_permanent_recipient_error(error) + log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name) + return False, error, is_permanent + else: + log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name) + return True, None, False + + except smtplib.SMTPRecipientsRefused as e: + error_msg = str(e) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) + return False, error_msg, is_permanent + + except smtplib.SMTPException as e: + error_msg = str(e) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name) + return False, error_msg, is_permanent + + except Exception as e: + log(f" ✗ {recipient}: Connection error - {e}", 'ERROR', worker_name) + return False, str(e), False + +# ============================================ +# S3 METADATA UPDATES (wie in domain-worker) +# ============================================ + +def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None): + """Markiert E-Mail als erfolgreich zugestellt""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = worker_name + metadata['status'] = 'delivered' + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + if invalid_inboxes: + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS', worker_name) + + except Exception as e: + log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) + +def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str): + """Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = worker_name + metadata['status'] = 'failed' + metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS', worker_name) + + except Exception as e: + log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) + +# ============================================ +# MAIN MESSAGE PROCESSING +# ============================================ + +def process_message(domain: str, message: dict, receive_count: int) -> bool: + """ + Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) + Returns: True (Erfolg/Löschen), False (Retry/Behalten) + """ + worker_name = f"worker-{domain}" + + try: + # 1. UNPACKING (SNS -> SES) + message_body = json.loads(message['Body']) + + if 'Message' in message_body and 'Type' in message_body: + # Es ist eine SNS Notification + sns_content = message_body['Message'] + if isinstance(sns_content, str): + ses_msg = json.loads(sns_content) + else: + ses_msg = sns_content + else: + ses_msg = message_body + + # 2. DATEN EXTRAHIEREN + mail = ses_msg.get('mail', {}) + receipt = ses_msg.get('receipt', {}) + + message_id = mail.get('messageId') + + # FIX: Amazon SES Setup Notification ignorieren + if message_id == "AMAZON_SES_SETUP_NOTIFICATION": + log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name) + return True + + from_addr = mail.get('source') + recipients = receipt.get('recipients', []) + + if not message_id: + log("❌ Error: No messageId in event payload", 'ERROR', worker_name) + return True + + # Domain Validation + if recipients: + first_recipient = recipients[0] + recipient_domain = first_recipient.split('@')[1] + + if recipient_domain.lower() != domain.lower(): + log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name) + return True + else: + log("⚠ Warning: No recipients in event", 'WARNING', worker_name) + return True + + bucket = domain_to_bucket_name(domain) + key = message_id + + log(f"\n{'='*70}", 'INFO', worker_name) + log(f"Processing Email (SNS/SES):", 'INFO', worker_name) + log(f" ID: {key}", 'INFO', worker_name) + log(f" Recipients: {len(recipients)} -> {recipients}", 'INFO', worker_name) + log(f" Bucket: {bucket}", 'INFO', worker_name) + + # 3. LADEN AUS S3 + try: + response = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = response['Body'].read() + log(f"✓ Loaded {len(raw_bytes)} bytes from S3", 'INFO', worker_name) + except s3.exceptions.NoSuchKey: + if receive_count < 5: + log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) + return False + else: + log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) + return True + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + if receive_count < 5: + log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) + return False + else: + log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) + return True + log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) + return False + except Exception as e: + log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) + return False + + # 4. PARSING & BOUNCE LOGIC + try: + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + subject = parsed.get('Subject', '(no subject)') + + # Bounce Header umschreiben + parsed, modified = apply_bounce_logic(parsed, subject, worker_name) + is_bounce = is_ses_bounce_notification(BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)) + + if modified: + log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name) + raw_bytes = parsed.as_bytes() + from_addr_final = parsed.get('From') + + if PROMETHEUS_ENABLED: + bounces_processed.labels(domain=domain, type='rewritten').inc() + else: + from_addr_final = from_addr + + except Exception as e: + log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name) + from_addr_final = from_addr + is_bounce = False + + # 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces) + if not is_bounce: + for recipient in recipients: + process_rules_for_recipient(recipient, parsed, domain, worker_name) + + # 6. SMTP VERSAND + log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name) + + successful = [] + failed_permanent = [] + failed_temporary = [] + smtp_conn = smtp_pool.get_connection() if not smtp_conn: - logger.error("Could not get SMTP connection") + log("❌ Could not get SMTP connection", 'ERROR', worker_name) return False - + try: for recipient in recipients: - try: - smtp_conn.sendmail(from_addr, [recipient], raw_email) - success_count += 1 - delivered_recipients.append(recipient) - logger.info(f" ✓ Delivered to {recipient}") - + success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, smtp_conn, worker_name) + + if success: + successful.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='success').inc() - - except smtplib.SMTPRecipientsRefused as e: - logger.warning(f" ✗ Recipient refused: {recipient}") - failed_recipients.append(recipient) + elif is_perm: + failed_permanent.append(recipient) if PROMETHEUS_ENABLED: - emails_processed.labels(domain=domain, status='recipient_refused').inc() - - except smtplib.SMTPException as e: - logger.error(f" ✗ SMTP error for {recipient}: {e}") - failed_recipients.append(recipient) - + emails_processed.labels(domain=domain, status='permanent_failure').inc() + else: + failed_temporary.append(recipient) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='temporary_failure').inc() finally: smtp_pool.return_connection(smtp_conn) + + # 7. RESULTAT & CLEANUP + log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name) + + if len(successful) > 0: + mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None) + log(f"✅ Success. Deleted from queue.", 'INFO', worker_name) + return True - # ---------------------------------------- - # 4. Process Rules (Forwards & Auto-Reply) - # ---------------------------------------- - if not is_bounce: # Don't process rules for bounces - for recipient in delivered_recipients: - rules = get_rules_for_email(recipient) - - for rule in rules: - if not rule.is_active(): - continue - - if rule.rule_type == 'autoreply': - send_autoreply(rule, from_addr, subject, domain) - - elif rule.rule_type == 'forward': - send_forward(rule, raw_email, from_addr, subject, domain) - - # ---------------------------------------- - # 5. Update S3 metadata - # ---------------------------------------- - if success_count > 0: - try: - metadata = { - 'processed': 'true', - 'processed_at': str(int(time.time())), - 'delivered_to': ','.join(delivered_recipients), - 'status': 'delivered' - } - if failed_recipients: - metadata['failed_recipients'] = ','.join(failed_recipients) - if bounce_rewritten: - metadata['bounce_rewritten'] = 'true' - - s3.copy_object( - Bucket=bucket, - Key=object_key, - CopySource={'Bucket': bucket, 'Key': object_key}, - Metadata=metadata, - MetadataDirective='REPLACE' - ) - except Exception as e: - logger.warning(f"Failed to update S3 metadata: {e}") - - return success_count > 0 - + elif len(failed_permanent) == len(recipients): + mark_as_all_invalid(bucket, key, failed_permanent, worker_name) + log(f"🛑 All recipients invalid. Deleted from queue.", 'INFO', worker_name) + return True + + else: + log(f"🔄 Temporary failures. Keeping in queue.", 'INFO', worker_name) + return False + except Exception as e: - logger.error(f"Error processing message: {e}", exc_info=True) - if PROMETHEUS_ENABLED: - emails_processed.labels(domain=domain, status='error').inc() + log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name) + traceback.print_exc() return False # ============================================ @@ -774,7 +924,11 @@ def process_message(domain: str, message: dict) -> bool: def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): """Poll single domain's queue continuously""" - logger.info(f"Starting poller for {domain}") + worker_name = f"worker-{domain}" + log(f"🚀 Starting poller for {domain}", 'INFO', worker_name) + + messages_processed = 0 + last_activity = time.time() while not stop_event.is_set(): try: @@ -782,7 +936,8 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): QueueUrl=queue_url, MaxNumberOfMessages=config.max_messages, WaitTimeSeconds=config.poll_interval, - VisibilityTimeout=config.visibility_timeout + VisibilityTimeout=config.visibility_timeout, + AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'] ) messages = response.get('Messages', []) @@ -800,34 +955,66 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): except: pass + if not messages: + if time.time() - last_activity > 60: + log(f"Waiting for messages... (processed: {messages_processed})", 'INFO', worker_name) + last_activity = time.time() + continue + + log(f"\n✉ Received {len(messages)} message(s) from queue", 'INFO', worker_name) + last_activity = time.time() + for message in messages: if stop_event.is_set(): break + receipt_handle = message['ReceiptHandle'] + receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1)) + + sent_timestamp = int(message.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 + queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 + + if queue_time > 0: + log(f"Message was in queue for {queue_time}s", 'INFO', worker_name) + if PROMETHEUS_ENABLED: emails_in_flight.inc() start_time = time.time() try: - success = process_message(domain, message) + success = process_message(domain, message, receive_count) if success: sqs.delete_message( QueueUrl=queue_url, - ReceiptHandle=message['ReceiptHandle'] + ReceiptHandle=receipt_handle ) - # If not successful, message becomes visible again after timeout - + log("✓ Message deleted from queue", 'INFO', worker_name) + messages_processed += 1 + else: + log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)", 'WARNING', worker_name) + + except json.JSONDecodeError as e: + log(f"✗ Invalid message format: {e}", 'ERROR', worker_name) + sqs.delete_message( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle + ) + + except Exception as e: + log(f"✗ Error processing message: {e}", 'ERROR', worker_name) + traceback.print_exc() + finally: if PROMETHEUS_ENABLED: emails_in_flight.dec() processing_time.labels(domain=domain).observe(time.time() - start_time) except Exception as e: - logger.error(f"Error polling {domain}: {e}") + log(f"✗ Error polling {domain}: {e}", 'ERROR', worker_name) time.sleep(5) - logger.info(f"Stopped poller for {domain}") + log(f"👋 Stopped poller for {domain} (processed: {messages_processed})", 'INFO', worker_name) # ============================================ # UNIFIED WORKER @@ -847,7 +1034,7 @@ class UnifiedWorker: self.domains = load_domains() if not self.domains: - logger.error("No domains configured!") + log("❌ No domains configured!", 'ERROR') sys.exit(1) # Get queue URLs @@ -855,20 +1042,23 @@ class UnifiedWorker: url = get_queue_url(domain) if url: self.queue_urls[domain] = url + log(f" ✓ {domain} -> {domain_to_queue_name(domain)}") + else: + log(f" ✗ {domain} -> Queue not found!", 'WARNING') if not self.queue_urls: - logger.error("No valid queues found!") + log("❌ No valid queues found!", 'ERROR') sys.exit(1) # Initialize SMTP pool smtp_pool.initialize() - logger.info(f"Initialized with {len(self.queue_urls)} domains") + log(f"Initialized with {len(self.queue_urls)} domains") def start(self): """Start all domain pollers""" self.executor = ThreadPoolExecutor( - max_workers=config.worker_threads, + max_workers=len(self.queue_urls), thread_name_prefix='poller' ) @@ -877,27 +1067,27 @@ class UnifiedWorker: future = self.executor.submit(poll_domain, domain, queue_url, self.stop_event) futures.append(future) - logger.info(f"Started {len(futures)} domain pollers") + log(f"Started {len(futures)} domain pollers") try: for future in as_completed(futures): try: future.result() except Exception as e: - logger.error(f"Poller error: {e}") + log(f"Poller error: {e}", 'ERROR') except KeyboardInterrupt: pass def stop(self): """Stop gracefully""" - logger.info("Stopping worker...") + log("⚠ Stopping worker...") self.stop_event.set() if self.executor: self.executor.shutdown(wait=True, cancel_futures=False) smtp_pool.close_all() - logger.info("Worker stopped") + log("👋 Worker stopped") # ============================================ # HEALTH CHECK SERVER @@ -917,6 +1107,7 @@ def start_health_server(worker: UnifiedWorker): status = { 'status': 'healthy', 'domains': len(worker.queue_urls), + 'domain_list': list(worker.queue_urls.keys()), 'dynamodb': DYNAMODB_AVAILABLE, 'features': { 'bounce_rewriting': True, @@ -925,7 +1116,7 @@ def start_health_server(worker: UnifiedWorker): }, 'timestamp': datetime.utcnow().isoformat() } - self.wfile.write(json.dumps(status).encode()) + self.wfile.write(json.dumps(status, indent=2).encode()) elif self.path == '/domains': self.send_response(200) @@ -933,26 +1124,17 @@ def start_health_server(worker: UnifiedWorker): self.end_headers() self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode()) - elif self.path == '/metrics-status': - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write(json.dumps({ - 'prometheus_enabled': PROMETHEUS_ENABLED, - 'metrics_port': config.metrics_port if PROMETHEUS_ENABLED else None - }).encode()) - else: self.send_response(404) self.end_headers() def log_message(self, format, *args): - pass + pass # Suppress HTTP logs server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler) - thread = threading.Thread(target=server.serve_forever, daemon=True) + thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') thread.start() - logger.info(f"Health server on port {config.health_port}") + log(f"Health server on port {config.health_port}") # ============================================ # ENTRY POINT @@ -962,7 +1144,7 @@ def main(): worker = UnifiedWorker() def signal_handler(signum, frame): - logger.info(f"Received signal {signum}") + log(f"Received signal {signum}") worker.stop() sys.exit(0) @@ -973,24 +1155,30 @@ def main(): if PROMETHEUS_ENABLED: start_http_server(config.metrics_port) - logger.info(f"Prometheus metrics on port {config.metrics_port}") + log(f"Prometheus metrics on port {config.metrics_port}") start_health_server(worker) - logger.info("=" * 60) - logger.info(" UNIFIED EMAIL WORKER (Full Featured)") - logger.info("=" * 60) - logger.info(f" Domains: {len(worker.queue_urls)}") - logger.info(f" Threads: {config.worker_threads}") - logger.info(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") - logger.info(f" SMTP Pool: {config.smtp_pool_size} connections") - logger.info("") - logger.info(" Features:") - logger.info(" ✓ Bounce Detection & Header Rewriting") - logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office") - logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding") - logger.info(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics") - logger.info("=" * 60) + log(f"\n{'='*70}") + log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)") + log(f"{'='*70}") + log(f" Domains: {len(worker.queue_urls)}") + log(f" Threads: {config.worker_threads}") + log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") + log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}") + log(f" Poll Interval: {config.poll_interval}s") + log(f" Visibility: {config.visibility_timeout}s") + log(f"") + log(f" Features:") + log(f" ✓ Bounce Detection & Header Rewriting") + log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office") + log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding") + log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics") + log(f"") + log(f" Active Domains:") + for domain in sorted(worker.queue_urls.keys()): + log(f" • {domain}") + log(f"{'='*70}\n") worker.start()