#!/usr/bin/env python3
"""
unified_worker.py - Multi-Domain Email Worker (Full Featured)
Features:
- Multi-Domain parallel processing via Thread Pool
- Bounce Detection & Rewriting (SES MAILER-DAEMON handling)
- Auto-Reply / Out-of-Office (from DynamoDB email-rules)
- Email Forwarding (from DynamoDB email-rules)
- SMTP Connection Pooling
- Prometheus Metrics
- Graceful Shutdown
DynamoDB Tables:
- ses-outbound-messages: Tracking für Bounce-Korrelation
- email-rules: Forwards und Auto-Reply Regeln
Schema email-rules (wie in domain-worker):
{
"email_address": "user@domain.com", # Partition Key
"ooo_active": true/false,
"ooo_message": "I am currently...",
"ooo_content_type": "text" | "html",
"forwards": ["other@example.com", "another@example.com"]
}
Schema ses-outbound-messages:
{
"MessageId": "abc123...", # Partition Key (SES Message-ID)
"original_source": "sender@domain.com",
"recipients": ["recipient@other.com"],
"timestamp": "2025-01-01T12:00:00Z",
"bounceType": "Permanent", # Nach Bounce-Notification
"bounceSubType": "General",
"bouncedRecipients": ["recipient@other.com"]
}
"""
import os
import sys
import json
import time
import signal
import threading
import smtplib
import traceback
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.message import MIMEMessage
from email.utils import formataddr, parseaddr, formatdate, make_msgid
import boto3
from botocore.exceptions import ClientError
# Optional: Prometheus Metrics
try:
from prometheus_client import start_http_server, Counter, Gauge, Histogram
PROMETHEUS_ENABLED = True
except ImportError:
PROMETHEUS_ENABLED = False
# ============================================
# CONFIGURATION
# ============================================
@dataclass
class Config:
"""Worker Configuration"""
# AWS
aws_region: str = os.environ.get('AWS_REGION', 'us-east-2')
# Domains to process
domains_list: str = os.environ.get('DOMAINS', '')
domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt')
# Worker Settings
worker_threads: int = int(os.environ.get('WORKER_THREADS', '10'))
poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20'))
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP for delivery (should use LMTP port 24 to bypass transport_maps)
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
smtp_user: str = os.environ.get('SMTP_USER', '')
smtp_pass: str = os.environ.get('SMTP_PASS', '')
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
# LMTP for local delivery (bypasses Postfix transport_maps completely)
# Set LMTP_ENABLED=true and LMTP_PORT=24 to use Dovecot LMTP
lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true'
lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost')
lmtp_port: int = int(os.environ.get('LMTP_PORT', '24'))
# DynamoDB Tables
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
# Bounce Handling
bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3'))
bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0'))
# Monitoring
metrics_port: int = int(os.environ.get('METRICS_PORT', '8000'))
health_port: int = int(os.environ.get('HEALTH_PORT', '8080'))
config = Config()
# ============================================
# LOGGING (wie in domain-worker)
# ============================================
def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'):
"""Structured logging with timestamp - matches domain-worker format"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
thread_name = threading.current_thread().name
print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True)
# ============================================
# METRICS
# ============================================
if PROMETHEUS_ENABLED:
emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status'])
emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed')
processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain'])
queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain'])
bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type'])
autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain'])
forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain'])
# ============================================
# AWS CLIENTS
# ============================================
sqs = boto3.client('sqs', region_name=config.aws_region)
s3 = boto3.client('s3', region_name=config.aws_region)
ses = boto3.client('ses', region_name=config.aws_region)
# DynamoDB
dynamodb = boto3.resource('dynamodb', region_name=config.aws_region)
DYNAMODB_AVAILABLE = False
rules_table = None
messages_table = None
try:
rules_table = dynamodb.Table(config.rules_table)
messages_table = dynamodb.Table(config.messages_table)
# Test connection
rules_table.table_status
messages_table.table_status
DYNAMODB_AVAILABLE = True
except Exception as e:
pass # Will be logged at startup
# ============================================
# SMTP CONNECTION POOL
# ============================================
class SMTPPool:
"""Thread-safe SMTP Connection Pool with robust connection handling"""
def __init__(self, host: str, port: int, pool_size: int = 5):
self.host = host
self.port = port
self.pool_size = pool_size
self._pool: Queue = Queue(maxsize=pool_size)
self._lock = threading.Lock()
self._initialized = False
def _create_connection(self) -> Optional[smtplib.SMTP]:
"""Create new SMTP connection"""
try:
conn = smtplib.SMTP(self.host, self.port, timeout=30)
conn.ehlo()
if config.smtp_use_tls:
conn.starttls()
conn.ehlo()
if config.smtp_user and config.smtp_pass:
conn.login(config.smtp_user, config.smtp_pass)
log(f" 📡 New SMTP connection created to {self.host}:{self.port}")
return conn
except Exception as e:
log(f"Failed to create SMTP connection: {e}", 'ERROR')
return None
def _test_connection(self, conn: smtplib.SMTP) -> bool:
"""Test if connection is still alive"""
try:
status = conn.noop()[0]
return status == 250
except Exception:
return False
def initialize(self):
"""Pre-create connections"""
if self._initialized:
return
# Nur 1-2 Connections initial, Rest on-demand
for _ in range(min(2, self.pool_size)):
conn = self._create_connection()
if conn:
self._pool.put(conn)
self._initialized = True
log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})")
def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]:
"""Get a valid connection from pool or create new one"""
# Versuche aus Pool zu holen
try:
conn = self._pool.get(block=False)
# Teste ob Connection noch lebt
if self._test_connection(conn):
return conn
else:
# Connection ist tot, schließen und neue erstellen
try:
conn.quit()
except:
pass
log(f" ♻ Recycled stale SMTP connection")
return self._create_connection()
except Empty:
# Pool leer, neue Connection erstellen
return self._create_connection()
def return_connection(self, conn: smtplib.SMTP):
"""Return connection to pool if still valid"""
if conn is None:
return
# Prüfen ob Connection noch gut ist
if not self._test_connection(conn):
try:
conn.quit()
except:
pass
log(f" 🗑 Discarded broken SMTP connection")
return
# Versuche zurück in Pool zu legen
try:
self._pool.put_nowait(conn)
except:
# Pool voll, Connection schließen
try:
conn.quit()
except:
pass
def close_all(self):
"""Close all connections"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.quit()
except:
pass
smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
# Global set of domains we manage (populated at startup)
MANAGED_DOMAINS: set = set()
# ============================================
# HELPER FUNCTIONS
# ============================================
def domain_to_queue_name(domain: str) -> str:
return domain.replace('.', '-') + '-queue'
def domain_to_bucket_name(domain: str) -> str:
return domain.replace('.', '-') + '-emails'
def is_internal_address(email_address: str) -> bool:
"""Check if email address belongs to one of our managed domains"""
if '@' not in email_address:
return False
domain = email_address.split('@')[1].lower()
return domain in MANAGED_DOMAINS
def send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool:
"""
Send email via local SMTP port 2525 (bypasses transport_maps).
Used for internal forwards to avoid SES loop.
Returns: True on success, False on failure
"""
try:
# Direkte SMTP Verbindung auf Port 2525 (ohne transport_maps)
with smtplib.SMTP(config.smtp_host, 2525, timeout=30) as conn:
conn.ehlo()
conn.sendmail(from_addr, [to_addr], raw_message)
log(f" ✓ Internal delivery to {to_addr} (Port 2525)", 'SUCCESS', worker_name)
return True
except Exception as e:
log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name)
return False
def get_queue_url(domain: str) -> Optional[str]:
queue_name = domain_to_queue_name(domain)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
log(f"Queue not found for domain: {domain}", 'WARNING')
else:
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
return None
def load_domains() -> List[str]:
"""Load domains from config and populate MANAGED_DOMAINS global"""
global MANAGED_DOMAINS
domains = []
if config.domains_list:
domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()])
if os.path.exists(config.domains_file):
with open(config.domains_file, 'r') as f:
for line in f:
domain = line.strip()
if domain and not domain.startswith('#'):
domains.append(domain)
domains = list(set(domains))
# Populate global set for is_internal_address() checks
MANAGED_DOMAINS = set(d.lower() for d in domains)
log(f"Loaded {len(domains)} domains: {', '.join(domains)}")
return domains
# ============================================
# BOUNCE HANDLING (wie in domain-worker)
# ============================================
def is_ses_bounce_notification(parsed_email) -> bool:
"""Check if email is from SES MAILER-DAEMON"""
from_header = (parsed_email.get('From') or '').lower()
return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header
def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
"""
Sucht Bounce-Info in DynamoDB anhand der Message-ID
Mit Retry-Logik für Timing-Issues (wie in domain-worker)
"""
if not DYNAMODB_AVAILABLE or not messages_table:
return None
for attempt in range(config.bounce_lookup_retries):
try:
response = messages_table.get_item(Key={'MessageId': message_id})
item = response.get('Item')
if item:
return {
'original_source': item.get('original_source', ''),
'bounceType': item.get('bounceType', 'Unknown'),
'bounceSubType': item.get('bounceSubType', 'Unknown'),
'bouncedRecipients': item.get('bouncedRecipients', []),
'timestamp': item.get('timestamp', '')
}
if attempt < config.bounce_lookup_retries - 1:
log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name)
time.sleep(config.bounce_lookup_delay)
else:
log(f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts for Message-ID: {message_id}", 'WARNING', worker_name)
return None
except Exception as e:
log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name)
if attempt < config.bounce_lookup_retries - 1:
time.sleep(config.bounce_lookup_delay)
else:
return None
return None
def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]:
"""
Prüft auf SES Bounce, sucht in DynamoDB und schreibt Header um.
Returns: (parsed_email_object, was_modified_bool)
"""
if not is_ses_bounce_notification(parsed):
return parsed, False
log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name)
# Message-ID aus Header extrahieren
message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0]
if not message_id:
log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name)
return parsed, False
log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name)
# Lookup in DynamoDB
bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name)
if not bounce_info:
return parsed, False
# Bounce Info ausgeben
original_source = bounce_info['original_source']
bounced_recipients = bounce_info['bouncedRecipients']
bounce_type = bounce_info['bounceType']
bounce_subtype = bounce_info['bounceSubType']
log(f"✓ Found bounce info:", 'INFO', worker_name)
log(f" Original sender: {original_source}", 'INFO', worker_name)
log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name)
log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name)
if bounced_recipients:
new_from = bounced_recipients[0]
# Rewrite Headers
parsed['X-Original-SES-From'] = parsed.get('From', '')
parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
parsed.replace_header('From', new_from)
if not parsed.get('Reply-To'):
parsed['Reply-To'] = new_from
# Subject anpassen
if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {new_from}")
log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name)
return parsed, True
log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name)
return parsed, False
# ============================================
# EMAIL BODY EXTRACTION (wie in domain-worker)
# ============================================
def extract_body_parts(parsed) -> Tuple[str, Optional[str]]:
"""
Extrahiert sowohl text/plain als auch text/html Body-Parts.
Returns: (text_body: str, html_body: str or None)
"""
text_body = ''
html_body = None
if parsed.is_multipart():
for part in parsed.walk():
content_type = part.get_content_type()
if content_type == 'text/plain':
try:
text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
elif content_type == 'text/html':
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
else:
try:
payload = parsed.get_payload(decode=True)
if payload:
decoded = payload.decode('utf-8', errors='ignore')
if parsed.get_content_type() == 'text/html':
html_body = decoded
else:
text_body = decoded
except Exception:
text_body = str(parsed.get_payload())
return text_body.strip() if text_body else '(No body content)', html_body
# ============================================
# AUTO-REPLY / OOO (wie in domain-worker)
# ============================================
def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'):
"""
Erstellt eine Out-of-Office Reply als komplette MIME-Message.
Behält Original-Body (text + html) bei.
"""
text_body, html_body = extract_body_parts(original_parsed)
original_subject = original_parsed.get('Subject', '(no subject)')
original_from = original_parsed.get('From', 'unknown')
# Neue Message erstellen
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = original_from
msg['Subject'] = f"Out of Office: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['In-Reply-To'] = original_parsed.get('Message-ID', '')
msg['References'] = original_parsed.get('Message-ID', '')
msg['Auto-Submitted'] = 'auto-replied' # Standard header für Auto-Replies
msg['X-SES-Worker-Processed'] = 'ooo-reply' # Unser Loop-Prevention Header
# Body-Teil erstellen
body_part = MIMEMultipart('alternative')
# Text-Version
text_content = f"{ooo_msg}\n\n"
text_content += "--- Original Message ---\n"
text_content += f"From: {original_from}\n"
text_content += f"Subject: {original_subject}\n\n"
text_content += text_body
body_part.attach(MIMEText(text_content, 'plain', 'utf-8'))
# HTML-Version (wenn gewünscht und Original vorhanden)
if content_type == 'html' or html_body:
html_content = f"
{ooo_msg}
"
html_content += ""
html_content += f"Original Message
"
html_content += f"From: {original_from}
"
html_content += f"Subject: {original_subject}
"
html_content += (html_body if html_body else text_body.replace('\n', '
'))
html_content += "
"
body_part.attach(MIMEText(html_content, 'html', 'utf-8'))
msg.attach(body_part)
return msg
# ============================================
# FORWARDING (wie in domain-worker)
# ============================================
def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str):
"""
Erstellt eine Forward-Message als komplette MIME-Message.
Behält ALLE Original-Parts inkl. Attachments bei.
"""
original_subject = original_parsed.get('Subject', '(no subject)')
original_date = original_parsed.get('Date', 'unknown')
# Neue Message erstellen
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = forward_to
msg['Subject'] = f"FWD: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['Reply-To'] = original_from
msg['X-SES-Worker-Processed'] = 'forwarded' # Unser Loop-Prevention Header
# Forward-Header als Text
text_body, html_body = extract_body_parts(original_parsed)
# Body-Teil
body_part = MIMEMultipart('alternative')
# Text-Version
fwd_text = "---------- Forwarded message ---------\n"
fwd_text += f"From: {original_from}\n"
fwd_text += f"Date: {original_date}\n"
fwd_text += f"Subject: {original_subject}\n"
fwd_text += f"To: {recipient}\n\n"
fwd_text += text_body
body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8'))
# HTML-Version
if html_body:
fwd_html = ""
fwd_html += "---------- Forwarded message ---------
"
fwd_html += f"From: {original_from}
"
fwd_html += f"Date: {original_date}
"
fwd_html += f"Subject: {original_subject}
"
fwd_html += f"To: {recipient}
"
fwd_html += html_body
fwd_html += "
"
body_part.attach(MIMEText(fwd_html, 'html', 'utf-8'))
msg.attach(body_part)
# WICHTIG: Attachments kopieren
if original_parsed.is_multipart():
for part in original_parsed.walk():
# Nur non-body parts (Attachments)
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_type() in ['text/plain', 'text/html']:
continue # Body bereits oben behandelt
# Attachment hinzufügen
msg.attach(part)
return msg
# ============================================
# RULES PROCESSING (KORRIGIERT - wie domain-worker Schema)
# ============================================
def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None:
"""
Verarbeitet OOO und Forward-Regeln für einen Empfänger.
Schema wie in domain-worker: email_address als Key, ooo_active, forwards als Felder.
"""
if not DYNAMODB_AVAILABLE or not rules_table:
return
try:
# Regel aus DynamoDB laden (Schema: email_address als Partition Key)
response = rules_table.get_item(Key={'email_address': recipient})
rule = response.get('Item', {})
if not rule:
return
original_from = parsed.get('From', '')
original_subject = parsed.get('Subject', '(no subject)')
# ============================================
# OOO / Auto-Reply handling
# ============================================
if rule.get('ooo_active', False):
ooo_msg = rule.get('ooo_message', 'I am currently out of office.')
content_type = rule.get('ooo_content_type', 'text')
# Sender für Reply extrahieren (nur Email-Adresse)
sender_name, sender_addr = parseaddr(original_from)
if not sender_addr:
sender_addr = original_from
# Nicht auf automatische Mails antworten
auto_submitted = parsed.get('Auto-Submitted', '')
precedence = (parsed.get('Precedence') or '').lower()
if auto_submitted and auto_submitted != 'no':
log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name)
elif precedence in ['bulk', 'junk', 'list']:
log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name)
elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster']):
log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name)
else:
try:
ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type)
ooo_bytes = ooo_reply.as_bytes()
# Unterscheiden: Intern (Port 2525) vs Extern (SES)
if is_internal_address(sender_addr):
# Interne Adresse → direkt via Port 2525
success = send_internal_email(recipient, sender_addr, ooo_bytes, worker_name)
if success:
log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name)
else:
log(f"⚠ Internal OOO reply failed to {sender_addr}", 'WARNING', worker_name)
else:
# Externe Adresse → via SES
ses.send_raw_email(
Source=recipient,
Destinations=[sender_addr],
RawMessage={'Data': ooo_bytes}
)
log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name)
if PROMETHEUS_ENABLED:
autoreplies_sent.labels(domain=domain).inc()
except ClientError as e:
error_code = e.response['Error']['Code']
log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR', worker_name)
except Exception as e:
log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name)
# ============================================
# Forward handling
# ============================================
forwards = rule.get('forwards', [])
if forwards:
for forward_to in forwards:
try:
fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from)
fwd_bytes = fwd_msg.as_bytes()
# Unterscheiden: Intern (Port 2525) vs Extern (SES)
if is_internal_address(forward_to):
# Interne Adresse → direkt via Port 2525 (keine Loop!)
success = send_internal_email(recipient, forward_to, fwd_bytes, worker_name)
if success:
log(f"✓ Forwarded internally to {forward_to}", 'SUCCESS', worker_name)
else:
log(f"⚠ Internal forward failed to {forward_to}", 'WARNING', worker_name)
else:
# Externe Adresse → via SES
ses.send_raw_email(
Source=recipient,
Destinations=[forward_to],
RawMessage={'Data': fwd_bytes}
)
log(f"✓ Forwarded externally to {forward_to} via SES", 'SUCCESS', worker_name)
if PROMETHEUS_ENABLED:
forwards_sent.labels(domain=domain).inc()
except ClientError as e:
error_code = e.response['Error']['Code']
log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR', worker_name)
except Exception as e:
log(f"⚠ Forward failed to {forward_to}: {e}", 'ERROR', worker_name)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
pass # Keine Regel vorhanden - normal
else:
log(f"⚠ DynamoDB error for {recipient}: {e}", 'ERROR', worker_name)
except Exception as e:
log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name)
traceback.print_exc()
# ============================================
# SMTP SENDING (wie in domain-worker)
# ============================================
def is_permanent_recipient_error(error_msg: str) -> bool:
"""
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]:
"""
Sendet E-Mail via SMTP/LMTP an EINEN Empfänger.
Wenn LMTP aktiviert ist, wird direkt an Dovecot geliefert (umgeht transport_maps).
Mit Retry-Logik bei Connection-Fehlern.
Returns: (success: bool, error: str or None, is_permanent: bool)
"""
last_error = None
# Entscheide ob LMTP oder SMTP
use_lmtp = config.lmtp_enabled
for attempt in range(max_retries + 1):
conn = None
try:
if use_lmtp:
# LMTP Verbindung direkt zu Dovecot (umgeht Postfix/transport_maps)
conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30)
# LMTP braucht kein EHLO, aber schadet nicht
conn.ehlo()
else:
# Normale SMTP Verbindung aus dem Pool
conn = smtp_pool.get_connection()
if not conn:
last_error = "Could not get SMTP connection"
log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
time.sleep(0.5)
continue
result = conn.sendmail(from_addr, [recipient], raw_message)
# Erfolg
if use_lmtp:
conn.quit()
else:
smtp_pool.return_connection(conn)
if isinstance(result, dict) and result:
error = str(result.get(recipient, 'Unknown refusal'))
is_permanent = is_permanent_recipient_error(error)
log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name)
return False, error, is_permanent
else:
delivery_method = "LMTP" if use_lmtp else "SMTP"
log(f" ✓ {recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name)
return True, None, False
except smtplib.SMTPServerDisconnected as e:
# Connection wurde geschlossen - Retry mit neuer Connection
log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
last_error = str(e)
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
except smtplib.SMTPRecipientsRefused as e:
if conn and not use_lmtp:
smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
error_msg = str(e)
is_permanent = is_permanent_recipient_error(error_msg)
log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except smtplib.SMTPException as e:
error_msg = str(e)
# Bei Connection-Fehlern: Retry
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
log(f" ⚠ {recipient}: Connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
last_error = error_msg
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
if conn and not use_lmtp:
smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
is_permanent = is_permanent_recipient_error(error_msg)
log(f" ✗ {recipient}: Error - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except Exception as e:
# Unbekannter Fehler
if conn:
try:
conn.quit()
except:
pass
log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name)
return False, str(e), False
# Alle Retries fehlgeschlagen
log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name)
return False, last_error or "Connection failed after retries", False
# ============================================
# S3 METADATA UPDATES (wie in domain-worker)
# ============================================
def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None):
"""Markiert E-Mail als erfolgreich zugestellt"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str):
"""Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
# ============================================
# MAIN MESSAGE PROCESSING
# ============================================
def process_message(domain: str, message: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
"""
worker_name = f"worker-{domain}"
try:
# 1. UNPACKING (SNS -> SES)
message_body = json.loads(message['Body'])
if 'Message' in message_body and 'Type' in message_body:
# Es ist eine SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
ses_msg = message_body
# 2. DATEN EXTRAHIEREN
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId')
# FIX: Amazon SES Setup Notification ignorieren
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
return True
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
return True
# Domain Validation
if recipients:
first_recipient = recipients[0]
recipient_domain = first_recipient.split('@')[1]
if recipient_domain.lower() != domain.lower():
log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name)
return True
else:
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
return True
bucket = domain_to_bucket_name(domain)
key = message_id
# Compact single-line log for email processing
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
# 3. LADEN AUS S3
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
# LOOP DETECTION: Check if this message was already processed by our worker
# We use a unique header that only our worker sets
temp_parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
# Check for OUR unique loop prevention header
x_worker_processed = temp_parsed.get('X-SES-Worker-Processed', '')
auto_submitted = temp_parsed.get('Auto-Submitted', '')
# Only skip if OUR header is present (not generic headers that others might set)
is_processed_by_us = bool(x_worker_processed)
is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed
if is_processed_by_us:
log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
skip_rules = True
else:
skip_rules = False
except s3.exceptions.NoSuchKey:
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
return True
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
return False
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
return False
# 4. PARSING & BOUNCE LOGIC
try:
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
# Bounce Header umschreiben (setzt auch is_bounce intern)
is_bounce = is_ses_bounce_notification(parsed)
parsed, modified = apply_bounce_logic(parsed, subject, worker_name)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From')
if PROMETHEUS_ENABLED:
bounces_processed.labels(domain=domain, type='rewritten').inc()
else:
from_addr_final = from_addr
except Exception as e:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name)
from_addr_final = from_addr
is_bounce = False
skip_rules = False # Default if parsing failed
# 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces oder bereits weitergeleitete)
if not is_bounce and not skip_rules:
for recipient in recipients:
process_rules_for_recipient(recipient, parsed, domain, worker_name)
# 6. SMTP VERSAND
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
successful = []
failed_permanent = []
failed_temporary = []
for recipient in recipients:
success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name)
if success:
successful.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='success').inc()
elif is_perm:
failed_permanent.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='permanent_failure').inc()
else:
failed_temporary.append(recipient)
if PROMETHEUS_ENABLED:
emails_processed.labels(domain=domain, status='temporary_failure').inc()
# 7. RESULTAT & CLEANUP
if len(successful) > 0:
mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None)
result_info = f"{len(successful)} OK"
if failed_permanent:
result_info += f", {len(failed_permanent)} invalid"
log(f"✅ Delivered ({result_info})", 'SUCCESS', worker_name)
return True
elif len(failed_permanent) == len(recipients):
mark_as_all_invalid(bucket, key, failed_permanent, worker_name)
log(f"🛑 All {len(recipients)} recipients invalid", 'ERROR', worker_name)
return True
else:
log(f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name)
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
traceback.print_exc()
return False
# ============================================
# DOMAIN POLLER
# ============================================
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event,
domain_stats: Dict[str, int] = None, stats_lock: threading.Lock = None):
"""Poll single domain's queue continuously"""
worker_name = f"worker-{domain}"
log(f"🚀 Starting poller for {domain}", 'INFO', worker_name)
messages_processed = 0
while not stop_event.is_set():
try:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=config.max_messages,
WaitTimeSeconds=config.poll_interval,
VisibilityTimeout=config.visibility_timeout,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
)
messages = response.get('Messages', [])
# Update queue size metric
if PROMETHEUS_ENABLED:
try:
attrs = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['ApproximateNumberOfMessages']
)
queue_size.labels(domain=domain).set(
int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
)
except:
pass
if not messages:
continue
log(f"✉ Received {len(messages)} message(s)", 'INFO', worker_name)
for message in messages:
if stop_event.is_set():
break
receipt_handle = message['ReceiptHandle']
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
if PROMETHEUS_ENABLED:
emails_in_flight.inc()
start_time = time.time()
try:
success = process_message(domain, message, receive_count)
if success:
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
messages_processed += 1
# Update shared stats
if domain_stats is not None and stats_lock is not None:
with stats_lock:
domain_stats[domain] = messages_processed
else:
log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name)
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR', worker_name)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR', worker_name)
traceback.print_exc()
finally:
if PROMETHEUS_ENABLED:
emails_in_flight.dec()
processing_time.labels(domain=domain).observe(time.time() - start_time)
except Exception as e:
log(f"✗ Error polling: {e}", 'ERROR', worker_name)
time.sleep(5)
log(f"👋 Stopped (processed: {messages_processed})", 'INFO', worker_name)
# ============================================
# UNIFIED WORKER
# ============================================
class UnifiedWorker:
"""Main worker coordinating all domain pollers"""
def __init__(self):
self.stop_event = threading.Event()
self.domains: List[str] = []
self.queue_urls: Dict[str, str] = {}
self.poller_threads: List[threading.Thread] = []
# Shared stats across all pollers
self.domain_stats: Dict[str, int] = {} # domain -> processed count
self.stats_lock = threading.Lock()
def setup(self):
"""Initialize worker"""
self.domains = load_domains()
if not self.domains:
log("❌ No domains configured!", 'ERROR')
sys.exit(1)
# Get queue URLs
for domain in self.domains:
url = get_queue_url(domain)
if url:
self.queue_urls[domain] = url
log(f" ✓ {domain} -> {domain_to_queue_name(domain)}")
else:
log(f" ✗ {domain} -> Queue not found!", 'WARNING')
if not self.queue_urls:
log("❌ No valid queues found!", 'ERROR')
sys.exit(1)
# Initialize SMTP pool
smtp_pool.initialize()
log(f"Initialized with {len(self.queue_urls)} domains")
def start(self):
"""Start all domain pollers"""
# Initialize stats for all domains
for domain in self.queue_urls.keys():
self.domain_stats[domain] = 0
for domain, queue_url in self.queue_urls.items():
thread = threading.Thread(
target=poll_domain,
args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock),
name=f"poller-{domain}",
daemon=True
)
thread.start()
self.poller_threads.append(thread)
log(f"Started {len(self.poller_threads)} domain pollers")
# Periodic status log (every 5 minutes)
last_status_log = time.time()
status_interval = 300 # 5 minutes
try:
while not self.stop_event.is_set():
self.stop_event.wait(timeout=10)
# Log status summary every 5 minutes
if time.time() - last_status_log > status_interval:
self._log_status_table()
last_status_log = time.time()
except KeyboardInterrupt:
pass
def _log_status_table(self):
"""Log a compact status table"""
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
with self.stats_lock:
total_processed = sum(self.domain_stats.values())
# Build compact stats: only show domains with activity or top domains
stats_parts = []
for domain in sorted(self.queue_urls.keys()):
count = self.domain_stats.get(domain, 0)
# Shorten domain for display
short_domain = domain.split('.')[0][:12]
stats_parts.append(f"{short_domain}:{count}")
stats_line = " | ".join(stats_parts)
log(f"📊 Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {stats_line}")
def stop(self):
"""Stop gracefully"""
log("⚠ Stopping worker...")
self.stop_event.set()
# Warten auf Poller-Threads (max 10 Sekunden)
for thread in self.poller_threads:
thread.join(timeout=10)
if thread.is_alive():
log(f"Warning: {thread.name} did not stop gracefully", 'WARNING')
smtp_pool.close_all()
log("👋 Worker stopped")
# ============================================
# HEALTH CHECK SERVER
# ============================================
def start_health_server(worker: UnifiedWorker):
"""HTTP health endpoint"""
from http.server import HTTPServer, BaseHTTPRequestHandler
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/health' or self.path == '/':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
status = {
'status': 'healthy',
'domains': len(worker.queue_urls),
'domain_list': list(worker.queue_urls.keys()),
'dynamodb': DYNAMODB_AVAILABLE,
'features': {
'bounce_rewriting': True,
'auto_reply': DYNAMODB_AVAILABLE,
'forwarding': DYNAMODB_AVAILABLE
},
'timestamp': datetime.utcnow().isoformat()
}
self.wfile.write(json.dumps(status, indent=2).encode())
elif self.path == '/domains':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # Suppress HTTP access logs
class SilentHTTPServer(HTTPServer):
"""HTTP Server that ignores connection reset errors from scanners"""
def handle_error(self, request, client_address):
exc_type = sys.exc_info()[0]
if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
pass # Silently ignore - these are just scanners/health checks disconnecting
else:
log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING')
server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server')
thread.start()
log(f"Health server on port {config.health_port}")
# ============================================
# ENTRY POINT
# ============================================
def main():
worker = UnifiedWorker()
def signal_handler(signum, frame):
log(f"Received signal {signum}")
worker.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
worker.setup()
if PROMETHEUS_ENABLED:
start_http_server(config.metrics_port)
log(f"Prometheus metrics on port {config.metrics_port}")
start_health_server(worker)
log(f"\n{'='*70}")
log(f"🚀 UNIFIED EMAIL WORKER")
log(f"{'='*70}")
log(f" Domains: {len(worker.queue_urls)}")
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
if config.lmtp_enabled:
log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)")
else:
log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}")
log(f" Poll Interval: {config.poll_interval}s")
log(f" Visibility: {config.visibility_timeout}s")
log(f"")
log(f" Features:")
log(f" ✓ Bounce Detection & Header Rewriting")
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office")
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding")
log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics")
log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery")
log(f"")
log(f" Active Domains:")
for domain in sorted(worker.queue_urls.keys()):
log(f" • {domain}")
log(f"{'='*70}\n")
worker.start()
if __name__ == '__main__':
main()