1323 lines
51 KiB
Python
1323 lines
51 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
unified_worker.py - Multi-Domain Email Worker (Full Featured)
|
||
|
||
Features:
|
||
- Multi-Domain parallel processing via Thread Pool
|
||
- Bounce Detection & Rewriting (SES MAILER-DAEMON handling)
|
||
- Auto-Reply / Out-of-Office (from DynamoDB email-rules)
|
||
- Email Forwarding (from DynamoDB email-rules)
|
||
- SMTP Connection Pooling
|
||
- Prometheus Metrics
|
||
- Graceful Shutdown
|
||
|
||
DynamoDB Tables:
|
||
- ses-outbound-messages: Tracking für Bounce-Korrelation
|
||
- email-rules: Forwards und Auto-Reply Regeln
|
||
|
||
Schema email-rules (wie in domain-worker):
|
||
{
|
||
"email_address": "user@domain.com", # Partition Key
|
||
"ooo_active": true/false,
|
||
"ooo_message": "I am currently...",
|
||
"ooo_content_type": "text" | "html",
|
||
"forwards": ["other@example.com", "another@example.com"]
|
||
}
|
||
|
||
Schema ses-outbound-messages:
|
||
{
|
||
"MessageId": "abc123...", # Partition Key (SES Message-ID)
|
||
"original_source": "sender@domain.com",
|
||
"recipients": ["recipient@other.com"],
|
||
"timestamp": "2025-01-01T12:00:00Z",
|
||
"bounceType": "Permanent", # Nach Bounce-Notification
|
||
"bounceSubType": "General",
|
||
"bouncedRecipients": ["recipient@other.com"]
|
||
}
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import signal
|
||
import threading
|
||
import smtplib
|
||
import traceback
|
||
from queue import Queue, Empty
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from typing import List, Dict, Optional, Tuple, Any
|
||
from datetime import datetime
|
||
from email.parser import BytesParser
|
||
from email.policy import SMTP as SMTPPolicy
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.message import MIMEMessage
|
||
from email.utils import formataddr, parseaddr, formatdate, make_msgid
|
||
|
||
import boto3
|
||
from botocore.exceptions import ClientError
|
||
|
||
# Optional: Prometheus Metrics
|
||
try:
|
||
from prometheus_client import start_http_server, Counter, Gauge, Histogram
|
||
PROMETHEUS_ENABLED = True
|
||
except ImportError:
|
||
PROMETHEUS_ENABLED = False
|
||
|
||
# ============================================
|
||
# CONFIGURATION
|
||
# ============================================
|
||
|
||
@dataclass
|
||
class Config:
|
||
"""Worker Configuration"""
|
||
# AWS
|
||
aws_region: str = os.environ.get('AWS_REGION', 'us-east-2')
|
||
|
||
# Domains to process
|
||
domains_list: str = os.environ.get('DOMAINS', '')
|
||
domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt')
|
||
|
||
# Worker Settings
|
||
worker_threads: int = int(os.environ.get('WORKER_THREADS', '10'))
|
||
poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20'))
|
||
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
|
||
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
|
||
|
||
# SMTP
|
||
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
|
||
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
|
||
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
|
||
smtp_user: str = os.environ.get('SMTP_USER', '')
|
||
smtp_pass: str = os.environ.get('SMTP_PASS', '')
|
||
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
|
||
|
||
# DynamoDB Tables
|
||
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
|
||
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
|
||
|
||
# Bounce Handling
|
||
bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3'))
|
||
bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0'))
|
||
|
||
# Monitoring
|
||
metrics_port: int = int(os.environ.get('METRICS_PORT', '8000'))
|
||
health_port: int = int(os.environ.get('HEALTH_PORT', '8080'))
|
||
|
||
config = Config()
|
||
|
||
# ============================================
|
||
# LOGGING (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'):
|
||
"""Structured logging with timestamp - matches domain-worker format"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
thread_name = threading.current_thread().name
|
||
print(f"[{timestamp}] [{level}] [{worker_name}] [{thread_name}] {message}", flush=True)
|
||
|
||
# ============================================
|
||
# METRICS
|
||
# ============================================
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status'])
|
||
emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed')
|
||
processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain'])
|
||
queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain'])
|
||
bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type'])
|
||
autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain'])
|
||
forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain'])
|
||
|
||
# ============================================
|
||
# AWS CLIENTS
|
||
# ============================================
|
||
|
||
sqs = boto3.client('sqs', region_name=config.aws_region)
|
||
s3 = boto3.client('s3', region_name=config.aws_region)
|
||
ses = boto3.client('ses', region_name=config.aws_region)
|
||
|
||
# DynamoDB
|
||
dynamodb = boto3.resource('dynamodb', region_name=config.aws_region)
|
||
DYNAMODB_AVAILABLE = False
|
||
rules_table = None
|
||
messages_table = None
|
||
|
||
try:
|
||
rules_table = dynamodb.Table(config.rules_table)
|
||
messages_table = dynamodb.Table(config.messages_table)
|
||
# Test connection
|
||
rules_table.table_status
|
||
messages_table.table_status
|
||
DYNAMODB_AVAILABLE = True
|
||
log(f"DynamoDB connected: {config.rules_table}, {config.messages_table}")
|
||
except Exception as e:
|
||
log(f"Warning: DynamoDB not available: {e}", 'WARNING')
|
||
|
||
# ============================================
|
||
# SMTP CONNECTION POOL
|
||
# ============================================
|
||
|
||
class SMTPPool:
|
||
"""Thread-safe SMTP Connection Pool with robust connection handling"""
|
||
|
||
def __init__(self, host: str, port: int, pool_size: int = 5):
|
||
self.host = host
|
||
self.port = port
|
||
self.pool_size = pool_size
|
||
self._pool: Queue = Queue(maxsize=pool_size)
|
||
self._lock = threading.Lock()
|
||
self._initialized = False
|
||
|
||
def _create_connection(self) -> Optional[smtplib.SMTP]:
|
||
"""Create new SMTP connection"""
|
||
try:
|
||
conn = smtplib.SMTP(self.host, self.port, timeout=30)
|
||
conn.ehlo()
|
||
if config.smtp_use_tls:
|
||
conn.starttls()
|
||
conn.ehlo()
|
||
if config.smtp_user and config.smtp_pass:
|
||
conn.login(config.smtp_user, config.smtp_pass)
|
||
log(f" 📡 New SMTP connection created to {self.host}:{self.port}")
|
||
return conn
|
||
except Exception as e:
|
||
log(f"Failed to create SMTP connection: {e}", 'ERROR')
|
||
return None
|
||
|
||
def _test_connection(self, conn: smtplib.SMTP) -> bool:
|
||
"""Test if connection is still alive"""
|
||
try:
|
||
status = conn.noop()[0]
|
||
return status == 250
|
||
except Exception:
|
||
return False
|
||
|
||
def initialize(self):
|
||
"""Pre-create connections"""
|
||
if self._initialized:
|
||
return
|
||
# Nur 1-2 Connections initial, Rest on-demand
|
||
for _ in range(min(2, self.pool_size)):
|
||
conn = self._create_connection()
|
||
if conn:
|
||
self._pool.put(conn)
|
||
self._initialized = True
|
||
log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})")
|
||
|
||
def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]:
|
||
"""Get a valid connection from pool or create new one"""
|
||
# Versuche aus Pool zu holen
|
||
try:
|
||
conn = self._pool.get(block=False)
|
||
# Teste ob Connection noch lebt
|
||
if self._test_connection(conn):
|
||
return conn
|
||
else:
|
||
# Connection ist tot, schließen und neue erstellen
|
||
try:
|
||
conn.quit()
|
||
except:
|
||
pass
|
||
log(f" ♻ Recycled stale SMTP connection")
|
||
return self._create_connection()
|
||
except Empty:
|
||
# Pool leer, neue Connection erstellen
|
||
return self._create_connection()
|
||
|
||
def return_connection(self, conn: smtplib.SMTP):
|
||
"""Return connection to pool if still valid"""
|
||
if conn is None:
|
||
return
|
||
|
||
# Prüfen ob Connection noch gut ist
|
||
if not self._test_connection(conn):
|
||
try:
|
||
conn.quit()
|
||
except:
|
||
pass
|
||
log(f" 🗑 Discarded broken SMTP connection")
|
||
return
|
||
|
||
# Versuche zurück in Pool zu legen
|
||
try:
|
||
self._pool.put_nowait(conn)
|
||
except:
|
||
# Pool voll, Connection schließen
|
||
try:
|
||
conn.quit()
|
||
except:
|
||
pass
|
||
|
||
def close_all(self):
|
||
"""Close all connections"""
|
||
while not self._pool.empty():
|
||
try:
|
||
conn = self._pool.get_nowait()
|
||
conn.quit()
|
||
except:
|
||
pass
|
||
|
||
smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
|
||
|
||
# ============================================
|
||
# HELPER FUNCTIONS
|
||
# ============================================
|
||
|
||
def domain_to_queue_name(domain: str) -> str:
|
||
return domain.replace('.', '-') + '-queue'
|
||
|
||
def domain_to_bucket_name(domain: str) -> str:
|
||
return domain.replace('.', '-') + '-emails'
|
||
|
||
def get_queue_url(domain: str) -> Optional[str]:
|
||
queue_name = domain_to_queue_name(domain)
|
||
try:
|
||
response = sqs.get_queue_url(QueueName=queue_name)
|
||
return response['QueueUrl']
|
||
except ClientError as e:
|
||
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
||
log(f"Queue not found for domain: {domain}", 'WARNING')
|
||
else:
|
||
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
|
||
return None
|
||
|
||
def load_domains() -> List[str]:
|
||
"""Load domains from config"""
|
||
domains = []
|
||
|
||
if config.domains_list:
|
||
domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()])
|
||
|
||
if os.path.exists(config.domains_file):
|
||
with open(config.domains_file, 'r') as f:
|
||
for line in f:
|
||
domain = line.strip()
|
||
if domain and not domain.startswith('#'):
|
||
domains.append(domain)
|
||
|
||
domains = list(set(domains))
|
||
log(f"Loaded {len(domains)} domains: {', '.join(domains)}")
|
||
return domains
|
||
|
||
# ============================================
|
||
# BOUNCE HANDLING (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def is_ses_bounce_notification(parsed_email) -> bool:
|
||
"""Check if email is from SES MAILER-DAEMON"""
|
||
from_header = (parsed_email.get('From') or '').lower()
|
||
return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header
|
||
|
||
def get_bounce_info_from_dynamodb(message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
|
||
"""
|
||
Sucht Bounce-Info in DynamoDB anhand der Message-ID
|
||
Mit Retry-Logik für Timing-Issues (wie in domain-worker)
|
||
"""
|
||
if not DYNAMODB_AVAILABLE or not messages_table:
|
||
return None
|
||
|
||
for attempt in range(config.bounce_lookup_retries):
|
||
try:
|
||
response = messages_table.get_item(Key={'MessageId': message_id})
|
||
item = response.get('Item')
|
||
|
||
if item:
|
||
return {
|
||
'original_source': item.get('original_source', ''),
|
||
'bounceType': item.get('bounceType', 'Unknown'),
|
||
'bounceSubType': item.get('bounceSubType', 'Unknown'),
|
||
'bouncedRecipients': item.get('bouncedRecipients', []),
|
||
'timestamp': item.get('timestamp', '')
|
||
}
|
||
|
||
if attempt < config.bounce_lookup_retries - 1:
|
||
log(f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s (attempt {attempt + 1}/{config.bounce_lookup_retries})...", 'INFO', worker_name)
|
||
time.sleep(config.bounce_lookup_delay)
|
||
else:
|
||
log(f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts for Message-ID: {message_id}", 'WARNING', worker_name)
|
||
return None
|
||
|
||
except Exception as e:
|
||
log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", 'ERROR', worker_name)
|
||
if attempt < config.bounce_lookup_retries - 1:
|
||
time.sleep(config.bounce_lookup_delay)
|
||
else:
|
||
return None
|
||
|
||
return None
|
||
|
||
def apply_bounce_logic(parsed, subject: str, worker_name: str = 'unified') -> Tuple[Any, bool]:
|
||
"""
|
||
Prüft auf SES Bounce, sucht in DynamoDB und schreibt Header um.
|
||
Returns: (parsed_email_object, was_modified_bool)
|
||
"""
|
||
if not is_ses_bounce_notification(parsed):
|
||
return parsed, False
|
||
|
||
log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name)
|
||
|
||
# Message-ID aus Header extrahieren
|
||
message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0]
|
||
|
||
if not message_id:
|
||
log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name)
|
||
return parsed, False
|
||
|
||
log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name)
|
||
|
||
# Lookup in DynamoDB
|
||
bounce_info = get_bounce_info_from_dynamodb(message_id, worker_name)
|
||
|
||
if not bounce_info:
|
||
return parsed, False
|
||
|
||
# Bounce Info ausgeben
|
||
original_source = bounce_info['original_source']
|
||
bounced_recipients = bounce_info['bouncedRecipients']
|
||
bounce_type = bounce_info['bounceType']
|
||
bounce_subtype = bounce_info['bounceSubType']
|
||
|
||
log(f"✓ Found bounce info:", 'INFO', worker_name)
|
||
log(f" Original sender: {original_source}", 'INFO', worker_name)
|
||
log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name)
|
||
log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name)
|
||
|
||
if bounced_recipients:
|
||
new_from = bounced_recipients[0]
|
||
|
||
# Rewrite Headers
|
||
parsed['X-Original-SES-From'] = parsed.get('From', '')
|
||
parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
|
||
parsed.replace_header('From', new_from)
|
||
|
||
if not parsed.get('Reply-To'):
|
||
parsed['Reply-To'] = new_from
|
||
|
||
# Subject anpassen
|
||
if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower():
|
||
parsed.replace_header('Subject', f"Delivery Status: {new_from}")
|
||
|
||
log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name)
|
||
return parsed, True
|
||
|
||
log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name)
|
||
return parsed, False
|
||
|
||
# ============================================
|
||
# EMAIL BODY EXTRACTION (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def extract_body_parts(parsed) -> Tuple[str, Optional[str]]:
|
||
"""
|
||
Extrahiert sowohl text/plain als auch text/html Body-Parts.
|
||
Returns: (text_body: str, html_body: str or None)
|
||
"""
|
||
text_body = ''
|
||
html_body = None
|
||
|
||
if parsed.is_multipart():
|
||
for part in parsed.walk():
|
||
content_type = part.get_content_type()
|
||
|
||
if content_type == 'text/plain':
|
||
try:
|
||
text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||
except Exception:
|
||
pass
|
||
|
||
elif content_type == 'text/html':
|
||
try:
|
||
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||
except Exception:
|
||
pass
|
||
else:
|
||
try:
|
||
payload = parsed.get_payload(decode=True)
|
||
if payload:
|
||
decoded = payload.decode('utf-8', errors='ignore')
|
||
if parsed.get_content_type() == 'text/html':
|
||
html_body = decoded
|
||
else:
|
||
text_body = decoded
|
||
except Exception:
|
||
text_body = str(parsed.get_payload())
|
||
|
||
return text_body.strip() if text_body else '(No body content)', html_body
|
||
|
||
# ============================================
|
||
# AUTO-REPLY / OOO (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'):
|
||
"""
|
||
Erstellt eine Out-of-Office Reply als komplette MIME-Message.
|
||
Behält Original-Body (text + html) bei.
|
||
"""
|
||
text_body, html_body = extract_body_parts(original_parsed)
|
||
original_subject = original_parsed.get('Subject', '(no subject)')
|
||
original_from = original_parsed.get('From', 'unknown')
|
||
|
||
# Neue Message erstellen
|
||
msg = MIMEMultipart('mixed')
|
||
msg['From'] = recipient
|
||
msg['To'] = original_from
|
||
msg['Subject'] = f"Out of Office: {original_subject}"
|
||
msg['Date'] = formatdate(localtime=True)
|
||
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
|
||
msg['In-Reply-To'] = original_parsed.get('Message-ID', '')
|
||
msg['References'] = original_parsed.get('Message-ID', '')
|
||
msg['Auto-Submitted'] = 'auto-replied' # Verhindert Loops
|
||
|
||
# Body-Teil erstellen
|
||
body_part = MIMEMultipart('alternative')
|
||
|
||
# Text-Version
|
||
text_content = f"{ooo_msg}\n\n"
|
||
text_content += "--- Original Message ---\n"
|
||
text_content += f"From: {original_from}\n"
|
||
text_content += f"Subject: {original_subject}\n\n"
|
||
text_content += text_body
|
||
body_part.attach(MIMEText(text_content, 'plain', 'utf-8'))
|
||
|
||
# HTML-Version (wenn gewünscht und Original vorhanden)
|
||
if content_type == 'html' or html_body:
|
||
html_content = f"<div>{ooo_msg}</div><br><hr><br>"
|
||
html_content += "<blockquote style='margin:10px 0;padding:10px;border-left:3px solid #ccc;'>"
|
||
html_content += f"<strong>Original Message</strong><br>"
|
||
html_content += f"<strong>From:</strong> {original_from}<br>"
|
||
html_content += f"<strong>Subject:</strong> {original_subject}<br><br>"
|
||
html_content += (html_body if html_body else text_body.replace('\n', '<br>'))
|
||
html_content += "</blockquote>"
|
||
body_part.attach(MIMEText(html_content, 'html', 'utf-8'))
|
||
|
||
msg.attach(body_part)
|
||
return msg
|
||
|
||
# ============================================
|
||
# FORWARDING (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str):
|
||
"""
|
||
Erstellt eine Forward-Message als komplette MIME-Message.
|
||
Behält ALLE Original-Parts inkl. Attachments bei.
|
||
"""
|
||
original_subject = original_parsed.get('Subject', '(no subject)')
|
||
original_date = original_parsed.get('Date', 'unknown')
|
||
|
||
# Neue Message erstellen
|
||
msg = MIMEMultipart('mixed')
|
||
msg['From'] = recipient
|
||
msg['To'] = forward_to
|
||
msg['Subject'] = f"FWD: {original_subject}"
|
||
msg['Date'] = formatdate(localtime=True)
|
||
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
|
||
msg['Reply-To'] = original_from
|
||
|
||
# Forward-Header als Text
|
||
text_body, html_body = extract_body_parts(original_parsed)
|
||
|
||
# Body-Teil
|
||
body_part = MIMEMultipart('alternative')
|
||
|
||
# Text-Version
|
||
fwd_text = "---------- Forwarded message ---------\n"
|
||
fwd_text += f"From: {original_from}\n"
|
||
fwd_text += f"Date: {original_date}\n"
|
||
fwd_text += f"Subject: {original_subject}\n"
|
||
fwd_text += f"To: {recipient}\n\n"
|
||
fwd_text += text_body
|
||
body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8'))
|
||
|
||
# HTML-Version
|
||
if html_body:
|
||
fwd_html = "<div style='border-left:3px solid #ccc;padding-left:10px;margin:10px 0;'>"
|
||
fwd_html += "<strong>---------- Forwarded message ---------</strong><br>"
|
||
fwd_html += f"<strong>From:</strong> {original_from}<br>"
|
||
fwd_html += f"<strong>Date:</strong> {original_date}<br>"
|
||
fwd_html += f"<strong>Subject:</strong> {original_subject}<br>"
|
||
fwd_html += f"<strong>To:</strong> {recipient}<br><br>"
|
||
fwd_html += html_body
|
||
fwd_html += "</div>"
|
||
body_part.attach(MIMEText(fwd_html, 'html', 'utf-8'))
|
||
|
||
msg.attach(body_part)
|
||
|
||
# WICHTIG: Attachments kopieren
|
||
if original_parsed.is_multipart():
|
||
for part in original_parsed.walk():
|
||
# Nur non-body parts (Attachments)
|
||
if part.get_content_maintype() == 'multipart':
|
||
continue
|
||
if part.get_content_type() in ['text/plain', 'text/html']:
|
||
continue # Body bereits oben behandelt
|
||
|
||
# Attachment hinzufügen
|
||
msg.attach(part)
|
||
|
||
return msg
|
||
|
||
# ============================================
|
||
# RULES PROCESSING (KORRIGIERT - wie domain-worker Schema)
|
||
# ============================================
|
||
|
||
def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name: str) -> None:
|
||
"""
|
||
Verarbeitet OOO und Forward-Regeln für einen Empfänger.
|
||
Schema wie in domain-worker: email_address als Key, ooo_active, forwards als Felder.
|
||
"""
|
||
if not DYNAMODB_AVAILABLE or not rules_table:
|
||
return
|
||
|
||
try:
|
||
# Regel aus DynamoDB laden (Schema: email_address als Partition Key)
|
||
response = rules_table.get_item(Key={'email_address': recipient})
|
||
rule = response.get('Item', {})
|
||
|
||
if not rule:
|
||
return
|
||
|
||
original_from = parsed.get('From', '')
|
||
original_subject = parsed.get('Subject', '(no subject)')
|
||
|
||
# ============================================
|
||
# OOO / Auto-Reply handling
|
||
# ============================================
|
||
if rule.get('ooo_active', False):
|
||
ooo_msg = rule.get('ooo_message', 'I am currently out of office.')
|
||
content_type = rule.get('ooo_content_type', 'text')
|
||
|
||
# Sender für Reply extrahieren (nur Email-Adresse)
|
||
sender_name, sender_addr = parseaddr(original_from)
|
||
if not sender_addr:
|
||
sender_addr = original_from
|
||
|
||
# Nicht auf automatische Mails antworten
|
||
auto_submitted = parsed.get('Auto-Submitted', '')
|
||
precedence = (parsed.get('Precedence') or '').lower()
|
||
|
||
if auto_submitted and auto_submitted != 'no':
|
||
log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name)
|
||
elif precedence in ['bulk', 'junk', 'list']:
|
||
log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name)
|
||
elif any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster']):
|
||
log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name)
|
||
else:
|
||
try:
|
||
ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type)
|
||
|
||
ses.send_raw_email(
|
||
Source=recipient,
|
||
Destinations=[sender_addr],
|
||
RawMessage={'Data': ooo_reply.as_bytes()}
|
||
)
|
||
log(f"✓ Sent OOO reply to {sender_addr} from {recipient}", 'SUCCESS', worker_name)
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
autoreplies_sent.labels(domain=domain).inc()
|
||
|
||
except ClientError as e:
|
||
error_code = e.response['Error']['Code']
|
||
log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR', worker_name)
|
||
|
||
# ============================================
|
||
# Forward handling
|
||
# ============================================
|
||
forwards = rule.get('forwards', [])
|
||
if forwards:
|
||
for forward_to in forwards:
|
||
try:
|
||
fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from)
|
||
|
||
ses.send_raw_email(
|
||
Source=recipient,
|
||
Destinations=[forward_to],
|
||
RawMessage={'Data': fwd_msg.as_bytes()}
|
||
)
|
||
log(f"✓ Forwarded to {forward_to} from {recipient}", 'SUCCESS', worker_name)
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
forwards_sent.labels(domain=domain).inc()
|
||
|
||
except ClientError as e:
|
||
error_code = e.response['Error']['Code']
|
||
log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR', worker_name)
|
||
|
||
except ClientError as e:
|
||
error_code = e.response['Error']['Code']
|
||
if error_code == 'ResourceNotFoundException':
|
||
pass # Keine Regel vorhanden - normal
|
||
else:
|
||
log(f"⚠ DynamoDB error for {recipient}: {e}", 'ERROR', worker_name)
|
||
except Exception as e:
|
||
log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING', worker_name)
|
||
traceback.print_exc()
|
||
|
||
# ============================================
|
||
# SMTP SENDING (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def is_permanent_recipient_error(error_msg: str) -> bool:
|
||
"""
|
||
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
|
||
"""
|
||
permanent_indicators = [
|
||
'550', # Mailbox unavailable / not found
|
||
'551', # User not local
|
||
'553', # Mailbox name not allowed / invalid
|
||
'mailbox not found',
|
||
'user unknown',
|
||
'no such user',
|
||
'recipient rejected',
|
||
'does not exist',
|
||
'invalid recipient',
|
||
'unknown user'
|
||
]
|
||
|
||
error_lower = error_msg.lower()
|
||
return any(indicator in error_lower for indicator in permanent_indicators)
|
||
|
||
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]:
|
||
"""
|
||
Sendet E-Mail via SMTP an EINEN Empfänger
|
||
Mit Retry-Logik bei Connection-Fehlern
|
||
Returns: (success: bool, error: str or None, is_permanent: bool)
|
||
"""
|
||
last_error = None
|
||
|
||
for attempt in range(max_retries + 1):
|
||
smtp_conn = smtp_pool.get_connection()
|
||
|
||
if not smtp_conn:
|
||
last_error = "Could not get SMTP connection"
|
||
log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||
time.sleep(0.5)
|
||
continue
|
||
|
||
try:
|
||
result = smtp_conn.sendmail(from_addr, [recipient], raw_message)
|
||
|
||
# Connection war erfolgreich, zurück in Pool
|
||
smtp_pool.return_connection(smtp_conn)
|
||
|
||
if isinstance(result, dict) and result:
|
||
error = str(result.get(recipient, 'Unknown refusal'))
|
||
is_permanent = is_permanent_recipient_error(error)
|
||
log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name)
|
||
return False, error, is_permanent
|
||
else:
|
||
log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name)
|
||
return True, None, False
|
||
|
||
except smtplib.SMTPServerDisconnected as e:
|
||
# Connection wurde geschlossen - Retry mit neuer Connection
|
||
log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||
last_error = str(e)
|
||
# Connection nicht zurückgeben (ist kaputt)
|
||
try:
|
||
smtp_conn.quit()
|
||
except:
|
||
pass
|
||
time.sleep(0.3)
|
||
continue
|
||
|
||
except smtplib.SMTPRecipientsRefused as e:
|
||
smtp_pool.return_connection(smtp_conn)
|
||
error_msg = str(e)
|
||
is_permanent = is_permanent_recipient_error(error_msg)
|
||
log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
|
||
return False, error_msg, is_permanent
|
||
|
||
except smtplib.SMTPException as e:
|
||
error_msg = str(e)
|
||
# Bei Connection-Fehlern: Retry
|
||
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
|
||
log(f" ⚠ {recipient}: SMTP connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||
last_error = error_msg
|
||
try:
|
||
smtp_conn.quit()
|
||
except:
|
||
pass
|
||
time.sleep(0.3)
|
||
continue
|
||
|
||
smtp_pool.return_connection(smtp_conn)
|
||
is_permanent = is_permanent_recipient_error(error_msg)
|
||
log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name)
|
||
return False, error_msg, is_permanent
|
||
|
||
except Exception as e:
|
||
# Unbekannter Fehler - Connection verwerfen, aber nicht permanent
|
||
try:
|
||
smtp_conn.quit()
|
||
except:
|
||
pass
|
||
log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name)
|
||
return False, str(e), False
|
||
|
||
# Alle Retries fehlgeschlagen
|
||
log(f" ✗ {recipient}: All retries failed - {last_error}", 'ERROR', worker_name)
|
||
return False, last_error or "Connection failed after retries", False
|
||
|
||
# ============================================
|
||
# S3 METADATA UPDATES (wie in domain-worker)
|
||
# ============================================
|
||
|
||
def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: List[str] = None):
|
||
"""Markiert E-Mail als erfolgreich zugestellt"""
|
||
try:
|
||
head = s3.head_object(Bucket=bucket, Key=key)
|
||
metadata = head.get('Metadata', {}) or {}
|
||
|
||
metadata['processed'] = 'true'
|
||
metadata['processed_at'] = str(int(time.time()))
|
||
metadata['processed_by'] = worker_name
|
||
metadata['status'] = 'delivered'
|
||
metadata.pop('processing_started', None)
|
||
metadata.pop('queued_at', None)
|
||
|
||
if invalid_inboxes:
|
||
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
|
||
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
|
||
|
||
s3.copy_object(
|
||
Bucket=bucket,
|
||
Key=key,
|
||
CopySource={'Bucket': bucket, 'Key': key},
|
||
Metadata=metadata,
|
||
MetadataDirective='REPLACE'
|
||
)
|
||
|
||
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS', worker_name)
|
||
|
||
except Exception as e:
|
||
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
|
||
|
||
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worker_name: str):
|
||
"""Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind"""
|
||
try:
|
||
head = s3.head_object(Bucket=bucket, Key=key)
|
||
metadata = head.get('Metadata', {}) or {}
|
||
|
||
metadata['processed'] = 'true'
|
||
metadata['processed_at'] = str(int(time.time()))
|
||
metadata['processed_by'] = worker_name
|
||
metadata['status'] = 'failed'
|
||
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
|
||
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
|
||
metadata.pop('processing_started', None)
|
||
metadata.pop('queued_at', None)
|
||
|
||
s3.copy_object(
|
||
Bucket=bucket,
|
||
Key=key,
|
||
CopySource={'Bucket': bucket, 'Key': key},
|
||
Metadata=metadata,
|
||
MetadataDirective='REPLACE'
|
||
)
|
||
|
||
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS', worker_name)
|
||
|
||
except Exception as e:
|
||
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
|
||
|
||
# ============================================
|
||
# MAIN MESSAGE PROCESSING
|
||
# ============================================
|
||
|
||
def process_message(domain: str, message: dict, receive_count: int) -> bool:
|
||
"""
|
||
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
|
||
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
|
||
"""
|
||
worker_name = f"worker-{domain}"
|
||
|
||
try:
|
||
# 1. UNPACKING (SNS -> SES)
|
||
message_body = json.loads(message['Body'])
|
||
|
||
if 'Message' in message_body and 'Type' in message_body:
|
||
# Es ist eine SNS Notification
|
||
sns_content = message_body['Message']
|
||
if isinstance(sns_content, str):
|
||
ses_msg = json.loads(sns_content)
|
||
else:
|
||
ses_msg = sns_content
|
||
else:
|
||
ses_msg = message_body
|
||
|
||
# 2. DATEN EXTRAHIEREN
|
||
mail = ses_msg.get('mail', {})
|
||
receipt = ses_msg.get('receipt', {})
|
||
|
||
message_id = mail.get('messageId')
|
||
|
||
# FIX: Amazon SES Setup Notification ignorieren
|
||
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
|
||
log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
|
||
return True
|
||
|
||
from_addr = mail.get('source')
|
||
recipients = receipt.get('recipients', [])
|
||
|
||
if not message_id:
|
||
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
|
||
return True
|
||
|
||
# Domain Validation
|
||
if recipients:
|
||
first_recipient = recipients[0]
|
||
recipient_domain = first_recipient.split('@')[1]
|
||
|
||
if recipient_domain.lower() != domain.lower():
|
||
log(f"⚠ Security: Ignored message for {recipient_domain} (I am worker for {domain})", 'WARNING', worker_name)
|
||
return True
|
||
else:
|
||
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
|
||
return True
|
||
|
||
bucket = domain_to_bucket_name(domain)
|
||
key = message_id
|
||
|
||
log(f"\n{'='*70}", 'INFO', worker_name)
|
||
log(f"Processing Email (SNS/SES):", 'INFO', worker_name)
|
||
log(f" ID: {key}", 'INFO', worker_name)
|
||
log(f" Recipients: {len(recipients)} -> {recipients}", 'INFO', worker_name)
|
||
log(f" Bucket: {bucket}", 'INFO', worker_name)
|
||
|
||
# 3. LADEN AUS S3
|
||
try:
|
||
response = s3.get_object(Bucket=bucket, Key=key)
|
||
raw_bytes = response['Body'].read()
|
||
log(f"✓ Loaded {len(raw_bytes)} bytes from S3", 'INFO', worker_name)
|
||
except s3.exceptions.NoSuchKey:
|
||
if receive_count < 5:
|
||
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
|
||
return False
|
||
else:
|
||
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
|
||
return True
|
||
except ClientError as e:
|
||
if e.response['Error']['Code'] == 'NoSuchKey':
|
||
if receive_count < 5:
|
||
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
|
||
return False
|
||
else:
|
||
log(f"❌ S3 Object missing permanently after retries.", 'ERROR', worker_name)
|
||
return True
|
||
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
|
||
return False
|
||
except Exception as e:
|
||
log(f"❌ S3 Download Error: {e}", 'ERROR', worker_name)
|
||
return False
|
||
|
||
# 4. PARSING & BOUNCE LOGIC
|
||
try:
|
||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
||
subject = parsed.get('Subject', '(no subject)')
|
||
|
||
# Bounce Header umschreiben (setzt auch is_bounce intern)
|
||
is_bounce = is_ses_bounce_notification(parsed)
|
||
parsed, modified = apply_bounce_logic(parsed, subject, worker_name)
|
||
|
||
if modified:
|
||
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
|
||
raw_bytes = parsed.as_bytes()
|
||
from_addr_final = parsed.get('From')
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
bounces_processed.labels(domain=domain, type='rewritten').inc()
|
||
else:
|
||
from_addr_final = from_addr
|
||
|
||
except Exception as e:
|
||
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name)
|
||
from_addr_final = from_addr
|
||
is_bounce = False
|
||
|
||
# 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces)
|
||
if not is_bounce:
|
||
for recipient in recipients:
|
||
process_rules_for_recipient(recipient, parsed, domain, worker_name)
|
||
|
||
# 6. SMTP VERSAND
|
||
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
|
||
|
||
successful = []
|
||
failed_permanent = []
|
||
failed_temporary = []
|
||
|
||
for recipient in recipients:
|
||
success, error, is_perm = send_email_to_recipient(from_addr_final, recipient, raw_bytes, worker_name)
|
||
|
||
if success:
|
||
successful.append(recipient)
|
||
if PROMETHEUS_ENABLED:
|
||
emails_processed.labels(domain=domain, status='success').inc()
|
||
elif is_perm:
|
||
failed_permanent.append(recipient)
|
||
if PROMETHEUS_ENABLED:
|
||
emails_processed.labels(domain=domain, status='permanent_failure').inc()
|
||
else:
|
||
failed_temporary.append(recipient)
|
||
if PROMETHEUS_ENABLED:
|
||
emails_processed.labels(domain=domain, status='temporary_failure').inc()
|
||
|
||
# 7. RESULTAT & CLEANUP
|
||
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name)
|
||
|
||
if len(successful) > 0:
|
||
mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None)
|
||
log(f"✅ Success. Deleted from queue.", 'INFO', worker_name)
|
||
return True
|
||
|
||
elif len(failed_permanent) == len(recipients):
|
||
mark_as_all_invalid(bucket, key, failed_permanent, worker_name)
|
||
log(f"🛑 All recipients invalid. Deleted from queue.", 'INFO', worker_name)
|
||
return True
|
||
|
||
else:
|
||
log(f"🔄 Temporary failures. Keeping in queue.", 'INFO', worker_name)
|
||
return False
|
||
|
||
except Exception as e:
|
||
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
# ============================================
|
||
# DOMAIN POLLER
|
||
# ============================================
|
||
|
||
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
||
"""Poll single domain's queue continuously"""
|
||
worker_name = f"worker-{domain}"
|
||
log(f"🚀 Starting poller for {domain}", 'INFO', worker_name)
|
||
|
||
messages_processed = 0
|
||
last_activity = time.time()
|
||
|
||
while not stop_event.is_set():
|
||
try:
|
||
response = sqs.receive_message(
|
||
QueueUrl=queue_url,
|
||
MaxNumberOfMessages=config.max_messages,
|
||
WaitTimeSeconds=config.poll_interval,
|
||
VisibilityTimeout=config.visibility_timeout,
|
||
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
|
||
)
|
||
|
||
messages = response.get('Messages', [])
|
||
|
||
# Update queue size metric
|
||
if PROMETHEUS_ENABLED:
|
||
try:
|
||
attrs = sqs.get_queue_attributes(
|
||
QueueUrl=queue_url,
|
||
AttributeNames=['ApproximateNumberOfMessages']
|
||
)
|
||
queue_size.labels(domain=domain).set(
|
||
int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
|
||
)
|
||
except:
|
||
pass
|
||
|
||
if not messages:
|
||
if time.time() - last_activity > 60:
|
||
log(f"Waiting for messages... (processed: {messages_processed})", 'INFO', worker_name)
|
||
last_activity = time.time()
|
||
continue
|
||
|
||
log(f"\n✉ Received {len(messages)} message(s) from queue", 'INFO', worker_name)
|
||
last_activity = time.time()
|
||
|
||
for message in messages:
|
||
if stop_event.is_set():
|
||
break
|
||
|
||
receipt_handle = message['ReceiptHandle']
|
||
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
|
||
|
||
sent_timestamp = int(message.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
|
||
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
|
||
|
||
if queue_time > 0:
|
||
log(f"Message was in queue for {queue_time}s", 'INFO', worker_name)
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
emails_in_flight.inc()
|
||
start_time = time.time()
|
||
|
||
try:
|
||
success = process_message(domain, message, receive_count)
|
||
|
||
if success:
|
||
sqs.delete_message(
|
||
QueueUrl=queue_url,
|
||
ReceiptHandle=receipt_handle
|
||
)
|
||
log("✓ Message deleted from queue", 'INFO', worker_name)
|
||
messages_processed += 1
|
||
else:
|
||
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)", 'WARNING', worker_name)
|
||
|
||
except json.JSONDecodeError as e:
|
||
log(f"✗ Invalid message format: {e}", 'ERROR', worker_name)
|
||
sqs.delete_message(
|
||
QueueUrl=queue_url,
|
||
ReceiptHandle=receipt_handle
|
||
)
|
||
|
||
except Exception as e:
|
||
log(f"✗ Error processing message: {e}", 'ERROR', worker_name)
|
||
traceback.print_exc()
|
||
|
||
finally:
|
||
if PROMETHEUS_ENABLED:
|
||
emails_in_flight.dec()
|
||
processing_time.labels(domain=domain).observe(time.time() - start_time)
|
||
|
||
except Exception as e:
|
||
log(f"✗ Error polling {domain}: {e}", 'ERROR', worker_name)
|
||
time.sleep(5)
|
||
|
||
log(f"👋 Stopped poller for {domain} (processed: {messages_processed})", 'INFO', worker_name)
|
||
|
||
# ============================================
|
||
# UNIFIED WORKER
|
||
# ============================================
|
||
|
||
class UnifiedWorker:
|
||
"""Main worker coordinating all domain pollers"""
|
||
|
||
def __init__(self):
|
||
self.stop_event = threading.Event()
|
||
self.executor: Optional[ThreadPoolExecutor] = None
|
||
self.domains: List[str] = []
|
||
self.queue_urls: Dict[str, str] = {}
|
||
self.active_pollers: Dict[str, threading.Thread] = {}
|
||
self.poller_lock = threading.Lock()
|
||
self.reload_interval = int(os.environ.get('DOMAIN_RELOAD_INTERVAL', '60')) # Sekunden
|
||
|
||
def setup(self):
|
||
"""Initialize worker"""
|
||
self.domains = load_domains()
|
||
|
||
if not self.domains:
|
||
log("❌ No domains configured!", 'ERROR')
|
||
sys.exit(1)
|
||
|
||
# Get queue URLs
|
||
for domain in self.domains:
|
||
url = get_queue_url(domain)
|
||
if url:
|
||
self.queue_urls[domain] = url
|
||
log(f" ✓ {domain} -> {domain_to_queue_name(domain)}")
|
||
else:
|
||
log(f" ✗ {domain} -> Queue not found!", 'WARNING')
|
||
|
||
if not self.queue_urls:
|
||
log("❌ No valid queues found!", 'ERROR')
|
||
sys.exit(1)
|
||
|
||
# Initialize SMTP pool
|
||
smtp_pool.initialize()
|
||
|
||
log(f"Initialized with {len(self.queue_urls)} domains")
|
||
|
||
def start(self):
|
||
"""Start all domain pollers with hot-reload support"""
|
||
# Initial pollers starten
|
||
for domain, queue_url in self.queue_urls.items():
|
||
self._start_poller(domain, queue_url)
|
||
|
||
log(f"Started {len(self.active_pollers)} domain pollers")
|
||
|
||
# Domain reload thread starten
|
||
reload_thread = threading.Thread(
|
||
target=self._domain_reload_loop,
|
||
name='domain-reloader',
|
||
daemon=True
|
||
)
|
||
reload_thread.start()
|
||
log(f"Domain hot-reload enabled (checking every {self.reload_interval}s)")
|
||
|
||
# Warten bis Shutdown
|
||
try:
|
||
while not self.stop_event.is_set():
|
||
self.stop_event.wait(timeout=1)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
|
||
def _start_poller(self, domain: str, queue_url: str):
|
||
"""Start a single domain poller thread"""
|
||
with self.poller_lock:
|
||
if domain in self.active_pollers:
|
||
return # Already running
|
||
|
||
thread = threading.Thread(
|
||
target=poll_domain,
|
||
args=(domain, queue_url, self.stop_event),
|
||
name=f"poller-{domain}",
|
||
daemon=True
|
||
)
|
||
thread.start()
|
||
self.active_pollers[domain] = thread
|
||
log(f"🚀 Started poller for {domain}", 'INFO', f"worker-{domain}")
|
||
|
||
def _domain_reload_loop(self):
|
||
"""Periodically check for new domains and start pollers"""
|
||
while not self.stop_event.is_set():
|
||
self.stop_event.wait(timeout=self.reload_interval)
|
||
|
||
if self.stop_event.is_set():
|
||
break
|
||
|
||
try:
|
||
# Aktuelle Domains laden
|
||
current_domains = load_domains()
|
||
|
||
# Neue Domains finden
|
||
new_domains = set(current_domains) - set(self.queue_urls.keys())
|
||
|
||
if new_domains:
|
||
log(f"🔄 Hot-reload: Found {len(new_domains)} new domain(s): {', '.join(new_domains)}")
|
||
|
||
for domain in new_domains:
|
||
queue_url = get_queue_url(domain)
|
||
if queue_url:
|
||
self.queue_urls[domain] = queue_url
|
||
self._start_poller(domain, queue_url)
|
||
else:
|
||
log(f"⚠ Could not find queue for new domain: {domain}", 'WARNING')
|
||
|
||
# Cleanup: Entfernte Domains (optional - Threads laufen weiter bis restart)
|
||
removed_domains = set(self.queue_urls.keys()) - set(current_domains)
|
||
if removed_domains:
|
||
log(f"ℹ Domains removed from config (will stop on restart): {', '.join(removed_domains)}")
|
||
|
||
except Exception as e:
|
||
log(f"Error in domain reload: {e}", 'ERROR')
|
||
|
||
def stop(self):
|
||
"""Stop gracefully"""
|
||
log("⚠ Stopping worker...")
|
||
self.stop_event.set()
|
||
|
||
# Warten auf Poller-Threads (max 10 Sekunden)
|
||
with self.poller_lock:
|
||
for domain, thread in self.active_pollers.items():
|
||
thread.join(timeout=10)
|
||
if thread.is_alive():
|
||
log(f"Warning: Poller for {domain} did not stop gracefully", 'WARNING')
|
||
|
||
smtp_pool.close_all()
|
||
log("👋 Worker stopped")
|
||
|
||
# ============================================
|
||
# HEALTH CHECK SERVER
|
||
# ============================================
|
||
|
||
def start_health_server(worker: UnifiedWorker):
|
||
"""HTTP health endpoint"""
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
|
||
class HealthHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
if self.path == '/health' or self.path == '/':
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.end_headers()
|
||
|
||
status = {
|
||
'status': 'healthy',
|
||
'domains': len(worker.queue_urls),
|
||
'domain_list': list(worker.queue_urls.keys()),
|
||
'dynamodb': DYNAMODB_AVAILABLE,
|
||
'features': {
|
||
'bounce_rewriting': True,
|
||
'auto_reply': DYNAMODB_AVAILABLE,
|
||
'forwarding': DYNAMODB_AVAILABLE
|
||
},
|
||
'timestamp': datetime.utcnow().isoformat()
|
||
}
|
||
self.wfile.write(json.dumps(status, indent=2).encode())
|
||
|
||
elif self.path == '/domains':
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode())
|
||
|
||
else:
|
||
self.send_response(404)
|
||
self.end_headers()
|
||
|
||
def log_message(self, format, *args):
|
||
pass # Suppress HTTP logs
|
||
|
||
server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler)
|
||
thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server')
|
||
thread.start()
|
||
log(f"Health server on port {config.health_port}")
|
||
|
||
# ============================================
|
||
# ENTRY POINT
|
||
# ============================================
|
||
|
||
# Global flag to prevent double execution
|
||
_worker_started = False
|
||
|
||
def main():
|
||
global _worker_started
|
||
if _worker_started:
|
||
log("⚠ Worker already started, ignoring duplicate call", 'WARNING')
|
||
return
|
||
_worker_started = True
|
||
|
||
worker = UnifiedWorker()
|
||
|
||
def signal_handler(signum, frame):
|
||
log(f"Received signal {signum}")
|
||
worker.stop()
|
||
sys.exit(0)
|
||
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
worker.setup()
|
||
|
||
if PROMETHEUS_ENABLED:
|
||
start_http_server(config.metrics_port)
|
||
log(f"Prometheus metrics on port {config.metrics_port}")
|
||
|
||
start_health_server(worker)
|
||
|
||
log(f"\n{'='*70}")
|
||
log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)")
|
||
log(f"{'='*70}")
|
||
log(f" Domains: {len(worker.queue_urls)}")
|
||
log(f" Hot-Reload: Every {worker.reload_interval}s")
|
||
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
|
||
log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}")
|
||
log(f" Poll Interval: {config.poll_interval}s")
|
||
log(f" Visibility: {config.visibility_timeout}s")
|
||
log(f"")
|
||
log(f" Features:")
|
||
log(f" ✓ Bounce Detection & Header Rewriting")
|
||
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office")
|
||
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding")
|
||
log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics")
|
||
log(f"")
|
||
log(f" Active Domains:")
|
||
for domain in sorted(worker.queue_urls.keys()):
|
||
log(f" • {domain}")
|
||
log(f"{'='*70}\n")
|
||
|
||
worker.start()
|
||
|
||
if __name__ == '__main__':
|
||
main() |