email-amazon/unified-worker/unified_worker.py

1081 lines
43 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
unified_worker.py - Multi-Domain Email Worker (Full Featured + Blocklist + Fixes)
Features:
- Multi-Domain parallel processing
- Bounce Detection & Rewriting (SES MAILER-DAEMON)
- Auto-Reply / Out-of-Office
- Email Forwarding
- Blocked Sender Logic (Wildcard Support)
- SMTP/LMTP Delivery
- Prometheus Metrics
- Graceful Shutdown
- Full Logging & Metadata
"""
import os, sys, json, time, signal, threading, smtplib, traceback, fnmatch
from queue import Queue, Empty
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import parseaddr, formatdate, make_msgid
import boto3
from botocore.exceptions import ClientError
try:
from prometheus_client import start_http_server, Counter, Gauge, Histogram
PROMETHEUS_ENABLED = True
except ImportError:
PROMETHEUS_ENABLED = False
# ============================================
# CONFIGURATION
# ============================================
@dataclass
class Config:
aws_region: str = os.environ.get('AWS_REGION', 'us-east-2')
domains_list: str = os.environ.get('DOMAINS', '')
domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt')
worker_threads: int = int(os.environ.get('WORKER_THREADS', '10'))
poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20'))
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
smtp_user: str = os.environ.get('SMTP_USER', '')
smtp_pass: str = os.environ.get('SMTP_PASS', '')
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true'
lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost')
lmtp_port: int = int(os.environ.get('LMTP_PORT', '24'))
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
blocked_table: str = os.environ.get('DYNAMODB_BLOCKED_TABLE', 'email-blocked-senders')
bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3'))
bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0'))
metrics_port: int = int(os.environ.get('METRICS_PORT', '8000'))
health_port: int = int(os.environ.get('HEALTH_PORT', '8080'))
config = Config()
# ============================================
# LOGGING
# ============================================
def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'):
timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
thread_name = threading.current_thread().name
print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True)
# ============================================
# METRICS
# ============================================
if PROMETHEUS_ENABLED:
emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status'])
emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed')
processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain'])
queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain'])
bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type'])
autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain'])
forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain'])
blocked_senders = Counter('blocked_senders_total', 'Emails blocked by blacklist', ['domain'])
# ============================================
# AWS CLIENTS
# ============================================
sqs = boto3.client('sqs', region_name=config.aws_region)
s3 = boto3.client('s3', region_name=config.aws_region)
ses = boto3.client('ses', region_name=config.aws_region)
dynamodb = boto3.resource('dynamodb', region_name=config.aws_region)
DYNAMODB_AVAILABLE = False
rules_table = None
messages_table = None
blocked_table = None
try:
rules_table = dynamodb.Table(config.rules_table)
messages_table = dynamodb.Table(config.messages_table)
blocked_table = dynamodb.Table(config.blocked_table)
rules_table.table_status
messages_table.table_status
blocked_table.table_status
DYNAMODB_AVAILABLE = True
except Exception as e:
log(f"⚠ DynamoDB not fully available: {e}", 'WARNING')
# ============================================
# SMTP POOL
# ============================================
class SMTPPool:
def __init__(self, host: str, port: int, pool_size: int = 5):
self.host = host
self.port = port
self.pool_size = pool_size
self._pool: Queue = Queue(maxsize=pool_size)
self._initialized = False
def _create_connection(self) -> Optional[smtplib.SMTP]:
try:
conn = smtplib.SMTP(self.host, self.port, timeout=30)
conn.ehlo()
if config.smtp_use_tls:
conn.starttls()
conn.ehlo()
if config.smtp_user and config.smtp_pass:
conn.login(config.smtp_user, config.smtp_pass)
return conn
except Exception as e:
log(f"Failed to create SMTP connection: {e}", 'ERROR')
return None
def _test_connection(self, conn: smtplib.SMTP) -> bool:
try:
return conn.noop()[0] == 250
except:
return False
def initialize(self):
if self._initialized:
return
for _ in range(min(2, self.pool_size)):
conn = self._create_connection()
if conn:
self._pool.put(conn)
self._initialized = True
def get_connection(self) -> Optional[smtplib.SMTP]:
try:
conn = self._pool.get(block=False)
if self._test_connection(conn):
return conn
else:
try:
conn.quit()
except:
pass
return self._create_connection()
except Empty:
return self._create_connection()
def return_connection(self, conn: smtplib.SMTP):
if conn and self._test_connection(conn):
try:
self._pool.put_nowait(conn)
except:
try:
conn.quit()
except:
pass
else:
try:
conn.quit()
except:
pass
def close_all(self):
while not self._pool.empty():
try:
self._pool.get_nowait().quit()
except:
pass
smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
# ============================================
# HELPER
# ============================================
def domain_to_queue_name(domain: str) -> str:
return domain.replace('.', '-') + '-queue'
def domain_to_bucket_name(domain: str) -> str:
return domain.replace('.', '-') + '-emails'
def is_internal_address(email_address: str) -> bool:
if '@' not in email_address:
return False
domain = email_address.split('@')[1].lower()
return domain in MANAGED_DOMAINS
def send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool:
try:
with smtplib.SMTP(config.smtp_host, 2525, timeout=30) as conn:
conn.ehlo()
conn.sendmail(from_addr, [to_addr], raw_message)
log(f" ✓ Internal delivery to {to_addr} (Port 2525)", 'SUCCESS', worker_name)
return True
except Exception as e:
log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name)
return False
def get_queue_url(domain: str) -> Optional[str]:
queue_name = domain_to_queue_name(domain)
try:
return sqs.get_queue_url(QueueName=queue_name)['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
log(f"Queue not found for domain: {domain}", 'WARNING')
else:
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
return None
def load_domains() -> List[str]:
global MANAGED_DOMAINS
domains = []
if config.domains_list:
domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()])
if os.path.exists(config.domains_file):
with open(config.domains_file, 'r') as f:
for line in f:
domain = line.strip()
if domain and not domain.startswith('#'):
domains.append(domain)
domains = list(set(domains))
MANAGED_DOMAINS = set(d.lower() for d in domains)
log(f"Loaded {len(domains)} domains: {', '.join(domains)}")
return domains
MANAGED_DOMAINS: set = set()
# ============================================
# BLOCKLIST
# ============================================
def is_sender_blocked(recipient: str, sender: str, worker_name: str) -> bool:
if not DYNAMODB_AVAILABLE or not blocked_table:
return False
try:
response = blocked_table.get_item(Key={'email_address': recipient})
item = response.get('Item', {})
if not item:
return False
patterns = item.get('blocked_patterns', [])
sender_clean = parseaddr(sender)[1].lower()
for pattern in patterns:
if fnmatch.fnmatch(sender_clean, pattern.lower()):
log(f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' for inbox {recipient}", 'WARNING', worker_name)
return True
return False
except Exception as e:
log(f"⚠ Error checking block list for {recipient}: {e}", 'ERROR', worker_name)
return False
# ============================================
# BOUNCE HANDLING
# ============================================
def is_ses_bounce_notification(parsed_email) -> bool:
from_header = (parsed_email.get('From') or '').lower()
return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header
def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
if not DYNAMODB_AVAILABLE or not messages_table:
return None
for attempt in range(config.bounce_lookup_retries):
try:
response = messages_table.get_item(Key={'MessageId': message_id})
item = response.get('Item')
if item:
return {
'original_source': item.get('original_source', ''),
'bounceType': item.get('bounceType', 'Unknown'),
'bounceSubType': item.get('bounceSubType', 'Unknown'),
'bouncedRecipients': item.get('bouncedRecipients', []),
'timestamp': item.get('timestamp', '')
}
if attempt < config.bounce_lookup_retries - 1:
log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name)
time.sleep(config.bounce_lookup_delay)
except Exception as e:
log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name)
if attempt < config.bounce_lookup_retries - 1:
time.sleep(config.bounce_lookup_delay)
return None
def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]:
if not is_ses_bounce_notification(parsed):
return parsed, False
log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name)
message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0]
if not message_id:
return parsed, False
bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name)
if not bounce_info:
return parsed, False
original_source = bounce_info['original_source']
bounced_recipients = bounce_info['bouncedRecipients']
bounce_type = bounce_info['bounceType']
bounce_subtype = bounce_info['bounceSubType']
log(f"✓ Found bounce info:", 'INFO', worker_name)
log(f" Original sender: {original_source}", 'INFO', worker_name)
log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name)
log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name)
if bounced_recipients:
new_from = bounced_recipients[0]
parsed['X-Original-SES-From'] = parsed.get('From', '')
parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
parsed.replace_header('From', new_from)
if not parsed.get('Reply-To'):
parsed['Reply-To'] = new_from
if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {new_from}")
log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name)
return parsed, True
return parsed, False
# ============================================
# EMAIL BODY EXTRACTION
# ============================================
def extract_body_parts(parsed) -> Tuple[str, Optional[str]]:
text_body = ''
html_body = None
if parsed.is_multipart():
for part in parsed.walk():
content_type = part.get_content_type()
if content_type == 'text/plain':
try:
text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
pass
elif content_type == 'text/html':
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
pass
else:
try:
payload = parsed.get_payload(decode=True)
if payload:
decoded = payload.decode('utf-8', errors='ignore')
if parsed.get_content_type() == 'text/html':
html_body = decoded
else:
text_body = decoded
except:
text_body = str(parsed.get_payload())
return text_body.strip() if text_body else '(No body content)', html_body
# ============================================
# AUTO-REPLY & FORWARDING
# ============================================
def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'):
text_body, html_body = extract_body_parts(original_parsed)
original_subject = original_parsed.get('Subject', '(no subject)')
original_from = original_parsed.get('From', 'unknown')
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = original_from
msg['Subject'] = f"Out of Office: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['In-Reply-To'] = original_parsed.get('Message-ID', '')
msg['References'] = original_parsed.get('Message-ID', '')
msg['Auto-Submitted'] = 'auto-replied'
msg['X-SES-Worker-Processed'] = 'ooo-reply'
body_part = MIMEMultipart('alternative')
text_content = f"{ooo_msg}\n\n--- Original Message ---\nFrom: {original_from}\nSubject: {original_subject}\n\n{text_body}"
body_part.attach(MIMEText(text_content, 'plain', 'utf-8'))
if content_type == 'html' or html_body:
html_content = f"<div>{ooo_msg}</div><br><hr><br><strong>Original Message</strong><br>From: {original_from}<br>Subject: {original_subject}<br><br>"
html_content += (html_body if html_body else text_body.replace('\n', '<br>'))
body_part.attach(MIMEText(html_content, 'html', 'utf-8'))
msg.attach(body_part)
return msg
def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str):
original_subject = original_parsed.get('Subject', '(no subject)')
original_date = original_parsed.get('Date', 'unknown')
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = forward_to
msg['Subject'] = f"FWD: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['Reply-To'] = original_from
msg['X-SES-Worker-Processed'] = 'forwarded'
text_body, html_body = extract_body_parts(original_parsed)
body_part = MIMEMultipart('alternative')
fwd_text = f"---------- Forwarded message ---------\nFrom: {original_from}\nDate: {original_date}\nSubject: {original_subject}\nTo: {recipient}\n\n{text_body}"
body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8'))
if html_body:
fwd_html = f"<div style='border-left:3px solid #ccc;padding-left:10px;'><strong>---------- Forwarded message ---------</strong><br>From: {original_from}<br>Subject: {original_subject}<br><br>{html_body}</div>"
body_part.attach(MIMEText(fwd_html, 'html', 'utf-8'))
msg.attach(body_part)
if original_parsed.is_multipart():
for part in original_parsed.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_type() in ['text/plain', 'text/html']:
continue
msg.attach(part)
return msg
def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None:
if not DYNAMODB_AVAILABLE or not rules_table:
return
try:
response = rules_table.get_item(Key={'email_address': recipient})
rule = response.get('Item', {})
if not rule:
return
original_from = parsed.get('From', '')
sender_name, sender_addr = parseaddr(original_from)
if not sender_addr:
sender_addr = original_from
# OOO
if rule.get('ooo_active', False):
auto_submitted = parsed.get('Auto-Submitted', '')
precedence = (parsed.get('Precedence') or '').lower()
if auto_submitted and auto_submitted != 'no':
pass
elif precedence in ['bulk', 'junk', 'list']:
pass
elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon']):
pass
else:
try:
ooo_msg = rule.get('ooo_message', 'I am out of office.')
content_type = rule.get('ooo_content_type', 'text')
ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type)
ooo_bytes = ooo_reply.as_bytes()
if is_internal_address(sender_addr):
success = send_internal_email(recipient, sender_addr, ooo_bytes, worker_name)
if success:
log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name)
else:
ses.send_raw_email(Source=recipient, Destinations=[sender_addr], RawMessage={'Data': ooo_bytes})
log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name)
if PROMETHEUS_ENABLED:
autoreplies_sent.labels(domain=domain).inc()
except Exception as e:
log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name)
# Forwarding
for forward_to in rule.get('forwards', []):
try:
fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from)
fwd_bytes = fwd_msg.as_bytes()
if is_internal_address(forward_to):
success = send_internal_email(recipient, forward_to, fwd_bytes, worker_name)
if success:
log(f"✓ Forwarded internally to {forward_to}", 'SUCCESS', worker_name)
else:
ses.send_raw_email(Source=recipient, Destinations=[forward_to], RawMessage={'Data': fwd_bytes})
log(f"✓ Forwarded externally to {forward_to} via SES", 'SUCCESS', worker_name)
if PROMETHEUS_ENABLED:
forwards_sent.labels(domain=domain).inc()
except Exception as e:
log(f"⚠ Forward failed to {forward_to}: {e}", 'ERROR', worker_name)
except Exception as e:
log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name)
# ============================================
# SMTP SENDING
# ============================================
def is_permanent_recipient_error(error_msg: str) -> bool:
permanent_indicators = [
'550', '551', '553', 'mailbox not found', 'user unknown', 'no such user',
'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user'
]
return any(indicator in error_msg.lower() for indicator in permanent_indicators)
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]:
use_lmtp = config.lmtp_enabled
last_error = None
for attempt in range(max_retries + 1):
conn = None
try:
if use_lmtp:
conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30)
conn.ehlo()
else:
conn = smtp_pool.get_connection()
if not conn:
last_error = "No SMTP connection available"
time.sleep(0.5)
continue
result = conn.sendmail(from_addr, [recipient], raw_message)
if use_lmtp:
conn.quit()
else:
smtp_pool.return_connection(conn)
if isinstance(result, dict) and result:
error = str(result.get(recipient, 'Unknown refusal'))
is_perm = is_permanent_recipient_error(error)
log(f"{recipient}: {error} ({'permanent' if is_perm else 'temporary'})", 'ERROR', worker_name)
return False, error, is_perm
else:
delivery_method = "LMTP" if use_lmtp else "SMTP"
log(f"{recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name)
return True, None, False
except smtplib.SMTPServerDisconnected as e:
last_error = str(e)
log(f"{recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
except smtplib.SMTPRecipientsRefused as e:
if conn and not use_lmtp:
smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
error_msg = str(e)
is_perm = is_permanent_recipient_error(error_msg)
log(f"{recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_perm
except smtplib.SMTPException as e:
error_msg = str(e)
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
log(f"{recipient}: Connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
last_error = error_msg
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
if conn and not use_lmtp:
smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
is_perm = is_permanent_recipient_error(error_msg)
log(f"{recipient}: Error - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_perm
except Exception as e:
if conn:
try:
conn.quit()
except:
pass
log(f"{recipient}: Unexpected error - {e}", 'ERROR', worker_name)
return False, str(e), False
log(f"{recipient}: All retries failed - {last_error}", 'ERROR', worker_name)
return False, last_error or "Connection failed after retries", False
# ============================================
# S3 METADATA
# ============================================
def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None):
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str):
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
# ============================================
# MAIN MESSAGE PROCESSING
# ============================================
def process_message(domain: str, message: dict, receive_count: int) -> bool:
worker_name = f"worker-{domain}"
try:
message_body = json.loads(message['Body'])
ses_msg = json.loads(message_body['Message']) if 'Message' in message_body else message_body
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId')
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log(" Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
return True
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
return True
if recipients:
first_recipient = recipients[0]
recipient_domain = first_recipient.split('@')[1]
if recipient_domain.lower() != domain.lower():
log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name)
return True
else:
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
return True
bucket = domain_to_bucket_name(domain)
key = message_id
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
# Download S3
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
temp_parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
x_worker_processed = temp_parsed.get('X-SES-Worker-Processed', '')
auto_submitted = temp_parsed.get('Auto-Submitted', '')
is_processed_by_us = bool(x_worker_processed)
is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed
if is_processed_by_us:
log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
skip_rules = True
else:
skip_rules = False
except s3.exceptions.NoSuchKey:
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
return True
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
return False
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
return False
# Parse & Bounce Logic
try:
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
is_bounce = is_ses_bounce_notification(parsed)
parsed, modified = apply_bounce_logic(parsed, subject, worker_name)
if modified:
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From')
if PROMETHEUS_ENABLED:
bounces_processed.labels(domain=domain, type='rewritten').inc()
else:
from_addr_final = from_addr
except Exception as e:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name)
from_addr_final = from_addr
is_bounce = False
skip_rules = False
# Process recipients
successful = []
failed_permanent = []
failed_temporary = []
blocked_recipients = []
for recipient in recipients:
if is_sender_blocked(recipient, from_addr_final, worker_name):
log(f"🗑 Silently dropping message for {recipient} (Sender blocked)", 'INFO', worker_name)
blocked_recipients.append(recipient)
if PROMETHEUS_ENABLED:
blocked_senders.labels(domain=domain).inc()
continue
if not is_bounce and not skip_rules:
process_rules_for_recipient(recipient, parsed, domain, worker_name)
success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name)
if success:
successful.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='success').inc()
elif is_perm:
failed_permanent.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='permanent_failure').inc()
else:
failed_temporary.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='temporary_failure').inc()
# Final result
total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients)
if total_handled == len(recipients):
if len(blocked_recipients) == len(recipients):
log(f"⛔ All recipients blocked sender. Deleting S3 object {key}...", 'WARNING', worker_name)
try:
s3.delete_object(Bucket=bucket, Key=key)
log(f"🗑 Deleted S3 object {key}", 'SUCCESS', worker_name)
except Exception as e:
log(f"⚠ Failed to delete S3 object: {e}", 'ERROR', worker_name)
elif len(successful) > 0:
mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None)
elif len(failed_permanent) > 0:
mark_as_all_invalid(bucket, key, failed_permanent, worker_name)
result_parts = []
if successful:
result_parts.append(f"{len(successful)} OK")
if failed_permanent:
result_parts.append(f"{len(failed_permanent)} invalid")
if blocked_recipients:
result_parts.append(f"{len(blocked_recipients)} blocked")
log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name)
return True
else:
log(f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name)
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
traceback.print_exc()
return False
# ============================================
# DOMAIN POLLER
# ============================================
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event, domain_stats: Dict[str, int], stats_lock: threading.Lock):
worker_name = f"worker-{domain}"
log(f"🚀 Starting poller for {domain}", 'INFO', worker_name)
messages_processed = 0
while not stop_event.is_set():
try:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=config.max_messages,
WaitTimeSeconds=config.poll_interval,
VisibilityTimeout=config.visibility_timeout,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
)
messages = response.get('Messages', [])
if PROMETHEUS_ENABLED:
try:
attrs = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages'])
queue_size.labels(domain=domain).set(int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0)))
except:
pass
if not messages:
continue
log(f"✉ Received {len(messages)} message(s)", 'INFO', worker_name)
for message in messages:
if stop_event.is_set():
break
receipt_handle = message['ReceiptHandle']
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
if PROMETHEUS_ENABLED:
emails_in_flight.inc()
start_time = time.time()
try:
success = process_message(domain, message, receive_count)
if success:
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
messages_processed += 1
with stats_lock:
domain_stats[domain] = messages_processed
else:
log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name)
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR', worker_name)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR', worker_name)
traceback.print_exc()
finally:
if PROMETHEUS_ENABLED:
emails_in_flight.dec()
processing_time.labels(domain=domain).observe(time.time() - start_time)
except Exception as e:
log(f"✗ Error polling: {e}", 'ERROR', worker_name)
time.sleep(5)
log(f"👋 Stopped (processed: {messages_processed})", 'INFO', worker_name)
# ============================================
# UNIFIED WORKER
# ============================================
class UnifiedWorker:
def __init__(self):
self.stop_event = threading.Event()
self.domains: List[str] = []
self.queue_urls: Dict[str, str] = {}
self.poller_threads: List[threading.Thread] = []
self.domain_stats: Dict[str, int] = {}
self.stats_lock = threading.Lock()
def setup(self):
self.domains = load_domains()
if not self.domains:
log("❌ No domains configured!", 'ERROR')
sys.exit(1)
for domain in self.domains:
url = get_queue_url(domain)
if url:
self.queue_urls[domain] = url
log(f"{domain} -> {domain_to_queue_name(domain)}")
else:
log(f"{domain} -> Queue not found!", 'WARNING')
if not self.queue_urls:
log("❌ No valid queues found!", 'ERROR')
sys.exit(1)
smtp_pool.initialize()
log(f"Initialized with {len(self.queue_urls)} domains")
def start(self):
for domain in self.queue_urls.keys():
self.domain_stats[domain] = 0
for domain, queue_url in self.queue_urls.items():
thread = threading.Thread(
target=poll_domain,
args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock),
name=f"poller-{domain}",
daemon=True
)
thread.start()
self.poller_threads.append(thread)
log(f"Started {len(self.poller_threads)} domain pollers")
last_status_log = time.time()
status_interval = 300 # 5 minutes
try:
while not self.stop_event.is_set():
self.stop_event.wait(timeout=10)
if time.time() - last_status_log > status_interval:
self._log_status_table()
last_status_log = time.time()
except KeyboardInterrupt:
pass
def _log_status_table(self):
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
with self.stats_lock:
total_processed = sum(self.domain_stats.values())
summary = " | ".join([f"{d.split('.')[0]}:{c}" for d, c in self.domain_stats.items() if c > 0])
log(f"📊 Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {summary}")
def stop(self):
log("⚠ Stopping worker...")
self.stop_event.set()
for thread in self.poller_threads:
thread.join(timeout=10)
if thread.is_alive():
log(f"Warning: {thread.name} did not stop gracefully", 'WARNING')
smtp_pool.close_all()
log("👋 Worker stopped")
# ============================================
# HEALTH SERVER
# ============================================
def start_health_server(worker: UnifiedWorker):
from http.server import HTTPServer, BaseHTTPRequestHandler
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/health' or self.path == '/':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
status = {
'status': 'healthy',
'domains': len(worker.queue_urls),
'domain_list': list(worker.queue_urls.keys()),
'dynamodb': DYNAMODB_AVAILABLE,
'features': {
'bounce_rewriting': True,
'auto_reply': DYNAMODB_AVAILABLE,
'forwarding': DYNAMODB_AVAILABLE,
'blocklist': DYNAMODB_AVAILABLE,
'lmtp': config.lmtp_enabled
},
'timestamp': datetime.utcnow().isoformat()
}
self.wfile.write(json.dumps(status, indent=2).encode())
elif self.path == '/domains':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # Suppress HTTP logs
server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server')
thread.start()
log(f"Health server on port {config.health_port}")
# ============================================
# ENTRY POINT
# ============================================
def main():
worker = UnifiedWorker()
def signal_handler(signum, frame):
log(f"Received signal {signum}")
worker.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signalIGINT, signal_handler)
worker.setup()
if PROMETHEUS_ENABLED:
start_http_server(config.metrics_port)
log(f"Prometheus metrics on port {config.metrics_port}")
start_health_server(worker)
log(f"\n{'='*70}")
log(f"🚀 UNIFIED EMAIL WORKER (CLEANED)")
log(f"{'='*70}")
log(f" Domains: {len(worker.queue_urls)}")
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
if config.lmtp_enabled:
log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)")
else:
log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}")
log(f" Poll Interval: {config.poll_interval}s")
log(f" Visibility: {config.visibility_timeout}s")
log(f"")
log(f" Features:")
log(f" ✓ Bounce Detection & Header Rewriting")
log(f" {'' if DYNAMODB_AVAILABLE else ''} Auto-Reply / Out-of-Office")
log(f" {'' if DYNAMODB_AVAILABLE else ''} Email Forwarding")
log(f" {'' if DYNAMODB_AVAILABLE else ''} Blocked Senders (Wildcard)")
log(f" {'' if PROMETHEUS_ENABLED else ''} Prometheus Metrics")
log(f" {'' if config.lmtp_enabled else ''} LMTP Direct Delivery")
log(f"")
log(f" Active Domains:")
for domain in sorted(worker.queue_urls.keys()):
log(f"{domain}")
log(f"{'='*70}\n")
worker.start()
if __name__ == '__main__':
main()