email-amazon/email-worker/worker.py

340 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Email message processing worker
"""
import json
import time
import traceback
from typing import List, Tuple
from logger import log
from config import config, domain_to_bucket_name
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker
from smtp.delivery import EmailDelivery
from metrics.prometheus import MetricsCollector
from email.parser import BytesParser # War wahrscheinlich schon da, prüfen
from email.policy import compat32 # <--- NEU: Hinzufügen
class MessageProcessor:
"""Processes individual email messages"""
def __init__(
self,
s3: S3Handler,
sqs: SQSHandler,
ses: SESHandler,
dynamodb: DynamoDBHandler,
delivery: EmailDelivery,
metrics: MetricsCollector
):
self.s3 = s3
self.sqs = sqs
self.ses = ses
self.dynamodb = dynamodb
self.delivery = delivery
self.metrics = metrics
# Initialize sub-processors
self.parser = EmailParser()
self.bounce_handler = BounceHandler(dynamodb)
self.rules_processor = RulesProcessor(dynamodb, ses)
self.blocklist = BlocklistChecker(dynamodb)
def process_message(self, domain: str, message: dict, receive_count: int) -> bool:
"""
Process one email message from queue
Args:
domain: Email domain
message: SQS message dict
receive_count: Number of times received
Returns:
True to delete from queue, False to retry
"""
worker_name = f"worker-{domain}"
try:
# 1. UNPACKING (SNS -> SES)
message_body = json.loads(message['Body'])
if 'Message' in message_body and 'Type' in message_body:
# It's an SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
ses_msg = message_body
# 2. EXTRACT DATA
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId')
# FIX: Ignore Amazon SES Setup Notification
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log(" Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
return True
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
return True
# Domain Validation
if recipients:
first_recipient = recipients[0]
recipient_domain = first_recipient.split('@')[1]
if recipient_domain.lower() != domain.lower():
log(
f"⚠ Security: Ignored message for {recipient_domain} "
f"(I am worker for {domain})",
'WARNING',
worker_name
)
return True
else:
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
return True
bucket = domain_to_bucket_name(domain)
key = message_id
# Compact single-line log for email processing
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
# 3. DOWNLOAD FROM S3
raw_bytes = self.s3.get_email(domain, message_id, receive_count)
if raw_bytes is None:
# S3 object not found yet, retry
return False
# 4. LOOP DETECTION
temp_parsed = self.parser.parse_bytes(raw_bytes)
skip_rules = self.parser.is_processed_by_worker(temp_parsed)
if skip_rules:
log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
# 5. PARSING & BOUNCE LOGIC
try:
# --- FIX 2.0: Pre-Sanitize via Legacy Mode ---
# Der strikte Parser crasht SOFORT beim Zugriff auf kaputte Header.
# Wir müssen erst "nachsichtig" parsen, reparieren und Bytes neu generieren.
try:
# 1. Parsen im Compat32-Modus (ignoriert Syntaxfehler)
lenient_parser = BytesParser(policy=compat32)
temp_msg = lenient_parser.parsebytes(raw_bytes)
# 2. Prüfen und Reparieren
bad_msg_id = temp_msg.get('Message-ID', '')
if bad_msg_id and ('[' in bad_msg_id or ']' in bad_msg_id):
clean_id = bad_msg_id.replace('[', '').replace(']', '')
temp_msg.replace_header('Message-ID', clean_id)
# 3. Bytes mit repariertem Header neu schreiben
raw_bytes = temp_msg.as_bytes()
log(f" 🔧 Sanitized malformed Message-ID via Legacy Mode: {clean_id}", 'INFO', worker_name)
if self.metrics:
self.metrics.increment_bounce(domain, 'sanitized_header')
except Exception as e_sanitize:
# Sollte nicht passieren, aber wir wollen hier nicht abbrechen
log(f" ⚠ Sanitization warning: {e_sanitize}", 'WARNING', worker_name)
# ---------------------------------------------
parsed = self.parser.parse_bytes(raw_bytes)
# --- FIX START: Sanitize Malformed Headers ---
# Fix für Microsofts <[uuid]@domain> Message-IDs, die Python crashen lassen
current_msg_id = parsed.get('Message-ID', '')
if current_msg_id and ('[' in current_msg_id or ']' in current_msg_id):
# Klammern entfernen, aber spitze Klammern behalten
clean_id = current_msg_id.replace('[', '').replace(']', '')
parsed.replace_header('Message-ID', clean_id)
log(f" 🔧 Sanitized malformed Message-ID", 'INFO', worker_name)
# --- FIX END ---
subject = parsed.get('Subject', '(no subject)')
# Bounce header rewriting
is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed)
parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From')
if self.metrics:
self.metrics.increment_bounce(domain, 'rewritten')
else:
from_addr_final = from_addr
# Marker für alle Emails von extern setzen
if not skip_rules: # Nur wenn nicht bereits processed
parsed['X-SES-Worker-Processed'] = 'delivered'
raw_bytes = parsed.as_bytes() # <--- Hier knallte es vorher
except Exception as e:
# --- VERBESSERTES ERROR LOGGING ---
error_msg = f"⚠ Parsing/Logic Error: {e}. Sending original."
log(error_msg, 'WARNING', worker_name)
# Den vollen Traceback ins Log schreiben (als ERROR markiert)
tb_str = traceback.format_exc()
log(f"Full Traceback:\n{tb_str}", 'ERROR', worker_name)
# ----------------------------------
# Fallback: Wir versuchen trotzdem, die Original-Mail zuzustellen
from_addr_final = from_addr
is_bounce = False
skip_rules = False
# 6. BLOCKLIST CHECK (Batch for efficiency)
blocked_by_recipient = self.blocklist.batch_check_blocked_senders(
recipients,
from_addr_final,
worker_name
)
# 7. PROCESS RECIPIENTS
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
successful = []
failed_permanent = []
failed_temporary = []
blocked_recipients = []
for recipient in recipients:
# Check if blocked
if blocked_by_recipient.get(recipient, False):
log(
f"🗑 Silently dropping message for {recipient} (Sender blocked)",
'INFO',
worker_name
)
blocked_recipients.append(recipient)
if self.metrics:
self.metrics.increment_blocked(domain)
continue
# Process rules (OOO, Forwarding) - not for bounces or already forwarded
skip_local_delivery = False # NEU
if not is_bounce and not skip_rules:
def metrics_callback(action_type: str, dom: str):
"""Callback for metrics from rules processor"""
if self.metrics:
if action_type == 'autoreply':
self.metrics.increment_autoreply(dom)
elif action_type == 'forward':
self.metrics.increment_forward(dom)
skip_local_delivery = self.rules_processor.process_rules_for_recipient(
recipient,
parsed,
domain,
worker_name,
metrics_callback
)
# SMTP Delivery
if skip_local_delivery: # NEU
log(f" ⏭ Skipping local delivery for {recipient} (legacy forward active)",
'INFO', worker_name)
successful.append(recipient) # Zählt als "handled"
else:
success, error, is_perm = self.delivery.send_to_recipient(
from_addr_final, recipient, raw_bytes, worker_name
)
if success:
successful.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'success')
elif is_perm:
failed_permanent.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'permanent_failure')
else:
failed_temporary.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'temporary_failure')
# 8. RESULT & CLEANUP
total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients)
if total_handled == len(recipients):
# All recipients handled (success, permanent fail, or blocked)
if len(blocked_recipients) == len(recipients):
# All recipients blocked - mark and delete S3 object
try:
self.s3.mark_as_blocked(
domain,
message_id,
blocked_recipients,
from_addr_final,
worker_name
)
self.s3.delete_blocked_email(domain, message_id, worker_name)
except Exception as e:
log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name)
# Don't delete from queue if S3 operations failed
return False
elif len(successful) > 0:
# At least one success
self.s3.mark_as_processed(
domain,
message_id,
worker_name,
failed_permanent if failed_permanent else None
)
elif len(failed_permanent) > 0:
# All failed permanently
self.s3.mark_as_all_invalid(
domain,
message_id,
failed_permanent,
worker_name
)
# Build result summary
result_parts = []
if successful:
result_parts.append(f"{len(successful)} OK")
if failed_permanent:
result_parts.append(f"{len(failed_permanent)} invalid")
if blocked_recipients:
result_parts.append(f"{len(blocked_recipients)} blocked")
log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name)
return True
else:
# Some recipients had temporary failures
log(
f"🔄 Temp failure ({len(failed_temporary)} failed), will retry",
'WARNING',
worker_name
)
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
traceback.print_exc()
return False