email-amazon/email-worker/worker.py

285 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Email message processing worker
"""
import json
import time
import traceback
from typing import List, Tuple
from logger import log
from config import config, domain_to_bucket_name
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker
from smtp.delivery import EmailDelivery
from metrics.prometheus import MetricsCollector
class MessageProcessor:
"""Processes individual email messages"""
def __init__(
self,
s3: S3Handler,
sqs: SQSHandler,
ses: SESHandler,
dynamodb: DynamoDBHandler,
delivery: EmailDelivery,
metrics: MetricsCollector
):
self.s3 = s3
self.sqs = sqs
self.ses = ses
self.dynamodb = dynamodb
self.delivery = delivery
self.metrics = metrics
# Initialize sub-processors
self.parser = EmailParser()
self.bounce_handler = BounceHandler(dynamodb)
self.rules_processor = RulesProcessor(dynamodb, ses)
self.blocklist = BlocklistChecker(dynamodb)
def process_message(self, domain: str, message: dict, receive_count: int) -> bool:
"""
Process one email message from queue
Args:
domain: Email domain
message: SQS message dict
receive_count: Number of times received
Returns:
True to delete from queue, False to retry
"""
worker_name = f"worker-{domain}"
try:
# 1. UNPACKING (SNS -> SES)
message_body = json.loads(message['Body'])
if 'Message' in message_body and 'Type' in message_body:
# It's an SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
ses_msg = message_body
# 2. EXTRACT DATA
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId')
# FIX: Ignore Amazon SES Setup Notification
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log(" Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
return True
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
return True
# Domain Validation
if recipients:
first_recipient = recipients[0]
recipient_domain = first_recipient.split('@')[1]
if recipient_domain.lower() != domain.lower():
log(
f"⚠ Security: Ignored message for {recipient_domain} "
f"(I am worker for {domain})",
'WARNING',
worker_name
)
return True
else:
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
return True
bucket = domain_to_bucket_name(domain)
key = message_id
# Compact single-line log for email processing
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
# 3. DOWNLOAD FROM S3
raw_bytes = self.s3.get_email(domain, message_id, receive_count)
if raw_bytes is None:
# S3 object not found yet, retry
return False
# 4. LOOP DETECTION
temp_parsed = self.parser.parse_bytes(raw_bytes)
skip_rules = self.parser.is_processed_by_worker(temp_parsed)
if skip_rules:
log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
# 5. PARSING & BOUNCE LOGIC
try:
parsed = self.parser.parse_bytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
# Bounce header rewriting
is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed)
parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From')
if self.metrics:
self.metrics.increment_bounce(domain, 'rewritten')
else:
from_addr_final = from_addr
except Exception as e:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name)
traceback.print_exc()
from_addr_final = from_addr
is_bounce = False
skip_rules = False
# 6. BLOCKLIST CHECK (Batch for efficiency)
blocked_by_recipient = self.blocklist.batch_check_blocked_senders(
recipients,
from_addr_final,
worker_name
)
# 7. PROCESS RECIPIENTS
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
successful = []
failed_permanent = []
failed_temporary = []
blocked_recipients = []
for recipient in recipients:
# Check if blocked
if blocked_by_recipient.get(recipient, False):
log(
f"🗑 Silently dropping message for {recipient} (Sender blocked)",
'INFO',
worker_name
)
blocked_recipients.append(recipient)
if self.metrics:
self.metrics.increment_blocked(domain)
continue
# Process rules (OOO, Forwarding) - not for bounces or already forwarded
if not is_bounce and not skip_rules:
def metrics_callback(action_type: str, dom: str):
"""Callback for metrics from rules processor"""
if self.metrics:
if action_type == 'autoreply':
self.metrics.increment_autoreply(dom)
elif action_type == 'forward':
self.metrics.increment_forward(dom)
self.rules_processor.process_rules_for_recipient(
recipient,
parsed,
domain,
worker_name,
metrics_callback
)
# SMTP Delivery
success, error, is_perm = self.delivery.send_to_recipient(
from_addr_final,
recipient,
raw_bytes,
worker_name
)
if success:
successful.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'success')
elif is_perm:
failed_permanent.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'permanent_failure')
else:
failed_temporary.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'temporary_failure')
# 8. RESULT & CLEANUP
total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients)
if total_handled == len(recipients):
# All recipients handled (success, permanent fail, or blocked)
if len(blocked_recipients) == len(recipients):
# All recipients blocked - mark and delete S3 object
try:
self.s3.mark_as_blocked(
domain,
message_id,
blocked_recipients,
from_addr_final,
worker_name
)
self.s3.delete_blocked_email(domain, message_id, worker_name)
except Exception as e:
log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name)
# Don't delete from queue if S3 operations failed
return False
elif len(successful) > 0:
# At least one success
self.s3.mark_as_processed(
domain,
message_id,
worker_name,
failed_permanent if failed_permanent else None
)
elif len(failed_permanent) > 0:
# All failed permanently
self.s3.mark_as_all_invalid(
domain,
message_id,
failed_permanent,
worker_name
)
# Build result summary
result_parts = []
if successful:
result_parts.append(f"{len(successful)} OK")
if failed_permanent:
result_parts.append(f"{len(failed_permanent)} invalid")
if blocked_recipients:
result_parts.append(f"{len(blocked_recipients)} blocked")
log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name)
return True
else:
# Some recipients had temporary failures
log(
f"🔄 Temp failure ({len(failed_temporary)} failed), will retry",
'WARNING',
worker_name
)
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
traceback.print_exc()
return False