import os import sys import boto3 import smtplib import json import time import traceback import signal from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from datetime import datetime # AWS Configuration AWS_REGION = 'us-east-2' s3 = boto3.client('s3', region_name=AWS_REGION) sqs = boto3.client('sqs', region_name=AWS_REGION) # ✨ Worker Configuration (domain-spezifisch) WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') # Worker Settings POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) # SMTP Configuration (einfach, da nur 1 Domain pro Worker) SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' SMTP_USER = os.environ.get('SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False # DynamoDB Ressource für Bounce-Lookup try: dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) msg_table = dynamo.Table('ses-outbound-messages') except Exception as e: log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING') msg_table = None def get_bucket_name(domain): """Konvention: domain.tld -> domain-tld-emails""" return domain.replace('.', '-') + '-emails' def is_ses_bounce_or_autoreply(parsed): """Erkennt SES Bounces""" from_h = (parsed.get('From') or '').lower() auto_sub = (parsed.get('Auto-Submitted') or '').lower() is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub return is_mailer_daemon or is_auto_replied def extract_original_message_id(parsed): """Extrahiert Original-Message-ID aus Headern""" in_reply_to = (parsed.get('In-Reply-To') or '').strip() if in_reply_to: msg_id = in_reply_to if msg_id.startswith('<') and '>' in msg_id: msg_id = msg_id[1:msg_id.find('>')] if '@' in msg_id: msg_id = msg_id.split('@')[0] return msg_id # Fallback References refs = (parsed.get('References') or '').strip() if refs: first_ref = refs.split()[0] if first_ref.startswith('<') and '>' in first_ref: first_ref = first_ref[1:first_ref.find('>')] if '@' in first_ref: first_ref = first_ref.split('@')[0] return first_ref return None def apply_bounce_logic(parsed, subject): """ Prüft auf Bounce, sucht in DynamoDB und schreibt Header um. Returns: (parsed_email_object, was_modified_bool) """ if not is_ses_bounce_or_autoreply(parsed): return parsed, False log("🔍 Detected auto-response/bounce. Checking DynamoDB...") original_msg_id = extract_original_message_id(parsed) if not original_msg_id: log("⚠ Could not extract original Message-ID") return parsed, False try: # Lookup in DynamoDB result = msg_table.get_item(Key={'MessageId': original_msg_id}) item = result.get('Item') if not item: log(f"⚠ No DynamoDB record found for {original_msg_id}") return parsed, False # Treffer! orig_source = item.get('source', '') orig_destinations = item.get('destinations', []) original_recipient = orig_destinations[0] if orig_destinations else '' if original_recipient: log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}") # Rewrite Headers parsed['X-Original-SES-From'] = parsed.get('From', '') parsed.replace_header('From', original_recipient) if not parsed.get('Reply-To'): parsed['Reply-To'] = original_recipient if 'delivery status notification' in subject.lower(): parsed.replace_header('Subject', f"Delivery Status: {original_recipient}") return parsed, True except Exception as e: log(f"⚠ DynamoDB Error: {e}") return parsed, False def signal_handler(signum, frame): global shutdown_requested print(f"\n⚠ Shutdown signal received (signal {signum})") shutdown_requested = True signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def log(message: str, level: str = 'INFO'): """Structured logging with timestamp""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" return domain.replace('.', '-') + '-queue' def get_queue_url() -> str: """Ermittelt Queue-URL für die konfigurierte Domain""" queue_name = domain_to_queue_name(WORKER_DOMAIN) try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except Exception as e: raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): """ Markiert E-Mail als erfolgreich zugestellt Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) # Invalid inboxes speichern falls vorhanden if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING') s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING') def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list): """ Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'failed' metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' metadata['invalid_inboxes'] = ','.join(invalid_inboxes) metadata.pop('processing_started', None) metadata.pop('queued_at', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS') except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING') def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): """ Markiert E-Mail als komplett fehlgeschlagen Wird nur aufgerufen wenn ALLE Recipients fehlschlagen """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['status'] = 'failed' metadata['failed_at'] = str(int(time.time())) metadata['failed_by'] = WORKER_NAME metadata['error'] = error[:500] # S3 Metadata limit metadata['retry_count'] = str(receive_count) metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') except Exception as e: log(f"Failed to mark as failed: {e}", 'WARNING') def is_temporary_smtp_error(error_msg: str) -> bool: """ Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) 4xx Codes = temporär, 5xx = permanent """ temporary_indicators = [ '421', # Service not available '450', # Mailbox unavailable '451', # Local error '452', # Insufficient storage '4', # Generisch 4xx 'timeout', 'connection refused', 'connection reset', 'network unreachable', 'temporarily', 'try again' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in temporary_indicators) def is_permanent_recipient_error(error_msg: str) -> bool: """ Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) 550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid """ permanent_indicators = [ '550', # Mailbox unavailable / not found '551', # User not local '553', # Mailbox name not allowed / invalid 'mailbox not found', 'user unknown', 'no such user', 'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in permanent_indicators) def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: """ Sendet E-Mail via SMTP an EINEN Empfänger Returns: (success: bool, error: str or None, is_permanent: bool) """ try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: smtp.ehlo() # STARTTLS falls konfiguriert if SMTP_USE_TLS: try: smtp.starttls() smtp.ehlo() except Exception as e: log(f" STARTTLS failed: {e}", 'WARNING') # Authentication falls konfiguriert if SMTP_USER and SMTP_PASS: try: smtp.login(SMTP_USER, SMTP_PASS) except Exception as e: log(f" SMTP auth failed: {e}", 'WARNING') # E-Mail senden result = smtp.sendmail(from_addr, [recipient], raw_message) # Result auswerten if isinstance(result, dict) and result: # Empfänger wurde abgelehnt error = result.get(recipient, 'Unknown refusal') is_permanent = is_permanent_recipient_error(str(error)) log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR') return False, str(error), is_permanent else: # Erfolgreich log(f" ✓ {recipient}: Delivered", 'SUCCESS') return True, None, False except smtplib.SMTPException as e: error_msg = str(e) is_permanent = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR') return False, error_msg, is_permanent except Exception as e: # Connection errors sind immer temporär log(f" ✗ {recipient}: Connection error - {e}", 'ERROR') return False, str(e), False # ========================================== # HAUPTFUNKTION: PROCESS MESSAGE # ========================================== def process_message(message_body: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) Returns: True (Erfolg/Löschen), False (Retry/Behalten) """ try: # 1. UNPACKING (SNS -> SES) # SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...' if 'Message' in message_body and 'Type' in message_body: # Es ist eine SNS Notification sns_content = message_body['Message'] if isinstance(sns_content, str): ses_msg = json.loads(sns_content) else: ses_msg = sns_content else: # Fallback: Vielleicht doch direkt SES (Legacy support) ses_msg = message_body # 2. DATEN EXTRAHIEREN mail = ses_msg.get('mail', {}) receipt = ses_msg.get('receipt', {}) message_id = mail.get('messageId') # Das ist der S3 Key! from_addr = mail.get('source') recipients = receipt.get('recipients', []) # S3 Key Validation if not message_id: log("❌ Error: No messageId in event payload", 'ERROR') return True # Löschen, da unbrauchbar # Domain Validation # Wir nehmen den ersten Empfänger um die Domain zu prüfen if recipients: first_recipient = recipients[0] domain = first_recipient.split('@')[1] if domain.lower() != WORKER_DOMAIN.lower(): log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING') return True # Löschen, gehört nicht hierher else: log("⚠ Warning: No recipients in event", 'WARNING') return True # Bucket Name ableiten bucket = get_bucket_name(WORKER_DOMAIN) key = message_id log(f"\n{'='*70}") log(f"Processing Email (SNS/SES):") log(f" ID: {key}") log(f" Recipients: {len(recipients)} -> {recipients}") log(f" Bucket: {bucket}") # 3. LADEN AUS S3 try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() log(f"✓ Loaded {len(raw_bytes)} bytes from S3") except s3.exceptions.NoSuchKey: # Race Condition: SNS war schneller als S3. # Wir geben False zurück, damit SQS es in 30s nochmal versucht. if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR') return True # Löschen except Exception as e: log(f"❌ S3 Download Error: {e}", 'ERROR') return False # Retry # 4. PARSING & BOUNCE LOGIC try: parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') # Hier passiert die Magie: Bounce Header umschreiben parsed, modified = apply_bounce_logic(parsed, subject) if modified: log(" ✨ Bounce detected & headers rewritten via DynamoDB") # Wir arbeiten mit den modifizierten Bytes weiter raw_bytes = parsed.as_bytes() from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope else: from_addr_final = from_addr # Original Envelope Sender except Exception as e: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') from_addr_final = from_addr # 5. SMTP VERSAND (Loop über Recipients) log(f"📤 Sending to {len(recipients)} recipient(s)...") successful = [] failed_permanent = [] failed_temporary = [] for recipient in recipients: # Wir nutzen raw_bytes (ggf. modifiziert) # WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final' # (bei Bounces ist das der Original-Empfänger, sonst der SES Sender) success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes) if success: successful.append(recipient) elif is_perm: failed_permanent.append(recipient) else: failed_temporary.append(recipient) # 6. RESULTAT & CLEANUP log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail") if len(successful) > 0: # Mindestens einer durchgegangen -> Erfolg mark_as_processed(bucket, key, failed_permanent if failed_permanent else None) log(f"✅ Success. Deleted from queue.") return True elif len(failed_permanent) == len(recipients): # Alle permanent fehlgeschlagen (User unknown) -> Löschen mark_as_all_invalid(bucket, key, failed_permanent) log(f"🛑 All recipients invalid. Deleted from queue.") return True else: # Temporäre Fehler -> Retry log(f"🔄 Temporary failures. Keeping in queue.") return False except Exception as e: log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR') traceback.print_exc() return False # Retry (außer es crasht immer wieder) def main_loop(): """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" # Queue URL ermitteln try: queue_url = get_queue_url() except Exception as e: log(f"FATAL: {e}", 'ERROR') sys.exit(1) log(f"\n{'='*70}") log(f"🚀 Email Worker started") log(f"{'='*70}") log(f" Worker Name: {WORKER_NAME}") log(f" Domain: {WORKER_DOMAIN}") log(f" Queue: {queue_url}") log(f" Region: {AWS_REGION}") log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") log(f" Poll interval: {POLL_INTERVAL}s") log(f" Max messages per poll: {MAX_MESSAGES}") log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") log(f"{'='*70}\n") consecutive_errors = 0 max_consecutive_errors = 10 messages_processed = 0 last_activity = time.time() while not shutdown_requested: try: # Messages aus Queue holen (Long Polling) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=MAX_MESSAGES, WaitTimeSeconds=POLL_INTERVAL, VisibilityTimeout=VISIBILITY_TIMEOUT, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], MessageAttributeNames=['All'] ) # Reset error counter bei erfolgreicher Abfrage consecutive_errors = 0 if 'Messages' not in response: # Keine Nachrichten if time.time() - last_activity > 60: log(f"Waiting for messages... (processed: {messages_processed})") last_activity = time.time() continue message_count = len(response['Messages']) log(f"\n✉ Received {message_count} message(s) from queue") last_activity = time.time() # Messages verarbeiten for msg in response['Messages']: if shutdown_requested: log("Shutdown requested, stopping processing") break receipt_handle = msg['ReceiptHandle'] # Receive Count auslesen receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) # Sent Timestamp (für Queue-Zeit-Berechnung) sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 if queue_time > 0: log(f"Message was in queue for {queue_time}s") try: message_body = json.loads(msg['Body']) # E-Mail verarbeiten success = process_message(message_body, receive_count) if success: # Message aus Queue löschen sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) log("✓ Message deleted from queue") messages_processed += 1 else: # Bei Fehler bleibt Message in Queue log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR') # Ungültige Messages löschen (nicht retryable) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR') traceback.print_exc() # Message bleibt in Queue für Retry except KeyboardInterrupt: log("\n⚠ Keyboard interrupt received") break except Exception as e: consecutive_errors += 1 log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') traceback.print_exc() if consecutive_errors >= max_consecutive_errors: log("Too many consecutive errors, shutting down", 'ERROR') break # Kurze Pause bei Fehlern time.sleep(5) log(f"\n{'='*70}") log(f"👋 Worker shutting down") log(f" Messages processed: {messages_processed}") log(f"{'='*70}\n") if __name__ == '__main__': # Validierung if not WORKER_DOMAIN: log("ERROR: WORKER_DOMAIN not set!", 'ERROR') sys.exit(1) try: main_loop() except Exception as e: log(f"Fatal error: {e}", 'ERROR') traceback.print_exc() sys.exit(1)