diff --git a/worker.py b/worker.py old mode 100644 new mode 100755 index ee57a74..3988171 --- a/worker.py +++ b/worker.py @@ -34,6 +34,156 @@ SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False +# DynamoDB Ressource für Bounce-Lookup +try: + dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) + msg_table = dynamo.Table('ses-outbound-messages') +except Exception as e: + log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING') + msg_table = None + +def get_bucket_name(domain): + """Konvention: domain.tld -> domain-tld-emails""" + return domain.replace('.', '-') + '-emails' + +def is_ses_bounce_or_autoreply(parsed): + """Erkennt SES Bounces""" + from_h = (parsed.get('From') or '').lower() + auto_sub = (parsed.get('Auto-Submitted') or '').lower() + is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h + is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub + return is_mailer_daemon or is_auto_replied + +def extract_original_message_id(parsed): + """ + Extrahiert Original SES Message-ID aus Email + SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-[hex6] + """ + import re + + # SES Message-ID Pattern (endet immer mit -000000) + ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000') + + # 1. Versuche Standard-Header (In-Reply-To, References) + for header in ['In-Reply-To', 'References']: + value = (parsed.get(header) or '').strip() + if value: + match = ses_pattern.search(value) + if match: + log(f" Found Message-ID in {header}: {match.group(0)}") + return match.group(0) + + # 2. Durchsuche Message-ID Header (manchmal steht dort die Original-ID) + msg_id_header = (parsed.get('Message-ID') or '').strip() + if msg_id_header: + match = ses_pattern.search(msg_id_header) + if match: + # Aber nur wenn es nicht die ID der aktuellen Bounce-Message ist + # (die beginnt oft auch mit 010f...) + pass # Wir überspringen das erstmal + + # 3. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts) + # Das fängt auch attached messages, text attachments, etc. ab + try: + body_text = '' + + # Hole den kompletten Body als String + if parsed.is_multipart(): + for part in parsed.walk(): + content_type = part.get_content_type() + + # Durchsuche ALLE Parts (außer Binärdaten wie images) + # Text-Parts, HTML, attached messages, und auch application/* Parts + if content_type.startswith('text/') or \ + content_type == 'message/rfc822' or \ + content_type.startswith('application/'): + try: + payload = part.get_payload(decode=True) + if payload: + # Versuche als UTF-8, fallback auf Latin-1 + try: + body_text += payload.decode('utf-8', errors='ignore') + except: + try: + body_text += payload.decode('latin-1', errors='ignore') + except: + # Letzter Versuch: als ASCII mit ignore + body_text += str(payload, errors='ignore') + except: + # Falls decode fehlschlägt, String-Payload holen + payload = part.get_payload() + if isinstance(payload, str): + body_text += payload + else: + # Nicht-Multipart Message + payload = parsed.get_payload(decode=True) + if payload: + try: + body_text = payload.decode('utf-8', errors='ignore') + except: + body_text = payload.decode('latin-1', errors='ignore') + + # Suche alle SES Message-IDs im Body + matches = ses_pattern.findall(body_text) + if matches: + # Nehme die ERSTE gefundene ID (meist die Original-ID) + # Die letzte ist oft die Bounce-Message selbst + log(f" Found {len(matches)} SES Message-ID(s) in body, using first: {matches[0]}") + return matches[0] + + except Exception as e: + log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING') + + return None + +def apply_bounce_logic(parsed, subject): + """ + Prüft auf Bounce, sucht in DynamoDB und schreibt Header um. + Returns: (parsed_email_object, was_modified_bool) + """ + if not is_ses_bounce_or_autoreply(parsed): + return parsed, False + + log("🔍 Detected auto-response/bounce. Checking DynamoDB...") + original_msg_id = extract_original_message_id(parsed) + + if not original_msg_id: + log("⚠ Could not extract original Message-ID") + return parsed, False + + try: + # Lookup in DynamoDB + result = msg_table.get_item(Key={'MessageId': original_msg_id}) + item = result.get('Item') + + if not item: + log(f"⚠ No DynamoDB record found for {original_msg_id}") + return parsed, False + + # Treffer! + orig_source = item.get('source', '') + orig_destinations = item.get('destinations', []) + original_recipient = orig_destinations[0] if orig_destinations else '' + + if original_recipient: + log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}") + + # Rewrite Headers + parsed['X-Original-SES-From'] = parsed.get('From', '') + parsed.replace_header('From', original_recipient) + + if not parsed.get('Reply-To'): + parsed['Reply-To'] = original_recipient + + if 'delivery status notification' in subject.lower(): + parsed.replace_header('Subject', f"Delivery Status: {original_recipient}") + + return parsed, True + + except Exception as e: + log(f"⚠ DynamoDB Error: {e}") + + return parsed, False def signal_handler(signum, frame): global shutdown_requested @@ -260,125 +410,151 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: return False, str(e), False +# ========================================== +# HAUPTFUNKTION: PROCESS MESSAGE +# ========================================== + def process_message(message_body: dict, receive_count: int) -> bool: """ - Verarbeitet eine E-Mail aus der Queue - Kann mehrere Recipients haben - sendet an alle - Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) + Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) + Returns: True (Erfolg/Löschen), False (Retry/Behalten) """ - - bucket = message_body['bucket'] - key = message_body['key'] - from_addr = message_body['from'] - recipients = message_body['recipients'] # Liste von Empfängern - domain = message_body['domain'] - subject = message_body.get('subject', '(unknown)') - message_id = message_body.get('message_id', '(unknown)') - - log(f"\n{'='*70}") - log(f"Processing email (Attempt #{receive_count}):") - log(f" MessageId: {message_id}") - log(f" S3 Key: {key}") - log(f" Domain: {domain}") - log(f" From: {from_addr}") - log(f" Recipients: {len(recipients)}") - for recipient in recipients: - log(f" - {recipient}") - log(f" Subject: {subject}") - log(f" S3: s3://{bucket}/{key}") - log(f"{'='*70}") - - # ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen - if domain.lower() != WORKER_DOMAIN.lower(): - log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR') - log("This message should not be in this queue! Deleting...", 'ERROR') - return True # Message löschen (gehört nicht hierher) - - # E-Mail aus S3 laden try: - response = s3.get_object(Bucket=bucket, Key=key) - raw_bytes = response['Body'].read() - log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)") - except s3.exceptions.NoSuchKey: - log(f"✗ S3 object not found (may have been deleted)", 'ERROR') - return True # Nicht retryable - Message löschen + # 1. UNPACKING (SNS -> SES) + # SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...' + if 'Message' in message_body and 'Type' in message_body: + # Es ist eine SNS Notification + sns_content = message_body['Message'] + if isinstance(sns_content, str): + ses_msg = json.loads(sns_content) + else: + ses_msg = sns_content + else: + # Fallback: Vielleicht doch direkt SES (Legacy support) + ses_msg = message_body + + # 2. DATEN EXTRAHIEREN + mail = ses_msg.get('mail', {}) + receipt = ses_msg.get('receipt', {}) + + message_id = mail.get('messageId') # Das ist der S3 Key! + # FIX: Amazon SES Setup Notification ignorieren + if message_id == "AMAZON_SES_SETUP_NOTIFICATION": + log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO') + return True # Erfolgreich (löschen), da kein Fehler + from_addr = mail.get('source') + recipients = receipt.get('recipients', []) + + # S3 Key Validation + if not message_id: + log("❌ Error: No messageId in event payload", 'ERROR') + return True # Löschen, da unbrauchbar + + # Domain Validation + # Wir nehmen den ersten Empfänger um die Domain zu prüfen + if recipients: + first_recipient = recipients[0] + domain = first_recipient.split('@')[1] + + if domain.lower() != WORKER_DOMAIN.lower(): + log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING') + return True # Löschen, gehört nicht hierher + else: + log("⚠ Warning: No recipients in event", 'WARNING') + return True + + # Bucket Name ableiten + bucket = get_bucket_name(WORKER_DOMAIN) + key = message_id + + log(f"\n{'='*70}") + log(f"Processing Email (SNS/SES):") + log(f" ID: {key}") + log(f" Recipients: {len(recipients)} -> {recipients}") + log(f" Bucket: {bucket}") + + # 3. LADEN AUS S3 + try: + response = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = response['Body'].read() + log(f"✓ Loaded {len(raw_bytes)} bytes from S3") + except s3.exceptions.NoSuchKey: + # Race Condition: SNS war schneller als S3. + # Wir geben False zurück, damit SQS es in 30s nochmal versucht. + if receive_count < 5: + log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') + return False + else: + log(f"❌ S3 Object missing permanently after retries.", 'ERROR') + return True # Löschen + except Exception as e: + log(f"❌ S3 Download Error: {e}", 'ERROR') + return False # Retry + + # 4. PARSING & BOUNCE LOGIC + try: + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + subject = parsed.get('Subject', '(no subject)') + + # Hier passiert die Magie: Bounce Header umschreiben + parsed, modified = apply_bounce_logic(parsed, subject) + + if modified: + log(" ✨ Bounce detected & headers rewritten via DynamoDB") + # Wir arbeiten mit den modifizierten Bytes weiter + raw_bytes = parsed.as_bytes() + from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope + else: + from_addr_final = from_addr # Original Envelope Sender + + except Exception as e: + log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') + from_addr_final = from_addr + + # 5. SMTP VERSAND (Loop über Recipients) + log(f"📤 Sending to {len(recipients)} recipient(s)...") + + successful = [] + failed_permanent = [] + failed_temporary = [] + + for recipient in recipients: + # Wir nutzen raw_bytes (ggf. modifiziert) + # WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final' + # (bei Bounces ist das der Original-Empfänger, sonst der SES Sender) + success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes) + + if success: + successful.append(recipient) + elif is_perm: + failed_permanent.append(recipient) + else: + failed_temporary.append(recipient) + + # 6. RESULTAT & CLEANUP + log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail") + + if len(successful) > 0: + # Mindestens einer durchgegangen -> Erfolg + mark_as_processed(bucket, key, failed_permanent if failed_permanent else None) + log(f"✅ Success. Deleted from queue.") + return True + + elif len(failed_permanent) == len(recipients): + # Alle permanent fehlgeschlagen (User unknown) -> Löschen + mark_as_all_invalid(bucket, key, failed_permanent) + log(f"🛑 All recipients invalid. Deleted from queue.") + return True + + else: + # Temporäre Fehler -> Retry + log(f"🔄 Temporary failures. Keeping in queue.") + return False + except Exception as e: - log(f"✗ Failed to load from S3: {e}", 'ERROR') - return False # Könnte temporär sein - retry - - # An alle Recipients senden - log(f"\n📤 Sending to {len(recipients)} recipient(s)...") - log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") - - successful = [] - failed_temporary = [] - failed_permanent = [] - - for recipient in recipients: - success, error, is_permanent = send_email(from_addr, recipient, raw_bytes) - - if success: - successful.append(recipient) - elif is_permanent: - failed_permanent.append(recipient) - else: - failed_temporary.append(recipient) - - # Ergebnis-Zusammenfassung - log(f"\n📊 Delivery Results:") - log(f" ✓ Successful: {len(successful)}/{len(recipients)}") - log(f" ✗ Failed (temporary): {len(failed_temporary)}") - log(f" ✗ Failed (permanent): {len(failed_permanent)}") - - # Entscheidungslogik - if len(successful) > 0: - # ✅ Fall 1: Mindestens 1 Recipient erfolgreich - # → status=delivered, invalid_inboxes tracken - - invalid_inboxes = failed_permanent if failed_permanent else None - mark_as_processed(bucket, key, invalid_inboxes) - - log(f"{'='*70}") - log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS') - if failed_permanent: - log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING') - if failed_temporary: - log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING') - log(f"{'='*70}\n") - - return True # Message löschen - - elif len(failed_permanent) == len(recipients): - # ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig) - # → status=failed, invalid_inboxes = ALLE - - mark_as_all_invalid(bucket, key, failed_permanent) - - log(f"{'='*70}") - log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR') - log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR') - log(f"{'='*70}\n") - - return True # Message löschen (nicht retryable) - - else: - # ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries - # → Retry wenn noch Versuche übrig - - if receive_count < 3: - log(f"⚠ All failures are temporary, will retry", 'WARNING') - log(f"{'='*70}\n") - return False # Message NICHT löschen → Retry - else: - # Max retries erreicht → als failed markieren - error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients." - mark_as_failed(bucket, key, error_summary, receive_count) - - log(f"{'='*70}") - log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR') - log(f"{'='*70}\n") - - return False # Nach 3 Versuchen → automatisch DLQ + log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR') + traceback.print_exc() + return False # Retry (außer es crasht immer wieder) def main_loop(): diff --git a/worker_sns.py b/worker.py.old old mode 100755 new mode 100644 similarity index 54% rename from worker_sns.py rename to worker.py.old index 3988171..ee57a74 --- a/worker_sns.py +++ b/worker.py.old @@ -34,156 +34,6 @@ SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False -# DynamoDB Ressource für Bounce-Lookup -try: - dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) - msg_table = dynamo.Table('ses-outbound-messages') -except Exception as e: - log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING') - msg_table = None - -def get_bucket_name(domain): - """Konvention: domain.tld -> domain-tld-emails""" - return domain.replace('.', '-') + '-emails' - -def is_ses_bounce_or_autoreply(parsed): - """Erkennt SES Bounces""" - from_h = (parsed.get('From') or '').lower() - auto_sub = (parsed.get('Auto-Submitted') or '').lower() - is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h - is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub - return is_mailer_daemon or is_auto_replied - -def extract_original_message_id(parsed): - """ - Extrahiert Original SES Message-ID aus Email - SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-[hex6] - """ - import re - - # SES Message-ID Pattern (endet immer mit -000000) - ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000') - - # 1. Versuche Standard-Header (In-Reply-To, References) - for header in ['In-Reply-To', 'References']: - value = (parsed.get(header) or '').strip() - if value: - match = ses_pattern.search(value) - if match: - log(f" Found Message-ID in {header}: {match.group(0)}") - return match.group(0) - - # 2. Durchsuche Message-ID Header (manchmal steht dort die Original-ID) - msg_id_header = (parsed.get('Message-ID') or '').strip() - if msg_id_header: - match = ses_pattern.search(msg_id_header) - if match: - # Aber nur wenn es nicht die ID der aktuellen Bounce-Message ist - # (die beginnt oft auch mit 010f...) - pass # Wir überspringen das erstmal - - # 3. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts) - # Das fängt auch attached messages, text attachments, etc. ab - try: - body_text = '' - - # Hole den kompletten Body als String - if parsed.is_multipart(): - for part in parsed.walk(): - content_type = part.get_content_type() - - # Durchsuche ALLE Parts (außer Binärdaten wie images) - # Text-Parts, HTML, attached messages, und auch application/* Parts - if content_type.startswith('text/') or \ - content_type == 'message/rfc822' or \ - content_type.startswith('application/'): - try: - payload = part.get_payload(decode=True) - if payload: - # Versuche als UTF-8, fallback auf Latin-1 - try: - body_text += payload.decode('utf-8', errors='ignore') - except: - try: - body_text += payload.decode('latin-1', errors='ignore') - except: - # Letzter Versuch: als ASCII mit ignore - body_text += str(payload, errors='ignore') - except: - # Falls decode fehlschlägt, String-Payload holen - payload = part.get_payload() - if isinstance(payload, str): - body_text += payload - else: - # Nicht-Multipart Message - payload = parsed.get_payload(decode=True) - if payload: - try: - body_text = payload.decode('utf-8', errors='ignore') - except: - body_text = payload.decode('latin-1', errors='ignore') - - # Suche alle SES Message-IDs im Body - matches = ses_pattern.findall(body_text) - if matches: - # Nehme die ERSTE gefundene ID (meist die Original-ID) - # Die letzte ist oft die Bounce-Message selbst - log(f" Found {len(matches)} SES Message-ID(s) in body, using first: {matches[0]}") - return matches[0] - - except Exception as e: - log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING') - - return None - -def apply_bounce_logic(parsed, subject): - """ - Prüft auf Bounce, sucht in DynamoDB und schreibt Header um. - Returns: (parsed_email_object, was_modified_bool) - """ - if not is_ses_bounce_or_autoreply(parsed): - return parsed, False - - log("🔍 Detected auto-response/bounce. Checking DynamoDB...") - original_msg_id = extract_original_message_id(parsed) - - if not original_msg_id: - log("⚠ Could not extract original Message-ID") - return parsed, False - - try: - # Lookup in DynamoDB - result = msg_table.get_item(Key={'MessageId': original_msg_id}) - item = result.get('Item') - - if not item: - log(f"⚠ No DynamoDB record found for {original_msg_id}") - return parsed, False - - # Treffer! - orig_source = item.get('source', '') - orig_destinations = item.get('destinations', []) - original_recipient = orig_destinations[0] if orig_destinations else '' - - if original_recipient: - log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}") - - # Rewrite Headers - parsed['X-Original-SES-From'] = parsed.get('From', '') - parsed.replace_header('From', original_recipient) - - if not parsed.get('Reply-To'): - parsed['Reply-To'] = original_recipient - - if 'delivery status notification' in subject.lower(): - parsed.replace_header('Subject', f"Delivery Status: {original_recipient}") - - return parsed, True - - except Exception as e: - log(f"⚠ DynamoDB Error: {e}") - - return parsed, False def signal_handler(signum, frame): global shutdown_requested @@ -410,151 +260,125 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: return False, str(e), False -# ========================================== -# HAUPTFUNKTION: PROCESS MESSAGE -# ========================================== - def process_message(message_body: dict, receive_count: int) -> bool: """ - Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) - Returns: True (Erfolg/Löschen), False (Retry/Behalten) + Verarbeitet eine E-Mail aus der Queue + Kann mehrere Recipients haben - sendet an alle + Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) """ + + bucket = message_body['bucket'] + key = message_body['key'] + from_addr = message_body['from'] + recipients = message_body['recipients'] # Liste von Empfängern + domain = message_body['domain'] + subject = message_body.get('subject', '(unknown)') + message_id = message_body.get('message_id', '(unknown)') + + log(f"\n{'='*70}") + log(f"Processing email (Attempt #{receive_count}):") + log(f" MessageId: {message_id}") + log(f" S3 Key: {key}") + log(f" Domain: {domain}") + log(f" From: {from_addr}") + log(f" Recipients: {len(recipients)}") + for recipient in recipients: + log(f" - {recipient}") + log(f" Subject: {subject}") + log(f" S3: s3://{bucket}/{key}") + log(f"{'='*70}") + + # ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen + if domain.lower() != WORKER_DOMAIN.lower(): + log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR') + log("This message should not be in this queue! Deleting...", 'ERROR') + return True # Message löschen (gehört nicht hierher) + + # E-Mail aus S3 laden try: - # 1. UNPACKING (SNS -> SES) - # SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...' - if 'Message' in message_body and 'Type' in message_body: - # Es ist eine SNS Notification - sns_content = message_body['Message'] - if isinstance(sns_content, str): - ses_msg = json.loads(sns_content) - else: - ses_msg = sns_content - else: - # Fallback: Vielleicht doch direkt SES (Legacy support) - ses_msg = message_body - - # 2. DATEN EXTRAHIEREN - mail = ses_msg.get('mail', {}) - receipt = ses_msg.get('receipt', {}) - - message_id = mail.get('messageId') # Das ist der S3 Key! - # FIX: Amazon SES Setup Notification ignorieren - if message_id == "AMAZON_SES_SETUP_NOTIFICATION": - log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO') - return True # Erfolgreich (löschen), da kein Fehler - from_addr = mail.get('source') - recipients = receipt.get('recipients', []) - - # S3 Key Validation - if not message_id: - log("❌ Error: No messageId in event payload", 'ERROR') - return True # Löschen, da unbrauchbar - - # Domain Validation - # Wir nehmen den ersten Empfänger um die Domain zu prüfen - if recipients: - first_recipient = recipients[0] - domain = first_recipient.split('@')[1] - - if domain.lower() != WORKER_DOMAIN.lower(): - log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING') - return True # Löschen, gehört nicht hierher - else: - log("⚠ Warning: No recipients in event", 'WARNING') - return True - - # Bucket Name ableiten - bucket = get_bucket_name(WORKER_DOMAIN) - key = message_id - - log(f"\n{'='*70}") - log(f"Processing Email (SNS/SES):") - log(f" ID: {key}") - log(f" Recipients: {len(recipients)} -> {recipients}") - log(f" Bucket: {bucket}") - - # 3. LADEN AUS S3 - try: - response = s3.get_object(Bucket=bucket, Key=key) - raw_bytes = response['Body'].read() - log(f"✓ Loaded {len(raw_bytes)} bytes from S3") - except s3.exceptions.NoSuchKey: - # Race Condition: SNS war schneller als S3. - # Wir geben False zurück, damit SQS es in 30s nochmal versucht. - if receive_count < 5: - log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') - return False - else: - log(f"❌ S3 Object missing permanently after retries.", 'ERROR') - return True # Löschen - except Exception as e: - log(f"❌ S3 Download Error: {e}", 'ERROR') - return False # Retry - - # 4. PARSING & BOUNCE LOGIC - try: - parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) - subject = parsed.get('Subject', '(no subject)') - - # Hier passiert die Magie: Bounce Header umschreiben - parsed, modified = apply_bounce_logic(parsed, subject) - - if modified: - log(" ✨ Bounce detected & headers rewritten via DynamoDB") - # Wir arbeiten mit den modifizierten Bytes weiter - raw_bytes = parsed.as_bytes() - from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope - else: - from_addr_final = from_addr # Original Envelope Sender - - except Exception as e: - log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') - from_addr_final = from_addr - - # 5. SMTP VERSAND (Loop über Recipients) - log(f"📤 Sending to {len(recipients)} recipient(s)...") - - successful = [] - failed_permanent = [] - failed_temporary = [] - - for recipient in recipients: - # Wir nutzen raw_bytes (ggf. modifiziert) - # WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final' - # (bei Bounces ist das der Original-Empfänger, sonst der SES Sender) - success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes) - - if success: - successful.append(recipient) - elif is_perm: - failed_permanent.append(recipient) - else: - failed_temporary.append(recipient) - - # 6. RESULTAT & CLEANUP - log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail") - - if len(successful) > 0: - # Mindestens einer durchgegangen -> Erfolg - mark_as_processed(bucket, key, failed_permanent if failed_permanent else None) - log(f"✅ Success. Deleted from queue.") - return True - - elif len(failed_permanent) == len(recipients): - # Alle permanent fehlgeschlagen (User unknown) -> Löschen - mark_as_all_invalid(bucket, key, failed_permanent) - log(f"🛑 All recipients invalid. Deleted from queue.") - return True - - else: - # Temporäre Fehler -> Retry - log(f"🔄 Temporary failures. Keeping in queue.") - return False - + response = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = response['Body'].read() + log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)") + except s3.exceptions.NoSuchKey: + log(f"✗ S3 object not found (may have been deleted)", 'ERROR') + return True # Nicht retryable - Message löschen except Exception as e: - log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR') - traceback.print_exc() - return False # Retry (außer es crasht immer wieder) + log(f"✗ Failed to load from S3: {e}", 'ERROR') + return False # Könnte temporär sein - retry + + # An alle Recipients senden + log(f"\n📤 Sending to {len(recipients)} recipient(s)...") + log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") + + successful = [] + failed_temporary = [] + failed_permanent = [] + + for recipient in recipients: + success, error, is_permanent = send_email(from_addr, recipient, raw_bytes) + + if success: + successful.append(recipient) + elif is_permanent: + failed_permanent.append(recipient) + else: + failed_temporary.append(recipient) + + # Ergebnis-Zusammenfassung + log(f"\n📊 Delivery Results:") + log(f" ✓ Successful: {len(successful)}/{len(recipients)}") + log(f" ✗ Failed (temporary): {len(failed_temporary)}") + log(f" ✗ Failed (permanent): {len(failed_permanent)}") + + # Entscheidungslogik + if len(successful) > 0: + # ✅ Fall 1: Mindestens 1 Recipient erfolgreich + # → status=delivered, invalid_inboxes tracken + + invalid_inboxes = failed_permanent if failed_permanent else None + mark_as_processed(bucket, key, invalid_inboxes) + + log(f"{'='*70}") + log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS') + if failed_permanent: + log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING') + if failed_temporary: + log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING') + log(f"{'='*70}\n") + + return True # Message löschen + + elif len(failed_permanent) == len(recipients): + # ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig) + # → status=failed, invalid_inboxes = ALLE + + mark_as_all_invalid(bucket, key, failed_permanent) + + log(f"{'='*70}") + log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR') + log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR') + log(f"{'='*70}\n") + + return True # Message löschen (nicht retryable) + + else: + # ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries + # → Retry wenn noch Versuche übrig + + if receive_count < 3: + log(f"⚠ All failures are temporary, will retry", 'WARNING') + log(f"{'='*70}\n") + return False # Message NICHT löschen → Retry + else: + # Max retries erreicht → als failed markieren + error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients." + mark_as_failed(bucket, key, error_summary, receive_count) + + log(f"{'='*70}") + log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR') + log(f"{'='*70}\n") + + return False # Nach 3 Versuchen → automatisch DLQ def main_loop():