#!/usr/bin/env python3 """ unified_worker.py - Multi-Domain Email Worker (Full Featured + Blocklist + Fixes) Features: - Multi-Domain parallel processing - Bounce Detection & Rewriting (SES MAILER-DAEMON) - Auto-Reply / Out-of-Office - Email Forwarding - Blocked Sender Logic (Wildcard Support) - SMTP/LMTP Delivery - Prometheus Metrics - Graceful Shutdown - Full Logging & Metadata """ import os, sys, json, time, signal, threading, smtplib, traceback, fnmatch from queue import Queue, Empty from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any from datetime import datetime from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import parseaddr, formatdate, make_msgid import boto3 from botocore.exceptions import ClientError try: from prometheus_client import start_http_server, Counter, Gauge, Histogram PROMETHEUS_ENABLED = True except ImportError: PROMETHEUS_ENABLED = False # ============================================ # CONFIGURATION # ============================================ @dataclass class Config: aws_region: str = os.environ.get('AWS_REGION', 'us-east-2') domains_list: str = os.environ.get('DOMAINS', '') domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt') worker_threads: int = int(os.environ.get('WORKER_THREADS', '10')) poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20')) max_messages: int = int(os.environ.get('MAX_MESSAGES', '10')) visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) smtp_host: str = os.environ.get('SMTP_HOST', 'localhost') smtp_port: int = int(os.environ.get('SMTP_PORT', '25')) smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' smtp_user: str = os.environ.get('SMTP_USER', '') smtp_pass: str = os.environ.get('SMTP_PASS', '') smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5')) lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true' lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost') lmtp_port: int = int(os.environ.get('LMTP_PORT', '24')) rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules') messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages') blocked_table: str = os.environ.get('DYNAMODB_BLOCKED_TABLE', 'email-blocked-senders') bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3')) bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0')) metrics_port: int = int(os.environ.get('METRICS_PORT', '8000')) health_port: int = int(os.environ.get('HEALTH_PORT', '8080')) config = Config() # ============================================ # LOGGING # ============================================ def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'): timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') thread_name = threading.current_thread().name print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True) # ============================================ # METRICS # ============================================ if PROMETHEUS_ENABLED: emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status']) emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed') processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain']) queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain']) bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type']) autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain']) forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain']) blocked_senders = Counter('blocked_senders_total', 'Emails blocked by blacklist', ['domain']) # ============================================ # AWS CLIENTS # ============================================ sqs = boto3.client('sqs', region_name=config.aws_region) s3 = boto3.client('s3', region_name=config.aws_region) ses = boto3.client('ses', region_name=config.aws_region) dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) DYNAMODB_AVAILABLE = False rules_table = None messages_table = None blocked_table = None try: rules_table = dynamodb.Table(config.rules_table) messages_table = dynamodb.Table(config.messages_table) blocked_table = dynamodb.Table(config.blocked_table) rules_table.table_status messages_table.table_status blocked_table.table_status DYNAMODB_AVAILABLE = True except Exception as e: log(f"⚠ DynamoDB not fully available: {e}", 'WARNING') # ============================================ # SMTP POOL # ============================================ class SMTPPool: def __init__(self, host: str, port: int, pool_size: int = 5): self.host = host self.port = port self.pool_size = pool_size self._pool: Queue = Queue(maxsize=pool_size) self._initialized = False def _create_connection(self) -> Optional[smtplib.SMTP]: try: conn = smtplib.SMTP(self.host, self.port, timeout=30) conn.ehlo() if config.smtp_use_tls: conn.starttls() conn.ehlo() if config.smtp_user and config.smtp_pass: conn.login(config.smtp_user, config.smtp_pass) return conn except Exception as e: log(f"Failed to create SMTP connection: {e}", 'ERROR') return None def _test_connection(self, conn: smtplib.SMTP) -> bool: try: return conn.noop()[0] == 250 except: return False def initialize(self): if self._initialized: return for _ in range(min(2, self.pool_size)): conn = self._create_connection() if conn: self._pool.put(conn) self._initialized = True def get_connection(self) -> Optional[smtplib.SMTP]: try: conn = self._pool.get(block=False) if self._test_connection(conn): return conn else: try: conn.quit() except: pass return self._create_connection() except Empty: return self._create_connection() def return_connection(self, conn: smtplib.SMTP): if conn and self._test_connection(conn): try: self._pool.put_nowait(conn) except: try: conn.quit() except: pass else: try: conn.quit() except: pass def close_all(self): while not self._pool.empty(): try: self._pool.get_nowait().quit() except: pass smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size) # ============================================ # HELPER # ============================================ def domain_to_queue_name(domain: str) -> str: return domain.replace('.', '-') + '-queue' def domain_to_bucket_name(domain: str) -> str: return domain.replace('.', '-') + '-emails' def is_internal_address(email_address: str) -> bool: if '@' not in email_address: return False domain = email_address.split('@')[1].lower() return domain in MANAGED_DOMAINS def send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool: try: with smtplib.SMTP(config.smtp_host, 2525, timeout=30) as conn: conn.ehlo() conn.sendmail(from_addr, [to_addr], raw_message) log(f" ✓ Internal delivery to {to_addr} (Port 2525)", 'SUCCESS', worker_name) return True except Exception as e: log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name) return False def get_queue_url(domain: str) -> Optional[str]: queue_name = domain_to_queue_name(domain) try: return sqs.get_queue_url(QueueName=queue_name)['QueueUrl'] except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': log(f"Queue not found for domain: {domain}", 'WARNING') else: log(f"Error getting queue URL for {domain}: {e}", 'ERROR') return None def load_domains() -> List[str]: global MANAGED_DOMAINS domains = [] if config.domains_list: domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()]) if os.path.exists(config.domains_file): with open(config.domains_file, 'r') as f: for line in f: domain = line.strip() if domain and not domain.startswith('#'): domains.append(domain) domains = list(set(domains)) MANAGED_DOMAINS = set(d.lower() for d in domains) log(f"Loaded {len(domains)} domains: {', '.join(domains)}") return domains MANAGED_DOMAINS: set = set() # ============================================ # BLOCKLIST # ============================================ def is_sender_blocked(recipient: str, sender: str, worker_name: str) -> bool: if not DYNAMODB_AVAILABLE or not blocked_table: return False try: response = blocked_table.get_item(Key={'email_address': recipient}) item = response.get('Item', {}) if not item: return False patterns = item.get('blocked_patterns', []) sender_clean = parseaddr(sender)[1].lower() for pattern in patterns: if fnmatch.fnmatch(sender_clean, pattern.lower()): log(f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' for inbox {recipient}", 'WARNING', worker_name) return True return False except Exception as e: log(f"⚠ Error checking block list for {recipient}: {e}", 'ERROR', worker_name) return False # ============================================ # BOUNCE HANDLING # ============================================ def is_ses_bounce_notification(parsed_email) -> bool: from_header = (parsed_email.get('From') or '').lower() return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]: if not DYNAMODB_AVAILABLE or not messages_table: return None for attempt in range(config.bounce_lookup_retries): try: response = messages_table.get_item(Key={'MessageId': message_id}) item = response.get('Item') if item: return { 'original_source': item.get('original_source', ''), 'bounceType': item.get('bounceType', 'Unknown'), 'bounceSubType': item.get('bounceSubType', 'Unknown'), 'bouncedRecipients': item.get('bouncedRecipients', []), 'timestamp': item.get('timestamp', '') } if attempt < config.bounce_lookup_retries - 1: log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name) time.sleep(config.bounce_lookup_delay) except Exception as e: log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name) if attempt < config.bounce_lookup_retries - 1: time.sleep(config.bounce_lookup_delay) return None def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]: if not is_ses_bounce_notification(parsed): return parsed, False log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name) message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0] if not message_id: return parsed, False bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name) if not bounce_info: return parsed, False original_source = bounce_info['original_source'] bounced_recipients = bounce_info['bouncedRecipients'] bounce_type = bounce_info['bounceType'] bounce_subtype = bounce_info['bounceSubType'] log(f"✓ Found bounce info:", 'INFO', worker_name) log(f" Original sender: {original_source}", 'INFO', worker_name) log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name) log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name) if bounced_recipients: new_from = bounced_recipients[0] parsed['X-Original-SES-From'] = parsed.get('From', '') parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" parsed.replace_header('From', new_from) if not parsed.get('Reply-To'): parsed['Reply-To'] = new_from if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower(): parsed.replace_header('Subject', f"Delivery Status: {new_from}") log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name) return parsed, True return parsed, False # ============================================ # EMAIL BODY EXTRACTION # ============================================ def extract_body_parts(parsed) -> Tuple[str, Optional[str]]: text_body = '' html_body = None if parsed.is_multipart(): for part in parsed.walk(): content_type = part.get_content_type() if content_type == 'text/plain': try: text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore') except: pass elif content_type == 'text/html': try: html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore') except: pass else: try: payload = parsed.get_payload(decode=True) if payload: decoded = payload.decode('utf-8', errors='ignore') if parsed.get_content_type() == 'text/html': html_body = decoded else: text_body = decoded except: text_body = str(parsed.get_payload()) return text_body.strip() if text_body else '(No body content)', html_body # ============================================ # AUTO-REPLY & FORWARDING # ============================================ def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'): text_body, html_body = extract_body_parts(original_parsed) original_subject = original_parsed.get('Subject', '(no subject)') original_from = original_parsed.get('From', 'unknown') msg = MIMEMultipart('mixed') msg['From'] = recipient msg['To'] = original_from msg['Subject'] = f"Out of Office: {original_subject}" msg['Date'] = formatdate(localtime=True) msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['In-Reply-To'] = original_parsed.get('Message-ID', '') msg['References'] = original_parsed.get('Message-ID', '') msg['Auto-Submitted'] = 'auto-replied' msg['X-SES-Worker-Processed'] = 'ooo-reply' body_part = MIMEMultipart('alternative') text_content = f"{ooo_msg}\n\n--- Original Message ---\nFrom: {original_from}\nSubject: {original_subject}\n\n{text_body}" body_part.attach(MIMEText(text_content, 'plain', 'utf-8')) if content_type == 'html' or html_body: html_content = f"
{ooo_msg}



