import os import sys import boto3 import smtplib import json import time import traceback import signal from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from datetime import datetime # AWS Configuration AWS_REGION = 'us-east-2' s3 = boto3.client('s3', region_name=AWS_REGION) sqs = boto3.client('sqs', region_name=AWS_REGION) # ✨ Worker Configuration (domain-spezifisch) WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') # Worker Settings POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) # SMTP Configuration (einfach, da nur 1 Domain pro Worker) SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' SMTP_USER = os.environ.get('SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False def signal_handler(signum, frame): global shutdown_requested print(f"\n⚠ Shutdown signal received (signal {signum})") shutdown_requested = True signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def log(message: str, level: str = 'INFO'): """Structured logging with timestamp""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" return domain.replace('.', '-') + '-queue' def get_queue_url() -> str: """Ermittelt Queue-URL für die konfigurierte Domain""" queue_name = domain_to_queue_name(WORKER_DOMAIN) try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except Exception as e: raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): """ Markiert E-Mail als erfolgreich zugestellt Setzt processed=true auch wenn manche Recipients fehlgeschlagen sind """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) # Invalid inboxes speichern falls vorhanden if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING') s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING') def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): """ Markiert E-Mail als komplett fehlgeschlagen Wird nur aufgerufen wenn ALLE Recipients fehlschlagen """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['status'] = 'failed' metadata['failed_at'] = str(int(time.time())) metadata['failed_by'] = WORKER_NAME metadata['error'] = error[:500] # S3 Metadata limit metadata['retry_count'] = str(receive_count) metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') except Exception as e: log(f"Failed to mark as failed: {e}", 'WARNING') def is_temporary_smtp_error(error_msg: str) -> bool: """ Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) 4xx Codes = temporär, 5xx = permanent """ temporary_indicators = [ '421', # Service not available '450', # Mailbox unavailable '451', # Local error '452', # Insufficient storage '4', # Generisch 4xx 'timeout', 'connection refused', 'connection reset', 'network unreachable', 'temporarily', 'try again' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in temporary_indicators) def is_permanent_recipient_error(error_msg: str) -> bool: """ Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) 550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid """ permanent_indicators = [ '550', # Mailbox unavailable / not found '551', # User not local '553', # Mailbox name not allowed / invalid 'mailbox not found', 'user unknown', 'no such user', 'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in permanent_indicators) def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: """ Sendet E-Mail via SMTP an EINEN Empfänger Returns: (success: bool, error: str or None, is_permanent: bool) """ try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: smtp.ehlo() # STARTTLS falls konfiguriert if SMTP_USE_TLS: try: smtp.starttls() smtp.ehlo() except Exception as e: log(f" STARTTLS failed: {e}", 'WARNING') # Authentication falls konfiguriert if SMTP_USER and SMTP_PASS: try: smtp.login(SMTP_USER, SMTP_PASS) except Exception as e: log(f" SMTP auth failed: {e}", 'WARNING') # E-Mail senden result = smtp.sendmail(from_addr, [recipient], raw_message) # Result auswerten if isinstance(result, dict) and result: # Empfänger wurde abgelehnt error = result.get(recipient, 'Unknown refusal') is_permanent = is_permanent_recipient_error(str(error)) log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR') return False, str(error), is_permanent else: # Erfolgreich log(f" ✓ {recipient}: Delivered", 'SUCCESS') return True, None, False except smtplib.SMTPException as e: error_msg = str(e) is_permanent = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR') return False, error_msg, is_permanent except Exception as e: # Connection errors sind immer temporär log(f" ✗ {recipient}: Connection error - {e}", 'ERROR') return False, str(e), False def process_message(message_body: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue Kann mehrere Recipients haben - sendet an alle Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) """ bucket = message_body['bucket'] key = message_body['key'] from_addr = message_body['from'] recipients = message_body['recipients'] # Liste von Empfängern domain = message_body['domain'] subject = message_body.get('subject', '(unknown)') message_id = message_body.get('message_id', '(unknown)') log(f"\n{'='*70}") log(f"Processing email (Attempt #{receive_count}):") log(f" MessageId: {message_id}") log(f" S3 Key: {key}") log(f" Domain: {domain}") log(f" From: {from_addr}") log(f" Recipients: {len(recipients)}") for recipient in recipients: log(f" - {recipient}") log(f" Subject: {subject}") log(f" S3: s3://{bucket}/{key}") log(f"{'='*70}") # ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen if domain.lower() != WORKER_DOMAIN.lower(): log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR') log("This message should not be in this queue! Deleting...", 'ERROR') return True # Message löschen (gehört nicht hierher) # E-Mail aus S3 laden try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)") except s3.exceptions.NoSuchKey: log(f"✗ S3 object not found (may have been deleted)", 'ERROR') return True # Nicht retryable - Message löschen except Exception as e: log(f"✗ Failed to load from S3: {e}", 'ERROR') return False # Könnte temporär sein - retry # An alle Recipients senden log(f"\n📤 Sending to {len(recipients)} recipient(s)...") log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") successful = [] failed_temporary = [] failed_permanent = [] for recipient in recipients: success, error, is_permanent = send_email(from_addr, recipient, raw_bytes) if success: successful.append(recipient) elif is_permanent: failed_permanent.append(recipient) else: failed_temporary.append(recipient) # Ergebnis-Zusammenfassung log(f"\n📊 Delivery Results:") log(f" ✓ Successful: {len(successful)}/{len(recipients)}") log(f" ✗ Failed (temporary): {len(failed_temporary)}") log(f" ✗ Failed (permanent): {len(failed_permanent)}") # Entscheidungslogik if len(successful) > 0: # Mindestens 1 Recipient erfolgreich # → processed=true setzen, invalid_inboxes tracken invalid_inboxes = failed_permanent if failed_permanent else None mark_as_processed(bucket, key, invalid_inboxes) log(f"{'='*70}") log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS') if failed_permanent: log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING') if failed_temporary: log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING') log(f"{'='*70}\n") return True # Message löschen elif len(failed_permanent) == len(recipients): # ALLE Recipients sind permanent fehlgeschlagen (alle Inboxen ungültig) # → processed=true setzen mit allen als invalid_inboxes mark_as_processed(bucket, key, failed_permanent) log(f"{'='*70}") log(f"✗ All recipients permanently failed (invalid inboxes)", 'ERROR') log(f"{'='*70}\n") return True # Message löschen (nicht retryable) else: # Nur temporäre Fehler, keine erfolgreichen Deliveries # → Retry wenn noch Versuche übrig if receive_count < 3: log(f"⚠ All failures are temporary, will retry", 'WARNING') log(f"{'='*70}\n") return False # Message NICHT löschen → Retry else: # Max retries erreicht → als failed markieren error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients." mark_as_failed(bucket, key, error_summary, receive_count) log(f"{'='*70}") log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR') log(f"{'='*70}\n") return False # Nach 3 Versuchen → automatisch DLQ def main_loop(): """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" # Queue URL ermitteln try: queue_url = get_queue_url() except Exception as e: log(f"FATAL: {e}", 'ERROR') sys.exit(1) log(f"\n{'='*70}") log(f"🚀 Email Worker started") log(f"{'='*70}") log(f" Worker Name: {WORKER_NAME}") log(f" Domain: {WORKER_DOMAIN}") log(f" Queue: {queue_url}") log(f" Region: {AWS_REGION}") log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") log(f" Poll interval: {POLL_INTERVAL}s") log(f" Max messages per poll: {MAX_MESSAGES}") log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") log(f"{'='*70}\n") consecutive_errors = 0 max_consecutive_errors = 10 messages_processed = 0 last_activity = time.time() while not shutdown_requested: try: # Messages aus Queue holen (Long Polling) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=MAX_MESSAGES, WaitTimeSeconds=POLL_INTERVAL, VisibilityTimeout=VISIBILITY_TIMEOUT, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], MessageAttributeNames=['All'] ) # Reset error counter bei erfolgreicher Abfrage consecutive_errors = 0 if 'Messages' not in response: # Keine Nachrichten if time.time() - last_activity > 60: log(f"Waiting for messages... (processed: {messages_processed})") last_activity = time.time() continue message_count = len(response['Messages']) log(f"\n✉ Received {message_count} message(s) from queue") last_activity = time.time() # Messages verarbeiten for msg in response['Messages']: if shutdown_requested: log("Shutdown requested, stopping processing") break receipt_handle = msg['ReceiptHandle'] # Receive Count auslesen receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) # Sent Timestamp (für Queue-Zeit-Berechnung) sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 if queue_time > 0: log(f"Message was in queue for {queue_time}s") try: message_body = json.loads(msg['Body']) # E-Mail verarbeiten success = process_message(message_body, receive_count) if success: # Message aus Queue löschen sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) log("✓ Message deleted from queue") messages_processed += 1 else: # Bei Fehler bleibt Message in Queue log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR') # Ungültige Messages löschen (nicht retryable) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR') traceback.print_exc() # Message bleibt in Queue für Retry except KeyboardInterrupt: log("\n⚠ Keyboard interrupt received") break except Exception as e: consecutive_errors += 1 log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') traceback.print_exc() if consecutive_errors >= max_consecutive_errors: log("Too many consecutive errors, shutting down", 'ERROR') break # Kurze Pause bei Fehlern time.sleep(5) log(f"\n{'='*70}") log(f"👋 Worker shutting down") log(f" Messages processed: {messages_processed}") log(f"{'='*70}\n") if __name__ == '__main__': # Validierung if not WORKER_DOMAIN: log("ERROR: WORKER_DOMAIN not set!", 'ERROR') sys.exit(1) try: main_loop() except Exception as e: log(f"Fatal error: {e}", 'ERROR') traceback.print_exc() sys.exit(1)