diff --git a/worker.py b/worker.py index a43768f..0b2d1c5 100644 --- a/worker.py +++ b/worker.py @@ -67,8 +67,11 @@ def get_queue_url() -> str: raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") -def mark_as_processed(bucket: str, key: str): - """Markiert E-Mail als erfolgreich zugestellt""" +def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): + """ + Markiert E-Mail als erfolgreich zugestellt + Setzt processed=true auch wenn manche Recipients fehlgeschlagen sind + """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} @@ -80,6 +83,11 @@ def mark_as_processed(bucket: str, key: str): metadata.pop('processing_started', None) metadata.pop('queued_at', None) + # Invalid inboxes speichern falls vorhanden + if invalid_inboxes: + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING') + s3.copy_object( Bucket=bucket, Key=key, @@ -95,7 +103,10 @@ def mark_as_processed(bucket: str, key: str): def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): - """Markiert E-Mail als fehlgeschlagen""" + """ + Markiert E-Mail als komplett fehlgeschlagen + Wird nur aufgerufen wenn ALLE Recipients fehlschlagen + """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} @@ -144,14 +155,34 @@ def is_temporary_smtp_error(error_msg: str) -> bool: return any(indicator in error_lower for indicator in temporary_indicators) +def is_permanent_recipient_error(error_msg: str) -> bool: + """ + Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) + 550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid + """ + permanent_indicators = [ + '550', # Mailbox unavailable / not found + '551', # User not local + '553', # Mailbox name not allowed / invalid + 'mailbox not found', + 'user unknown', + 'no such user', + 'recipient rejected', + 'does not exist', + 'invalid recipient', + 'unknown user' + ] + + error_lower = error_msg.lower() + return any(indicator in error_lower for indicator in permanent_indicators) + + def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: """ - Sendet E-Mail via SMTP - Returns: (success: bool, error: str or None) + Sendet E-Mail via SMTP an EINEN Empfänger + Returns: (success: bool, error: str or None, is_permanent: bool) """ - log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") - try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: smtp.ehlo() @@ -161,17 +192,15 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: try: smtp.starttls() smtp.ehlo() - log("✓ STARTTLS enabled") except Exception as e: - log(f"STARTTLS failed: {e}", 'WARNING') + log(f" STARTTLS failed: {e}", 'WARNING') # Authentication falls konfiguriert if SMTP_USER and SMTP_PASS: try: smtp.login(SMTP_USER, SMTP_PASS) - log("✓ SMTP authenticated") except Exception as e: - log(f"SMTP auth failed: {e}", 'WARNING') + log(f" SMTP auth failed: {e}", 'WARNING') # E-Mail senden result = smtp.sendmail(from_addr, [recipient], raw_message) @@ -180,32 +209,37 @@ def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: if isinstance(result, dict) and result: # Empfänger wurde abgelehnt error = result.get(recipient, 'Unknown refusal') - log(f"✗ Recipient refused: {error}", 'ERROR') - return False, str(error) + is_permanent = is_permanent_recipient_error(str(error)) + log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR') + return False, str(error), is_permanent else: # Erfolgreich - log(f"✓ Email delivered to {recipient}", 'SUCCESS') - return True, None + log(f" ✓ {recipient}: Delivered", 'SUCCESS') + return True, None, False except smtplib.SMTPException as e: - log(f"✗ SMTP error: {e}", 'ERROR') - return False, str(e) + error_msg = str(e) + is_permanent = is_permanent_recipient_error(error_msg) + log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR') + return False, error_msg, is_permanent except Exception as e: - log(f"✗ Connection error: {e}", 'ERROR') - return False, str(e) + # Connection errors sind immer temporär + log(f" ✗ {recipient}: Connection error - {e}", 'ERROR') + return False, str(e), False def process_message(message_body: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue + Kann mehrere Recipients haben - sendet an alle Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) """ bucket = message_body['bucket'] key = message_body['key'] from_addr = message_body['from'] - recipient = message_body['recipient'] # Nur 1 Empfänger + recipients = message_body['recipients'] # Liste von Empfängern domain = message_body['domain'] subject = message_body.get('subject', '(unknown)') message_id = message_body.get('message_id', '(unknown)') @@ -213,9 +247,12 @@ def process_message(message_body: dict, receive_count: int) -> bool: log(f"\n{'='*70}") log(f"Processing email (Attempt #{receive_count}):") log(f" MessageId: {message_id}") + log(f" S3 Key: {key}") log(f" Domain: {domain}") log(f" From: {from_addr}") - log(f" To: {recipient}") + log(f" Recipients: {len(recipients)}") + for recipient in recipients: + log(f" - {recipient}") log(f" Subject: {subject}") log(f" S3: s3://{bucket}/{key}") log(f"{'='*70}") @@ -238,35 +275,75 @@ def process_message(message_body: dict, receive_count: int) -> bool: log(f"✗ Failed to load from S3: {e}", 'ERROR') return False # Könnte temporär sein - retry - # E-Mail senden - success, error = send_email(from_addr, recipient, raw_bytes) + # An alle Recipients senden + log(f"\n📤 Sending to {len(recipients)} recipient(s)...") + log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") - if success: - # Erfolgreich zugestellt - mark_as_processed(bucket, key) + successful = [] + failed_temporary = [] + failed_permanent = [] + + for recipient in recipients: + success, error, is_permanent = send_email(from_addr, recipient, raw_bytes) + + if success: + successful.append(recipient) + elif is_permanent: + failed_permanent.append(recipient) + else: + failed_temporary.append(recipient) + + # Ergebnis-Zusammenfassung + log(f"\n📊 Delivery Results:") + log(f" ✓ Successful: {len(successful)}/{len(recipients)}") + log(f" ✗ Failed (temporary): {len(failed_temporary)}") + log(f" ✗ Failed (permanent): {len(failed_permanent)}") + + # Entscheidungslogik + if len(successful) > 0: + # Mindestens 1 Recipient erfolgreich + # → processed=true setzen, invalid_inboxes tracken + + invalid_inboxes = failed_permanent if failed_permanent else None + mark_as_processed(bucket, key, invalid_inboxes) log(f"{'='*70}") - log(f"✅ Email delivered successfully", 'SUCCESS') + log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS') + if failed_permanent: + log(f"⚠ {len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING') + if failed_temporary: + log(f"⚠ {len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING') log(f"{'='*70}\n") return True # Message löschen - else: - # Fehler aufgetreten - error_msg = error or "Unknown SMTP error" + elif len(failed_permanent) == len(recipients): + # ALLE Recipients sind permanent fehlgeschlagen (alle Inboxen ungültig) + # → processed=true setzen mit allen als invalid_inboxes - # Prüfe ob temporärer Fehler (Retry sinnvoll) - if receive_count < 3 and is_temporary_smtp_error(error_msg): - log(f"⚠ Temporary error detected, will retry", 'WARNING') + mark_as_processed(bucket, key, failed_permanent) + + log(f"{'='*70}") + log(f"✗ All recipients permanently failed (invalid inboxes)", 'ERROR') + log(f"{'='*70}\n") + + return True # Message löschen (nicht retryable) + + else: + # Nur temporäre Fehler, keine erfolgreichen Deliveries + # → Retry wenn noch Versuche übrig + + if receive_count < 3: + log(f"⚠ All failures are temporary, will retry", 'WARNING') log(f"{'='*70}\n") return False # Message NICHT löschen → Retry else: - # Permanenter Fehler oder max retries erreicht - mark_as_failed(bucket, key, error_msg, receive_count) + # Max retries erreicht → als failed markieren + error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients." + mark_as_failed(bucket, key, error_summary, receive_count) log(f"{'='*70}") - log(f"✗ Email delivery failed permanently", 'ERROR') - log(f" Error: {error_msg}") + log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR') log(f"{'='*70}\n") return False # Nach 3 Versuchen → automatisch DLQ