rename
This commit is contained in:
parent
ad8f032285
commit
19b4bb1471
|
|
@ -34,6 +34,156 @@ SMTP_PASS = os.environ.get('SMTP_PASS')
|
||||||
# Graceful shutdown
|
# Graceful shutdown
|
||||||
shutdown_requested = False
|
shutdown_requested = False
|
||||||
|
|
||||||
|
# DynamoDB Ressource für Bounce-Lookup
|
||||||
|
try:
|
||||||
|
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
|
||||||
|
msg_table = dynamo.Table('ses-outbound-messages')
|
||||||
|
except Exception as e:
|
||||||
|
log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING')
|
||||||
|
msg_table = None
|
||||||
|
|
||||||
|
def get_bucket_name(domain):
|
||||||
|
"""Konvention: domain.tld -> domain-tld-emails"""
|
||||||
|
return domain.replace('.', '-') + '-emails'
|
||||||
|
|
||||||
|
def is_ses_bounce_or_autoreply(parsed):
|
||||||
|
"""Erkennt SES Bounces"""
|
||||||
|
from_h = (parsed.get('From') or '').lower()
|
||||||
|
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
|
||||||
|
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
|
||||||
|
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
|
||||||
|
return is_mailer_daemon or is_auto_replied
|
||||||
|
|
||||||
|
def extract_original_message_id(parsed):
|
||||||
|
"""
|
||||||
|
Extrahiert Original SES Message-ID aus Email
|
||||||
|
SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-[hex6]
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
# SES Message-ID Pattern (endet immer mit -000000)
|
||||||
|
ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000')
|
||||||
|
|
||||||
|
# 1. Versuche Standard-Header (In-Reply-To, References)
|
||||||
|
for header in ['In-Reply-To', 'References']:
|
||||||
|
value = (parsed.get(header) or '').strip()
|
||||||
|
if value:
|
||||||
|
match = ses_pattern.search(value)
|
||||||
|
if match:
|
||||||
|
log(f" Found Message-ID in {header}: {match.group(0)}")
|
||||||
|
return match.group(0)
|
||||||
|
|
||||||
|
# 2. Durchsuche Message-ID Header (manchmal steht dort die Original-ID)
|
||||||
|
msg_id_header = (parsed.get('Message-ID') or '').strip()
|
||||||
|
if msg_id_header:
|
||||||
|
match = ses_pattern.search(msg_id_header)
|
||||||
|
if match:
|
||||||
|
# Aber nur wenn es nicht die ID der aktuellen Bounce-Message ist
|
||||||
|
# (die beginnt oft auch mit 010f...)
|
||||||
|
pass # Wir überspringen das erstmal
|
||||||
|
|
||||||
|
# 3. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts)
|
||||||
|
# Das fängt auch attached messages, text attachments, etc. ab
|
||||||
|
try:
|
||||||
|
body_text = ''
|
||||||
|
|
||||||
|
# Hole den kompletten Body als String
|
||||||
|
if parsed.is_multipart():
|
||||||
|
for part in parsed.walk():
|
||||||
|
content_type = part.get_content_type()
|
||||||
|
|
||||||
|
# Durchsuche ALLE Parts (außer Binärdaten wie images)
|
||||||
|
# Text-Parts, HTML, attached messages, und auch application/* Parts
|
||||||
|
if content_type.startswith('text/') or \
|
||||||
|
content_type == 'message/rfc822' or \
|
||||||
|
content_type.startswith('application/'):
|
||||||
|
try:
|
||||||
|
payload = part.get_payload(decode=True)
|
||||||
|
if payload:
|
||||||
|
# Versuche als UTF-8, fallback auf Latin-1
|
||||||
|
try:
|
||||||
|
body_text += payload.decode('utf-8', errors='ignore')
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
body_text += payload.decode('latin-1', errors='ignore')
|
||||||
|
except:
|
||||||
|
# Letzter Versuch: als ASCII mit ignore
|
||||||
|
body_text += str(payload, errors='ignore')
|
||||||
|
except:
|
||||||
|
# Falls decode fehlschlägt, String-Payload holen
|
||||||
|
payload = part.get_payload()
|
||||||
|
if isinstance(payload, str):
|
||||||
|
body_text += payload
|
||||||
|
else:
|
||||||
|
# Nicht-Multipart Message
|
||||||
|
payload = parsed.get_payload(decode=True)
|
||||||
|
if payload:
|
||||||
|
try:
|
||||||
|
body_text = payload.decode('utf-8', errors='ignore')
|
||||||
|
except:
|
||||||
|
body_text = payload.decode('latin-1', errors='ignore')
|
||||||
|
|
||||||
|
# Suche alle SES Message-IDs im Body
|
||||||
|
matches = ses_pattern.findall(body_text)
|
||||||
|
if matches:
|
||||||
|
# Nehme die ERSTE gefundene ID (meist die Original-ID)
|
||||||
|
# Die letzte ist oft die Bounce-Message selbst
|
||||||
|
log(f" Found {len(matches)} SES Message-ID(s) in body, using first: {matches[0]}")
|
||||||
|
return matches[0]
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING')
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def apply_bounce_logic(parsed, subject):
|
||||||
|
"""
|
||||||
|
Prüft auf Bounce, sucht in DynamoDB und schreibt Header um.
|
||||||
|
Returns: (parsed_email_object, was_modified_bool)
|
||||||
|
"""
|
||||||
|
if not is_ses_bounce_or_autoreply(parsed):
|
||||||
|
return parsed, False
|
||||||
|
|
||||||
|
log("🔍 Detected auto-response/bounce. Checking DynamoDB...")
|
||||||
|
original_msg_id = extract_original_message_id(parsed)
|
||||||
|
|
||||||
|
if not original_msg_id:
|
||||||
|
log("⚠ Could not extract original Message-ID")
|
||||||
|
return parsed, False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Lookup in DynamoDB
|
||||||
|
result = msg_table.get_item(Key={'MessageId': original_msg_id})
|
||||||
|
item = result.get('Item')
|
||||||
|
|
||||||
|
if not item:
|
||||||
|
log(f"⚠ No DynamoDB record found for {original_msg_id}")
|
||||||
|
return parsed, False
|
||||||
|
|
||||||
|
# Treffer!
|
||||||
|
orig_source = item.get('source', '')
|
||||||
|
orig_destinations = item.get('destinations', [])
|
||||||
|
original_recipient = orig_destinations[0] if orig_destinations else ''
|
||||||
|
|
||||||
|
if original_recipient:
|
||||||
|
log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}")
|
||||||
|
|
||||||
|
# Rewrite Headers
|
||||||
|
parsed['X-Original-SES-From'] = parsed.get('From', '')
|
||||||
|
parsed.replace_header('From', original_recipient)
|
||||||
|
|
||||||
|
if not parsed.get('Reply-To'):
|
||||||
|
parsed['Reply-To'] = original_recipient
|
||||||
|
|
||||||
|
if 'delivery status notification' in subject.lower():
|
||||||
|
parsed.replace_header('Subject', f"Delivery Status: {original_recipient}")
|
||||||
|
|
||||||
|
return parsed, True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log(f"⚠ DynamoDB Error: {e}")
|
||||||
|
|
||||||
|
return parsed, False
|
||||||
|
|
||||||
def signal_handler(signum, frame):
|
def signal_handler(signum, frame):
|
||||||
global shutdown_requested
|
global shutdown_requested
|
||||||
|
|
@ -260,125 +410,151 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
|
||||||
return False, str(e), False
|
return False, str(e), False
|
||||||
|
|
||||||
|
|
||||||
|
# ==========================================
|
||||||
|
# HAUPTFUNKTION: PROCESS MESSAGE
|
||||||
|
# ==========================================
|
||||||
|
|
||||||
def process_message(message_body: dict, receive_count: int) -> bool:
|
def process_message(message_body: dict, receive_count: int) -> bool:
|
||||||
"""
|
"""
|
||||||
Verarbeitet eine E-Mail aus der Queue
|
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
|
||||||
Kann mehrere Recipients haben - sendet an alle
|
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
|
||||||
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
bucket = message_body['bucket']
|
|
||||||
key = message_body['key']
|
|
||||||
from_addr = message_body['from']
|
|
||||||
recipients = message_body['recipients'] # Liste von Empfängern
|
|
||||||
domain = message_body['domain']
|
|
||||||
subject = message_body.get('subject', '(unknown)')
|
|
||||||
message_id = message_body.get('message_id', '(unknown)')
|
|
||||||
|
|
||||||
log(f"\n{'='*70}")
|
|
||||||
log(f"Processing email (Attempt #{receive_count}):")
|
|
||||||
log(f" MessageId: {message_id}")
|
|
||||||
log(f" S3 Key: {key}")
|
|
||||||
log(f" Domain: {domain}")
|
|
||||||
log(f" From: {from_addr}")
|
|
||||||
log(f" Recipients: {len(recipients)}")
|
|
||||||
for recipient in recipients:
|
|
||||||
log(f" - {recipient}")
|
|
||||||
log(f" Subject: {subject}")
|
|
||||||
log(f" S3: s3://{bucket}/{key}")
|
|
||||||
log(f"{'='*70}")
|
|
||||||
|
|
||||||
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
|
|
||||||
if domain.lower() != WORKER_DOMAIN.lower():
|
|
||||||
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
|
|
||||||
log("This message should not be in this queue! Deleting...", 'ERROR')
|
|
||||||
return True # Message löschen (gehört nicht hierher)
|
|
||||||
|
|
||||||
# E-Mail aus S3 laden
|
|
||||||
try:
|
try:
|
||||||
response = s3.get_object(Bucket=bucket, Key=key)
|
# 1. UNPACKING (SNS -> SES)
|
||||||
raw_bytes = response['Body'].read()
|
# SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...'
|
||||||
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
|
if 'Message' in message_body and 'Type' in message_body:
|
||||||
except s3.exceptions.NoSuchKey:
|
# Es ist eine SNS Notification
|
||||||
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
|
sns_content = message_body['Message']
|
||||||
return True # Nicht retryable - Message löschen
|
if isinstance(sns_content, str):
|
||||||
|
ses_msg = json.loads(sns_content)
|
||||||
|
else:
|
||||||
|
ses_msg = sns_content
|
||||||
|
else:
|
||||||
|
# Fallback: Vielleicht doch direkt SES (Legacy support)
|
||||||
|
ses_msg = message_body
|
||||||
|
|
||||||
|
# 2. DATEN EXTRAHIEREN
|
||||||
|
mail = ses_msg.get('mail', {})
|
||||||
|
receipt = ses_msg.get('receipt', {})
|
||||||
|
|
||||||
|
message_id = mail.get('messageId') # Das ist der S3 Key!
|
||||||
|
# FIX: Amazon SES Setup Notification ignorieren
|
||||||
|
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
|
||||||
|
log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO')
|
||||||
|
return True # Erfolgreich (löschen), da kein Fehler
|
||||||
|
from_addr = mail.get('source')
|
||||||
|
recipients = receipt.get('recipients', [])
|
||||||
|
|
||||||
|
# S3 Key Validation
|
||||||
|
if not message_id:
|
||||||
|
log("❌ Error: No messageId in event payload", 'ERROR')
|
||||||
|
return True # Löschen, da unbrauchbar
|
||||||
|
|
||||||
|
# Domain Validation
|
||||||
|
# Wir nehmen den ersten Empfänger um die Domain zu prüfen
|
||||||
|
if recipients:
|
||||||
|
first_recipient = recipients[0]
|
||||||
|
domain = first_recipient.split('@')[1]
|
||||||
|
|
||||||
|
if domain.lower() != WORKER_DOMAIN.lower():
|
||||||
|
log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING')
|
||||||
|
return True # Löschen, gehört nicht hierher
|
||||||
|
else:
|
||||||
|
log("⚠ Warning: No recipients in event", 'WARNING')
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Bucket Name ableiten
|
||||||
|
bucket = get_bucket_name(WORKER_DOMAIN)
|
||||||
|
key = message_id
|
||||||
|
|
||||||
|
log(f"\n{'='*70}")
|
||||||
|
log(f"Processing Email (SNS/SES):")
|
||||||
|
log(f" ID: {key}")
|
||||||
|
log(f" Recipients: {len(recipients)} -> {recipients}")
|
||||||
|
log(f" Bucket: {bucket}")
|
||||||
|
|
||||||
|
# 3. LADEN AUS S3
|
||||||
|
try:
|
||||||
|
response = s3.get_object(Bucket=bucket, Key=key)
|
||||||
|
raw_bytes = response['Body'].read()
|
||||||
|
log(f"✓ Loaded {len(raw_bytes)} bytes from S3")
|
||||||
|
except s3.exceptions.NoSuchKey:
|
||||||
|
# Race Condition: SNS war schneller als S3.
|
||||||
|
# Wir geben False zurück, damit SQS es in 30s nochmal versucht.
|
||||||
|
if receive_count < 5:
|
||||||
|
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
|
||||||
|
return True # Löschen
|
||||||
|
except Exception as e:
|
||||||
|
log(f"❌ S3 Download Error: {e}", 'ERROR')
|
||||||
|
return False # Retry
|
||||||
|
|
||||||
|
# 4. PARSING & BOUNCE LOGIC
|
||||||
|
try:
|
||||||
|
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
||||||
|
subject = parsed.get('Subject', '(no subject)')
|
||||||
|
|
||||||
|
# Hier passiert die Magie: Bounce Header umschreiben
|
||||||
|
parsed, modified = apply_bounce_logic(parsed, subject)
|
||||||
|
|
||||||
|
if modified:
|
||||||
|
log(" ✨ Bounce detected & headers rewritten via DynamoDB")
|
||||||
|
# Wir arbeiten mit den modifizierten Bytes weiter
|
||||||
|
raw_bytes = parsed.as_bytes()
|
||||||
|
from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope
|
||||||
|
else:
|
||||||
|
from_addr_final = from_addr # Original Envelope Sender
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING')
|
||||||
|
from_addr_final = from_addr
|
||||||
|
|
||||||
|
# 5. SMTP VERSAND (Loop über Recipients)
|
||||||
|
log(f"📤 Sending to {len(recipients)} recipient(s)...")
|
||||||
|
|
||||||
|
successful = []
|
||||||
|
failed_permanent = []
|
||||||
|
failed_temporary = []
|
||||||
|
|
||||||
|
for recipient in recipients:
|
||||||
|
# Wir nutzen raw_bytes (ggf. modifiziert)
|
||||||
|
# WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final'
|
||||||
|
# (bei Bounces ist das der Original-Empfänger, sonst der SES Sender)
|
||||||
|
success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
successful.append(recipient)
|
||||||
|
elif is_perm:
|
||||||
|
failed_permanent.append(recipient)
|
||||||
|
else:
|
||||||
|
failed_temporary.append(recipient)
|
||||||
|
|
||||||
|
# 6. RESULTAT & CLEANUP
|
||||||
|
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail")
|
||||||
|
|
||||||
|
if len(successful) > 0:
|
||||||
|
# Mindestens einer durchgegangen -> Erfolg
|
||||||
|
mark_as_processed(bucket, key, failed_permanent if failed_permanent else None)
|
||||||
|
log(f"✅ Success. Deleted from queue.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
elif len(failed_permanent) == len(recipients):
|
||||||
|
# Alle permanent fehlgeschlagen (User unknown) -> Löschen
|
||||||
|
mark_as_all_invalid(bucket, key, failed_permanent)
|
||||||
|
log(f"🛑 All recipients invalid. Deleted from queue.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Temporäre Fehler -> Retry
|
||||||
|
log(f"🔄 Temporary failures. Keeping in queue.")
|
||||||
|
return False
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"✗ Failed to load from S3: {e}", 'ERROR')
|
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR')
|
||||||
return False # Könnte temporär sein - retry
|
traceback.print_exc()
|
||||||
|
return False # Retry (außer es crasht immer wieder)
|
||||||
# An alle Recipients senden
|
|
||||||
log(f"\n📤 Sending to {len(recipients)} recipient(s)...")
|
|
||||||
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
|
|
||||||
|
|
||||||
successful = []
|
|
||||||
failed_temporary = []
|
|
||||||
failed_permanent = []
|
|
||||||
|
|
||||||
for recipient in recipients:
|
|
||||||
success, error, is_permanent = send_email(from_addr, recipient, raw_bytes)
|
|
||||||
|
|
||||||
if success:
|
|
||||||
successful.append(recipient)
|
|
||||||
elif is_permanent:
|
|
||||||
failed_permanent.append(recipient)
|
|
||||||
else:
|
|
||||||
failed_temporary.append(recipient)
|
|
||||||
|
|
||||||
# Ergebnis-Zusammenfassung
|
|
||||||
log(f"\n📊 Delivery Results:")
|
|
||||||
log(f" ✓ Successful: {len(successful)}/{len(recipients)}")
|
|
||||||
log(f" ✗ Failed (temporary): {len(failed_temporary)}")
|
|
||||||
log(f" ✗ Failed (permanent): {len(failed_permanent)}")
|
|
||||||
|
|
||||||
# Entscheidungslogik
|
|
||||||
if len(successful) > 0:
|
|
||||||
# ✅ Fall 1: Mindestens 1 Recipient erfolgreich
|
|
||||||
# → status=delivered, invalid_inboxes tracken
|
|
||||||
|
|
||||||
invalid_inboxes = failed_permanent if failed_permanent else None
|
|
||||||
mark_as_processed(bucket, key, invalid_inboxes)
|
|
||||||
|
|
||||||
log(f"{'='*70}")
|
|
||||||
log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS')
|
|
||||||
if failed_permanent:
|
|
||||||
log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING')
|
|
||||||
if failed_temporary:
|
|
||||||
log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING')
|
|
||||||
log(f"{'='*70}\n")
|
|
||||||
|
|
||||||
return True # Message löschen
|
|
||||||
|
|
||||||
elif len(failed_permanent) == len(recipients):
|
|
||||||
# ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig)
|
|
||||||
# → status=failed, invalid_inboxes = ALLE
|
|
||||||
|
|
||||||
mark_as_all_invalid(bucket, key, failed_permanent)
|
|
||||||
|
|
||||||
log(f"{'='*70}")
|
|
||||||
log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR')
|
|
||||||
log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR')
|
|
||||||
log(f"{'='*70}\n")
|
|
||||||
|
|
||||||
return True # Message löschen (nicht retryable)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries
|
|
||||||
# → Retry wenn noch Versuche übrig
|
|
||||||
|
|
||||||
if receive_count < 3:
|
|
||||||
log(f"⚠ All failures are temporary, will retry", 'WARNING')
|
|
||||||
log(f"{'='*70}\n")
|
|
||||||
return False # Message NICHT löschen → Retry
|
|
||||||
else:
|
|
||||||
# Max retries erreicht → als failed markieren
|
|
||||||
error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients."
|
|
||||||
mark_as_failed(bucket, key, error_summary, receive_count)
|
|
||||||
|
|
||||||
log(f"{'='*70}")
|
|
||||||
log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR')
|
|
||||||
log(f"{'='*70}\n")
|
|
||||||
|
|
||||||
return False # Nach 3 Versuchen → automatisch DLQ
|
|
||||||
|
|
||||||
|
|
||||||
def main_loop():
|
def main_loop():
|
||||||
|
|
|
||||||
|
|
@ -34,156 +34,6 @@ SMTP_PASS = os.environ.get('SMTP_PASS')
|
||||||
# Graceful shutdown
|
# Graceful shutdown
|
||||||
shutdown_requested = False
|
shutdown_requested = False
|
||||||
|
|
||||||
# DynamoDB Ressource für Bounce-Lookup
|
|
||||||
try:
|
|
||||||
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
|
|
||||||
msg_table = dynamo.Table('ses-outbound-messages')
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING')
|
|
||||||
msg_table = None
|
|
||||||
|
|
||||||
def get_bucket_name(domain):
|
|
||||||
"""Konvention: domain.tld -> domain-tld-emails"""
|
|
||||||
return domain.replace('.', '-') + '-emails'
|
|
||||||
|
|
||||||
def is_ses_bounce_or_autoreply(parsed):
|
|
||||||
"""Erkennt SES Bounces"""
|
|
||||||
from_h = (parsed.get('From') or '').lower()
|
|
||||||
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
|
|
||||||
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
|
|
||||||
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
|
|
||||||
return is_mailer_daemon or is_auto_replied
|
|
||||||
|
|
||||||
def extract_original_message_id(parsed):
|
|
||||||
"""
|
|
||||||
Extrahiert Original SES Message-ID aus Email
|
|
||||||
SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-[hex6]
|
|
||||||
"""
|
|
||||||
import re
|
|
||||||
|
|
||||||
# SES Message-ID Pattern (endet immer mit -000000)
|
|
||||||
ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000')
|
|
||||||
|
|
||||||
# 1. Versuche Standard-Header (In-Reply-To, References)
|
|
||||||
for header in ['In-Reply-To', 'References']:
|
|
||||||
value = (parsed.get(header) or '').strip()
|
|
||||||
if value:
|
|
||||||
match = ses_pattern.search(value)
|
|
||||||
if match:
|
|
||||||
log(f" Found Message-ID in {header}: {match.group(0)}")
|
|
||||||
return match.group(0)
|
|
||||||
|
|
||||||
# 2. Durchsuche Message-ID Header (manchmal steht dort die Original-ID)
|
|
||||||
msg_id_header = (parsed.get('Message-ID') or '').strip()
|
|
||||||
if msg_id_header:
|
|
||||||
match = ses_pattern.search(msg_id_header)
|
|
||||||
if match:
|
|
||||||
# Aber nur wenn es nicht die ID der aktuellen Bounce-Message ist
|
|
||||||
# (die beginnt oft auch mit 010f...)
|
|
||||||
pass # Wir überspringen das erstmal
|
|
||||||
|
|
||||||
# 3. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts)
|
|
||||||
# Das fängt auch attached messages, text attachments, etc. ab
|
|
||||||
try:
|
|
||||||
body_text = ''
|
|
||||||
|
|
||||||
# Hole den kompletten Body als String
|
|
||||||
if parsed.is_multipart():
|
|
||||||
for part in parsed.walk():
|
|
||||||
content_type = part.get_content_type()
|
|
||||||
|
|
||||||
# Durchsuche ALLE Parts (außer Binärdaten wie images)
|
|
||||||
# Text-Parts, HTML, attached messages, und auch application/* Parts
|
|
||||||
if content_type.startswith('text/') or \
|
|
||||||
content_type == 'message/rfc822' or \
|
|
||||||
content_type.startswith('application/'):
|
|
||||||
try:
|
|
||||||
payload = part.get_payload(decode=True)
|
|
||||||
if payload:
|
|
||||||
# Versuche als UTF-8, fallback auf Latin-1
|
|
||||||
try:
|
|
||||||
body_text += payload.decode('utf-8', errors='ignore')
|
|
||||||
except:
|
|
||||||
try:
|
|
||||||
body_text += payload.decode('latin-1', errors='ignore')
|
|
||||||
except:
|
|
||||||
# Letzter Versuch: als ASCII mit ignore
|
|
||||||
body_text += str(payload, errors='ignore')
|
|
||||||
except:
|
|
||||||
# Falls decode fehlschlägt, String-Payload holen
|
|
||||||
payload = part.get_payload()
|
|
||||||
if isinstance(payload, str):
|
|
||||||
body_text += payload
|
|
||||||
else:
|
|
||||||
# Nicht-Multipart Message
|
|
||||||
payload = parsed.get_payload(decode=True)
|
|
||||||
if payload:
|
|
||||||
try:
|
|
||||||
body_text = payload.decode('utf-8', errors='ignore')
|
|
||||||
except:
|
|
||||||
body_text = payload.decode('latin-1', errors='ignore')
|
|
||||||
|
|
||||||
# Suche alle SES Message-IDs im Body
|
|
||||||
matches = ses_pattern.findall(body_text)
|
|
||||||
if matches:
|
|
||||||
# Nehme die ERSTE gefundene ID (meist die Original-ID)
|
|
||||||
# Die letzte ist oft die Bounce-Message selbst
|
|
||||||
log(f" Found {len(matches)} SES Message-ID(s) in body, using first: {matches[0]}")
|
|
||||||
return matches[0]
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING')
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def apply_bounce_logic(parsed, subject):
|
|
||||||
"""
|
|
||||||
Prüft auf Bounce, sucht in DynamoDB und schreibt Header um.
|
|
||||||
Returns: (parsed_email_object, was_modified_bool)
|
|
||||||
"""
|
|
||||||
if not is_ses_bounce_or_autoreply(parsed):
|
|
||||||
return parsed, False
|
|
||||||
|
|
||||||
log("🔍 Detected auto-response/bounce. Checking DynamoDB...")
|
|
||||||
original_msg_id = extract_original_message_id(parsed)
|
|
||||||
|
|
||||||
if not original_msg_id:
|
|
||||||
log("⚠ Could not extract original Message-ID")
|
|
||||||
return parsed, False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Lookup in DynamoDB
|
|
||||||
result = msg_table.get_item(Key={'MessageId': original_msg_id})
|
|
||||||
item = result.get('Item')
|
|
||||||
|
|
||||||
if not item:
|
|
||||||
log(f"⚠ No DynamoDB record found for {original_msg_id}")
|
|
||||||
return parsed, False
|
|
||||||
|
|
||||||
# Treffer!
|
|
||||||
orig_source = item.get('source', '')
|
|
||||||
orig_destinations = item.get('destinations', [])
|
|
||||||
original_recipient = orig_destinations[0] if orig_destinations else ''
|
|
||||||
|
|
||||||
if original_recipient:
|
|
||||||
log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}")
|
|
||||||
|
|
||||||
# Rewrite Headers
|
|
||||||
parsed['X-Original-SES-From'] = parsed.get('From', '')
|
|
||||||
parsed.replace_header('From', original_recipient)
|
|
||||||
|
|
||||||
if not parsed.get('Reply-To'):
|
|
||||||
parsed['Reply-To'] = original_recipient
|
|
||||||
|
|
||||||
if 'delivery status notification' in subject.lower():
|
|
||||||
parsed.replace_header('Subject', f"Delivery Status: {original_recipient}")
|
|
||||||
|
|
||||||
return parsed, True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log(f"⚠ DynamoDB Error: {e}")
|
|
||||||
|
|
||||||
return parsed, False
|
|
||||||
|
|
||||||
def signal_handler(signum, frame):
|
def signal_handler(signum, frame):
|
||||||
global shutdown_requested
|
global shutdown_requested
|
||||||
|
|
@ -410,151 +260,125 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
|
||||||
return False, str(e), False
|
return False, str(e), False
|
||||||
|
|
||||||
|
|
||||||
# ==========================================
|
|
||||||
# HAUPTFUNKTION: PROCESS MESSAGE
|
|
||||||
# ==========================================
|
|
||||||
|
|
||||||
def process_message(message_body: dict, receive_count: int) -> bool:
|
def process_message(message_body: dict, receive_count: int) -> bool:
|
||||||
"""
|
"""
|
||||||
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
|
Verarbeitet eine E-Mail aus der Queue
|
||||||
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
|
Kann mehrere Recipients haben - sendet an alle
|
||||||
|
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
bucket = message_body['bucket']
|
||||||
|
key = message_body['key']
|
||||||
|
from_addr = message_body['from']
|
||||||
|
recipients = message_body['recipients'] # Liste von Empfängern
|
||||||
|
domain = message_body['domain']
|
||||||
|
subject = message_body.get('subject', '(unknown)')
|
||||||
|
message_id = message_body.get('message_id', '(unknown)')
|
||||||
|
|
||||||
|
log(f"\n{'='*70}")
|
||||||
|
log(f"Processing email (Attempt #{receive_count}):")
|
||||||
|
log(f" MessageId: {message_id}")
|
||||||
|
log(f" S3 Key: {key}")
|
||||||
|
log(f" Domain: {domain}")
|
||||||
|
log(f" From: {from_addr}")
|
||||||
|
log(f" Recipients: {len(recipients)}")
|
||||||
|
for recipient in recipients:
|
||||||
|
log(f" - {recipient}")
|
||||||
|
log(f" Subject: {subject}")
|
||||||
|
log(f" S3: s3://{bucket}/{key}")
|
||||||
|
log(f"{'='*70}")
|
||||||
|
|
||||||
|
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
|
||||||
|
if domain.lower() != WORKER_DOMAIN.lower():
|
||||||
|
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
|
||||||
|
log("This message should not be in this queue! Deleting...", 'ERROR')
|
||||||
|
return True # Message löschen (gehört nicht hierher)
|
||||||
|
|
||||||
|
# E-Mail aus S3 laden
|
||||||
try:
|
try:
|
||||||
# 1. UNPACKING (SNS -> SES)
|
response = s3.get_object(Bucket=bucket, Key=key)
|
||||||
# SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...'
|
raw_bytes = response['Body'].read()
|
||||||
if 'Message' in message_body and 'Type' in message_body:
|
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
|
||||||
# Es ist eine SNS Notification
|
except s3.exceptions.NoSuchKey:
|
||||||
sns_content = message_body['Message']
|
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
|
||||||
if isinstance(sns_content, str):
|
return True # Nicht retryable - Message löschen
|
||||||
ses_msg = json.loads(sns_content)
|
|
||||||
else:
|
|
||||||
ses_msg = sns_content
|
|
||||||
else:
|
|
||||||
# Fallback: Vielleicht doch direkt SES (Legacy support)
|
|
||||||
ses_msg = message_body
|
|
||||||
|
|
||||||
# 2. DATEN EXTRAHIEREN
|
|
||||||
mail = ses_msg.get('mail', {})
|
|
||||||
receipt = ses_msg.get('receipt', {})
|
|
||||||
|
|
||||||
message_id = mail.get('messageId') # Das ist der S3 Key!
|
|
||||||
# FIX: Amazon SES Setup Notification ignorieren
|
|
||||||
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
|
|
||||||
log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO')
|
|
||||||
return True # Erfolgreich (löschen), da kein Fehler
|
|
||||||
from_addr = mail.get('source')
|
|
||||||
recipients = receipt.get('recipients', [])
|
|
||||||
|
|
||||||
# S3 Key Validation
|
|
||||||
if not message_id:
|
|
||||||
log("❌ Error: No messageId in event payload", 'ERROR')
|
|
||||||
return True # Löschen, da unbrauchbar
|
|
||||||
|
|
||||||
# Domain Validation
|
|
||||||
# Wir nehmen den ersten Empfänger um die Domain zu prüfen
|
|
||||||
if recipients:
|
|
||||||
first_recipient = recipients[0]
|
|
||||||
domain = first_recipient.split('@')[1]
|
|
||||||
|
|
||||||
if domain.lower() != WORKER_DOMAIN.lower():
|
|
||||||
log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING')
|
|
||||||
return True # Löschen, gehört nicht hierher
|
|
||||||
else:
|
|
||||||
log("⚠ Warning: No recipients in event", 'WARNING')
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Bucket Name ableiten
|
|
||||||
bucket = get_bucket_name(WORKER_DOMAIN)
|
|
||||||
key = message_id
|
|
||||||
|
|
||||||
log(f"\n{'='*70}")
|
|
||||||
log(f"Processing Email (SNS/SES):")
|
|
||||||
log(f" ID: {key}")
|
|
||||||
log(f" Recipients: {len(recipients)} -> {recipients}")
|
|
||||||
log(f" Bucket: {bucket}")
|
|
||||||
|
|
||||||
# 3. LADEN AUS S3
|
|
||||||
try:
|
|
||||||
response = s3.get_object(Bucket=bucket, Key=key)
|
|
||||||
raw_bytes = response['Body'].read()
|
|
||||||
log(f"✓ Loaded {len(raw_bytes)} bytes from S3")
|
|
||||||
except s3.exceptions.NoSuchKey:
|
|
||||||
# Race Condition: SNS war schneller als S3.
|
|
||||||
# Wir geben False zurück, damit SQS es in 30s nochmal versucht.
|
|
||||||
if receive_count < 5:
|
|
||||||
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
|
|
||||||
return True # Löschen
|
|
||||||
except Exception as e:
|
|
||||||
log(f"❌ S3 Download Error: {e}", 'ERROR')
|
|
||||||
return False # Retry
|
|
||||||
|
|
||||||
# 4. PARSING & BOUNCE LOGIC
|
|
||||||
try:
|
|
||||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
|
||||||
subject = parsed.get('Subject', '(no subject)')
|
|
||||||
|
|
||||||
# Hier passiert die Magie: Bounce Header umschreiben
|
|
||||||
parsed, modified = apply_bounce_logic(parsed, subject)
|
|
||||||
|
|
||||||
if modified:
|
|
||||||
log(" ✨ Bounce detected & headers rewritten via DynamoDB")
|
|
||||||
# Wir arbeiten mit den modifizierten Bytes weiter
|
|
||||||
raw_bytes = parsed.as_bytes()
|
|
||||||
from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope
|
|
||||||
else:
|
|
||||||
from_addr_final = from_addr # Original Envelope Sender
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING')
|
|
||||||
from_addr_final = from_addr
|
|
||||||
|
|
||||||
# 5. SMTP VERSAND (Loop über Recipients)
|
|
||||||
log(f"📤 Sending to {len(recipients)} recipient(s)...")
|
|
||||||
|
|
||||||
successful = []
|
|
||||||
failed_permanent = []
|
|
||||||
failed_temporary = []
|
|
||||||
|
|
||||||
for recipient in recipients:
|
|
||||||
# Wir nutzen raw_bytes (ggf. modifiziert)
|
|
||||||
# WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final'
|
|
||||||
# (bei Bounces ist das der Original-Empfänger, sonst der SES Sender)
|
|
||||||
success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes)
|
|
||||||
|
|
||||||
if success:
|
|
||||||
successful.append(recipient)
|
|
||||||
elif is_perm:
|
|
||||||
failed_permanent.append(recipient)
|
|
||||||
else:
|
|
||||||
failed_temporary.append(recipient)
|
|
||||||
|
|
||||||
# 6. RESULTAT & CLEANUP
|
|
||||||
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail")
|
|
||||||
|
|
||||||
if len(successful) > 0:
|
|
||||||
# Mindestens einer durchgegangen -> Erfolg
|
|
||||||
mark_as_processed(bucket, key, failed_permanent if failed_permanent else None)
|
|
||||||
log(f"✅ Success. Deleted from queue.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif len(failed_permanent) == len(recipients):
|
|
||||||
# Alle permanent fehlgeschlagen (User unknown) -> Löschen
|
|
||||||
mark_as_all_invalid(bucket, key, failed_permanent)
|
|
||||||
log(f"🛑 All recipients invalid. Deleted from queue.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Temporäre Fehler -> Retry
|
|
||||||
log(f"🔄 Temporary failures. Keeping in queue.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR')
|
log(f"✗ Failed to load from S3: {e}", 'ERROR')
|
||||||
traceback.print_exc()
|
return False # Könnte temporär sein - retry
|
||||||
return False # Retry (außer es crasht immer wieder)
|
|
||||||
|
# An alle Recipients senden
|
||||||
|
log(f"\n📤 Sending to {len(recipients)} recipient(s)...")
|
||||||
|
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
|
||||||
|
|
||||||
|
successful = []
|
||||||
|
failed_temporary = []
|
||||||
|
failed_permanent = []
|
||||||
|
|
||||||
|
for recipient in recipients:
|
||||||
|
success, error, is_permanent = send_email(from_addr, recipient, raw_bytes)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
successful.append(recipient)
|
||||||
|
elif is_permanent:
|
||||||
|
failed_permanent.append(recipient)
|
||||||
|
else:
|
||||||
|
failed_temporary.append(recipient)
|
||||||
|
|
||||||
|
# Ergebnis-Zusammenfassung
|
||||||
|
log(f"\n📊 Delivery Results:")
|
||||||
|
log(f" ✓ Successful: {len(successful)}/{len(recipients)}")
|
||||||
|
log(f" ✗ Failed (temporary): {len(failed_temporary)}")
|
||||||
|
log(f" ✗ Failed (permanent): {len(failed_permanent)}")
|
||||||
|
|
||||||
|
# Entscheidungslogik
|
||||||
|
if len(successful) > 0:
|
||||||
|
# ✅ Fall 1: Mindestens 1 Recipient erfolgreich
|
||||||
|
# → status=delivered, invalid_inboxes tracken
|
||||||
|
|
||||||
|
invalid_inboxes = failed_permanent if failed_permanent else None
|
||||||
|
mark_as_processed(bucket, key, invalid_inboxes)
|
||||||
|
|
||||||
|
log(f"{'='*70}")
|
||||||
|
log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS')
|
||||||
|
if failed_permanent:
|
||||||
|
log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING')
|
||||||
|
if failed_temporary:
|
||||||
|
log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING')
|
||||||
|
log(f"{'='*70}\n")
|
||||||
|
|
||||||
|
return True # Message löschen
|
||||||
|
|
||||||
|
elif len(failed_permanent) == len(recipients):
|
||||||
|
# ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig)
|
||||||
|
# → status=failed, invalid_inboxes = ALLE
|
||||||
|
|
||||||
|
mark_as_all_invalid(bucket, key, failed_permanent)
|
||||||
|
|
||||||
|
log(f"{'='*70}")
|
||||||
|
log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR')
|
||||||
|
log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR')
|
||||||
|
log(f"{'='*70}\n")
|
||||||
|
|
||||||
|
return True # Message löschen (nicht retryable)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries
|
||||||
|
# → Retry wenn noch Versuche übrig
|
||||||
|
|
||||||
|
if receive_count < 3:
|
||||||
|
log(f"⚠ All failures are temporary, will retry", 'WARNING')
|
||||||
|
log(f"{'='*70}\n")
|
||||||
|
return False # Message NICHT löschen → Retry
|
||||||
|
else:
|
||||||
|
# Max retries erreicht → als failed markieren
|
||||||
|
error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients."
|
||||||
|
mark_as_failed(bucket, key, error_summary, receive_count)
|
||||||
|
|
||||||
|
log(f"{'='*70}")
|
||||||
|
log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR')
|
||||||
|
log(f"{'='*70}\n")
|
||||||
|
|
||||||
|
return False # Nach 3 Versuchen → automatisch DLQ
|
||||||
|
|
||||||
|
|
||||||
def main_loop():
|
def main_loop():
|
||||||
Loading…
Reference in New Issue