From eba3e0462b47f965f988fd6fb85d6c062cb61f36 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sun, 30 Nov 2025 16:41:21 -0600 Subject: [PATCH] neuer Worker --- worker_sns.py | 626 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 626 insertions(+) create mode 100755 worker_sns.py diff --git a/worker_sns.py b/worker_sns.py new file mode 100755 index 0000000..b7899f9 --- /dev/null +++ b/worker_sns.py @@ -0,0 +1,626 @@ +import os +import sys +import boto3 +import smtplib +import json +import time +import traceback +import signal +from email.parser import BytesParser +from email.policy import SMTP as SMTPPolicy +from datetime import datetime + +# AWS Configuration +AWS_REGION = 'us-east-2' +s3 = boto3.client('s3', region_name=AWS_REGION) +sqs = boto3.client('sqs', region_name=AWS_REGION) + +# ✨ Worker Configuration (domain-spezifisch) +WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' +WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') + +# Worker Settings +POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) +MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) +VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) + +# SMTP Configuration (einfach, da nur 1 Domain pro Worker) +SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') +SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) +SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' +SMTP_USER = os.environ.get('SMTP_USER') +SMTP_PASS = os.environ.get('SMTP_PASS') + +# Graceful shutdown +shutdown_requested = False + +# DynamoDB Ressource (neu im Worker!) +dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) +msg_table = dynamo.Table('ses-outbound-messages') + +def get_bucket_name(domain): + """Konvention: domain.tld -> domain-tld-emails""" + return domain.replace('.', '-') + '-emails' + +def is_ses_bounce_or_autoreply(parsed): + """Erkennt SES Bounces""" + from_h = (parsed.get('From') or '').lower() + auto_sub = (parsed.get('Auto-Submitted') or '').lower() + is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h + is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub + return is_mailer_daemon or is_auto_replied + +def extract_original_message_id(parsed): + """Extrahiert Original-Message-ID aus Headern""" + in_reply_to = (parsed.get('In-Reply-To') or '').strip() + if in_reply_to: + msg_id = in_reply_to + if msg_id.startswith('<') and '>' in msg_id: + msg_id = msg_id[1:msg_id.find('>')] + if '@' in msg_id: msg_id = msg_id.split('@')[0] + return msg_id + + # Fallback References + refs = (parsed.get('References') or '').strip() + if refs: + first_ref = refs.split()[0] + if first_ref.startswith('<') and '>' in first_ref: + first_ref = first_ref[1:first_ref.find('>')] + if '@' in first_ref: first_ref = first_ref.split('@')[0] + return first_ref + return None + +def apply_bounce_logic(parsed, subject): + """ + Prüft auf Bounce, sucht in DynamoDB und schreibt Header um. + Returns: (parsed_email_object, was_modified_bool) + """ + if not is_ses_bounce_or_autoreply(parsed): + return parsed, False + + log("🔍 Detected auto-response/bounce. Checking DynamoDB...") + original_msg_id = extract_original_message_id(parsed) + + if not original_msg_id: + log("⚠ Could not extract original Message-ID") + return parsed, False + + try: + # Lookup in DynamoDB + result = msg_table.get_item(Key={'MessageId': original_msg_id}) + item = result.get('Item') + + if not item: + log(f"⚠ No DynamoDB record found for {original_msg_id}") + return parsed, False + + # Treffer! + orig_source = item.get('source', '') + orig_destinations = item.get('destinations', []) + original_recipient = orig_destinations[0] if orig_destinations else '' + + if original_recipient: + log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}") + + # Rewrite Headers + parsed['X-Original-SES-From'] = parsed.get('From', '') + parsed.replace_header('From', original_recipient) + + if not parsed.get('Reply-To'): + parsed['Reply-To'] = original_recipient + + if 'delivery status notification' in subject.lower(): + parsed.replace_header('Subject', f"Delivery Status: {original_recipient}") + + return parsed, True + + except Exception as e: + log(f"⚠ DynamoDB Error: {e}") + + return parsed, False + +def signal_handler(signum, frame): + global shutdown_requested + print(f"\n⚠ Shutdown signal received (signal {signum})") + shutdown_requested = True + + +signal.signal(signal.SIGTERM, signal_handler) +signal.signal(signal.SIGINT, signal_handler) + + +def log(message: str, level: str = 'INFO'): + """Structured logging with timestamp""" + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) + + +def domain_to_queue_name(domain: str) -> str: + """Konvertiert Domain zu SQS Queue Namen""" + return domain.replace('.', '-') + '-queue' + + +def get_queue_url() -> str: + """Ermittelt Queue-URL für die konfigurierte Domain""" + queue_name = domain_to_queue_name(WORKER_DOMAIN) + + try: + response = sqs.get_queue_url(QueueName=queue_name) + return response['QueueUrl'] + except Exception as e: + raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") + + +def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): + """ + Markiert E-Mail als erfolgreich zugestellt + Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war + """ + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = WORKER_NAME + metadata['status'] = 'delivered' + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + # Invalid inboxes speichern falls vorhanden + if invalid_inboxes: + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING') + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') + + except Exception as e: + log(f"Failed to mark as processed: {e}", 'WARNING') + + +def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list): + """ + Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind + """ + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = WORKER_NAME + metadata['status'] = 'failed' + metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS') + + except Exception as e: + log(f"Failed to mark as all invalid: {e}", 'WARNING') + + +def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): + """ + Markiert E-Mail als komplett fehlgeschlagen + Wird nur aufgerufen wenn ALLE Recipients fehlschlagen + """ + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['status'] = 'failed' + metadata['failed_at'] = str(int(time.time())) + metadata['failed_by'] = WORKER_NAME + metadata['error'] = error[:500] # S3 Metadata limit + metadata['retry_count'] = str(receive_count) + metadata.pop('processing_started', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') + + except Exception as e: + log(f"Failed to mark as failed: {e}", 'WARNING') + + +def is_temporary_smtp_error(error_msg: str) -> bool: + """ + Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) + 4xx Codes = temporär, 5xx = permanent + """ + temporary_indicators = [ + '421', # Service not available + '450', # Mailbox unavailable + '451', # Local error + '452', # Insufficient storage + '4', # Generisch 4xx + 'timeout', + 'connection refused', + 'connection reset', + 'network unreachable', + 'temporarily', + 'try again' + ] + + error_lower = error_msg.lower() + return any(indicator in error_lower for indicator in temporary_indicators) + + +def is_permanent_recipient_error(error_msg: str) -> bool: + """ + Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) + 550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid + """ + permanent_indicators = [ + '550', # Mailbox unavailable / not found + '551', # User not local + '553', # Mailbox name not allowed / invalid + 'mailbox not found', + 'user unknown', + 'no such user', + 'recipient rejected', + 'does not exist', + 'invalid recipient', + 'unknown user' + ] + + error_lower = error_msg.lower() + return any(indicator in error_lower for indicator in permanent_indicators) + + +def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: + """ + Sendet E-Mail via SMTP an EINEN Empfänger + Returns: (success: bool, error: str or None, is_permanent: bool) + """ + + try: + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: + smtp.ehlo() + + # STARTTLS falls konfiguriert + if SMTP_USE_TLS: + try: + smtp.starttls() + smtp.ehlo() + except Exception as e: + log(f" STARTTLS failed: {e}", 'WARNING') + + # Authentication falls konfiguriert + if SMTP_USER and SMTP_PASS: + try: + smtp.login(SMTP_USER, SMTP_PASS) + except Exception as e: + log(f" SMTP auth failed: {e}", 'WARNING') + + # E-Mail senden + result = smtp.sendmail(from_addr, [recipient], raw_message) + + # Result auswerten + if isinstance(result, dict) and result: + # Empfänger wurde abgelehnt + error = result.get(recipient, 'Unknown refusal') + is_permanent = is_permanent_recipient_error(str(error)) + log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR') + return False, str(error), is_permanent + else: + # Erfolgreich + log(f" ✓ {recipient}: Delivered", 'SUCCESS') + return True, None, False + + except smtplib.SMTPException as e: + error_msg = str(e) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR') + return False, error_msg, is_permanent + + except Exception as e: + # Connection errors sind immer temporär + log(f" ✗ {recipient}: Connection error - {e}", 'ERROR') + return False, str(e), False + + +# ========================================== +# HAUPTFUNKTION: PROCESS MESSAGE +# ========================================== + +def process_message(message_body: dict, receive_count: int) -> bool: + """ + Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) + Returns: True (Erfolg/Löschen), False (Retry/Behalten) + """ + try: + # 1. UNPACKING (SNS -> SES) + # SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...' + if 'Message' in message_body and 'Type' in message_body: + # Es ist eine SNS Notification + sns_content = message_body['Message'] + if isinstance(sns_content, str): + ses_msg = json.loads(sns_content) + else: + ses_msg = sns_content + else: + # Fallback: Vielleicht doch direkt SES (Legacy support) + ses_msg = message_body + + # 2. DATEN EXTRAHIEREN + mail = ses_msg.get('mail', {}) + receipt = ses_msg.get('receipt', {}) + + message_id = mail.get('messageId') # Das ist der S3 Key! + from_addr = mail.get('source') + recipients = receipt.get('recipients', []) + + # S3 Key Validation + if not message_id: + log("❌ Error: No messageId in event payload", 'ERROR') + return True # Löschen, da unbrauchbar + + # Domain Validation + # Wir nehmen den ersten Empfänger um die Domain zu prüfen + if recipients: + first_recipient = recipients[0] + domain = first_recipient.split('@')[1] + + if domain.lower() != WORKER_DOMAIN.lower(): + log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING') + return True # Löschen, gehört nicht hierher + else: + log("⚠ Warning: No recipients in event", 'WARNING') + return True + + # Bucket Name ableiten + bucket = get_bucket_name(WORKER_DOMAIN) + key = message_id + + log(f"\n{'='*70}") + log(f"Processing Email (SNS/SES):") + log(f" ID: {key}") + log(f" Recipients: {len(recipients)} -> {recipients}") + log(f" Bucket: {bucket}") + + # 3. LADEN AUS S3 + try: + response = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = response['Body'].read() + log(f"✓ Loaded {len(raw_bytes)} bytes from S3") + except s3.exceptions.NoSuchKey: + # Race Condition: SNS war schneller als S3. + # Wir geben False zurück, damit SQS es in 30s nochmal versucht. + if receive_count < 5: + log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') + return False + else: + log(f"❌ S3 Object missing permanently after retries.", 'ERROR') + return True # Löschen + except Exception as e: + log(f"❌ S3 Download Error: {e}", 'ERROR') + return False # Retry + + # 4. PARSING & BOUNCE LOGIC + try: + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + subject = parsed.get('Subject', '(no subject)') + + # Hier passiert die Magie: Bounce Header umschreiben + parsed, modified = apply_bounce_logic(parsed, subject) + + if modified: + log(" ✨ Bounce detected & headers rewritten via DynamoDB") + # Wir arbeiten mit den modifizierten Bytes weiter + raw_bytes = parsed.as_bytes() + from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope + else: + from_addr_final = from_addr # Original Envelope Sender + + except Exception as e: + log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') + from_addr_final = from_addr + + # 5. SMTP VERSAND (Loop über Recipients) + log(f"📤 Sending to {len(recipients)} recipient(s)...") + + successful = [] + failed_permanent = [] + failed_temporary = [] + + for recipient in recipients: + # Wir nutzen raw_bytes (ggf. modifiziert) + # WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final' + # (bei Bounces ist das der Original-Empfänger, sonst der SES Sender) + success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes) + + if success: + successful.append(recipient) + elif is_perm: + failed_permanent.append(recipient) + else: + failed_temporary.append(recipient) + + # 6. RESULTAT & CLEANUP + log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail") + + if len(successful) > 0: + # Mindestens einer durchgegangen -> Erfolg + mark_as_processed(bucket, key, failed_permanent if failed_permanent else None) + log(f"✅ Success. Deleted from queue.") + return True + + elif len(failed_permanent) == len(recipients): + # Alle permanent fehlgeschlagen (User unknown) -> Löschen + mark_as_all_invalid(bucket, key, failed_permanent) + log(f"🛑 All recipients invalid. Deleted from queue.") + return True + + else: + # Temporäre Fehler -> Retry + log(f"🔄 Temporary failures. Keeping in queue.") + return False + + except Exception as e: + log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR') + traceback.print_exc() + return False # Retry (außer es crasht immer wieder) + + +def main_loop(): + """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" + + # Queue URL ermitteln + try: + queue_url = get_queue_url() + except Exception as e: + log(f"FATAL: {e}", 'ERROR') + sys.exit(1) + + log(f"\n{'='*70}") + log(f"🚀 Email Worker started") + log(f"{'='*70}") + log(f" Worker Name: {WORKER_NAME}") + log(f" Domain: {WORKER_DOMAIN}") + log(f" Queue: {queue_url}") + log(f" Region: {AWS_REGION}") + log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") + log(f" Poll interval: {POLL_INTERVAL}s") + log(f" Max messages per poll: {MAX_MESSAGES}") + log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") + log(f"{'='*70}\n") + + consecutive_errors = 0 + max_consecutive_errors = 10 + messages_processed = 0 + last_activity = time.time() + + while not shutdown_requested: + try: + # Messages aus Queue holen (Long Polling) + response = sqs.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=MAX_MESSAGES, + WaitTimeSeconds=POLL_INTERVAL, + VisibilityTimeout=VISIBILITY_TIMEOUT, + AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], + MessageAttributeNames=['All'] + ) + + # Reset error counter bei erfolgreicher Abfrage + consecutive_errors = 0 + + if 'Messages' not in response: + # Keine Nachrichten + if time.time() - last_activity > 60: + log(f"Waiting for messages... (processed: {messages_processed})") + last_activity = time.time() + continue + + message_count = len(response['Messages']) + log(f"\n✉ Received {message_count} message(s) from queue") + last_activity = time.time() + + # Messages verarbeiten + for msg in response['Messages']: + if shutdown_requested: + log("Shutdown requested, stopping processing") + break + + receipt_handle = msg['ReceiptHandle'] + + # Receive Count auslesen + receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) + + # Sent Timestamp (für Queue-Zeit-Berechnung) + sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 + queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 + + if queue_time > 0: + log(f"Message was in queue for {queue_time}s") + + try: + message_body = json.loads(msg['Body']) + + # E-Mail verarbeiten + success = process_message(message_body, receive_count) + + if success: + # Message aus Queue löschen + sqs.delete_message( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle + ) + log("✓ Message deleted from queue") + messages_processed += 1 + else: + # Bei Fehler bleibt Message in Queue + log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") + + except json.JSONDecodeError as e: + log(f"✗ Invalid message format: {e}", 'ERROR') + # Ungültige Messages löschen (nicht retryable) + sqs.delete_message( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle + ) + + except Exception as e: + log(f"✗ Error processing message: {e}", 'ERROR') + traceback.print_exc() + # Message bleibt in Queue für Retry + + except KeyboardInterrupt: + log("\n⚠ Keyboard interrupt received") + break + + except Exception as e: + consecutive_errors += 1 + log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') + traceback.print_exc() + + if consecutive_errors >= max_consecutive_errors: + log("Too many consecutive errors, shutting down", 'ERROR') + break + + # Kurze Pause bei Fehlern + time.sleep(5) + + log(f"\n{'='*70}") + log(f"👋 Worker shutting down") + log(f" Messages processed: {messages_processed}") + log(f"{'='*70}\n") + + +if __name__ == '__main__': + # Validierung + if not WORKER_DOMAIN: + log("ERROR: WORKER_DOMAIN not set!", 'ERROR') + sys.exit(1) + + try: + main_loop() + except Exception as e: + log(f"Fatal error: {e}", 'ERROR') + traceback.print_exc() + sys.exit(1) \ No newline at end of file