import os import sys import boto3 import smtplib import json import time import traceback import signal from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from datetime import datetime # AWS Configuration AWS_REGION = 'us-east-2' s3 = boto3.client('s3', region_name=AWS_REGION) sqs = boto3.client('sqs', region_name=AWS_REGION) # ✨ Worker Configuration (domain-spezifisch) WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') # Worker Settings POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) # SMTP Configuration (einfach, da nur 1 Domain pro Worker) SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' SMTP_USER = os.environ.get('SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False def signal_handler(signum, frame): global shutdown_requested print(f"\n⚠ Shutdown signal received (signal {signum})") shutdown_requested = True signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def log(message: str, level: str = 'INFO'): """Structured logging with timestamp""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" return domain.replace('.', '-') + '-queue' def get_queue_url() -> str: """Ermittelt Queue-URL für die konfigurierte Domain""" queue_name = domain_to_queue_name(WORKER_DOMAIN) try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except Exception as e: raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") def mark_as_processed(bucket: str, key: str): """Markiert E-Mail als erfolgreich zugestellt""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING') def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): """Markiert E-Mail als fehlgeschlagen""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['status'] = 'failed' metadata['failed_at'] = str(int(time.time())) metadata['failed_by'] = WORKER_NAME metadata['error'] = error[:500] # S3 Metadata limit metadata['retry_count'] = str(receive_count) metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') except Exception as e: log(f"Failed to mark as failed: {e}", 'WARNING') def is_temporary_smtp_error(error_msg: str) -> bool: """ Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) 4xx Codes = temporär, 5xx = permanent """ temporary_indicators = [ '421', # Service not available '450', # Mailbox unavailable '451', # Local error '452', # Insufficient storage '4', # Generisch 4xx 'timeout', 'connection refused', 'connection reset', 'network unreachable', 'temporarily', 'try again' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in temporary_indicators) def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: """ Sendet E-Mail via SMTP Returns: (success: bool, error: str or None) """ log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: smtp.ehlo() # STARTTLS falls konfiguriert if SMTP_USE_TLS: try: smtp.starttls() smtp.ehlo() log("✓ STARTTLS enabled") except Exception as e: log(f"STARTTLS failed: {e}", 'WARNING') # Authentication falls konfiguriert if SMTP_USER and SMTP_PASS: try: smtp.login(SMTP_USER, SMTP_PASS) log("✓ SMTP authenticated") except Exception as e: log(f"SMTP auth failed: {e}", 'WARNING') # E-Mail senden result = smtp.sendmail(from_addr, [recipient], raw_message) # Result auswerten if isinstance(result, dict) and result: # Empfänger wurde abgelehnt error = result.get(recipient, 'Unknown refusal') log(f"✗ Recipient refused: {error}", 'ERROR') return False, str(error) else: # Erfolgreich log(f"✓ Email delivered to {recipient}", 'SUCCESS') return True, None except smtplib.SMTPException as e: log(f"✗ SMTP error: {e}", 'ERROR') return False, str(e) except Exception as e: log(f"✗ Connection error: {e}", 'ERROR') return False, str(e) def process_message(message_body: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) """ bucket = message_body['bucket'] key = message_body['key'] from_addr = message_body['from'] recipient = message_body['recipient'] # Nur 1 Empfänger domain = message_body['domain'] subject = message_body.get('subject', '(unknown)') message_id = message_body.get('message_id', '(unknown)') log(f"\n{'='*70}") log(f"Processing email (Attempt #{receive_count}):") log(f" MessageId: {message_id}") log(f" Domain: {domain}") log(f" From: {from_addr}") log(f" To: {recipient}") log(f" Subject: {subject}") log(f" S3: s3://{bucket}/{key}") log(f"{'='*70}") # ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen if domain.lower() != WORKER_DOMAIN.lower(): log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR') log("This message should not be in this queue! Deleting...", 'ERROR') return True # Message löschen (gehört nicht hierher) # E-Mail aus S3 laden try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)") except s3.exceptions.NoSuchKey: log(f"✗ S3 object not found (may have been deleted)", 'ERROR') return True # Nicht retryable - Message löschen except Exception as e: log(f"✗ Failed to load from S3: {e}", 'ERROR') return False # Könnte temporär sein - retry # E-Mail senden success, error = send_email(from_addr, recipient, raw_bytes) if success: # Erfolgreich zugestellt mark_as_processed(bucket, key) log(f"{'='*70}") log(f"✅ Email delivered successfully", 'SUCCESS') log(f"{'='*70}\n") return True # Message löschen else: # Fehler aufgetreten error_msg = error or "Unknown SMTP error" # Prüfe ob temporärer Fehler (Retry sinnvoll) if receive_count < 3 and is_temporary_smtp_error(error_msg): log(f"⚠ Temporary error detected, will retry", 'WARNING') log(f"{'='*70}\n") return False # Message NICHT löschen → Retry else: # Permanenter Fehler oder max retries erreicht mark_as_failed(bucket, key, error_msg, receive_count) log(f"{'='*70}") log(f"✗ Email delivery failed permanently", 'ERROR') log(f" Error: {error_msg}") log(f"{'='*70}\n") return False # Nach 3 Versuchen → automatisch DLQ def main_loop(): """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" # Queue URL ermitteln try: queue_url = get_queue_url() except Exception as e: log(f"FATAL: {e}", 'ERROR') sys.exit(1) log(f"\n{'='*70}") log(f"🚀 Email Worker started") log(f"{'='*70}") log(f" Worker Name: {WORKER_NAME}") log(f" Domain: {WORKER_DOMAIN}") log(f" Queue: {queue_url}") log(f" Region: {AWS_REGION}") log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") log(f" Poll interval: {POLL_INTERVAL}s") log(f" Max messages per poll: {MAX_MESSAGES}") log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") log(f"{'='*70}\n") consecutive_errors = 0 max_consecutive_errors = 10 messages_processed = 0 last_activity = time.time() while not shutdown_requested: try: # Messages aus Queue holen (Long Polling) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=MAX_MESSAGES, WaitTimeSeconds=POLL_INTERVAL, VisibilityTimeout=VISIBILITY_TIMEOUT, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], MessageAttributeNames=['All'] ) # Reset error counter bei erfolgreicher Abfrage consecutive_errors = 0 if 'Messages' not in response: # Keine Nachrichten if time.time() - last_activity > 60: log(f"Waiting for messages... (processed: {messages_processed})") last_activity = time.time() continue message_count = len(response['Messages']) log(f"\n✉ Received {message_count} message(s) from queue") last_activity = time.time() # Messages verarbeiten for msg in response['Messages']: if shutdown_requested: log("Shutdown requested, stopping processing") break receipt_handle = msg['ReceiptHandle'] # Receive Count auslesen receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) # Sent Timestamp (für Queue-Zeit-Berechnung) sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 if queue_time > 0: log(f"Message was in queue for {queue_time}s") try: message_body = json.loads(msg['Body']) # E-Mail verarbeiten success = process_message(message_body, receive_count) if success: # Message aus Queue löschen sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) log("✓ Message deleted from queue") messages_processed += 1 else: # Bei Fehler bleibt Message in Queue log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR') # Ungültige Messages löschen (nicht retryable) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR') traceback.print_exc() # Message bleibt in Queue für Retry except KeyboardInterrupt: log("\n⚠ Keyboard interrupt received") break except Exception as e: consecutive_errors += 1 log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') traceback.print_exc() if consecutive_errors >= max_consecutive_errors: log("Too many consecutive errors, shutting down", 'ERROR') break # Kurze Pause bei Fehlern time.sleep(5) log(f"\n{'='*70}") log(f"👋 Worker shutting down") log(f" Messages processed: {messages_processed}") log(f"{'='*70}\n") if __name__ == '__main__': # Validierung if not WORKER_DOMAIN: log("ERROR: WORKER_DOMAIN not set!", 'ERROR') sys.exit(1) try: main_loop() except Exception as e: log(f"Fatal error: {e}", 'ERROR') traceback.print_exc() sys.exit(1)