290 lines
11 KiB
Python
290 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Email message processing worker
|
||
"""
|
||
|
||
import json
|
||
import time
|
||
import traceback
|
||
from typing import List, Tuple
|
||
|
||
from logger import log
|
||
from config import config, domain_to_bucket_name
|
||
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
|
||
from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker
|
||
from smtp.delivery import EmailDelivery
|
||
from metrics.prometheus import MetricsCollector
|
||
|
||
|
||
class MessageProcessor:
|
||
"""Processes individual email messages"""
|
||
|
||
def __init__(
|
||
self,
|
||
s3: S3Handler,
|
||
sqs: SQSHandler,
|
||
ses: SESHandler,
|
||
dynamodb: DynamoDBHandler,
|
||
delivery: EmailDelivery,
|
||
metrics: MetricsCollector
|
||
):
|
||
self.s3 = s3
|
||
self.sqs = sqs
|
||
self.ses = ses
|
||
self.dynamodb = dynamodb
|
||
self.delivery = delivery
|
||
self.metrics = metrics
|
||
|
||
# Initialize sub-processors
|
||
self.parser = EmailParser()
|
||
self.bounce_handler = BounceHandler(dynamodb)
|
||
self.rules_processor = RulesProcessor(dynamodb, ses)
|
||
self.blocklist = BlocklistChecker(dynamodb)
|
||
|
||
def process_message(self, domain: str, message: dict, receive_count: int) -> bool:
|
||
"""
|
||
Process one email message from queue
|
||
|
||
Args:
|
||
domain: Email domain
|
||
message: SQS message dict
|
||
receive_count: Number of times received
|
||
|
||
Returns:
|
||
True to delete from queue, False to retry
|
||
"""
|
||
worker_name = f"worker-{domain}"
|
||
|
||
try:
|
||
# 1. UNPACKING (SNS -> SES)
|
||
message_body = json.loads(message['Body'])
|
||
|
||
if 'Message' in message_body and 'Type' in message_body:
|
||
# It's an SNS Notification
|
||
sns_content = message_body['Message']
|
||
if isinstance(sns_content, str):
|
||
ses_msg = json.loads(sns_content)
|
||
else:
|
||
ses_msg = sns_content
|
||
else:
|
||
ses_msg = message_body
|
||
|
||
# 2. EXTRACT DATA
|
||
mail = ses_msg.get('mail', {})
|
||
receipt = ses_msg.get('receipt', {})
|
||
|
||
message_id = mail.get('messageId')
|
||
|
||
# FIX: Ignore Amazon SES Setup Notification
|
||
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
|
||
log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
|
||
return True
|
||
|
||
from_addr = mail.get('source')
|
||
recipients = receipt.get('recipients', [])
|
||
|
||
if not message_id:
|
||
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
|
||
return True
|
||
|
||
# Domain Validation
|
||
if recipients:
|
||
first_recipient = recipients[0]
|
||
recipient_domain = first_recipient.split('@')[1]
|
||
|
||
if recipient_domain.lower() != domain.lower():
|
||
log(
|
||
f"⚠ Security: Ignored message for {recipient_domain} "
|
||
f"(I am worker for {domain})",
|
||
'WARNING',
|
||
worker_name
|
||
)
|
||
return True
|
||
else:
|
||
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
|
||
return True
|
||
|
||
bucket = domain_to_bucket_name(domain)
|
||
key = message_id
|
||
|
||
# Compact single-line log for email processing
|
||
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
|
||
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
|
||
|
||
# 3. DOWNLOAD FROM S3
|
||
raw_bytes = self.s3.get_email(domain, message_id, receive_count)
|
||
if raw_bytes is None:
|
||
# S3 object not found yet, retry
|
||
return False
|
||
|
||
# 4. LOOP DETECTION
|
||
temp_parsed = self.parser.parse_bytes(raw_bytes)
|
||
skip_rules = self.parser.is_processed_by_worker(temp_parsed)
|
||
|
||
if skip_rules:
|
||
log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
|
||
|
||
# 5. PARSING & BOUNCE LOGIC
|
||
try:
|
||
parsed = self.parser.parse_bytes(raw_bytes)
|
||
subject = parsed.get('Subject', '(no subject)')
|
||
|
||
# Bounce header rewriting
|
||
is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed)
|
||
parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name)
|
||
|
||
if modified:
|
||
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
|
||
raw_bytes = parsed.as_bytes()
|
||
from_addr_final = parsed.get('From')
|
||
|
||
if self.metrics:
|
||
self.metrics.increment_bounce(domain, 'rewritten')
|
||
else:
|
||
from_addr_final = from_addr
|
||
|
||
# ⭐ HIER NEU: Marker für alle Emails von extern setzen
|
||
if not skip_rules: # Nur wenn nicht bereits processed
|
||
parsed['X-SES-Worker-Processed'] = 'delivered'
|
||
raw_bytes = parsed.as_bytes()
|
||
|
||
except Exception as e:
|
||
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name)
|
||
traceback.print_exc()
|
||
from_addr_final = from_addr
|
||
is_bounce = False
|
||
skip_rules = False
|
||
|
||
# 6. BLOCKLIST CHECK (Batch for efficiency)
|
||
blocked_by_recipient = self.blocklist.batch_check_blocked_senders(
|
||
recipients,
|
||
from_addr_final,
|
||
worker_name
|
||
)
|
||
|
||
# 7. PROCESS RECIPIENTS
|
||
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
|
||
|
||
successful = []
|
||
failed_permanent = []
|
||
failed_temporary = []
|
||
blocked_recipients = []
|
||
|
||
for recipient in recipients:
|
||
# Check if blocked
|
||
if blocked_by_recipient.get(recipient, False):
|
||
log(
|
||
f"🗑 Silently dropping message for {recipient} (Sender blocked)",
|
||
'INFO',
|
||
worker_name
|
||
)
|
||
blocked_recipients.append(recipient)
|
||
if self.metrics:
|
||
self.metrics.increment_blocked(domain)
|
||
continue
|
||
|
||
# Process rules (OOO, Forwarding) - not for bounces or already forwarded
|
||
if not is_bounce and not skip_rules:
|
||
def metrics_callback(action_type: str, dom: str):
|
||
"""Callback for metrics from rules processor"""
|
||
if self.metrics:
|
||
if action_type == 'autoreply':
|
||
self.metrics.increment_autoreply(dom)
|
||
elif action_type == 'forward':
|
||
self.metrics.increment_forward(dom)
|
||
|
||
self.rules_processor.process_rules_for_recipient(
|
||
recipient,
|
||
parsed,
|
||
domain,
|
||
worker_name,
|
||
metrics_callback
|
||
)
|
||
|
||
# SMTP Delivery
|
||
success, error, is_perm = self.delivery.send_to_recipient(
|
||
from_addr_final,
|
||
recipient,
|
||
raw_bytes,
|
||
worker_name
|
||
)
|
||
|
||
if success:
|
||
successful.append(recipient)
|
||
if self.metrics:
|
||
self.metrics.increment_processed(domain, 'success')
|
||
elif is_perm:
|
||
failed_permanent.append(recipient)
|
||
if self.metrics:
|
||
self.metrics.increment_processed(domain, 'permanent_failure')
|
||
else:
|
||
failed_temporary.append(recipient)
|
||
if self.metrics:
|
||
self.metrics.increment_processed(domain, 'temporary_failure')
|
||
|
||
# 8. RESULT & CLEANUP
|
||
total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients)
|
||
|
||
if total_handled == len(recipients):
|
||
# All recipients handled (success, permanent fail, or blocked)
|
||
|
||
if len(blocked_recipients) == len(recipients):
|
||
# All recipients blocked - mark and delete S3 object
|
||
try:
|
||
self.s3.mark_as_blocked(
|
||
domain,
|
||
message_id,
|
||
blocked_recipients,
|
||
from_addr_final,
|
||
worker_name
|
||
)
|
||
self.s3.delete_blocked_email(domain, message_id, worker_name)
|
||
except Exception as e:
|
||
log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name)
|
||
# Don't delete from queue if S3 operations failed
|
||
return False
|
||
|
||
elif len(successful) > 0:
|
||
# At least one success
|
||
self.s3.mark_as_processed(
|
||
domain,
|
||
message_id,
|
||
worker_name,
|
||
failed_permanent if failed_permanent else None
|
||
)
|
||
|
||
elif len(failed_permanent) > 0:
|
||
# All failed permanently
|
||
self.s3.mark_as_all_invalid(
|
||
domain,
|
||
message_id,
|
||
failed_permanent,
|
||
worker_name
|
||
)
|
||
|
||
# Build result summary
|
||
result_parts = []
|
||
if successful:
|
||
result_parts.append(f"{len(successful)} OK")
|
||
if failed_permanent:
|
||
result_parts.append(f"{len(failed_permanent)} invalid")
|
||
if blocked_recipients:
|
||
result_parts.append(f"{len(blocked_recipients)} blocked")
|
||
|
||
log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name)
|
||
return True
|
||
|
||
else:
|
||
# Some recipients had temporary failures
|
||
log(
|
||
f"🔄 Temp failure ({len(failed_temporary)} failed), will retry",
|
||
'WARNING',
|
||
worker_name
|
||
)
|
||
return False
|
||
|
||
except Exception as e:
|
||
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
|
||
traceback.print_exc()
|
||
return False
|