Original Message
From: {original_from}
Subject: {original_subject}

" html_content += (html_body if html_body else text_body.replace('\n', '
')) body_part.attach(MIMEText(html_content, 'html', 'utf-8')) msg.attach(body_part) return msg def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str): original_subject = original_parsed.get('Subject', '(no subject)') original_date = original_parsed.get('Date', 'unknown') msg = MIMEMultipart('mixed') msg['From'] = recipient msg['To'] = forward_to msg['Subject'] = f"FWD: {original_subject}" msg['Date'] = formatdate(localtime=True) msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['Reply-To'] = original_from msg['X-SES-Worker-Processed'] = 'forwarded' text_body, html_body = extract_body_parts(original_parsed) body_part = MIMEMultipart('alternative') fwd_text = f"---------- Forwarded message ---------\nFrom: {original_from}\nDate: {original_date}\nSubject: {original_subject}\nTo: {recipient}\n\n{text_body}" body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8')) if html_body: fwd_html = f"
---------- Forwarded message ---------
From: {original_from}
Subject: {original_subject}

{html_body}
" body_part.attach(MIMEText(fwd_html, 'html', 'utf-8')) msg.attach(body_part) if original_parsed.is_multipart(): for part in original_parsed.walk(): if part.get_content_maintype() == 'multipart': continue if part.get_content_type() in ['text/plain', 'text/html']: continue msg.attach(part) return msg def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None: if not DYNAMODB_AVAILABLE or not rules_table: return try: response = rules_table.get_item(Key={'email_address': recipient}) rule = response.get('Item', {}) if not rule: return original_from = parsed.get('From', '') sender_name, sender_addr = parseaddr(original_from) if not sender_addr: sender_addr = original_from # OOO if rule.get('ooo_active', False): auto_submitted = parsed.get('Auto-Submitted', '') precedence = (parsed.get('Precedence') or '').lower() if auto_submitted and auto_submitted != 'no': pass elif precedence in ['bulk', 'junk', 'list']: pass elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon']): pass else: try: ooo_msg = rule.get('ooo_message', 'I am out of office.') content_type = rule.get('ooo_content_type', 'text') ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type) ooo_bytes = ooo_reply.as_bytes() if is_internal_address(sender_addr): success = send_internal_email(recipient, sender_addr, ooo_bytes, worker_name) if success: log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name) else: ses.send_raw_email(Source=recipient, Destinations=[sender_addr], RawMessage={'Data': ooo_bytes}) log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name) if PROMETHEUS_ENABLED: autoreplies_sent.labels(domain=domain).inc() except Exception as e: log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name) # Forwarding for forward_to in rule.get('forwards', []): try: fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from) fwd_bytes = fwd_msg.as_bytes() if is_internal_address(forward_to): success = send_internal_email(recipient, forward_to, fwd_bytes, worker_name) if success: log(f"✓ Forwarded internally to {forward_to}", 'SUCCESS', worker_name) else: ses.send_raw_email(Source=recipient, Destinations=[forward_to], RawMessage={'Data': fwd_bytes}) log(f"✓ Forwarded externally to {forward_to} via SES", 'SUCCESS', worker_name) if PROMETHEUS_ENABLED: forwards_sent.labels(domain=domain).inc() except Exception as e: log(f"⚠ Forward failed to {forward_to}: {e}", 'ERROR', worker_name) except Exception as e: log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name) # ============================================ # SMTP SENDING # ============================================ def is_permanent_recipient_error(error_msg: str) -> bool: permanent_indicators = [ '550', '551', '553', 'mailbox not found', 'user unknown', 'no such user', 'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user' ] return any(indicator in error_msg.lower() for indicator in permanent_indicators) def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]: use_lmtp = config.lmtp_enabled last_error = None for attempt in range(max_retries + 1): conn = None try: if use_lmtp: conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30) conn.ehlo() else: conn = smtp_pool.get_connection() if not conn: last_error = "No SMTP connection available" time.sleep(0.5) continue result = conn.sendmail(from_addr, [recipient], raw_message) if use_lmtp: conn.quit() else: smtp_pool.return_connection(conn) if isinstance(result, dict) and result: error = str(result.get(recipient, 'Unknown refusal')) is_perm = is_permanent_recipient_error(error) log(f" ✗ {recipient}: {error} ({'permanent' if is_perm else 'temporary'})", 'ERROR', worker_name) return False, error, is_perm else: delivery_method = "LMTP" if use_lmtp else "SMTP" log(f" ✓ {recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name) return True, None, False except smtplib.SMTPServerDisconnected as e: last_error = str(e) log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) if conn: try: conn.quit() except: pass time.sleep(0.3) continue except smtplib.SMTPRecipientsRefused as e: if conn and not use_lmtp: smtp_pool.return_connection(conn) elif conn: try: conn.quit() except: pass error_msg = str(e) is_perm = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) return False, error_msg, is_perm except smtplib.SMTPException as e: error_msg = str(e) if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower(): log(f" ⚠ {recipient}: Connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name) last_error = error_msg if conn: try: conn.quit() except: pass time.sleep(0.3) continue if conn and not use_lmtp: smtp_pool.return_connection(conn) elif conn: try: conn.quit() except: pass is_perm = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: Error - {error_msg}", 'ERROR', worker_name) return False, error_msg, is_perm except Exception as e: if conn: try: conn.quit() except: pass log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name) return False, str(e), False log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name) return False, last_error or "Connection failed after retries", False # ============================================ # S3 METADATA # ============================================ def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None): try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str): try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'failed' metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' metadata['invalid_inboxes'] = ','.join(invalid_inboxes) metadata.pop('processing_started', None) metadata.pop('queued_at', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) # ============================================ # MAIN MESSAGE PROCESSING # ============================================ def process_message(domain: str, message: dict, receive_count: int) -> bool: worker_name = f"worker-{domain}" try: message_body = json.loads(message['Body']) ses_msg = json.loads(message_body['Message']) if 'Message' in message_body else message_body mail = ses_msg.get('mail', {}) receipt = ses_msg.get('receipt', {}) message_id = mail.get('messageId') if message_id == "AMAZON_SES_SETUP_NOTIFICATION": log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name) return True from_addr = mail.get('source') recipients = receipt.get('recipients', []) if not message_id: log("❌ Error: No messageId in event payload", 'ERROR', worker_name) return True if recipients: first_recipient = recipients[0] recipient_domain = first_recipient.split('@')[1] if recipient_domain.lower() != domain.lower(): log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name) return True else: log("⚠ Warning: No recipients in event", 'WARNING', worker_name) return True bucket = domain_to_bucket_name(domain) key = message_id recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients" log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name) # Download S3 try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() temp_parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) x_worker_processed = temp_parsed.get('X-SES-Worker-Processed', '') auto_submitted = temp_parsed.get('Auto-Submitted', '') is_processed_by_us = bool(x_worker_processed) is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed if is_processed_by_us: log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) skip_rules = True else: skip_rules = False except s3.exceptions.NoSuchKey: if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) return True except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name) return True log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) return False except Exception as e: log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name) return False # Parse & Bounce Logic try: parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') is_bounce = is_ses_bounce_notification(parsed) parsed, modified = apply_bounce_logic(parsed, subject, worker_name) if modified: raw_bytes = parsed.as_bytes() from_addr_final = parsed.get('From') if PROMETHEUS_ENABLED: bounces_processed.labels(domain=domain, type='rewritten').inc() else: from_addr_final = from_addr except Exception as e: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name) from_addr_final = from_addr is_bounce = False skip_rules = False # Process recipients successful = [] failed_permanent = [] failed_temporary = [] blocked_recipients = [] for recipient in recipients: if is_sender_blocked(recipient, from_addr_final, worker_name): log(f"🗑 Silently dropping message for {recipient} (Sender blocked)", 'INFO', worker_name) blocked_recipients.append(recipient) if PROMETHEUS_ENABLED: blocked_senders.labels(domain=domain).inc() continue if not is_bounce and not skip_rules: process_rules_for_recipient(recipient, parsed, domain, worker_name) success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name) if success: successful.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='success').inc() elif is_perm: failed_permanent.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='permanent_failure').inc() else: failed_temporary.append(recipient) if PROMETHEUS_ENABLED: emails_processed.labels(domain=domain, status='temporary_failure').inc() # Final result total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients) if total_handled == len(recipients): if len(blocked_recipients) == len(recipients): log(f"⛔ All recipients blocked sender. Deleting S3 object {key}...", 'WARNING', worker_name) try: s3.delete_object(Bucket=bucket, Key=key) log(f"🗑 Deleted S3 object {key}", 'SUCCESS', worker_name) except Exception as e: log(f"⚠ Failed to delete S3 object: {e}", 'ERROR', worker_name) elif len(successful) > 0: mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None) elif len(failed_permanent) > 0: mark_as_all_invalid(bucket, key, failed_permanent, worker_name) result_parts = [] if successful: result_parts.append(f"{len(successful)} OK") if failed_permanent: result_parts.append(f"{len(failed_permanent)} invalid") if blocked_recipients: result_parts.append(f"{len(blocked_recipients)} blocked") log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name) return True else: log(f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name) return False except Exception as e: log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name) traceback.print_exc() return False # ============================================ # DOMAIN POLLER # ============================================ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event, domain_stats: Dict[str, int], stats_lock: threading.Lock): worker_name = f"worker-{domain}" log(f"🚀 Starting poller for {domain}", 'INFO', worker_name) messages_processed = 0 while not stop_event.is_set(): try: response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=config.max_messages, WaitTimeSeconds=config.poll_interval, VisibilityTimeout=config.visibility_timeout, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'] ) messages = response.get('Messages', []) if PROMETHEUS_ENABLED: try: attrs = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages']) queue_size.labels(domain=domain).set(int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))) except: pass if not messages: continue log(f"✉ Received {len(messages)} message(s)", 'INFO', worker_name) for message in messages: if stop_event.is_set(): break receipt_handle = message['ReceiptHandle'] receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1)) if PROMETHEUS_ENABLED: emails_in_flight.inc() start_time = time.time() try: success = process_message(domain, message, receive_count) if success: sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) messages_processed += 1 with stats_lock: domain_stats[domain] = messages_processed else: log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name) except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR', worker_name) sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR', worker_name) traceback.print_exc() finally: if PROMETHEUS_ENABLED: emails_in_flight.dec() processing_time.labels(domain=domain).observe(time.time() - start_time) except Exception as e: log(f"✗ Error polling: {e}", 'ERROR', worker_name) time.sleep(5) log(f"👋 Stopped (processed: {messages_processed})", 'INFO', worker_name) # ============================================ # UNIFIED WORKER # ============================================ class UnifiedWorker: def __init__(self): self.stop_event = threading.Event() self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} self.poller_threads: List[threading.Thread] = [] self.domain_stats: Dict[str, int] = {} self.stats_lock = threading.Lock() def setup(self): self.domains = load_domains() if not self.domains: log("❌ No domains configured!", 'ERROR') sys.exit(1) for domain in self.domains: url = get_queue_url(domain) if url: self.queue_urls[domain] = url log(f" ✓ {domain} -> {domain_to_queue_name(domain)}") else: log(f" ✗ {domain} -> Queue not found!", 'WARNING') if not self.queue_urls: log("❌ No valid queues found!", 'ERROR') sys.exit(1) smtp_pool.initialize() log(f"Initialized with {len(self.queue_urls)} domains") def start(self): for domain in self.queue_urls.keys(): self.domain_stats[domain] = 0 for domain, queue_url in self.queue_urls.items(): thread = threading.Thread( target=poll_domain, args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock), name=f"poller-{domain}", daemon=True ) thread.start() self.poller_threads.append(thread) log(f"Started {len(self.poller_threads)} domain pollers") last_status_log = time.time() status_interval = 300 # 5 minutes try: while not self.stop_event.is_set(): self.stop_event.wait(timeout=10) if time.time() - last_status_log > status_interval: self._log_status_table() last_status_log = time.time() except KeyboardInterrupt: pass def _log_status_table(self): active_threads = sum(1 for t in self.poller_threads if t.is_alive()) with self.stats_lock: total_processed = sum(self.domain_stats.values()) summary = " | ".join([f"{d.split('.')[0]}:{c}" for d, c in self.domain_stats.items() if c > 0]) log(f"📊 Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {summary}") def stop(self): log("⚠ Stopping worker...") self.stop_event.set() for thread in self.poller_threads: thread.join(timeout=10) if thread.is_alive(): log(f"Warning: {thread.name} did not stop gracefully", 'WARNING') smtp_pool.close_all() log("👋 Worker stopped") # ============================================ # HEALTH SERVER # ============================================ def start_health_server(worker: UnifiedWorker): from http.server import HTTPServer, BaseHTTPRequestHandler class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/health' or self.path == '/': self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() status = { 'status': 'healthy', 'domains': len(worker.queue_urls), 'domain_list': list(worker.queue_urls.keys()), 'dynamodb': DYNAMODB_AVAILABLE, 'features': { 'bounce_rewriting': True, 'auto_reply': DYNAMODB_AVAILABLE, 'forwarding': DYNAMODB_AVAILABLE, 'blocklist': DYNAMODB_AVAILABLE, 'lmtp': config.lmtp_enabled }, 'timestamp': datetime.utcnow().isoformat() } self.wfile.write(json.dumps(status, indent=2).encode()) elif self.path == '/domains': self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode()) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # Suppress HTTP logs server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') thread.start() log(f"Health server on port {config.health_port}") # ============================================ # ENTRY POINT # ============================================ def main(): worker = UnifiedWorker() def signal_handler(signum, frame): log(f"Received signal {signum}") worker.stop() sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signalIGINT, signal_handler) worker.setup() if PROMETHEUS_ENABLED: start_http_server(config.metrics_port) log(f"Prometheus metrics on port {config.metrics_port}") start_health_server(worker) log(f"\n{'='*70}") log(f"🚀 UNIFIED EMAIL WORKER (CLEANED)") log(f"{'='*70}") log(f" Domains: {len(worker.queue_urls)}") log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") if config.lmtp_enabled: log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)") else: log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}") log(f" Poll Interval: {config.poll_interval}s") log(f" Visibility: {config.visibility_timeout}s") log(f"") log(f" Features:") log(f" ✓ Bounce Detection & Header Rewriting") log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office") log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding") log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Blocked Senders (Wildcard)") log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics") log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery") log(f"") log(f" Active Domains:") for domain in sorted(worker.queue_urls.keys()): log(f" • {domain}") log(f"{'='*70}\n") worker.start() if __name__ == '__main__': main()