From 990218ba95d1d79c4e17fd12ae00db88358b68b7 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sat, 10 Jan 2026 16:48:05 -0600 Subject: [PATCH] cleanup + unified worker --- Deployment Commands | 20 - Lambda IAM Policy | 35 -- basic_setup/setup_email_domain.sh | 38 -- unified-worker/Dockerfile | 33 + unified-worker/docker-compose.yml | 68 ++ unified-worker/domains.txt | 14 + unified-worker/requirements.txt | 2 + unified-worker/unified_worker.py | 998 ++++++++++++++++++++++++++++++ 8 files changed, 1115 insertions(+), 93 deletions(-) delete mode 100644 Deployment Commands delete mode 100644 Lambda IAM Policy delete mode 100755 basic_setup/setup_email_domain.sh create mode 100644 unified-worker/Dockerfile create mode 100644 unified-worker/docker-compose.yml create mode 100644 unified-worker/domains.txt create mode 100644 unified-worker/requirements.txt create mode 100644 unified-worker/unified_worker.py diff --git a/Deployment Commands b/Deployment Commands deleted file mode 100644 index 52e6858..0000000 --- a/Deployment Commands +++ /dev/null @@ -1,20 +0,0 @@ -# Worker bauen -docker-compose build - -# Worker starten -docker-compose up -d - -# Logs ansehen -docker-compose logs -f - -# Logs nur für eine Domain -docker-compose logs -f worker-andreasknuth - -# Status prüfen -docker-compose ps - -# Worker neu starten -docker-compose restart - -# Worker stoppen -docker-compose down \ No newline at end of file diff --git a/Lambda IAM Policy b/Lambda IAM Policy deleted file mode 100644 index 5ec15a3..0000000 --- a/Lambda IAM Policy +++ /dev/null @@ -1,35 +0,0 @@ -{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "s3:GetObject", - "s3:HeadObject", - "s3:ListBucket", - "s3:CopyObject" - ], - "Resource": [ - "arn:aws:s3:::*-emails", - "arn:aws:s3:::*-emails/*" - ] - }, - { - "Effect": "Allow", - "Action": [ - "sqs:SendMessage", - "sqs:GetQueueUrl" - ], - "Resource": "arn:aws:sqs:eu-central-1:123456789:*-queue" - }, - { - "Effect": "Allow", - "Action": [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents" - ], - "Resource": "arn:aws:logs:::*" - } - ] -} diff --git a/basic_setup/setup_email_domain.sh b/basic_setup/setup_email_domain.sh deleted file mode 100755 index b10905f..0000000 --- a/basic_setup/setup_email_domain.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash -# setup_email_domain.sh - Ein Wrapper-Script, das alle drei Skripte in der richtigen Reihenfolge ausführt - -# Überprüfen, ob die Domain-Variable gesetzt ist -if [ -z "$1" ]; then - echo "Fehler: Keine Domain angegeben." - echo "Verwendung: ./setup_email_domain.sh domain.de [region]" - exit 1 -fi - -DOMAIN_NAME=$1 -AWS_REGION=${2:-"us-east-2"} - -# Variablen exportieren -export DOMAIN_NAME -export AWS_REGION - -echo "=== AWS E-Mail-Infrastruktur für $DOMAIN_NAME einrichten ===" -echo "AWS-Region: $AWS_REGION" -echo - -# Skripte nacheinander ausführen -echo "1. S3-Bucket erstellen..." -./awss3.sh -echo - -echo "2. SES-Konfiguration einrichten..." -export S3_BUCKET_NAME=$(echo "$DOMAIN_NAME" | tr '.' '-' | awk '{print $0 "-emails"}') -./awsses.sh -echo - -echo "3. IAM-Benutzer und SMTP-Zugangsdaten erstellen..." -./awsiam.sh -echo - -echo "=== Setup abgeschlossen ===" -echo "Alle Schritte wurden abgeschlossen. Bitte überprüfen Sie die Ausgaben der einzelnen Skripte." -echo "Vergessen Sie nicht, die benötigten DNS-Einträge für Ihre Domain zu setzen, um die SES-Verifizierung abzuschließen." \ No newline at end of file diff --git a/unified-worker/Dockerfile b/unified-worker/Dockerfile new file mode 100644 index 0000000..60d0b4f --- /dev/null +++ b/unified-worker/Dockerfile @@ -0,0 +1,33 @@ +FROM python:3.11-slim + +LABEL maintainer="andreas@knuth.dev" +LABEL description="Unified multi-domain email worker" + +# System packages +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Non-root user +RUN useradd -m -u 1000 worker && \ + mkdir -p /app /var/log/email-worker /etc/email-worker && \ + chown -R worker:worker /app /var/log/email-worker /etc/email-worker + +# Python dependencies +COPY requirements.txt /app/ +RUN pip install --no-cache-dir -r /app/requirements.txt + +# Worker code +COPY --chown=worker:worker unified_worker.py /app/ + +WORKDIR /app +USER worker + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Unbuffered output +ENV PYTHONUNBUFFERED=1 + +CMD ["python", "unified_worker.py"] diff --git a/unified-worker/docker-compose.yml b/unified-worker/docker-compose.yml new file mode 100644 index 0000000..7150059 --- /dev/null +++ b/unified-worker/docker-compose.yml @@ -0,0 +1,68 @@ +version: "3.8" + +# Unified Email Worker - verarbeitet alle Domains mit einem Container + +services: + unified-worker: + build: + context: . + dockerfile: Dockerfile + container_name: unified-email-worker + restart: unless-stopped + network_mode: host # Für lokalen SMTP-Zugriff + + volumes: + # Domain-Liste (eine Domain pro Zeile) + - ./domains.txt:/etc/email-worker/domains.txt:ro + # Logs + - ./logs:/var/log/email-worker + + environment: + # AWS Credentials + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=us-east-2 + + # Alternative: Domains direkt als Liste + # - DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net + + # Worker Settings + - WORKER_THREADS=${WORKER_THREADS:-10} + - POLL_INTERVAL=${POLL_INTERVAL:-20} + - MAX_MESSAGES=${MAX_MESSAGES:-10} + - VISIBILITY_TIMEOUT=${VISIBILITY_TIMEOUT:-300} + + # SMTP (lokal zum DMS) + - SMTP_HOST=${SMTP_HOST:-localhost} + - SMTP_PORT=${SMTP_PORT:-25} + - SMTP_POOL_SIZE=${SMTP_POOL_SIZE:-5} + + # Monitoring + - METRICS_PORT=8000 + - HEALTH_PORT=8080 + + ports: + # Prometheus Metrics + - "8000:8000" + # Health Check + - "8080:8080" + + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + + logging: + driver: "json-file" + options: + max-size: "50m" + max-file: "10" + + deploy: + resources: + limits: + memory: 512M + reservations: + memory: 256M diff --git a/unified-worker/domains.txt b/unified-worker/domains.txt new file mode 100644 index 0000000..b804095 --- /dev/null +++ b/unified-worker/domains.txt @@ -0,0 +1,14 @@ +# domains.txt - Liste aller zu verarbeitenden Domains +# Eine Domain pro Zeile +# Zeilen mit # werden ignoriert + +# Test Domain +andreasknuth.de + +# Produktiv Domains (später hinzufügen) +# annavillesda.org +# bayarea-cc.com +# bizmatch.net +# hotshpotshgallery.com +# qrmaster.net +# ruehrgedoens.de diff --git a/unified-worker/requirements.txt b/unified-worker/requirements.txt new file mode 100644 index 0000000..b366aaa --- /dev/null +++ b/unified-worker/requirements.txt @@ -0,0 +1,2 @@ +boto3>=1.34.0 +prometheus-client>=0.19.0 diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py new file mode 100644 index 0000000..24a6091 --- /dev/null +++ b/unified-worker/unified_worker.py @@ -0,0 +1,998 @@ +#!/usr/bin/env python3 +""" +unified_worker.py - Multi-Domain Email Worker (Full Featured) + +Features: +- Multi-Domain parallel processing via Thread Pool +- Bounce Detection & Rewriting (SES MAILER-DAEMON handling) +- Auto-Reply / Out-of-Office (from DynamoDB email-rules) +- Email Forwarding (from DynamoDB email-rules) +- SMTP Connection Pooling +- Prometheus Metrics +- Graceful Shutdown + +DynamoDB Tables: +- ses-outbound-messages: Tracking für Bounce-Korrelation +- email-rules: Forwards und Auto-Reply Regeln + +Schema email-rules: +{ + "email": "user@domain.com", # Partition Key + "rule_type": "forward|autoreply", # Sort Key + "enabled": true, + "target": "other@example.com", # Für forwards + "subject": "Out of Office", # Für autoreply + "body": "I am currently...", # Für autoreply + "start_date": "2025-01-01", # Optional: Zeitraum + "end_date": "2025-01-15" # Optional: Zeitraum +} + +Schema ses-outbound-messages: +{ + "MessageId": "abc123...", # Partition Key (SES Message-ID) + "original_source": "sender@domain.com", + "recipients": ["recipient@other.com"], + "timestamp": "2025-01-01T12:00:00Z", + "bounceType": "Permanent", # Nach Bounce-Notification + "bounceSubType": "General", + "bouncedRecipients": ["recipient@other.com"] +} +""" + +import os +import sys +import json +import time +import signal +import logging +import threading +import smtplib +import hashlib +import re +from queue import Queue, Empty +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Tuple, Any +from datetime import datetime, date +from email.parser import BytesParser +from email.policy import SMTP as SMTPPolicy +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.utils import formataddr, parseaddr, formatdate +import copy + +import boto3 +from botocore.exceptions import ClientError +from boto3.dynamodb.conditions import Key + +# Optional: Prometheus Metrics +try: + from prometheus_client import start_http_server, Counter, Gauge, Histogram + PROMETHEUS_ENABLED = True +except ImportError: + PROMETHEUS_ENABLED = False + logging.warning("prometheus_client not installed - metrics disabled") + +# ============================================ +# CONFIGURATION +# ============================================ + +@dataclass +class Config: + """Worker Configuration""" + # AWS + aws_region: str = os.environ.get('AWS_REGION', 'us-east-2') + + # Domains to process + domains_list: str = os.environ.get('DOMAINS', '') + domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt') + + # Worker Settings + worker_threads: int = int(os.environ.get('WORKER_THREADS', '10')) + poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20')) + max_messages: int = int(os.environ.get('MAX_MESSAGES', '10')) + visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) + + # SMTP + smtp_host: str = os.environ.get('SMTP_HOST', 'localhost') + smtp_port: int = int(os.environ.get('SMTP_PORT', '25')) + smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' + smtp_user: str = os.environ.get('SMTP_USER', '') + smtp_pass: str = os.environ.get('SMTP_PASS', '') + smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5')) + + # DynamoDB Tables + rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules') + messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages') + + # Bounce Handling + bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3')) + bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0')) + + # Auto-Reply + autoreply_from_name: str = os.environ.get('AUTOREPLY_FROM_NAME', 'Auto-Reply') + autoreply_cooldown_hours: int = int(os.environ.get('AUTOREPLY_COOLDOWN_HOURS', '24')) + + # Monitoring + metrics_port: int = int(os.environ.get('METRICS_PORT', '8000')) + health_port: int = int(os.environ.get('HEALTH_PORT', '8080')) + +config = Config() + +# ============================================ +# LOGGING +# ============================================ + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# ============================================ +# METRICS +# ============================================ + +if PROMETHEUS_ENABLED: + emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status']) + emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed') + processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain']) + queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain']) + bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type']) + autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain']) + forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain']) + +# ============================================ +# AWS CLIENTS +# ============================================ + +sqs = boto3.client('sqs', region_name=config.aws_region) +s3 = boto3.client('s3', region_name=config.aws_region) +ses = boto3.client('ses', region_name=config.aws_region) + +# DynamoDB +dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) +try: + rules_table = dynamodb.Table(config.rules_table) + messages_table = dynamodb.Table(config.messages_table) + # Test connection + rules_table.table_status + messages_table.table_status + DYNAMODB_AVAILABLE = True + logger.info(f"DynamoDB connected: {config.rules_table}, {config.messages_table}") +except Exception as e: + DYNAMODB_AVAILABLE = False + rules_table = None + messages_table = None + logger.warning(f"DynamoDB not available: {e}") + +# Auto-Reply Cooldown Cache (in-memory, thread-safe) +autoreply_cache: Dict[str, datetime] = {} +autoreply_cache_lock = threading.Lock() + +# ============================================ +# SMTP CONNECTION POOL +# ============================================ + +class SMTPPool: + """Thread-safe SMTP Connection Pool""" + + def __init__(self, host: str, port: int, pool_size: int = 5): + self.host = host + self.port = port + self.pool_size = pool_size + self._pool: Queue = Queue(maxsize=pool_size) + self._lock = threading.Lock() + self._initialized = False + + def _create_connection(self) -> Optional[smtplib.SMTP]: + """Create new SMTP connection""" + try: + conn = smtplib.SMTP(self.host, self.port, timeout=30) + conn.ehlo() + if config.smtp_use_tls: + conn.starttls() + conn.ehlo() + if config.smtp_user and config.smtp_pass: + conn.login(config.smtp_user, config.smtp_pass) + return conn + except Exception as e: + logger.error(f"Failed to create SMTP connection: {e}") + return None + + def initialize(self): + """Pre-create connections""" + if self._initialized: + return + for _ in range(self.pool_size): + conn = self._create_connection() + if conn: + self._pool.put(conn) + self._initialized = True + logger.info(f"SMTP pool initialized with {self._pool.qsize()} connections") + + def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]: + """Get connection from pool""" + try: + conn = self._pool.get(timeout=timeout) + try: + conn.noop() + return conn + except: + conn = self._create_connection() + return conn + except Empty: + return self._create_connection() + + def return_connection(self, conn: smtplib.SMTP): + """Return connection to pool""" + if conn is None: + return + try: + self._pool.put_nowait(conn) + except: + try: + conn.quit() + except: + pass + + def close_all(self): + """Close all connections""" + while not self._pool.empty(): + try: + conn = self._pool.get_nowait() + conn.quit() + except: + pass + +smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size) + +# ============================================ +# HELPER FUNCTIONS +# ============================================ + +def domain_to_queue_name(domain: str) -> str: + return domain.replace('.', '-') + '-queue' + +def domain_to_bucket_name(domain: str) -> str: + return domain.replace('.', '-') + '-emails' + +def get_queue_url(domain: str) -> Optional[str]: + queue_name = domain_to_queue_name(domain) + try: + response = sqs.get_queue_url(QueueName=queue_name) + return response['QueueUrl'] + except ClientError as e: + if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': + logger.warning(f"Queue not found for domain: {domain}") + else: + logger.error(f"Error getting queue URL for {domain}: {e}") + return None + +def load_domains() -> List[str]: + """Load domains from config""" + domains = [] + + if config.domains_list: + domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()]) + + if os.path.exists(config.domains_file): + with open(config.domains_file, 'r') as f: + for line in f: + domain = line.strip() + if domain and not domain.startswith('#'): + domains.append(domain) + + domains = list(set(domains)) + logger.info(f"Loaded {len(domains)} domains") + return domains + +def extract_email_address(header_value: str) -> str: + """Extract pure email address from header like 'Name '""" + if not header_value: + return '' + name, addr = parseaddr(header_value) + return addr.lower() if addr else header_value.lower() + +# ============================================ +# BOUNCE HANDLING +# ============================================ + +def is_ses_bounce_notification(parsed_email) -> bool: + """Check if email is from SES MAILER-DAEMON""" + from_header = (parsed_email.get('From') or '').lower() + return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header + +def get_bounce_info_from_dynamodb(message_id: str) -> Optional[Dict]: + """ + Look up original sender info from DynamoDB for bounce correlation. + The message_id here is from the bounce notification, we need to find + the original outbound message. + """ + if not DYNAMODB_AVAILABLE or not messages_table: + return None + + for attempt in range(config.bounce_lookup_retries): + try: + # Message-ID könnte verschiedene Formate haben + # Versuche mit und ohne < > Klammern + clean_id = message_id.strip('<>').split('@')[0] + + response = messages_table.get_item(Key={'MessageId': clean_id}) + item = response.get('Item') + + if item: + logger.info(f"Found bounce info for {clean_id}") + return { + 'original_source': item.get('original_source', ''), + 'recipients': item.get('recipients', []), + 'bounceType': item.get('bounceType', 'Unknown'), + 'bounceSubType': item.get('bounceSubType', 'Unknown'), + 'bouncedRecipients': item.get('bouncedRecipients', []), + 'timestamp': item.get('timestamp', '') + } + + if attempt < config.bounce_lookup_retries - 1: + logger.debug(f"Bounce record not found, retry {attempt + 1}...") + time.sleep(config.bounce_lookup_delay) + + except Exception as e: + logger.error(f"DynamoDB error looking up bounce: {e}") + if attempt < config.bounce_lookup_retries - 1: + time.sleep(config.bounce_lookup_delay) + + return None + +def rewrite_bounce_headers(parsed_email, bounce_info: Dict) -> Tuple[Any, bool]: + """ + Rewrite bounce email headers so the recipient sees it correctly. + + Problem: SES sends bounces FROM mailer-daemon@amazonses.com + but we want the user to see FROM the bounced address + + Returns: (modified_email, was_modified) + """ + if not bounce_info: + return parsed_email, False + + original_source = bounce_info.get('original_source', '') + bounced_recipients = bounce_info.get('bouncedRecipients', []) + bounce_type = bounce_info.get('bounceType', 'Unknown') + bounce_subtype = bounce_info.get('bounceSubType', 'Unknown') + + if not bounced_recipients: + return parsed_email, False + + # Use first bounced recipient as the new "From" + new_from = bounced_recipients[0] if isinstance(bounced_recipients[0], str) else bounced_recipients[0].get('emailAddress', '') + + if not new_from: + return parsed_email, False + + logger.info(f"Rewriting bounce: FROM {parsed_email.get('From')} -> {new_from}") + + # Add diagnostic headers + parsed_email['X-Original-SES-From'] = parsed_email.get('From', '') + parsed_email['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" + parsed_email['X-Original-Recipient'] = original_source + + # Rewrite From header + if 'From' in parsed_email: + del parsed_email['From'] + parsed_email['From'] = new_from + + # Add Reply-To if not present + if not parsed_email.get('Reply-To'): + parsed_email['Reply-To'] = new_from + + # Improve subject for clarity + subject = parsed_email.get('Subject', '') + if 'delivery status' in subject.lower() or not subject: + if 'Subject' in parsed_email: + del parsed_email['Subject'] + parsed_email['Subject'] = f"Undeliverable: Message to {new_from}" + + return parsed_email, True + +# ============================================ +# EMAIL RULES (FORWARDS & AUTO-REPLY) +# ============================================ + +@dataclass +class EmailRule: + """Represents a forward or auto-reply rule""" + email: str + rule_type: str # 'forward' or 'autoreply' + enabled: bool = True + target: str = '' # Forward destination + subject: str = '' # Auto-reply subject + body: str = '' # Auto-reply body + start_date: Optional[date] = None + end_date: Optional[date] = None + + def is_active(self) -> bool: + """Check if rule is currently active (considering date range)""" + if not self.enabled: + return False + + today = date.today() + + if self.start_date and today < self.start_date: + return False + if self.end_date and today > self.end_date: + return False + + return True + +def get_rules_for_email(email_address: str) -> List[EmailRule]: + """Fetch all rules for an email address from DynamoDB""" + if not DYNAMODB_AVAILABLE or not rules_table: + return [] + + email_lower = email_address.lower() + rules = [] + + try: + response = rules_table.query( + KeyConditionExpression=Key('email').eq(email_lower) + ) + + for item in response.get('Items', []): + rule = EmailRule( + email=item.get('email', ''), + rule_type=item.get('rule_type', ''), + enabled=item.get('enabled', True), + target=item.get('target', ''), + subject=item.get('subject', ''), + body=item.get('body', ''), + ) + + # Parse dates if present + if item.get('start_date'): + try: + rule.start_date = datetime.strptime(item['start_date'], '%Y-%m-%d').date() + except: + pass + if item.get('end_date'): + try: + rule.end_date = datetime.strptime(item['end_date'], '%Y-%m-%d').date() + except: + pass + + rules.append(rule) + + logger.debug(f"Found {len(rules)} rules for {email_address}") + return rules + + except Exception as e: + logger.error(f"Error fetching rules for {email_address}: {e}") + return [] + +def should_send_autoreply(recipient: str, sender: str) -> bool: + """ + Check if we should send an auto-reply. + Implements cooldown to avoid reply loops. + """ + # Don't reply to automated messages + sender_lower = sender.lower() + if any(x in sender_lower for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster', 'bounce', 'notification']): + return False + + # Check cooldown cache + cache_key = f"{recipient}:{sender}" + + with autoreply_cache_lock: + last_reply = autoreply_cache.get(cache_key) + if last_reply: + hours_since = (datetime.now() - last_reply).total_seconds() / 3600 + if hours_since < config.autoreply_cooldown_hours: + logger.debug(f"Auto-reply cooldown active for {cache_key}") + return False + + # Update cache + autoreply_cache[cache_key] = datetime.now() + + return True + +def send_autoreply(rule: EmailRule, original_from: str, original_subject: str, domain: str) -> bool: + """Send an auto-reply email via SES""" + if not should_send_autoreply(rule.email, original_from): + return False + + try: + # Build reply message + reply_subject = rule.subject or f"Re: {original_subject}" + reply_body = rule.body or "This is an automated reply. I am currently unavailable." + + # Add original subject reference if not in body + if original_subject and original_subject not in reply_body: + reply_body += f"\n\n---\nRegarding: {original_subject}" + + msg = MIMEText(reply_body, 'plain', 'utf-8') + msg['Subject'] = reply_subject + msg['From'] = formataddr((config.autoreply_from_name, rule.email)) + msg['To'] = original_from + msg['Date'] = formatdate(localtime=True) + msg['Auto-Submitted'] = 'auto-replied' + msg['X-Auto-Response-Suppress'] = 'All' + msg['Precedence'] = 'auto_reply' + + # Send via SES + ses.send_raw_email( + Source=rule.email, + Destinations=[original_from], + RawMessage={'Data': msg.as_string()} + ) + + logger.info(f"✉ Auto-reply sent from {rule.email} to {original_from}") + + if PROMETHEUS_ENABLED: + autoreplies_sent.labels(domain=domain).inc() + + return True + + except Exception as e: + logger.error(f"Failed to send auto-reply: {e}") + return False + +def send_forward(rule: EmailRule, raw_email: bytes, original_from: str, original_subject: str, domain: str) -> bool: + """Forward email to target address""" + if not rule.target: + return False + + try: + # Parse original email + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email) + + # Create forwarded message + fwd_msg = MIMEMultipart('mixed') + fwd_msg['Subject'] = f"Fwd: {original_subject or parsed.get('Subject', '(no subject)')}" + fwd_msg['From'] = rule.email + fwd_msg['To'] = rule.target + fwd_msg['Date'] = formatdate(localtime=True) + fwd_msg['X-Forwarded-For'] = rule.email + fwd_msg['X-Original-From'] = original_from + + # Add forward header text + forward_header = MIMEText( + f"---------- Forwarded message ----------\n" + f"From: {parsed.get('From', 'Unknown')}\n" + f"Date: {parsed.get('Date', 'Unknown')}\n" + f"Subject: {parsed.get('Subject', '(no subject)')}\n" + f"To: {parsed.get('To', rule.email)}\n\n", + 'plain', 'utf-8' + ) + fwd_msg.attach(forward_header) + + # Attach original message + from email.mime.message import MIMEMessage + fwd_msg.attach(MIMEMessage(parsed)) + + # Send via SES + ses.send_raw_email( + Source=rule.email, + Destinations=[rule.target], + RawMessage={'Data': fwd_msg.as_string()} + ) + + logger.info(f"➡ Forward sent from {rule.email} to {rule.target}") + + if PROMETHEUS_ENABLED: + forwards_sent.labels(domain=domain).inc() + + return True + + except Exception as e: + logger.error(f"Failed to forward email: {e}") + return False + +# ============================================ +# MAIN EMAIL PROCESSING +# ============================================ + +def process_message(domain: str, message: dict) -> bool: + """Process single SQS message with full feature set""" + receipt_handle = message['ReceiptHandle'] + + try: + # Parse SQS message body + body = json.loads(message['Body']) + + # Handle SNS wrapper (Lambda shim format) + if body.get('Type') == 'Notification': + ses_data = json.loads(body.get('Message', '{}')) + else: + ses_data = body.get('ses', body) + + mail = ses_data.get('mail', {}) + receipt = ses_data.get('receipt', {}) + + message_id = mail.get('messageId', 'unknown') + from_addr = mail.get('source', '') + recipients = receipt.get('recipients', []) or mail.get('destination', []) + + logger.info(f"Processing: {message_id} for {domain}") + logger.info(f" From: {from_addr}") + logger.info(f" To: {recipients}") + + # ---------------------------------------- + # 1. Download email from S3 + # ---------------------------------------- + bucket = domain_to_bucket_name(domain) + object_key = message_id + + # Check receipt action for actual S3 location + actions = receipt.get('action', {}) + if isinstance(actions, dict): + actions = [actions] + for action in actions: + if isinstance(action, dict) and action.get('type') == 'S3': + bucket = action.get('bucketName', bucket) + object_key = action.get('objectKey', object_key) + break + + try: + s3_response = s3.get_object(Bucket=bucket, Key=object_key) + raw_email = s3_response['Body'].read() + except ClientError as e: + logger.error(f"Failed to download from S3: {bucket}/{object_key} - {e}") + return False + + # Parse email + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email) + subject = parsed.get('Subject', '') + + # ---------------------------------------- + # 2. Bounce Detection & Rewriting + # ---------------------------------------- + is_bounce = is_ses_bounce_notification(parsed) + bounce_rewritten = False + + if is_bounce: + logger.info("🔄 Detected SES bounce notification") + + # Extract Message-ID from bounce to correlate with original + bounce_msg_id = parsed.get('Message-ID', '') + + # Also check X-Original-Message-Id or parse from body + # SES bounces often contain the original Message-ID in headers or body + original_msg_id = parsed.get('X-Original-Message-Id', '') + if not original_msg_id: + # Try to extract from References header + refs = parsed.get('References', '') + if refs: + original_msg_id = refs.split()[0] if refs else '' + + lookup_id = original_msg_id or bounce_msg_id + + if lookup_id: + bounce_info = get_bounce_info_from_dynamodb(lookup_id) + if bounce_info: + parsed, bounce_rewritten = rewrite_bounce_headers(parsed, bounce_info) + + if PROMETHEUS_ENABLED: + bounces_processed.labels( + domain=domain, + type=bounce_info.get('bounceType', 'Unknown') + ).inc() + + # ---------------------------------------- + # 3. Deliver to local mailboxes + # ---------------------------------------- + success_count = 0 + failed_recipients = [] + delivered_recipients = [] + + # Get modified raw email if bounce was rewritten + if bounce_rewritten: + raw_email = parsed.as_bytes() + + smtp_conn = smtp_pool.get_connection() + if not smtp_conn: + logger.error("Could not get SMTP connection") + return False + + try: + for recipient in recipients: + try: + smtp_conn.sendmail(from_addr, [recipient], raw_email) + success_count += 1 + delivered_recipients.append(recipient) + logger.info(f" ✓ Delivered to {recipient}") + + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='success').inc() + + except smtplib.SMTPRecipientsRefused as e: + logger.warning(f" ✗ Recipient refused: {recipient}") + failed_recipients.append(recipient) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='recipient_refused').inc() + + except smtplib.SMTPException as e: + logger.error(f" ✗ SMTP error for {recipient}: {e}") + failed_recipients.append(recipient) + + finally: + smtp_pool.return_connection(smtp_conn) + + # ---------------------------------------- + # 4. Process Rules (Forwards & Auto-Reply) + # ---------------------------------------- + if not is_bounce: # Don't process rules for bounces + for recipient in delivered_recipients: + rules = get_rules_for_email(recipient) + + for rule in rules: + if not rule.is_active(): + continue + + if rule.rule_type == 'autoreply': + send_autoreply(rule, from_addr, subject, domain) + + elif rule.rule_type == 'forward': + send_forward(rule, raw_email, from_addr, subject, domain) + + # ---------------------------------------- + # 5. Update S3 metadata + # ---------------------------------------- + if success_count > 0: + try: + metadata = { + 'processed': 'true', + 'processed_at': str(int(time.time())), + 'delivered_to': ','.join(delivered_recipients), + 'status': 'delivered' + } + if failed_recipients: + metadata['failed_recipients'] = ','.join(failed_recipients) + if bounce_rewritten: + metadata['bounce_rewritten'] = 'true' + + s3.copy_object( + Bucket=bucket, + Key=object_key, + CopySource={'Bucket': bucket, 'Key': object_key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + except Exception as e: + logger.warning(f"Failed to update S3 metadata: {e}") + + return success_count > 0 + + except Exception as e: + logger.error(f"Error processing message: {e}", exc_info=True) + if PROMETHEUS_ENABLED: + emails_processed.labels(domain=domain, status='error').inc() + return False + +# ============================================ +# DOMAIN POLLER +# ============================================ + +def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): + """Poll single domain's queue continuously""" + logger.info(f"Starting poller for {domain}") + + while not stop_event.is_set(): + try: + response = sqs.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=config.max_messages, + WaitTimeSeconds=config.poll_interval, + VisibilityTimeout=config.visibility_timeout + ) + + messages = response.get('Messages', []) + + # Update queue size metric + if PROMETHEUS_ENABLED: + try: + attrs = sqs.get_queue_attributes( + QueueUrl=queue_url, + AttributeNames=['ApproximateNumberOfMessages'] + ) + queue_size.labels(domain=domain).set( + int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0)) + ) + except: + pass + + for message in messages: + if stop_event.is_set(): + break + + if PROMETHEUS_ENABLED: + emails_in_flight.inc() + start_time = time.time() + + try: + success = process_message(domain, message) + + if success: + sqs.delete_message( + QueueUrl=queue_url, + ReceiptHandle=message['ReceiptHandle'] + ) + # If not successful, message becomes visible again after timeout + + finally: + if PROMETHEUS_ENABLED: + emails_in_flight.dec() + processing_time.labels(domain=domain).observe(time.time() - start_time) + + except Exception as e: + logger.error(f"Error polling {domain}: {e}") + time.sleep(5) + + logger.info(f"Stopped poller for {domain}") + +# ============================================ +# UNIFIED WORKER +# ============================================ + +class UnifiedWorker: + """Main worker coordinating all domain pollers""" + + def __init__(self): + self.stop_event = threading.Event() + self.executor: Optional[ThreadPoolExecutor] = None + self.domains: List[str] = [] + self.queue_urls: Dict[str, str] = {} + + def setup(self): + """Initialize worker""" + self.domains = load_domains() + + if not self.domains: + logger.error("No domains configured!") + sys.exit(1) + + # Get queue URLs + for domain in self.domains: + url = get_queue_url(domain) + if url: + self.queue_urls[domain] = url + + if not self.queue_urls: + logger.error("No valid queues found!") + sys.exit(1) + + # Initialize SMTP pool + smtp_pool.initialize() + + logger.info(f"Initialized with {len(self.queue_urls)} domains") + + def start(self): + """Start all domain pollers""" + self.executor = ThreadPoolExecutor( + max_workers=config.worker_threads, + thread_name_prefix='poller' + ) + + futures = [] + for domain, queue_url in self.queue_urls.items(): + future = self.executor.submit(poll_domain, domain, queue_url, self.stop_event) + futures.append(future) + + logger.info(f"Started {len(futures)} domain pollers") + + try: + for future in as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Poller error: {e}") + except KeyboardInterrupt: + pass + + def stop(self): + """Stop gracefully""" + logger.info("Stopping worker...") + self.stop_event.set() + + if self.executor: + self.executor.shutdown(wait=True, cancel_futures=False) + + smtp_pool.close_all() + logger.info("Worker stopped") + +# ============================================ +# HEALTH CHECK SERVER +# ============================================ + +def start_health_server(worker: UnifiedWorker): + """HTTP health endpoint""" + from http.server import HTTPServer, BaseHTTPRequestHandler + + class HealthHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/health' or self.path == '/': + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + + status = { + 'status': 'healthy', + 'domains': len(worker.queue_urls), + 'dynamodb': DYNAMODB_AVAILABLE, + 'features': { + 'bounce_rewriting': True, + 'auto_reply': DYNAMODB_AVAILABLE, + 'forwarding': DYNAMODB_AVAILABLE + }, + 'timestamp': datetime.utcnow().isoformat() + } + self.wfile.write(json.dumps(status).encode()) + + elif self.path == '/domains': + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode()) + + elif self.path == '/metrics-status': + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps({ + 'prometheus_enabled': PROMETHEUS_ENABLED, + 'metrics_port': config.metrics_port if PROMETHEUS_ENABLED else None + }).encode()) + + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + pass + + server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + logger.info(f"Health server on port {config.health_port}") + +# ============================================ +# ENTRY POINT +# ============================================ + +def main(): + worker = UnifiedWorker() + + def signal_handler(signum, frame): + logger.info(f"Received signal {signum}") + worker.stop() + sys.exit(0) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + worker.setup() + + if PROMETHEUS_ENABLED: + start_http_server(config.metrics_port) + logger.info(f"Prometheus metrics on port {config.metrics_port}") + + start_health_server(worker) + + logger.info("=" * 60) + logger.info(" UNIFIED EMAIL WORKER (Full Featured)") + logger.info("=" * 60) + logger.info(f" Domains: {len(worker.queue_urls)}") + logger.info(f" Threads: {config.worker_threads}") + logger.info(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}") + logger.info(f" SMTP Pool: {config.smtp_pool_size} connections") + logger.info("") + logger.info(" Features:") + logger.info(" ✓ Bounce Detection & Header Rewriting") + logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office") + logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding") + logger.info(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics") + logger.info("=" * 60) + + worker.start() + +if __name__ == '__main__': + main() \ No newline at end of file