#!/usr/bin/env python3 """ Test script für Message-ID Extraktion - VERBESSERTE VERSION Kann lokal ausgeführt werden ohne AWS-Verbindung """ import re from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy def log(message: str, level: str = 'INFO'): """Dummy log für Tests""" print(f"[{level}] {message}") def extract_original_message_id(parsed): """ Extrahiert Original SES Message-ID aus Email SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-000000 """ import re # SES Message-ID Pattern (endet immer mit -000000) ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000') # Die Message-ID der aktuellen Email (Bounce selbst) - diese wollen wir NICHT current_msg_id = (parsed.get('Message-ID') or '').strip() current_match = ses_pattern.search(current_msg_id) current_id = current_match.group(0) if current_match else None log(f"Current Message-ID: {current_id}", 'DEBUG') # 1. Versuche Standard-Header (In-Reply-To, References) for header in ['In-Reply-To', 'References']: value = (parsed.get(header) or '').strip() if value: match = ses_pattern.search(value) if match: found_id = match.group(0) # Nur nehmen wenn es NICHT die aktuelle Bounce-ID ist if found_id != current_id: log(f" Found Message-ID in {header}: {found_id}") return found_id # 2. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts) try: body_text = '' # Hole den kompletten Body als String if parsed.is_multipart(): for part in parsed.walk(): content_type = part.get_content_type() # SPEZIALFALL: message/rfc822 (eingebettete Messages) if content_type == 'message/rfc822': log(f" Processing embedded message/rfc822", 'DEBUG') try: # get_payload() gibt eine Liste mit einem EmailMessage-Objekt zurück! payload = part.get_payload() if isinstance(payload, list) and len(payload) > 0: embedded_msg = payload[0] # Hole Message-ID aus dem eingebetteten Message embedded_id = (embedded_msg.get('Message-ID') or '').strip() match = ses_pattern.search(embedded_id) if match: found_id = match.group(0) log(f" Found ID in embedded msg: {found_id}", 'DEBUG') # Nur nehmen wenn es NICHT die aktuelle Bounce-ID ist if found_id != current_id: log(f" ✓ Found Message-ID in embedded message: {found_id}") return found_id # Fallback: Konvertiere eingebettete Message zu String body_text += embedded_msg.as_string() except Exception as e: log(f" Warning: Could not process embedded message: {e}", 'WARNING') # Durchsuche ALLE anderen Parts (außer Binärdaten wie images) elif content_type.startswith('text/') or content_type.startswith('application/'): try: payload = part.get_payload(decode=True) if payload: # Versuche als UTF-8, fallback auf Latin-1 try: body_text += payload.decode('utf-8', errors='ignore') except: try: body_text += payload.decode('latin-1', errors='ignore') except: # Letzter Versuch: als ASCII mit ignore body_text += str(payload, errors='ignore') except: # Falls decode fehlschlägt, String-Payload holen payload = part.get_payload() if isinstance(payload, str): body_text += payload else: # Nicht-Multipart Message payload = parsed.get_payload(decode=True) if payload: try: body_text = payload.decode('utf-8', errors='ignore') except: body_text = payload.decode('latin-1', errors='ignore') # Suche alle SES Message-IDs im Body matches = ses_pattern.findall(body_text) if matches: log(f" Found {len(matches)} total IDs in body: {matches}", 'DEBUG') # Filtere die aktuelle Bounce-ID raus candidates = [m for m in matches if m != current_id] if candidates: # Nehme die ERSTE der verbleibenden (meist die Original-ID) log(f" Found {len(matches)} SES Message-ID(s) in body, using first (not bounce): {candidates[0]}") return candidates[0] else: log(f" Found {len(matches)} SES Message-ID(s) but all match the bounce ID") except Exception as e: log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING') return None def test_with_file(filepath: str): """Test mit einer echten Email-Datei""" print(f"\n{'='*70}") print(f"Testing: {filepath}") print('='*70) with open(filepath, 'rb') as f: raw_bytes = f.read() parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) print(f"\nEmail Headers:") print(f" From: {parsed.get('From')}") print(f" To: {parsed.get('To')}") print(f" Subject: {parsed.get('Subject')}") print(f" Message-ID: {parsed.get('Message-ID')}") print(f" In-Reply-To: {parsed.get('In-Reply-To')}") print(f" References: {parsed.get('References')}") print(f"\n--- EXTRACTION ---") result = extract_original_message_id(parsed) print(f"\n{'='*70}") print(f"RESULT: {result}") print('='*70) return result if __name__ == '__main__': import sys if len(sys.argv) > 1: # Email-Datei als Argument result = test_with_file(sys.argv[1]) # Exit code: 0 = success (ID found), 1 = failure (no ID) sys.exit(0 if result else 1) else: print("Usage: python3 test_extract_v2.py ") sys.exit(1)