#!/usr/bin/env python3 """ Email message processing worker """ import json import time import traceback from typing import List, Tuple from logger import log from config import config, domain_to_bucket_name from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker from smtp.delivery import EmailDelivery from metrics.prometheus import MetricsCollector class MessageProcessor: """Processes individual email messages""" def __init__( self, s3: S3Handler, sqs: SQSHandler, ses: SESHandler, dynamodb: DynamoDBHandler, delivery: EmailDelivery, metrics: MetricsCollector ): self.s3 = s3 self.sqs = sqs self.ses = ses self.dynamodb = dynamodb self.delivery = delivery self.metrics = metrics # Initialize sub-processors self.parser = EmailParser() self.bounce_handler = BounceHandler(dynamodb) self.rules_processor = RulesProcessor(dynamodb, ses) self.blocklist = BlocklistChecker(dynamodb) def process_message(self, domain: str, message: dict, receive_count: int) -> bool: """ Process one email message from queue Args: domain: Email domain message: SQS message dict receive_count: Number of times received Returns: True to delete from queue, False to retry """ worker_name = f"worker-{domain}" try: # 1. UNPACKING (SNS -> SES) message_body = json.loads(message['Body']) if 'Message' in message_body and 'Type' in message_body: # It's an SNS Notification sns_content = message_body['Message'] if isinstance(sns_content, str): ses_msg = json.loads(sns_content) else: ses_msg = sns_content else: ses_msg = message_body # 2. EXTRACT DATA mail = ses_msg.get('mail', {}) receipt = ses_msg.get('receipt', {}) message_id = mail.get('messageId') # FIX: Ignore Amazon SES Setup Notification if message_id == "AMAZON_SES_SETUP_NOTIFICATION": log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name) return True from_addr = mail.get('source') recipients = receipt.get('recipients', []) if not message_id: log("❌ Error: No messageId in event payload", 'ERROR', worker_name) return True # Domain Validation if recipients: first_recipient = recipients[0] recipient_domain = first_recipient.split('@')[1] if recipient_domain.lower() != domain.lower(): log( f"⚠ Security: Ignored message for {recipient_domain} " f"(I am worker for {domain})", 'WARNING', worker_name ) return True else: log("⚠ Warning: No recipients in event", 'WARNING', worker_name) return True bucket = domain_to_bucket_name(domain) key = message_id # Compact single-line log for email processing recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients" log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name) # 3. DOWNLOAD FROM S3 raw_bytes = self.s3.get_email(domain, message_id, receive_count) if raw_bytes is None: # S3 object not found yet, retry return False # 4. LOOP DETECTION temp_parsed = self.parser.parse_bytes(raw_bytes) skip_rules = self.parser.is_processed_by_worker(temp_parsed) if skip_rules: log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) # 5. PARSING & BOUNCE LOGIC try: parsed = self.parser.parse_bytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') # Bounce header rewriting is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed) parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name) if modified: log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name) raw_bytes = parsed.as_bytes() from_addr_final = parsed.get('From') if self.metrics: self.metrics.increment_bounce(domain, 'rewritten') else: from_addr_final = from_addr # ⭐ HIER NEU: Marker für alle Emails von extern setzen if not skip_rules: # Nur wenn nicht bereits processed parsed['X-SES-Worker-Processed'] = 'delivered' raw_bytes = parsed.as_bytes() except Exception as e: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name) traceback.print_exc() from_addr_final = from_addr is_bounce = False skip_rules = False # 6. BLOCKLIST CHECK (Batch for efficiency) blocked_by_recipient = self.blocklist.batch_check_blocked_senders( recipients, from_addr_final, worker_name ) # 7. PROCESS RECIPIENTS log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name) successful = [] failed_permanent = [] failed_temporary = [] blocked_recipients = [] for recipient in recipients: # Check if blocked if blocked_by_recipient.get(recipient, False): log( f"🗑 Silently dropping message for {recipient} (Sender blocked)", 'INFO', worker_name ) blocked_recipients.append(recipient) if self.metrics: self.metrics.increment_blocked(domain) continue # Process rules (OOO, Forwarding) - not for bounces or already forwarded if not is_bounce and not skip_rules: def metrics_callback(action_type: str, dom: str): """Callback for metrics from rules processor""" if self.metrics: if action_type == 'autoreply': self.metrics.increment_autoreply(dom) elif action_type == 'forward': self.metrics.increment_forward(dom) self.rules_processor.process_rules_for_recipient( recipient, parsed, domain, worker_name, metrics_callback ) # SMTP Delivery success, error, is_perm = self.delivery.send_to_recipient( from_addr_final, recipient, raw_bytes, worker_name ) if success: successful.append(recipient) if self.metrics: self.metrics.increment_processed(domain, 'success') elif is_perm: failed_permanent.append(recipient) if self.metrics: self.metrics.increment_processed(domain, 'permanent_failure') else: failed_temporary.append(recipient) if self.metrics: self.metrics.increment_processed(domain, 'temporary_failure') # 8. RESULT & CLEANUP total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients) if total_handled == len(recipients): # All recipients handled (success, permanent fail, or blocked) if len(blocked_recipients) == len(recipients): # All recipients blocked - mark and delete S3 object try: self.s3.mark_as_blocked( domain, message_id, blocked_recipients, from_addr_final, worker_name ) self.s3.delete_blocked_email(domain, message_id, worker_name) except Exception as e: log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name) # Don't delete from queue if S3 operations failed return False elif len(successful) > 0: # At least one success self.s3.mark_as_processed( domain, message_id, worker_name, failed_permanent if failed_permanent else None ) elif len(failed_permanent) > 0: # All failed permanently self.s3.mark_as_all_invalid( domain, message_id, failed_permanent, worker_name ) # Build result summary result_parts = [] if successful: result_parts.append(f"{len(successful)} OK") if failed_permanent: result_parts.append(f"{len(failed_permanent)} invalid") if blocked_recipients: result_parts.append(f"{len(blocked_recipients)} blocked") log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name) return True else: # Some recipients had temporary failures log( f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name ) return False except Exception as e: log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name) traceback.print_exc() return False