Compare commits

..

2 Commits

Author SHA1 Message Date
Andreas Knuth 688d49e218 remove python worker 2026-03-13 20:12:52 -05:00
Andreas Knuth 6016fbe13d remove version 2026-03-13 20:11:35 -05:00
38 changed files with 0 additions and 4916 deletions

View File

@ -1,5 +1,3 @@
version: "3.8"
services: services:
email-worker: email-worker:
build: . build: .

View File

@ -1,38 +0,0 @@
# Documentation
*.md
!README.md
# Git
.git
.gitignore
# Python
__pycache__
*.pyc
*.pyo
*.pyd
.Python
*.so
# Logs
logs/
*.log
# Environment
.env
.env.example
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Build
*.tar.gz
dist/
build/

View File

@ -1,42 +0,0 @@
# AWS Credentials
AWS_REGION=us-east-2
AWS_ACCESS_KEY_ID=your_access_key_here
AWS_SECRET_ACCESS_KEY=your_secret_key_here
# Domains Configuration
DOMAINS=example.com,another.com
# Alternative: Use domains file
# DOMAINS_FILE=/etc/email-worker/domains.txt
# Worker Settings
WORKER_THREADS=10
POLL_INTERVAL=20
MAX_MESSAGES=10
VISIBILITY_TIMEOUT=300
# SMTP Configuration
SMTP_HOST=localhost
SMTP_PORT=25
SMTP_USE_TLS=false
SMTP_USER=
SMTP_PASS=
SMTP_POOL_SIZE=5
INTERNAL_SMTP_PORT=2525
# LMTP Configuration (Optional)
LMTP_ENABLED=false
LMTP_HOST=localhost
LMTP_PORT=24
# DynamoDB Tables
DYNAMODB_RULES_TABLE=email-rules
DYNAMODB_MESSAGES_TABLE=ses-outbound-messages
DYNAMODB_BLOCKED_TABLE=email-blocked-senders
# Bounce Handling
BOUNCE_LOOKUP_RETRIES=3
BOUNCE_LOOKUP_DELAY=1.0
# Monitoring Ports
METRICS_PORT=8000
HEALTH_PORT=8080

View File

@ -1,35 +0,0 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
.venv
# Logs
*.log
# Environment
.env
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Build
dist/
build/
*.egg-info/
# Archives
*.tar.gz
*.zip

View File

@ -1,37 +0,0 @@
FROM python:3.11-slim
LABEL maintainer="andreas@knuth.dev"
LABEL description="Unified multi-domain email worker (modular version)"
# System packages
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Non-root user
RUN useradd -m -u 1000 worker && \
mkdir -p /app /var/log/email-worker /etc/email-worker && \
chown -R worker:worker /app /var/log/email-worker /etc/email-worker
# Python dependencies
COPY requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
# Worker code (all modules)
COPY --chown=worker:worker aws/ /app/aws/
COPY --chown=worker:worker email_processing/ /app/email_processing/
COPY --chown=worker:worker smtp/ /app/smtp/
COPY --chown=worker:worker metrics/ /app/metrics/
COPY --chown=worker:worker *.py /app/
WORKDIR /app
USER worker
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Unbuffered output
ENV PYTHONUNBUFFERED=1
CMD ["python3", "main.py"]

View File

@ -1,50 +0,0 @@
.PHONY: help install run test lint clean docker-build docker-run docker-stop docker-logs
help:
@echo "Available commands:"
@echo " make install - Install dependencies"
@echo " make run - Run worker locally"
@echo " make test - Run tests (TODO)"
@echo " make lint - Run linting"
@echo " make clean - Clean up files"
@echo " make docker-build - Build Docker image"
@echo " make docker-run - Run with docker-compose"
@echo " make docker-stop - Stop docker-compose"
@echo " make docker-logs - Show docker logs"
install:
pip install -r requirements.txt
run:
python3 main.py
test:
@echo "TODO: Add tests"
# python3 -m pytest tests/
lint:
@echo "Running pylint..."
-pylint --rcfile=.pylintrc *.py **/*.py 2>/dev/null || echo "pylint not installed"
@echo "Running flake8..."
-flake8 --max-line-length=120 . 2>/dev/null || echo "flake8 not installed"
clean:
find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find . -type f -name "*.pyc" -delete
find . -type f -name "*.pyo" -delete
find . -type f -name "*.log" -delete
docker-build:
docker build -t unified-email-worker:latest .
docker-run:
docker-compose up -d
docker-stop:
docker-compose down
docker-logs:
docker-compose logs -f email-worker
docker-restart: docker-stop docker-build docker-run
@echo "Worker restarted"

View File

@ -1,11 +0,0 @@
#!/usr/bin/env python3
"""
AWS service handlers
"""
from .s3_handler import S3Handler
from .sqs_handler import SQSHandler
from .ses_handler import SESHandler
from .dynamodb_handler import DynamoDBHandler
__all__ = ['S3Handler', 'SQSHandler', 'SESHandler', 'DynamoDBHandler']

View File

@ -1,192 +0,0 @@
#!/usr/bin/env python3
"""
DynamoDB operations handler
"""
import time
from typing import Optional, Dict, Any, List
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config
class DynamoDBHandler:
"""Handles all DynamoDB operations"""
def __init__(self):
self.resource = boto3.resource('dynamodb', region_name=config.aws_region)
self.available = False
self.rules_table = None
self.messages_table = None
self.blocked_table = None
self._initialize_tables()
def _initialize_tables(self):
"""Initialize DynamoDB table connections"""
try:
self.rules_table = self.resource.Table(config.rules_table)
self.messages_table = self.resource.Table(config.messages_table)
self.blocked_table = self.resource.Table(config.blocked_table)
# Test connection
self.rules_table.table_status
self.messages_table.table_status
self.blocked_table.table_status
self.available = True
log("✓ DynamoDB tables connected successfully")
except Exception as e:
log(f"⚠ DynamoDB not fully available: {e}", 'WARNING')
self.available = False
def get_email_rules(self, email_address: str) -> Optional[Dict[str, Any]]:
"""
Get email rules for recipient (OOO, Forwarding)
Args:
email_address: Recipient email address
Returns:
Rule dictionary or None if not found
"""
if not self.available or not self.rules_table:
return None
try:
response = self.rules_table.get_item(Key={'email_address': email_address})
return response.get('Item')
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceNotFoundException':
log(f"⚠ DynamoDB error for {email_address}: {e}", 'ERROR')
return None
except Exception as e:
log(f"⚠ DynamoDB error for {email_address}: {e}", 'WARNING')
return None
def get_bounce_info(self, message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
"""
Get bounce information from DynamoDB with retry logic
Args:
message_id: SES Message ID
worker_name: Worker name for logging
Returns:
Bounce info dictionary or None
"""
if not self.available or not self.messages_table:
return None
for attempt in range(config.bounce_lookup_retries):
try:
response = self.messages_table.get_item(Key={'MessageId': message_id})
item = response.get('Item')
if item:
return {
'original_source': item.get('original_source', ''),
'bounceType': item.get('bounceType', 'Unknown'),
'bounceSubType': item.get('bounceSubType', 'Unknown'),
'bouncedRecipients': item.get('bouncedRecipients', []),
'timestamp': item.get('timestamp', '')
}
if attempt < config.bounce_lookup_retries - 1:
log(
f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s "
f"(attempt {attempt + 1}/{config.bounce_lookup_retries})...",
'INFO',
worker_name
)
time.sleep(config.bounce_lookup_delay)
else:
log(
f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts "
f"for Message-ID: {message_id}",
'WARNING',
worker_name
)
return None
except Exception as e:
log(
f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}",
'ERROR',
worker_name
)
if attempt < config.bounce_lookup_retries - 1:
time.sleep(config.bounce_lookup_delay)
else:
return None
return None
def get_blocked_patterns(self, email_address: str) -> List[str]:
"""
Get blocked sender patterns for recipient
Args:
email_address: Recipient email address
Returns:
List of blocked patterns (may include wildcards)
"""
if not self.available or not self.blocked_table:
return []
try:
response = self.blocked_table.get_item(Key={'email_address': email_address})
item = response.get('Item', {})
return item.get('blocked_patterns', [])
except Exception as e:
log(f"⚠ Error getting block list for {email_address}: {e}", 'ERROR')
return []
def batch_get_blocked_patterns(self, email_addresses: List[str]) -> Dict[str, List[str]]:
"""
Batch get blocked patterns for multiple recipients (more efficient)
Args:
email_addresses: List of recipient email addresses
Returns:
Dictionary mapping email_address -> list of blocked patterns
"""
if not self.available or not self.blocked_table:
return {addr: [] for addr in email_addresses}
try:
# DynamoDB BatchGetItem
keys = [{'email_address': addr} for addr in email_addresses]
response = self.resource.batch_get_item(
RequestItems={
config.blocked_table: {'Keys': keys}
}
)
items = response.get('Responses', {}).get(config.blocked_table, [])
# Build result dictionary
result = {}
for email_address in email_addresses:
matching_item = next(
(item for item in items if item['email_address'] == email_address),
None
)
if matching_item:
result[email_address] = matching_item.get('blocked_patterns', [])
else:
result[email_address] = []
return result
except Exception as e:
log(f"⚠ Batch blocklist check error: {e}", 'ERROR')
return {addr: [] for addr in email_addresses}

View File

@ -1,193 +0,0 @@
#!/usr/bin/env python3
"""
S3 operations handler
"""
import time
from typing import Optional, List
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config, domain_to_bucket_name
class S3Handler:
"""Handles all S3 operations"""
def __init__(self):
self.client = boto3.client('s3', region_name=config.aws_region)
def get_email(self, domain: str, message_id: str, receive_count: int) -> Optional[bytes]:
"""
Download email from S3
Args:
domain: Email domain
message_id: SES Message ID
receive_count: Number of times this message was received from queue
Returns:
Raw email bytes or None if not found/error
"""
bucket = domain_to_bucket_name(domain)
try:
response = self.client.get_object(Bucket=bucket, Key=message_id)
return response['Body'].read()
except self.client.exceptions.NoSuchKey:
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return None
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
raise
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return None
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
raise
log(f"❌ S3 Download Error: {e}", 'ERROR')
raise
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR')
raise
def mark_as_processed(
self,
domain: str,
message_id: str,
worker_name: str,
invalid_inboxes: Optional[List[str]] = None
):
"""Mark email as successfully delivered"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
def mark_as_all_invalid(
self,
domain: str,
message_id: str,
invalid_inboxes: List[str],
worker_name: str
):
"""Mark email as failed because all recipients are invalid"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
def mark_as_blocked(
self,
domain: str,
message_id: str,
blocked_recipients: List[str],
sender: str,
worker_name: str
):
"""
Mark email as blocked by sender blacklist
This sets metadata BEFORE deletion for audit trail
"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'blocked'
metadata['blocked_recipients'] = ','.join(blocked_recipients)
metadata['blocked_sender'] = sender
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked as blocked in S3 metadata", 'INFO', worker_name)
except Exception as e:
log(f"⚠ Failed to mark as blocked: {e}", 'ERROR', worker_name)
raise
def delete_blocked_email(
self,
domain: str,
message_id: str,
worker_name: str
):
"""
Delete email after marking as blocked
Only call this after mark_as_blocked() succeeded
"""
bucket = domain_to_bucket_name(domain)
try:
self.client.delete_object(Bucket=bucket, Key=message_id)
log(f"🗑 Deleted blocked email from S3", 'SUCCESS', worker_name)
except Exception as e:
log(f"⚠ Failed to delete blocked email: {e}", 'ERROR', worker_name)
raise

View File

@ -1,53 +0,0 @@
#!/usr/bin/env python3
"""
SES operations handler
"""
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config
class SESHandler:
"""Handles all SES operations"""
def __init__(self):
self.client = boto3.client('ses', region_name=config.aws_region)
def send_raw_email(
self,
source: str,
destination: str,
raw_message: bytes,
worker_name: str
) -> bool:
"""
Send raw email via SES
Args:
source: From address
destination: To address
raw_message: Raw MIME message bytes
worker_name: Worker name for logging
Returns:
True if sent successfully, False otherwise
"""
try:
self.client.send_raw_email(
Source=source,
Destinations=[destination],
RawMessage={'Data': raw_message}
)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
log(f"⚠ SES send failed to {destination} ({error_code}): {e}", 'ERROR', worker_name)
return False
except Exception as e:
log(f"⚠ SES send failed to {destination}: {e}", 'ERROR', worker_name)
return False

View File

@ -1,103 +0,0 @@
#!/usr/bin/env python3
"""
SQS operations handler
"""
from typing import Optional, List, Dict, Any
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config, domain_to_queue_name
class SQSHandler:
"""Handles all SQS operations"""
def __init__(self):
self.client = boto3.client('sqs', region_name=config.aws_region)
def get_queue_url(self, domain: str) -> Optional[str]:
"""
Get SQS queue URL for domain
Args:
domain: Email domain
Returns:
Queue URL or None if not found
"""
queue_name = domain_to_queue_name(domain)
try:
response = self.client.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
log(f"Queue not found for domain: {domain}", 'WARNING')
else:
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
return None
def receive_messages(self, queue_url: str) -> List[Dict[str, Any]]:
"""
Receive messages from queue
Args:
queue_url: SQS Queue URL
Returns:
List of message dictionaries
"""
try:
response = self.client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=config.max_messages,
WaitTimeSeconds=config.poll_interval,
VisibilityTimeout=config.visibility_timeout,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
)
return response.get('Messages', [])
except Exception as e:
log(f"Error receiving messages: {e}", 'ERROR')
return []
def delete_message(self, queue_url: str, receipt_handle: str):
"""
Delete message from queue
Args:
queue_url: SQS Queue URL
receipt_handle: Message receipt handle
"""
try:
self.client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"Error deleting message: {e}", 'ERROR')
raise
def get_queue_size(self, queue_url: str) -> int:
"""
Get approximate number of messages in queue
Args:
queue_url: SQS Queue URL
Returns:
Number of messages (0 if error)
"""
try:
attrs = self.client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['ApproximateNumberOfMessages']
)
return int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
except Exception:
return 0

View File

@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""
Configuration management for unified email worker
"""
import os
from dataclasses import dataclass
from typing import Set
@dataclass
class Config:
"""Worker Configuration"""
# AWS
aws_region: str = os.environ.get('AWS_REGION', 'us-east-2')
# Domains to process
domains_list: str = os.environ.get('DOMAINS', '')
domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt')
# Worker Settings
worker_threads: int = int(os.environ.get('WORKER_THREADS', '10'))
poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20'))
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP for delivery
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
smtp_user: str = os.environ.get('SMTP_USER', '')
smtp_pass: str = os.environ.get('SMTP_PASS', '')
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
# Internal SMTP (bypasses transport_maps)
internal_smtp_port: int = int(os.environ.get('INTERNAL_SMTP_PORT', '2525'))
# LMTP for local delivery (bypasses Postfix transport_maps)
lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true'
lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost')
lmtp_port: int = int(os.environ.get('LMTP_PORT', '24'))
# DynamoDB Tables
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
blocked_table: str = os.environ.get('DYNAMODB_BLOCKED_TABLE', 'email-blocked-senders')
# Bounce Handling
bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3'))
bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0'))
# Monitoring
metrics_port: int = int(os.environ.get('METRICS_PORT', '8000'))
health_port: int = int(os.environ.get('HEALTH_PORT', '8080'))
# Global configuration instance
config = Config()
# Global set of managed domains (populated at startup)
MANAGED_DOMAINS: Set[str] = set()
def load_domains() -> list[str]:
"""Load domains from config and populate MANAGED_DOMAINS global"""
global MANAGED_DOMAINS
domains = []
if config.domains_list:
domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()])
if os.path.exists(config.domains_file):
with open(config.domains_file, 'r') as f:
for line in f:
domain = line.strip()
if domain and not domain.startswith('#'):
domains.append(domain)
domains = list(set(domains))
MANAGED_DOMAINS = set(d.lower() for d in domains)
return domains
def is_internal_address(email_address: str) -> bool:
"""Check if email address belongs to one of our managed domains"""
if '@' not in email_address:
return False
domain = email_address.split('@')[1].lower()
return domain in MANAGED_DOMAINS
def domain_to_queue_name(domain: str) -> str:
"""Convert domain to SQS queue name"""
return domain.replace('.', '-') + '-queue'
def domain_to_bucket_name(domain: str) -> str:
"""Convert domain to S3 bucket name"""
return domain.replace('.', '-') + '-emails'

View File

@ -1,85 +0,0 @@
services:
unified-worker:
build:
context: .
dockerfile: Dockerfile
container_name: unified-email-worker
restart: unless-stopped
network_mode: host # Für lokalen SMTP-Zugriff
volumes:
# Domain-Liste (eine Domain pro Zeile)
- ./domains.txt:/etc/email-worker/domains.txt:ro
# Logs
- ./logs:/var/log/email-worker
environment:
# AWS Credentials
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=us-east-2
# Domains via File (domains.txt)
- DOMAINS_FILE=/etc/email-worker/domains.txt
# Alternative: Domains direkt als Liste
# - DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net
# Worker Settings
- WORKER_THREADS=${WORKER_THREADS:-10}
- POLL_INTERVAL=${POLL_INTERVAL:-20}
- MAX_MESSAGES=${MAX_MESSAGES:-10}
- VISIBILITY_TIMEOUT=${VISIBILITY_TIMEOUT:-300}
# SMTP (lokal zum DMS)
- SMTP_HOST=${SMTP_HOST:-localhost}
- SMTP_PORT=${SMTP_PORT:-25}
- SMTP_POOL_SIZE=${SMTP_POOL_SIZE:-5}
- SMTP_USE_TLS=false
# Internal SMTP Port (bypasses transport_maps)
- INTERNAL_SMTP_PORT=25
# LMTP (Optional - für direktes Dovecot Delivery)
- LMTP_ENABLED=${LMTP_ENABLED:-false}
- LMTP_HOST=${LMTP_HOST:-localhost}
- LMTP_PORT=${LMTP_PORT:-24}
# DynamoDB Tables
- DYNAMODB_RULES_TABLE=${DYNAMODB_RULES_TABLE:-email-rules}
- DYNAMODB_MESSAGES_TABLE=${DYNAMODB_MESSAGES_TABLE:-ses-outbound-messages}
- DYNAMODB_BLOCKED_TABLE=${DYNAMODB_BLOCKED_TABLE:-email-blocked-senders}
# Bounce Handling
- BOUNCE_LOOKUP_RETRIES=${BOUNCE_LOOKUP_RETRIES:-3}
- BOUNCE_LOOKUP_DELAY=${BOUNCE_LOOKUP_DELAY:-1.0}
# Monitoring
- METRICS_PORT=8000
- HEALTH_PORT=8080
ports:
# Prometheus Metrics
- "8000:8000"
# Health Check
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
logging:
driver: "json-file"
options:
max-size: "50m"
max-file: "10"
deploy:
resources:
limits:
memory: 512M
reservations:
memory: 256M

View File

@ -1,381 +0,0 @@
# Architecture Documentation
## 📐 System Overview
```
┌─────────────────────────────────────────────────────────────────────┐
│ AWS Cloud Services │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ SQS │────▶│ S3 │ │ SES │ │
│ │ Queues │ │ Buckets │ │ Sending │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌────▼─────────────────▼─────────────────▼───────────────┐ │
│ │ DynamoDB Tables │ │
│ │ • email-rules (OOO, Forwarding) │ │
│ │ • ses-outbound-messages (Bounce Tracking) │ │
│ │ • email-blocked-senders (Blocklist) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
│ Polling & Processing
┌─────────────────────────────────────────────────────────────────────┐
│ Unified Email Worker │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Main Thread (unified_worker.py) │ │
│ │ • Coordination │ │
│ │ • Status Monitoring │ │
│ │ • Signal Handling │ │
│ └────────────┬────────────────────────────────────────────┘ │
│ │ │
│ ├──▶ Domain Poller Thread 1 (example.com) │
│ ├──▶ Domain Poller Thread 2 (another.com) │
│ ├──▶ Domain Poller Thread 3 (...) │
│ ├──▶ Health Server Thread (port 8080) │
│ └──▶ Metrics Server Thread (port 8000) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SMTP Connection Pool │ │
│ │ • Connection Reuse │ │
│ │ • Health Checks │ │
│ │ • Auto-reconnect │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
│ SMTP/LMTP Delivery
┌─────────────────────────────────────────────────────────────────────┐
│ Mail Server (Docker Mailserver) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Port 25 (SMTP - from pool) │
│ Port 2525 (SMTP - internal delivery, bypasses transport_maps) │
│ Port 24 (LMTP - direct to Dovecot, bypasses Postfix) │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
## 🔄 Message Flow
### 1. Email Reception
```
1. SES receives email
2. SES stores in S3 bucket (domain-emails/)
3. SES publishes SNS notification
4. SNS enqueues message to SQS (domain-queue)
```
### 2. Worker Processing
```
┌─────────────────────────────────────────────────────────────┐
│ Domain Poller (domain_poller.py) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 1. Poll SQS Queue (20s long poll) │
│ • Receive up to 10 messages │
│ • Extract SES notification from SNS wrapper │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 2. Download from S3 (s3_handler.py) │
│ • Get raw email bytes │
│ • Handle retry if not found yet │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 3. Parse Email (parser.py) │
│ • Parse MIME structure │
│ • Extract headers, body, attachments │
│ • Check for loop prevention marker │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 4. Bounce Detection (bounce_handler.py) │
│ • Check if from mailer-daemon@amazonses.com │
│ • Lookup original sender in DynamoDB │
│ • Rewrite From/Reply-To headers │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 5. Blocklist Check (blocklist.py) │
│ • Batch lookup blocked patterns for all recipients │
│ • Check sender against wildcard patterns │
│ • Mark blocked recipients │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 6. Process Rules for Each Recipient (rules_processor.py) │
│ ├─▶ Auto-Reply (OOO) │
│ │ • Check if ooo_active = true │
│ │ • Don't reply to auto-submitted messages │
│ │ • Create reply with original message quoted │
│ │ • Send via SES (external) or Port 2525 (internal) │
│ │ │
│ └─▶ Forwarding │
│ • Get forward addresses from rule │
│ • Create forward with FWD: prefix │
│ • Preserve attachments │
│ • Send via SES (external) or Port 2525 (internal) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 7. SMTP Delivery (delivery.py) │
│ • Get connection from pool │
│ • Send to each recipient (not blocked) │
│ • Track success/permanent/temporary failures │
│ • Return connection to pool │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 8. Update S3 Metadata (s3_handler.py) │
│ ├─▶ All Blocked: mark_as_blocked() + delete() │
│ ├─▶ Some Success: mark_as_processed() │
│ └─▶ All Invalid: mark_as_all_invalid() │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 9. Delete from Queue │
│ • Success or permanent failure → delete │
│ • Temporary failure → keep in queue (retry) │
└─────────────────────────────────────────────────────────────┘
```
## 🧩 Component Details
### AWS Handlers (`aws/`)
#### `s3_handler.py`
- **Purpose**: All S3 operations
- **Key Methods**:
- `get_email()`: Download with retry logic
- `mark_as_processed()`: Update metadata on success
- `mark_as_all_invalid()`: Update metadata on permanent failure
- `mark_as_blocked()`: Set metadata before deletion
- `delete_blocked_email()`: Delete after marking
#### `sqs_handler.py`
- **Purpose**: Queue operations
- **Key Methods**:
- `get_queue_url()`: Resolve domain to queue
- `receive_messages()`: Long poll with attributes
- `delete_message()`: Remove after processing
- `get_queue_size()`: For metrics
#### `ses_handler.py`
- **Purpose**: Send emails via SES
- **Key Methods**:
- `send_raw_email()`: Send raw MIME message
#### `dynamodb_handler.py`
- **Purpose**: All DynamoDB operations
- **Key Methods**:
- `get_email_rules()`: OOO and forwarding rules
- `get_bounce_info()`: Bounce lookup with retry
- `get_blocked_patterns()`: Single recipient
- `batch_get_blocked_patterns()`: Multiple recipients (efficient!)
### Email Processors (`email_processing/`)
#### `parser.py`
- **Purpose**: Email parsing utilities
- **Key Methods**:
- `parse_bytes()`: Parse raw email
- `extract_body_parts()`: Get text/html bodies
- `is_processed_by_worker()`: Loop detection
#### `bounce_handler.py`
- **Purpose**: Bounce detection and rewriting
- **Key Methods**:
- `is_ses_bounce_notification()`: Detect MAILER-DAEMON
- `apply_bounce_logic()`: Rewrite headers
#### `blocklist.py`
- **Purpose**: Sender blocking with wildcards
- **Key Methods**:
- `is_sender_blocked()`: Single check
- `batch_check_blocked_senders()`: Batch check (preferred!)
- **Wildcard Support**: Uses `fnmatch` for patterns like `*@spam.com`
#### `rules_processor.py`
- **Purpose**: OOO and forwarding logic
- **Key Methods**:
- `process_rules_for_recipient()`: Main entry point
- `_handle_ooo()`: Auto-reply logic
- `_handle_forwards()`: Forwarding logic
- `_create_ooo_reply()`: Build OOO message
- `_create_forward_message()`: Build forward with attachments
### SMTP Components (`smtp/`)
#### `pool.py`
- **Purpose**: Connection pooling
- **Features**:
- Lazy initialization
- Health checks (NOOP)
- Auto-reconnect on stale connections
- Thread-safe queue
#### `delivery.py`
- **Purpose**: Actual email delivery
- **Features**:
- SMTP or LMTP support
- Retry logic for connection errors
- Permanent vs temporary failure detection
- Connection pool integration
### Monitoring (`metrics/`)
#### `prometheus.py`
- **Purpose**: Metrics collection
- **Metrics**:
- Counters: processed, bounces, autoreplies, forwards, blocked
- Gauges: in_flight, queue_size
- Histograms: processing_time
## 🔐 Security Features
### 1. Domain Validation
Each worker only processes messages for its assigned domains:
```python
if recipient_domain.lower() != domain.lower():
log("Security: Ignored message for wrong domain")
return True # Delete from queue
```
### 2. Loop Prevention
Detects already-processed emails:
```python
if parsed.get('X-SES-Worker-Processed'):
log("Loop prevention: Already processed")
skip_rules = True
```
### 3. Blocklist Wildcards
Supports flexible patterns:
```python
blocked_patterns = [
"*@spam.com", # Any user at spam.com
"noreply@*.com", # noreply at any .com
"newsletter@example.*" # newsletter at any example TLD
]
```
### 4. Internal vs External Routing
Prevents SES loops for internal forwards:
```python
if is_internal_address(forward_to):
# Direct SMTP to port 2525 (bypasses transport_maps)
send_internal_email(...)
else:
# Send via SES
ses.send_raw_email(...)
```
## 📊 Data Flow Diagrams
### Bounce Rewriting Flow
```
SES Bounce → Worker → DynamoDB Lookup → Header Rewrite → Delivery
Message-ID
ses-outbound-messages
{MessageId: "abc",
original_source: "real@sender.com",
bouncedRecipients: ["failed@domain.com"]}
Rewrite From: mailer-daemon@amazonses.com
→ failed@domain.com
```
### Blocklist Check Flow
```
Incoming Email → Batch DynamoDB Call → Pattern Matching → Decision
↓ ↓ ↓ ↓
sender@spam.com Get patterns for fnmatch() Block/Allow
all recipients "*@spam.com"
matches!
```
## ⚡ Performance Optimizations
### 1. Batch DynamoDB Calls
```python
# ❌ Old way: N calls for N recipients
for recipient in recipients:
patterns = dynamodb.get_blocked_patterns(recipient)
# ✅ New way: 1 call for N recipients
patterns_by_recipient = dynamodb.batch_get_blocked_patterns(recipients)
```
### 2. Connection Pooling
```python
# ❌ Old way: New connection per email
conn = smtplib.SMTP(host, port)
conn.sendmail(...)
conn.quit()
# ✅ New way: Reuse connections
conn = pool.get_connection() # Reuses existing
conn.sendmail(...)
pool.return_connection(conn) # Returns to pool
```
### 3. Parallel Domain Processing
```
Domain 1 Thread ──▶ Process 10 emails/poll
Domain 2 Thread ──▶ Process 10 emails/poll
Domain 3 Thread ──▶ Process 10 emails/poll
(All in parallel!)
```
## 🔄 Error Handling Strategy
### Retry Logic
- **Temporary Errors**: Keep in queue, retry (visibility timeout)
- **Permanent Errors**: Mark in S3, delete from queue
- **S3 Not Found**: Retry up to 5 times (eventual consistency)
### Connection Failures
```python
for attempt in range(max_retries):
try:
conn.sendmail(...)
return True
except SMTPServerDisconnected:
log("Connection lost, retrying...")
time.sleep(0.3)
continue # Try again
```
### Audit Trail
All actions recorded in S3 metadata:
```json
{
"processed": "true",
"processed_at": "1706000000",
"processed_by": "worker-example.com",
"status": "delivered",
"invalid_inboxes": "baduser@example.com",
"blocked_sender": "spam@bad.com"
}
```

View File

@ -1,37 +0,0 @@
# Changelog
## v1.0.1 - 2025-01-23
### Fixed
- **CRITICAL:** Renamed `email/` directory to `email_processing/` to avoid namespace conflict with Python's built-in `email` module
- This fixes the `ImportError: cannot import name 'BytesParser' from partially initialized module 'email.parser'` error
- All imports updated accordingly
- No functional changes, only namespace fix
### Changed
- Updated all documentation to reflect new directory name
- Updated Dockerfile to copy `email_processing/` instead of `email/`
## v1.0.0 - 2025-01-23
### Added
- Modular architecture (27 files vs 1 monolith)
- Batch DynamoDB operations (10x performance improvement)
- Sender blocklist with wildcard support
- LMTP direct delivery support
- Enhanced metrics and monitoring
- Comprehensive documentation (6 MD files)
### Fixed
- `signal.SIGINT` typo (was `signalIGINT`)
- Missing S3 metadata audit trail for blocked emails
- Inefficient DynamoDB calls (N calls → 1 batch call)
- S3 delete error handling (proper retry logic)
### Documentation
- README.md - Full feature documentation
- QUICKSTART.md - Quick deployment guide for your setup
- ARCHITECTURE.md - Detailed system architecture
- MIGRATION.md - Migration from monolith
- COMPATIBILITY.md - 100% compatibility proof
- SUMMARY.md - All improvements overview

View File

@ -1,311 +0,0 @@
# Kompatibilität mit bestehendem Setup
## ✅ 100% Kompatibel
Die modulare Version ist **vollständig kompatibel** mit deinem bestehenden Setup:
### 1. Dockerfile
- ✅ Gleicher Base Image: `python:3.11-slim`
- ✅ Gleicher User: `worker` (UID 1000)
- ✅ Gleiche Verzeichnisse: `/app`, `/var/log/email-worker`, `/etc/email-worker`
- ✅ Gleicher Health Check: `curl http://localhost:8080/health`
- ✅ Gleiche Labels: `maintainer`, `description`
- **Änderung:** Kopiert nun mehrere Module statt einer Datei
### 2. docker-compose.yml
- ✅ Gleicher Container Name: `unified-email-worker`
- ✅ Gleicher Network Mode: `host`
- ✅ Gleiche Volumes: `domains.txt`, `logs/`
- ✅ Gleiche Ports: `8000`, `8080`
- ✅ Gleiche Environment Variables
- ✅ Gleiche Resource Limits: 512M / 256M
- ✅ Gleiche Logging Config: 50M / 10 files
- **Neu:** Zusätzliche optionale Env Vars (abwärtskompatibel)
### 3. requirements.txt
- ✅ Gleiche Dependencies: `boto3`, `prometheus-client`
- ✅ Aktualisierte Versionen (>=1.34.0 statt >=1.26.0)
- **Kompatibel:** Alte Version funktioniert auch, neue ist empfohlen
### 4. domains.txt
- ✅ Gleiches Format: Eine Domain pro Zeile
- ✅ Kommentare mit `#` funktionieren
- ✅ Gleiche Location: `/etc/email-worker/domains.txt`
- **Keine Änderung nötig**
## 🔄 Was ist neu/anders?
### Dateistruktur
**Alt:**
```
/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── domains.txt
└── unified_worker.py (800+ Zeilen)
```
**Neu:**
```
/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── domains.txt
├── main.py # Entry Point
├── config.py # Konfiguration
├── logger.py # Logging
├── worker.py # Message Processing
├── unified_worker.py # Worker Coordinator
├── domain_poller.py # Queue Polling
├── health_server.py # Health Check Server
├── aws/
│ ├── s3_handler.py
│ ├── sqs_handler.py
│ ├── ses_handler.py
│ └── dynamodb_handler.py
├── email_processing/
│ ├── parser.py
│ ├── bounce_handler.py
│ ├── blocklist.py
│ └── rules_processor.py
├── smtp/
│ ├── pool.py
│ └── delivery.py
└── metrics/
└── prometheus.py
```
### Neue optionale Umgebungsvariablen
Diese sind **optional** und haben sinnvolle Defaults:
```bash
# Internal SMTP Port (neu)
INTERNAL_SMTP_PORT=2525 # Default: 2525
# LMTP Support (neu)
LMTP_ENABLED=false # Default: false
LMTP_HOST=localhost # Default: localhost
LMTP_PORT=24 # Default: 24
# Blocklist Table (neu)
DYNAMODB_BLOCKED_TABLE=email-blocked-senders # Default: email-blocked-senders
```
**Wichtig:** Wenn du diese nicht setzt, funktioniert alles wie vorher!
## 🚀 Deployment
### Option 1: Drop-In Replacement
```bash
# Alte Dateien sichern
cp unified_worker.py unified_worker.py.backup
cp Dockerfile Dockerfile.backup
cp docker-compose.yml docker-compose.yml.backup
# Neue Dateien entpacken
tar -xzf email-worker-modular.tar.gz
cd email-worker/
# domains.txt und .env anpassen (falls nötig)
# Dann normal deployen:
docker-compose build
docker-compose up -d
```
### Option 2: Side-by-Side (Empfohlen)
```bash
# Altes Setup bleibt in /opt/email-worker-old
# Neues Setup in /opt/email-worker
# Neue Version entpacken
cd /opt
tar -xzf email-worker-modular.tar.gz
mv email-worker email-worker-new
# Container Namen unterscheiden:
# In docker-compose.yml:
container_name: unified-email-worker-new
# Starten
cd email-worker-new
docker-compose up -d
# Parallel laufen lassen (24h Test)
# Dann alte Version stoppen, neue umbenennen
```
## 🔍 Verifikation der Kompatibilität
### 1. Environment Variables
Alle deine bestehenden Env Vars funktionieren:
```bash
# Deine bisherigen Vars (alle kompatibel)
AWS_ACCESS_KEY_ID ✅
AWS_SECRET_ACCESS_KEY ✅
AWS_REGION ✅
WORKER_THREADS ✅
POLL_INTERVAL ✅
MAX_MESSAGES ✅
VISIBILITY_TIMEOUT ✅
SMTP_HOST ✅
SMTP_PORT ✅
SMTP_POOL_SIZE ✅
METRICS_PORT ✅
HEALTH_PORT ✅
```
### 2. DynamoDB Tables
Bestehende Tables funktionieren ohne Änderung:
```bash
# Bounce Tracking (bereits vorhanden)
ses-outbound-messages ✅
# Email Rules (bereits vorhanden?)
email-rules ✅
# Blocklist (neu, optional)
email-blocked-senders 🆕 Optional
```
### 3. API Endpoints
Gleiche Endpoints wie vorher:
```bash
# Health Check
GET http://localhost:8080/health ✅ Gleiche Response
# Domains List
GET http://localhost:8080/domains ✅ Gleiche Response
# Prometheus Metrics
GET http://localhost:8000/metrics ✅ Kompatibel + neue Metrics
```
### 4. Logging
Gleiches Format, gleiche Location:
```bash
# Logs in Container
/var/log/email-worker/ ✅ Gleich
# Log Format
[timestamp] [LEVEL] [worker-name] [thread] message ✅ Gleich
```
### 5. S3 Metadata
Gleiches Schema, volle Kompatibilität:
```json
{
"processed": "true",
"processed_at": "1706000000",
"processed_by": "worker-andreasknuth-de",
"status": "delivered",
"invalid_inboxes": "..."
}
```
**Neu:** Zusätzliche Metadata bei blockierten Emails:
```json
{
"status": "blocked",
"blocked_sender": "spam@bad.com",
"blocked_recipients": "user@andreasknuth.de"
}
```
## ⚠️ Breaking Changes
**KEINE!** Die modulare Version ist 100% abwärtskompatibel.
Die einzigen Unterschiede sind:
1. ✅ **Mehr Dateien** statt einer (aber gleiches Verhalten)
2. ✅ **Neue optionale Features** (müssen nicht genutzt werden)
3. ✅ **Bessere Performance** (durch Batch-Calls)
4. ✅ **Mehr Metrics** (zusätzliche, alte bleiben)
## 🧪 Testing Checklist
Nach Deployment prüfen:
```bash
# 1. Container läuft
docker ps | grep unified-email-worker
✅ Status: Up
# 2. Health Check
curl http://localhost:8080/health | jq
✅ "status": "healthy"
# 3. Domains geladen
curl http://localhost:8080/domains
✅ ["andreasknuth.de"]
# 4. Logs ohne Fehler
docker-compose logs | grep ERROR
✅ Keine kritischen Fehler
# 5. Test Email senden
# Email via SES senden
✅ Wird zugestellt
# 6. Metrics verfügbar
curl http://localhost:8000/metrics | grep emails_processed
✅ Metrics werden erfasst
```
## 💡 Empfohlener Rollout-Plan
### Phase 1: Testing (1-2 Tage)
- Neuen Container parallel zum alten starten
- Nur 1 Test-Domain zuweisen
- Logs monitoren
- Performance vergleichen
### Phase 2: Staged Rollout (3-7 Tage)
- 50% der Domains auf neue Version
- Metrics vergleichen (alte vs neue)
- Bei Problemen: Rollback auf alte Version
### Phase 3: Full Rollout
- Alle Domains auf neue Version
- Alte Version als Backup behalten (1 Woche)
- Dann alte Version dekommissionieren
## 🔙 Rollback-Plan
Falls Probleme auftreten:
```bash
# 1. Neue Version stoppen
docker-compose -f docker-compose.yml down
# 2. Backup wiederherstellen
cp unified_worker.py.backup unified_worker.py
cp Dockerfile.backup Dockerfile
cp docker-compose.yml.backup docker-compose.yml
# 3. Alte Version starten
docker-compose build
docker-compose up -d
# 4. Verifizieren
curl http://localhost:8080/health
```
**Downtime:** < 30 Sekunden (Zeit für Container Restart)
## ✅ Fazit
Die modulare Version ist ein **Drop-In Replacement**:
- Gleiche Konfiguration
- Gleiche API
- Gleiche Infrastruktur
- **Bonus:** Bessere Performance, mehr Features, weniger Bugs
Einziger Unterschied: Mehr Dateien, aber alle in einem tarball verpackt.

View File

@ -1,366 +0,0 @@
# Migration Guide: Monolith → Modular Architecture
## 🎯 Why Migrate?
### Problems with Monolith
- ❌ **Single file > 800 lines** - hard to navigate
- ❌ **Mixed responsibilities** - S3, SQS, SMTP, DynamoDB all in one place
- ❌ **Hard to test** - can't test components in isolation
- ❌ **Difficult to debug** - errors could be anywhere
- ❌ **Critical bugs** - `signalIGINT` typo, missing audit trail
- ❌ **Performance issues** - N DynamoDB calls for N recipients
### Benefits of Modular
- ✅ **Separation of Concerns** - each module has one job
- ✅ **Easy to Test** - mock S3Handler, test in isolation
- ✅ **Better Performance** - batch DynamoDB calls
- ✅ **Maintainable** - changes isolated to specific files
- ✅ **Extensible** - easy to add new features
- ✅ **Bug Fixes** - all critical bugs fixed
## 🔄 Migration Steps
### Step 1: Backup Current Setup
```bash
# Backup monolith
cp unified_worker.py unified_worker.py.backup
# Backup any configuration
cp .env .env.backup
```
### Step 2: Clone New Structure
```bash
# Download modular version
git clone <repo> email-worker-modular
cd email-worker-modular
# Copy environment variables
cp .env.example .env
# Edit .env with your settings
```
### Step 3: Update Configuration
The modular version uses the SAME environment variables, so your existing `.env` should work:
```bash
# No changes needed to these:
AWS_REGION=us-east-2
DOMAINS=example.com,another.com
SMTP_HOST=localhost
SMTP_PORT=25
# ... etc
```
**New variables** (optional):
```bash
# For internal delivery (bypasses transport_maps)
INTERNAL_SMTP_PORT=2525
# For blocklist feature
DYNAMODB_BLOCKED_TABLE=email-blocked-senders
```
### Step 4: Install Dependencies
```bash
pip install -r requirements.txt
```
### Step 5: Test Locally
```bash
# Run worker
python3 main.py
# Check health endpoint
curl http://localhost:8080/health
# Check metrics
curl http://localhost:8000/metrics
```
### Step 6: Deploy
#### Docker Deployment
```bash
# Build image
docker build -t unified-email-worker:latest .
# Run with docker-compose
docker-compose up -d
# Check logs
docker-compose logs -f email-worker
```
#### Systemd Deployment
```bash
# Create systemd service
sudo nano /etc/systemd/system/email-worker.service
```
```ini
[Unit]
Description=Unified Email Worker
After=network.target
[Service]
Type=simple
User=worker
WorkingDirectory=/opt/email-worker
EnvironmentFile=/opt/email-worker/.env
ExecStart=/usr/bin/python3 /opt/email-worker/main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
```bash
# Enable and start
sudo systemctl enable email-worker
sudo systemctl start email-worker
sudo systemctl status email-worker
```
### Step 7: Monitor Migration
```bash
# Watch logs
tail -f /var/log/syslog | grep email-worker
# Check metrics
watch -n 5 'curl -s http://localhost:8000/metrics | grep emails_processed'
# Monitor S3 metadata
aws s3api head-object \
--bucket example-com-emails \
--key <message-id> \
--query Metadata
```
## 🔍 Verification Checklist
After migration, verify all features work:
- [ ] **Email Delivery**
```bash
# Send test email via SES
# Check it arrives in mailbox
```
- [ ] **Bounce Rewriting**
```bash
# Trigger a bounce (send to invalid@example.com)
# Verify bounce comes FROM the failed recipient
```
- [ ] **Auto-Reply (OOO)**
```bash
# Set OOO in DynamoDB:
aws dynamodb put-item \
--table-name email-rules \
--item '{"email_address": {"S": "test@example.com"}, "ooo_active": {"BOOL": true}, "ooo_message": {"S": "I am away"}}'
# Send email to test@example.com
# Verify auto-reply received
```
- [ ] **Forwarding**
```bash
# Set forward rule:
aws dynamodb put-item \
--table-name email-rules \
--item '{"email_address": {"S": "test@example.com"}, "forwards": {"L": [{"S": "other@example.com"}]}}'
# Send email to test@example.com
# Verify other@example.com receives forwarded email
```
- [ ] **Blocklist**
```bash
# Block sender:
aws dynamodb put-item \
--table-name email-blocked-senders \
--item '{"email_address": {"S": "test@example.com"}, "blocked_patterns": {"L": [{"S": "spam@*.com"}]}}'
# Send email from spam@bad.com to test@example.com
# Verify email is blocked (not delivered, S3 deleted)
```
- [ ] **Metrics**
```bash
curl http://localhost:8000/metrics | grep emails_processed
```
- [ ] **Health Check**
```bash
curl http://localhost:8080/health | jq
```
## 🐛 Troubleshooting Migration Issues
### Issue: Worker not starting
```bash
# Check Python version
python3 --version # Should be 3.11+
# Check dependencies
pip list | grep boto3
# Check logs
python3 main.py # Run in foreground to see errors
```
### Issue: No emails processing
```bash
# Check queue URLs
curl http://localhost:8080/domains
# Verify SQS permissions
aws sqs list-queues
# Check worker logs for errors
tail -f /var/log/email-worker.log
```
### Issue: Bounces not rewriting
```bash
# Verify DynamoDB table exists
aws dynamodb describe-table --table-name ses-outbound-messages
# Check if Lambda is writing bounce records
aws dynamodb scan --table-name ses-outbound-messages --limit 5
# Verify worker can read DynamoDB
# (Check logs for "DynamoDB tables connected successfully")
```
### Issue: Performance degradation
```bash
# Check if batch calls are used
grep "batch_get_blocked_patterns" main.py # Should exist in modular version
# Monitor DynamoDB read capacity
aws cloudwatch get-metric-statistics \
--namespace AWS/DynamoDB \
--metric-name ConsumedReadCapacityUnits \
--dimensions Name=TableName,Value=email-blocked-senders \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Sum
```
## 📊 Comparison: Before vs After
| Feature | Monolith | Modular | Improvement |
|---------|----------|---------|-------------|
| Lines of Code | 800+ in 1 file | ~150 per file | ✅ Easier to read |
| DynamoDB Calls | N per message | 1 per message | ✅ 10x faster |
| Error Handling | Missing in places | Comprehensive | ✅ More reliable |
| Testability | Hard | Easy | ✅ Can unit test |
| Audit Trail | Incomplete | Complete | ✅ Better compliance |
| Bugs Fixed | - | 4 critical | ✅ More stable |
| Extensibility | Hard | Easy | ✅ Future-proof |
## 🎓 Code Comparison Examples
### Example 1: Blocklist Check
**Monolith (Inefficient):**
```python
for recipient in recipients:
if is_sender_blocked(recipient, sender, worker_name):
# DynamoDB call for EACH recipient!
blocked_recipients.append(recipient)
```
**Modular (Efficient):**
```python
# ONE DynamoDB call for ALL recipients
blocked_by_recipient = blocklist.batch_check_blocked_senders(
recipients, sender, worker_name
)
for recipient in recipients:
if blocked_by_recipient[recipient]:
blocked_recipients.append(recipient)
```
### Example 2: S3 Blocked Email Handling
**Monolith (Missing Audit Trail):**
```python
if all_blocked:
s3.delete_object(Bucket=bucket, Key=key) # ❌ No metadata!
```
**Modular (Proper Audit):**
```python
if all_blocked:
s3.mark_as_blocked(domain, key, blocked, sender, worker) # ✅ Set metadata
s3.delete_blocked_email(domain, key, worker) # ✅ Then delete
```
### Example 3: Signal Handling
**Monolith (Bug):**
```python
signal.signal(signal.SIGTERM, handler)
signal.signal(signalIGINT, handler) # ❌ Typo! Should be signal.SIGINT
```
**Modular (Fixed):**
```python
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler) # ✅ Correct
```
## 🔄 Rollback Plan
If you need to rollback:
```bash
# Stop new worker
docker-compose down
# or
sudo systemctl stop email-worker
# Restore monolith
cp unified_worker.py.backup unified_worker.py
# Restart old worker
python3 unified_worker.py
# or restore old systemd service
```
## 💡 Best Practices After Migration
1. **Monitor Metrics**: Set up Prometheus/Grafana dashboards
2. **Set up Alerts**: Alert on queue buildup, high error rates
3. **Regular Updates**: Keep dependencies updated
4. **Backup Rules**: Export DynamoDB rules regularly
5. **Test in Staging**: Always test rule changes in non-prod first
## 📚 Additional Resources
- [ARCHITECTURE.md](ARCHITECTURE.md) - Detailed architecture diagrams
- [README.md](README.md) - Complete feature documentation
- [Makefile](Makefile) - Common commands
## ❓ FAQ
**Q: Will my existing DynamoDB tables work?**
A: Yes! Same schema, just need to add `email-blocked-senders` table for blocklist feature.
**Q: Do I need to change my Lambda functions?**
A: No, bounce tracking Lambda stays the same.
**Q: Can I migrate one domain at a time?**
A: Yes! Run both workers with different `DOMAINS` settings, then migrate gradually.
**Q: What about my existing S3 metadata?**
A: New worker reads and writes same metadata format, fully compatible.
**Q: How do I add new features?**
A: Just add a new module in appropriate directory (e.g., new file in `email/`), import in `worker.py`.

View File

@ -1,330 +0,0 @@
# Quick Start Guide
## 🚀 Deployment auf deinem System
### Voraussetzungen
- Docker & Docker Compose installiert
- AWS Credentials mit Zugriff auf SQS, S3, SES, DynamoDB
- Docker Mailserver (DMS) läuft lokal
### 1. Vorbereitung
```bash
# Ins Verzeichnis wechseln
cd /pfad/zu/email-worker
# domains.txt anpassen (falls weitere Domains)
nano domains.txt
# Logs-Verzeichnis erstellen
mkdir -p logs
```
### 2. Umgebungsvariablen
Erstelle `.env` Datei:
```bash
# AWS Credentials
AWS_ACCESS_KEY_ID=dein_access_key
AWS_SECRET_ACCESS_KEY=dein_secret_key
# Optional: Worker Settings überschreiben
WORKER_THREADS=10
POLL_INTERVAL=20
MAX_MESSAGES=10
# Optional: SMTP Settings
SMTP_HOST=localhost
SMTP_PORT=25
# Optional: LMTP für direktes Dovecot Delivery
# LMTP_ENABLED=true
# LMTP_PORT=24
```
### 3. Build & Start
```bash
# Image bauen
docker-compose build
# Starten
docker-compose up -d
# Logs anschauen
docker-compose logs -f
```
### 4. Verifizierung
```bash
# Health Check
curl http://localhost:8080/health | jq
# Domains prüfen
curl http://localhost:8080/domains
# Metrics (Prometheus)
curl http://localhost:8000/metrics | grep emails_processed
# Container Status
docker ps | grep unified-email-worker
```
### 5. Test Email senden
```bash
# Via AWS SES Console oder CLI eine Test-Email senden
aws ses send-email \
--from sender@andreasknuth.de \
--destination ToAddresses=test@andreasknuth.de \
--message Subject={Data="Test"},Body={Text={Data="Test message"}}
# Worker Logs beobachten
docker-compose logs -f | grep "Processing:"
```
## 🔧 Wartung
### Logs anschauen
```bash
# Live Logs
docker-compose logs -f
# Nur Worker Logs
docker logs -f unified-email-worker
# Logs im Volume
tail -f logs/*.log
```
### Neustart
```bash
# Neustart nach Code-Änderungen
docker-compose restart
# Kompletter Rebuild
docker-compose down
docker-compose build
docker-compose up -d
```
### Update
```bash
# Neue Version pullen/kopieren
git pull # oder manuell Dateien ersetzen
# Rebuild & Restart
docker-compose down
docker-compose build
docker-compose up -d
```
## 📊 Monitoring
### Prometheus Metrics (Port 8000)
```bash
# Alle Metrics
curl http://localhost:8000/metrics
# Verarbeitete Emails
curl -s http://localhost:8000/metrics | grep emails_processed_total
# Queue Größe
curl -s http://localhost:8000/metrics | grep queue_messages_available
# Blocked Senders
curl -s http://localhost:8000/metrics | grep blocked_senders_total
```
### Health Check (Port 8080)
```bash
# Status
curl http://localhost:8080/health | jq
# Domains
curl http://localhost:8080/domains | jq
```
## 🔐 DynamoDB Tabellen Setup
### Email Rules (OOO, Forwarding)
```bash
# Tabelle erstellen (falls nicht vorhanden)
aws dynamodb create-table \
--table-name email-rules \
--attribute-definitions AttributeName=email_address,AttributeType=S \
--key-schema AttributeName=email_address,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--region us-east-2
# OOO Regel hinzufügen
aws dynamodb put-item \
--table-name email-rules \
--item '{
"email_address": {"S": "andreas@andreasknuth.de"},
"ooo_active": {"BOOL": true},
"ooo_message": {"S": "Ich bin derzeit nicht erreichbar."},
"ooo_content_type": {"S": "text"}
}' \
--region us-east-2
# Forward Regel hinzufügen
aws dynamodb put-item \
--table-name email-rules \
--item '{
"email_address": {"S": "info@andreasknuth.de"},
"forwards": {"L": [
{"S": "andreas@andreasknuth.de"}
]}
}' \
--region us-east-2
```
### Blocked Senders
```bash
# Tabelle erstellen (falls nicht vorhanden)
aws dynamodb create-table \
--table-name email-blocked-senders \
--attribute-definitions AttributeName=email_address,AttributeType=S \
--key-schema AttributeName=email_address,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--region us-east-2
# Blocklist hinzufügen
aws dynamodb put-item \
--table-name email-blocked-senders \
--item '{
"email_address": {"S": "andreas@andreasknuth.de"},
"blocked_patterns": {"L": [
{"S": "*@spam.com"},
{"S": "noreply@*.marketing.com"}
]}
}' \
--region us-east-2
```
## 🐛 Troubleshooting
### Worker startet nicht
```bash
# Logs prüfen
docker-compose logs unified-worker
# Container Status
docker ps -a | grep unified
# Manuell starten (Debug)
docker-compose run --rm unified-worker python3 main.py
```
### Keine Emails werden verarbeitet
```bash
# Queue URLs prüfen
curl http://localhost:8080/domains
# AWS Permissions prüfen
aws sqs list-queues --region us-east-2
# DynamoDB Verbindung prüfen (in Logs)
docker-compose logs | grep "DynamoDB"
```
### Bounces werden nicht umgeschrieben
```bash
# DynamoDB Bounce Records prüfen
aws dynamodb scan \
--table-name ses-outbound-messages \
--limit 5 \
--region us-east-2
# Worker Logs nach "Bounce detected" durchsuchen
docker-compose logs | grep "Bounce detected"
```
### SMTP Delivery Fehler
```bash
# SMTP Verbindung testen
docker-compose exec unified-worker nc -zv localhost 25
# Worker Logs
docker-compose logs | grep "SMTP"
```
## 📈 Performance Tuning
### Mehr Worker Threads
```bash
# In .env
WORKER_THREADS=20 # Default: 10
```
### Längeres Polling
```bash
# In .env
POLL_INTERVAL=30 # Default: 20 (Sekunden)
```
### Größerer Connection Pool
```bash
# In .env
SMTP_POOL_SIZE=10 # Default: 5
```
### LMTP für bessere Performance
```bash
# In .env
LMTP_ENABLED=true
LMTP_PORT=24
```
## 🔄 Migration vom Monolithen
### Side-by-Side Deployment
```bash
# Alte Version läuft als "unified-email-worker-old"
# Neue Version als "unified-email-worker"
# domains.txt aufteilen:
# old: andreasknuth.de
# new: andere-domain.de
# Nach Verifizierung alle Domains auf new migrieren
```
### Zero-Downtime Switch
```bash
# 1. Neue Version starten (andere Domains)
docker-compose up -d
# 2. Beide parallel laufen lassen (24h)
# 3. Monitoring: Metrics vergleichen
curl http://localhost:8000/metrics
# 4. Alte Version stoppen
docker stop unified-email-worker-old
# 5. domains.txt updaten (alle Domains)
# 6. Neue Version neustarten
docker-compose restart
```
## ✅ Checkliste nach Deployment
- [ ] Container läuft: `docker ps | grep unified`
- [ ] Health Check OK: `curl http://localhost:8080/health`
- [ ] Domains geladen: `curl http://localhost:8080/domains`
- [ ] Logs ohne Fehler: `docker-compose logs | grep ERROR`
- [ ] Test-Email erfolgreich: Email an Test-Adresse senden
- [ ] Bounce Rewriting funktioniert: Bounce-Email testen
- [ ] Metrics erreichbar: `curl http://localhost:8000/metrics`
- [ ] DynamoDB Tables vorhanden: AWS Console prüfen
## 📞 Support
Bei Problemen:
1. Logs prüfen: `docker-compose logs -f`
2. Health Check: `curl http://localhost:8080/health`
3. AWS Console: Queues, S3 Buckets, DynamoDB prüfen
4. Container neu starten: `docker-compose restart`

View File

@ -1,306 +0,0 @@
# Unified Email Worker (Modular Version)
Multi-domain email processing worker for AWS SES/S3/SQS with bounce handling, auto-replies, forwarding, and sender blocking.
## 🏗️ Architecture
```
email-worker/
├── config.py # Configuration management
├── logger.py # Structured logging
├── aws/ # AWS service handlers
│ ├── s3_handler.py # S3 operations (download, metadata)
│ ├── sqs_handler.py # SQS polling
│ ├── ses_handler.py # SES email sending
│ └── dynamodb_handler.py # DynamoDB (rules, bounces, blocklist)
├── email_processing/ # Email processing
│ ├── parser.py # Email parsing utilities
│ ├── bounce_handler.py # Bounce detection & rewriting
│ ├── rules_processor.py # OOO & forwarding logic
│ └── blocklist.py # Sender blocking with wildcards
├── smtp/ # SMTP delivery
│ ├── pool.py # Connection pooling
│ └── delivery.py # SMTP/LMTP delivery with retry
├── metrics/ # Monitoring
│ └── prometheus.py # Prometheus metrics
├── worker.py # Message processing logic
├── domain_poller.py # Domain queue poller
├── unified_worker.py # Main worker coordinator
├── health_server.py # Health check HTTP server
└── main.py # Entry point
```
## ✨ Features
- ✅ **Multi-Domain Processing**: Parallel processing of multiple domains via thread pool
- ✅ **Bounce Detection**: Automatic SES bounce notification rewriting
- ✅ **Auto-Reply/OOO**: Out-of-office automatic replies
- ✅ **Email Forwarding**: Rule-based forwarding to internal/external addresses
- ✅ **Sender Blocking**: Wildcard-based sender blocklist per recipient
- ✅ **SMTP Connection Pooling**: Efficient reuse of connections
- ✅ **LMTP Support**: Direct delivery to Dovecot (bypasses Postfix transport_maps)
- ✅ **Prometheus Metrics**: Comprehensive monitoring
- ✅ **Health Checks**: HTTP health endpoint for container orchestration
- ✅ **Graceful Shutdown**: Proper cleanup on SIGTERM/SIGINT
## 🔧 Configuration
All configuration via environment variables:
### AWS Settings
```bash
AWS_REGION=us-east-2
```
### Domains
```bash
# Option 1: Comma-separated list
DOMAINS=example.com,another.com
# Option 2: File with one domain per line
DOMAINS_FILE=/etc/email-worker/domains.txt
```
### Worker Settings
```bash
WORKER_THREADS=10
POLL_INTERVAL=20 # SQS long polling (seconds)
MAX_MESSAGES=10 # Max messages per poll
VISIBILITY_TIMEOUT=300 # Message visibility timeout (seconds)
```
### SMTP Delivery
```bash
SMTP_HOST=localhost
SMTP_PORT=25
SMTP_USE_TLS=false
SMTP_USER=
SMTP_PASS=
SMTP_POOL_SIZE=5
INTERNAL_SMTP_PORT=2525 # Port for internal delivery (bypasses transport_maps)
```
### LMTP (Direct Dovecot Delivery)
```bash
LMTP_ENABLED=false # Set to 'true' to use LMTP
LMTP_HOST=localhost
LMTP_PORT=24
```
### DynamoDB Tables
```bash
DYNAMODB_RULES_TABLE=email-rules
DYNAMODB_MESSAGES_TABLE=ses-outbound-messages
DYNAMODB_BLOCKED_TABLE=email-blocked-senders
```
### Bounce Handling
```bash
BOUNCE_LOOKUP_RETRIES=3
BOUNCE_LOOKUP_DELAY=1.0
```
### Monitoring
```bash
METRICS_PORT=8000 # Prometheus metrics
HEALTH_PORT=8080 # Health check endpoint
```
## 📊 DynamoDB Schemas
### email-rules
```json
{
"email_address": "user@example.com", // Partition Key
"ooo_active": true,
"ooo_message": "I am currently out of office...",
"ooo_content_type": "text", // "text" or "html"
"forwards": ["other@example.com", "external@gmail.com"]
}
```
### ses-outbound-messages
```json
{
"MessageId": "abc123...", // Partition Key (SES Message-ID)
"original_source": "sender@example.com",
"recipients": ["recipient@other.com"],
"timestamp": "2025-01-01T12:00:00Z",
"bounceType": "Permanent",
"bounceSubType": "General",
"bouncedRecipients": ["recipient@other.com"]
}
```
### email-blocked-senders
```json
{
"email_address": "user@example.com", // Partition Key
"blocked_patterns": [
"spam@*.com", // Wildcard support
"noreply@badsite.com",
"*@malicious.org"
]
}
```
## 🚀 Usage
### Installation
```bash
cd email-worker
pip install -r requirements.txt
```
### Run
```bash
python3 main.py
```
### Docker
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python3", "main.py"]
```
## 📈 Metrics
Available at `http://localhost:8000/metrics`:
- `emails_processed_total{domain, status}` - Total emails processed
- `emails_in_flight` - Currently processing emails
- `email_processing_seconds{domain}` - Processing time histogram
- `queue_messages_available{domain}` - Queue size gauge
- `bounces_processed_total{domain, type}` - Bounce notifications
- `autoreplies_sent_total{domain}` - Auto-replies sent
- `forwards_sent_total{domain}` - Forwards sent
- `blocked_senders_total{domain}` - Blocked emails
## 🏥 Health Checks
Available at `http://localhost:8080/health`:
```json
{
"status": "healthy",
"domains": 5,
"domain_list": ["example.com", "another.com"],
"dynamodb": true,
"features": {
"bounce_rewriting": true,
"auto_reply": true,
"forwarding": true,
"blocklist": true,
"lmtp": false
},
"timestamp": "2025-01-22T10:00:00.000000"
}
```
## 🔍 Key Improvements in Modular Version
### 1. **Fixed Critical Bugs**
- ✅ Fixed `signal.SIGINT` typo (was `signalIGINT`)
- ✅ Proper S3 metadata before deletion (audit trail)
- ✅ Batch DynamoDB calls for blocklist (performance)
- ✅ Error handling for S3 delete failures
### 2. **Better Architecture**
- **Separation of Concerns**: Each component has single responsibility
- **Testability**: Easy to unit test individual components
- **Maintainability**: Changes isolated to specific modules
- **Extensibility**: Easy to add new features
### 3. **Performance**
- **Batch Blocklist Checks**: One DynamoDB call for all recipients
- **Connection Pooling**: Reusable SMTP connections
- **Efficient Metrics**: Optional Prometheus integration
### 4. **Reliability**
- **Proper Error Handling**: Each component handles its own errors
- **Graceful Degradation**: Works even if DynamoDB unavailable
- **Audit Trail**: All actions logged to S3 metadata
## 🔐 Security Features
1. **Domain Validation**: Workers only process their assigned domains
2. **Loop Prevention**: Detects and skips already-processed emails
3. **Blocklist Support**: Wildcard-based sender blocking
4. **Internal vs External**: Separate handling prevents loops
## 📝 Example Usage
### Enable OOO for user
```python
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('email-rules')
table.put_item(Item={
'email_address': 'john@example.com',
'ooo_active': True,
'ooo_message': 'I am out of office until Feb 1st.',
'ooo_content_type': 'html'
})
```
### Block spam senders
```python
table = dynamodb.Table('email-blocked-senders')
table.put_item(Item={
'email_address': 'john@example.com',
'blocked_patterns': [
'*@spam.com',
'noreply@*.marketing.com',
'newsletter@*'
]
})
```
### Forward emails
```python
table = dynamodb.Table('email-rules')
table.put_item(Item={
'email_address': 'support@example.com',
'forwards': [
'john@example.com',
'jane@example.com',
'external@gmail.com'
]
})
```
## 🐛 Troubleshooting
### Worker not processing emails
1. Check queue URLs: `curl http://localhost:8080/domains`
2. Check logs for SQS errors
3. Verify IAM permissions for SQS/S3/SES/DynamoDB
### Bounces not rewritten
1. Check DynamoDB table name: `DYNAMODB_MESSAGES_TABLE`
2. Verify Lambda function is writing bounce records
3. Check logs for DynamoDB lookup errors
### Auto-replies not sent
1. Verify DynamoDB rules table accessible
2. Check `ooo_active` is `true` (boolean, not string)
3. Review logs for SES send errors
### Blocked emails still delivered
1. Verify blocklist table exists and is accessible
2. Check wildcard patterns are lowercase
3. Review logs for blocklist check errors
## 📄 License
MIT License - See LICENSE file for details

View File

@ -1,247 +0,0 @@
# 📋 Refactoring Summary
## ✅ Critical Bugs Fixed
### 1. **Signal Handler Typo** (CRITICAL)
**Old:**
```python
signal.signal(signalIGINT, signal_handler) # ❌ NameError at startup
```
**New:**
```python
signal.signal(signal.SIGINT, signal_handler) # ✅ Fixed
```
**Impact:** Worker couldn't start due to Python syntax error
---
### 2. **Missing Audit Trail for Blocked Emails** (HIGH)
**Old:**
```python
if all_blocked:
s3.delete_object(Bucket=bucket, Key=key) # ❌ No metadata
```
**New:**
```python
if all_blocked:
s3.mark_as_blocked(domain, key, blocked, sender, worker) # ✅ Metadata first
s3.delete_blocked_email(domain, key, worker) # ✅ Then delete
```
**Impact:**
- ❌ No compliance trail (who blocked, when, why)
- ❌ Impossible to troubleshoot
- ✅ Now: Full audit trail in S3 metadata before deletion
---
### 3. **Inefficient DynamoDB Calls** (MEDIUM - Performance)
**Old:**
```python
for recipient in recipients:
patterns = dynamodb.get_item(Key={'email_address': recipient}) # N calls!
if is_blocked(patterns, sender):
blocked.append(recipient)
```
**New:**
```python
# 1 batch call for all recipients
patterns_map = dynamodb.batch_get_blocked_patterns(recipients)
for recipient in recipients:
if is_blocked(patterns_map[recipient], sender):
blocked.append(recipient)
```
**Impact:**
- Old: 10 recipients = 10 DynamoDB calls = higher latency + costs
- New: 10 recipients = 1 DynamoDB call = **10x faster, 10x cheaper**
---
### 4. **S3 Delete Error Handling** (MEDIUM)
**Old:**
```python
try:
s3.delete_object(...)
except Exception as e:
log(f"Failed: {e}")
# ❌ Queue message still deleted → inconsistent state
return True
```
**New:**
```python
try:
s3.mark_as_blocked(...)
s3.delete_blocked_email(...)
except Exception as e:
log(f"Failed: {e}")
return False # ✅ Keep in queue for retry
```
**Impact:** Prevents orphaned S3 objects when delete fails
---
## 🏗️ Architecture Improvements
### Modular Structure
```
Before: 1 file, 800+ lines
After: 27 files, ~150 lines each
```
| Module | Responsibility | LOC |
|--------|---------------|-----|
| `config.py` | Configuration management | 85 |
| `logger.py` | Structured logging | 20 |
| `aws/s3_handler.py` | S3 operations | 180 |
| `aws/sqs_handler.py` | SQS polling | 95 |
| `aws/ses_handler.py` | SES sending | 45 |
| `aws/dynamodb_handler.py` | DynamoDB access | 175 |
| `email_processing/parser.py` | Email parsing | 75 |
| `email_processing/bounce_handler.py` | Bounce detection | 95 |
| `email_processing/blocklist.py` | Sender blocking | 90 |
| `email_processing/rules_processor.py` | OOO & forwarding | 285 |
| `smtp/pool.py` | Connection pooling | 110 |
| `smtp/delivery.py` | SMTP/LMTP delivery | 165 |
| `metrics/prometheus.py` | Metrics collection | 140 |
| `worker.py` | Message processing | 265 |
| `domain_poller.py` | Queue polling | 105 |
| `unified_worker.py` | Worker coordination | 180 |
| `health_server.py` | Health checks | 85 |
| `main.py` | Entry point | 45 |
**Total:** ~2,420 lines (well-organized vs 800 spaghetti)
---
## 🎯 Benefits Summary
### Maintainability
- ✅ **Single Responsibility**: Each class has one job
- ✅ **Easy to Navigate**: Find code by feature
- ✅ **Reduced Coupling**: Changes isolated to modules
- ✅ **Better Documentation**: Each module documented
### Testability
- ✅ **Unit Testing**: Mock `S3Handler`, test `BounceHandler` independently
- ✅ **Integration Testing**: Test components in isolation
- ✅ **Faster CI/CD**: Test only changed modules
### Performance
- ✅ **Batch Operations**: 10x fewer DynamoDB calls
- ✅ **Connection Pooling**: Reuse SMTP connections
- ✅ **Parallel Processing**: One thread per domain
### Reliability
- ✅ **Error Isolation**: Errors in one module don't crash others
- ✅ **Comprehensive Logging**: Structured, searchable logs
- ✅ **Audit Trail**: All actions recorded in S3 metadata
- ✅ **Graceful Degradation**: Works even if DynamoDB down
### Extensibility
Adding new features is now easy:
**Example: Add DKIM Signing**
1. Create `email_processing/dkim_signer.py`
2. Add to `worker.py`: `signed_bytes = dkim.sign(raw_bytes)`
3. Done! No touching 800-line monolith
---
## 📊 Performance Comparison
| Metric | Monolith | Modular | Improvement |
|--------|----------|---------|-------------|
| DynamoDB Calls/Email | N (per recipient) | 1 (batch) | **10x reduction** |
| SMTP Connections/Email | 1 (new each time) | Pooled (reused) | **5x fewer** |
| Startup Time | ~2s | ~1s | **2x faster** |
| Memory Usage | ~150MB | ~120MB | **20% less** |
| Lines per Feature | Mixed in 800 | ~100-150 | **Clearer** |
---
## 🔒 Security Improvements
1. **Audit Trail**: Every action logged with timestamp, worker ID
2. **Domain Validation**: Workers only process assigned domains
3. **Loop Prevention**: Detects recursive processing
4. **Blocklist**: Per-recipient wildcard blocking
5. **Separate Internal Routing**: Prevents SES loops
---
## 📝 Migration Path
### Zero Downtime Migration
1. Deploy modular version alongside monolith
2. Route half domains to new worker
3. Monitor metrics, logs for issues
4. Gradually shift all traffic
5. Decommission monolith
### Rollback Strategy
- Same environment variables
- Same DynamoDB schema
- Easy to switch back if needed
---
## 🎓 Code Quality Metrics
### Complexity Reduction
- **Cyclomatic Complexity**: Reduced from 45 → 8 per function
- **Function Length**: Max 50 lines (was 200+)
- **File Length**: Max 285 lines (was 800+)
### Code Smells Removed
- ❌ God Object (1 class doing everything)
- ❌ Long Methods (200+ line functions)
- ❌ Duplicate Code (3 copies of S3 metadata update)
- ❌ Magic Numbers (hardcoded retry counts)
### Best Practices Added
- ✅ Type Hints (where appropriate)
- ✅ Docstrings (all public methods)
- ✅ Logging (structured, consistent)
- ✅ Error Handling (specific exceptions)
---
## 🚀 Next Steps
### Recommended Follow-ups
1. **Add Unit Tests**: Use `pytest` with mocked AWS services
2. **CI/CD Pipeline**: Automated testing and deployment
3. **Monitoring Dashboard**: Grafana + Prometheus
4. **Alert Rules**: Notify on high error rates
5. **Load Testing**: Verify performance at scale
### Future Enhancements (Easy to Add Now!)
- **DKIM Signing**: New module in `email/`
- **Spam Filtering**: New module in `email/`
- **Rate Limiting**: New module in `smtp/`
- **Queue Prioritization**: Modify `domain_poller.py`
- **Multi-Region**: Add region config
---
## 📚 Documentation
All documentation included:
- **README.md**: Features, configuration, usage
- **ARCHITECTURE.md**: System design, data flows
- **MIGRATION.md**: Step-by-step migration guide
- **SUMMARY.md**: This file - key improvements
- **Code Comments**: Inline documentation
- **Docstrings**: All public methods documented
---
## ✨ Key Takeaway
The refactoring transforms a **fragile 800-line monolith** into a **robust, modular system** that is:
- **Faster** (batch operations)
- **Safer** (better error handling, audit trail)
- **Easier to maintain** (clear structure)
- **Ready to scale** (extensible architecture)
All while **fixing 4 critical bugs** and maintaining **100% backwards compatibility**.

View File

@ -1,109 +0,0 @@
#!/usr/bin/env python3
"""
Domain queue poller
"""
import json
import time
import threading
import traceback
from logger import log
from aws import SQSHandler
from worker import MessageProcessor
from metrics.prometheus import MetricsCollector
class DomainPoller:
"""Polls SQS queue for a single domain"""
def __init__(
self,
domain: str,
queue_url: str,
message_processor: MessageProcessor,
sqs: SQSHandler,
metrics: MetricsCollector,
stop_event: threading.Event,
stats_dict: dict,
stats_lock: threading.Lock
):
self.domain = domain
self.queue_url = queue_url
self.processor = message_processor
self.sqs = sqs
self.metrics = metrics
self.stop_event = stop_event
self.stats_dict = stats_dict
self.stats_lock = stats_lock
self.worker_name = f"worker-{domain}"
self.messages_processed = 0
def poll(self):
"""Main polling loop"""
log(f"🚀 Starting poller for {self.domain}", 'INFO', self.worker_name)
while not self.stop_event.is_set():
try:
# Receive messages from queue
messages = self.sqs.receive_messages(self.queue_url)
# Update queue size metric
if self.metrics:
queue_size = self.sqs.get_queue_size(self.queue_url)
self.metrics.set_queue_size(self.domain, queue_size)
if not messages:
continue
log(f"✉ Received {len(messages)} message(s)", 'INFO', self.worker_name)
for message in messages:
if self.stop_event.is_set():
break
receipt_handle = message['ReceiptHandle']
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
if self.metrics:
self.metrics.increment_in_flight()
start_time = time.time()
try:
success = self.processor.process_message(self.domain, message, receive_count)
if success:
self.sqs.delete_message(self.queue_url, receipt_handle)
self.messages_processed += 1
# Update shared stats
with self.stats_lock:
self.stats_dict[self.domain] = self.messages_processed
else:
log(
f"⚠ Retry queued (attempt {receive_count}/3)",
'WARNING',
self.worker_name
)
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR', self.worker_name)
self.sqs.delete_message(self.queue_url, receipt_handle)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR', self.worker_name)
traceback.print_exc()
finally:
if self.metrics:
self.metrics.decrement_in_flight()
self.metrics.observe_processing_time(
self.domain,
time.time() - start_time
)
except Exception as e:
log(f"✗ Error polling: {e}", 'ERROR', self.worker_name)
time.sleep(5)
log(f"👋 Stopped (processed: {self.messages_processed})", 'INFO', self.worker_name)

View File

@ -1,6 +0,0 @@
# domains.txt - Liste aller zu verarbeitenden Domains
# Eine Domain pro Zeile
# Zeilen mit # werden ignoriert
# Production Domains
andreasknuth.de

View File

@ -1,11 +0,0 @@
#!/usr/bin/env python3
"""
Email processing components
"""
from .parser import EmailParser
from .bounce_handler import BounceHandler
from .rules_processor import RulesProcessor
from .blocklist import BlocklistChecker
__all__ = ['EmailParser', 'BounceHandler', 'RulesProcessor', 'BlocklistChecker']

View File

@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""
Sender blocklist checking with wildcard support
"""
import fnmatch
from typing import List, Dict
from email.utils import parseaddr
from logger import log
from aws.dynamodb_handler import DynamoDBHandler
class BlocklistChecker:
"""Checks if senders are blocked"""
def __init__(self, dynamodb: DynamoDBHandler):
self.dynamodb = dynamodb
def is_sender_blocked(
self,
recipient: str,
sender: str,
worker_name: str
) -> bool:
"""
Check if sender is blocked for this recipient
Args:
recipient: Recipient email address
sender: Sender email address (may include name)
worker_name: Worker name for logging
Returns:
True if sender is blocked
"""
patterns = self.dynamodb.get_blocked_patterns(recipient)
if not patterns:
return False
sender_clean = parseaddr(sender)[1].lower()
for pattern in patterns:
if fnmatch.fnmatch(sender_clean, pattern.lower()):
log(
f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' "
f"for inbox {recipient}",
'WARNING',
worker_name
)
return True
return False
def batch_check_blocked_senders(
self,
recipients: List[str],
senders: List[str], # <-- Geändert: Erwartet nun eine Liste
worker_name: str
) -> Dict[str, bool]:
"""
Batch check if ANY of the senders are blocked for multiple recipients (more efficient)
Args:
recipients: List of recipient email addresses
senders: List of sender email addresses (Envelope & Header)
worker_name: Worker name for logging
Returns:
Dictionary mapping recipient -> is_blocked (bool)
"""
# Get all blocked patterns in one batch call
patterns_by_recipient = self.dynamodb.batch_get_blocked_patterns(recipients)
# Alle übergebenen Adressen bereinigen
senders_clean = [parseaddr(s)[1].lower() for s in senders if s]
result = {}
for recipient in recipients:
patterns = patterns_by_recipient.get(recipient, [])
is_blocked = False
for pattern in patterns:
for sender_clean in senders_clean:
if fnmatch.fnmatch(sender_clean, pattern.lower()):
log(
f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' "
f"for inbox {recipient}",
'WARNING',
worker_name
)
is_blocked = True
break # Bricht die Senders-Schleife ab
if is_blocked:
break # Bricht die Pattern-Schleife ab
result[recipient] = is_blocked
return result

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""
Bounce detection and header rewriting
"""
from typing import Tuple, Any
from logger import log
from aws.dynamodb_handler import DynamoDBHandler
class BounceHandler:
"""Handles bounce detection and header rewriting"""
def __init__(self, dynamodb: DynamoDBHandler):
self.dynamodb = dynamodb
@staticmethod
def is_ses_bounce_notification(parsed_email) -> bool:
"""Check if email is from SES MAILER-DAEMON"""
try:
from_header = (parsed_email.get('From') or '').lower()
except (AttributeError, TypeError, KeyError):
# Malformed From header - safely extract raw value
try:
from_header = str(parsed_email.get_all('From', [''])[0]).lower()
except:
from_header = ''
return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header
def apply_bounce_logic(
self,
parsed,
subject: str,
worker_name: str = 'unified'
) -> Tuple[Any, bool]:
"""
Check for SES Bounce, lookup in DynamoDB and rewrite headers
Args:
parsed: Parsed email message object
subject: Email subject
worker_name: Worker name for logging
Returns:
Tuple of (parsed_email_object, was_modified_bool)
"""
if not self.is_ses_bounce_notification(parsed):
return parsed, False
log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name)
# Extract Message-ID from header
message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0]
if not message_id:
log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name)
return parsed, False
log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name)
# Lookup in DynamoDB
bounce_info = self.dynamodb.get_bounce_info(message_id, worker_name)
if not bounce_info:
return parsed, False
# Bounce Info ausgeben
original_source = bounce_info['original_source']
bounced_recipients = bounce_info['bouncedRecipients']
bounce_type = bounce_info['bounceType']
bounce_subtype = bounce_info['bounceSubType']
log(f"✓ Found bounce info:", 'INFO', worker_name)
log(f" Original sender: {original_source}", 'INFO', worker_name)
log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name)
log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name)
if bounced_recipients:
new_from = bounced_recipients[0]
# Rewrite Headers
parsed['X-Original-SES-From'] = parsed.get('From', '')
parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
parsed.replace_header('From', new_from)
if not parsed.get('Reply-To'):
parsed['Reply-To'] = new_from
# Subject anpassen
if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {new_from}")
log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name)
return parsed, True
log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name)
return parsed, False

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python3
"""
Email parsing utilities
"""
from typing import Tuple, Optional
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
class EmailParser:
"""Email parsing utilities"""
@staticmethod
def parse_bytes(raw_bytes: bytes):
"""Parse raw email bytes into email.message object"""
return BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
@staticmethod
def extract_body_parts(parsed) -> Tuple[str, Optional[str]]:
"""
Extract both text/plain and text/html body parts
Args:
parsed: Parsed email message object
Returns:
Tuple of (text_body, html_body or None)
"""
text_body = ''
html_body = None
if parsed.is_multipart():
for part in parsed.walk():
content_type = part.get_content_type()
if content_type == 'text/plain':
try:
text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
elif content_type == 'text/html':
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
else:
try:
payload = parsed.get_payload(decode=True)
if payload:
decoded = payload.decode('utf-8', errors='ignore')
if parsed.get_content_type() == 'text/html':
html_body = decoded
else:
text_body = decoded
except Exception:
text_body = str(parsed.get_payload())
return text_body.strip() if text_body else '(No body content)', html_body
@staticmethod
def is_processed_by_worker(parsed) -> bool:
"""
Check if email was already processed by our worker (loop detection)
Args:
parsed: Parsed email message object
Returns:
True if already processed
"""
x_worker_processed = parsed.get('X-SES-Worker-Processed', '')
auto_submitted = parsed.get('Auto-Submitted', '')
# Only skip if OUR header is present
is_processed_by_us = bool(x_worker_processed)
is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed
return is_processed_by_us or is_our_auto_reply

View File

@ -1,365 +0,0 @@
#!/usr/bin/env python3
"""
Email rules processing (Auto-Reply/OOO and Forwarding)
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import parseaddr, formatdate, make_msgid
from botocore.exceptions import ClientError
from logger import log
from config import config, is_internal_address
from aws.dynamodb_handler import DynamoDBHandler
from aws.ses_handler import SESHandler
from email_processing.parser import EmailParser
class RulesProcessor:
"""Processes email rules (OOO, Forwarding)"""
def __init__(self, dynamodb: DynamoDBHandler, ses: SESHandler):
self.dynamodb = dynamodb
self.ses = ses
def process_rules_for_recipient(
self,
recipient: str,
parsed,
domain: str,
worker_name: str,
metrics_callback=None
):
"""
Process OOO and Forward rules for a recipient
Args:
recipient: Recipient email address
parsed: Parsed email message object
domain: Email domain
worker_name: Worker name for logging
metrics_callback: Optional callback to increment metrics
"""
rule = self.dynamodb.get_email_rules(recipient.lower())
if not rule:
return False # NEU: Return-Wert
original_from = parsed.get('From', '')
sender_name, sender_addr = parseaddr(original_from)
if not sender_addr:
sender_addr = original_from
# ============================================
# OOO / Auto-Reply handling
# ============================================
if rule.get('ooo_active', False):
self._handle_ooo(
recipient,
parsed,
sender_addr,
rule,
domain,
worker_name,
metrics_callback
)
# ============================================
# Forward handling
# ============================================
forwards = rule.get('forwards', [])
has_legacy_forward = False # NEU
if forwards:
if rule.get('forward_smtp_override'):
has_legacy_forward = True # NEU
self._handle_forwards(
recipient, parsed, original_from, forwards,
domain, worker_name, metrics_callback, rule=rule
)
return has_legacy_forward # NEU: statt kein Return
def _handle_ooo(
self,
recipient: str,
parsed,
sender_addr: str,
rule: dict,
domain: str,
worker_name: str,
metrics_callback=None
):
"""Handle Out-of-Office auto-reply"""
# Don't reply to automatic messages
auto_submitted = parsed.get('Auto-Submitted', '')
precedence = (parsed.get('Precedence') or '').lower()
if auto_submitted and auto_submitted != 'no':
log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name)
return
if precedence in ['bulk', 'junk', 'list']:
log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name)
return
if any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon']):
log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name)
return
try:
ooo_msg = rule.get('ooo_message', 'I am out of office.')
content_type = rule.get('ooo_content_type', 'text')
ooo_reply = self._create_ooo_reply(parsed, recipient, ooo_msg, content_type)
ooo_bytes = ooo_reply.as_bytes()
# Distinguish: Internal (Port 2525) vs External (SES)
if is_internal_address(sender_addr):
# Internal address → direct via Port 2525
success = self._send_internal_email(recipient, sender_addr, ooo_bytes, worker_name)
if success:
log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name)
else:
log(f"⚠ Internal OOO reply failed to {sender_addr}", 'WARNING', worker_name)
else:
# External address → via SES
success = self.ses.send_raw_email(recipient, sender_addr, ooo_bytes, worker_name)
if success:
log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name)
if metrics_callback:
metrics_callback('autoreply', domain)
except Exception as e:
log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name)
def _handle_forwards(
self,
recipient: str,
parsed,
original_from: str,
forwards: list,
domain: str,
worker_name: str,
metrics_callback=None,
rule: dict = None
):
"""Handle email forwarding"""
smtp_override = None
if rule:
smtp_override = rule.get('forward_smtp_override')
for forward_to in forwards:
try:
if smtp_override:
# Migration: Original-Mail unverändert weiterleiten
raw_bytes = parsed.as_bytes()
success = self._send_via_legacy_smtp(
recipient, forward_to, raw_bytes,
smtp_override, worker_name
)
if success:
log(f"✓ Forwarded via legacy SMTP to {forward_to} "
f"({smtp_override.get('host', '?')})",
'SUCCESS', worker_name)
else:
log(f"⚠ Legacy SMTP forward failed to {forward_to}",
'WARNING', worker_name)
else:
# Normaler Forward (neue FWD-Message)
fwd_msg = self._create_forward_message(
parsed, recipient, forward_to, original_from
)
fwd_bytes = fwd_msg.as_bytes()
if is_internal_address(forward_to):
success = self._send_internal_email(
recipient, forward_to, fwd_bytes, worker_name
)
if success:
log(f"✓ Forwarded internally to {forward_to}",
'SUCCESS', worker_name)
else:
log(f"⚠ Internal forward failed to {forward_to}",
'WARNING', worker_name)
else:
success = self.ses.send_raw_email(
recipient, forward_to, fwd_bytes, worker_name
)
if success:
log(f"✓ Forwarded externally to {forward_to} via SES",
'SUCCESS', worker_name)
if metrics_callback:
metrics_callback('forward', domain)
except Exception as e:
log(f"⚠ Forward failed to {forward_to}: {e}",
'ERROR', worker_name)
@staticmethod
def _send_via_legacy_smtp(
from_addr: str,
to_addr: str,
raw_message: bytes,
smtp_config: dict,
worker_name: str
) -> bool:
"""
Send email directly to a legacy SMTP server (for migration).
Bypasses SES completely to avoid mail loops.
"""
try:
host = smtp_config.get('host', '')
# DynamoDB speichert Zahlen als Decimal, daher int()
port = int(smtp_config.get('port', 25))
use_tls = smtp_config.get('tls', False)
username = smtp_config.get('username')
password = smtp_config.get('password')
if not host:
log(f" ✗ Legacy SMTP: no host configured", 'ERROR', worker_name)
return False
with smtplib.SMTP(host, port, timeout=30) as conn:
conn.ehlo()
if use_tls:
conn.starttls()
conn.ehlo()
if username and password:
conn.login(username, password)
conn.sendmail(from_addr, [to_addr], raw_message)
return True
except Exception as e:
log(
f" ✗ Legacy SMTP failed ({smtp_config.get('host', '?')}:"
f"{smtp_config.get('port', '?')}): {e}",
'ERROR', worker_name
)
return False
@staticmethod
def _send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool:
"""
Send email via internal SMTP port (bypasses transport_maps)
Args:
from_addr: From address
to_addr: To address
raw_message: Raw MIME message bytes
worker_name: Worker name for logging
Returns:
True on success, False on failure
"""
try:
with smtplib.SMTP(config.smtp_host, config.internal_smtp_port, timeout=30) as conn:
conn.ehlo()
conn.sendmail(from_addr, [to_addr], raw_message)
return True
except Exception as e:
log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name)
return False
@staticmethod
def _create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'):
"""Create Out-of-Office reply as complete MIME message"""
text_body, html_body = EmailParser.extract_body_parts(original_parsed)
original_subject = original_parsed.get('Subject', '(no subject)')
original_from = original_parsed.get('From', 'unknown')
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = original_from
msg['Subject'] = f"Out of Office: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['In-Reply-To'] = original_parsed.get('Message-ID', '')
msg['References'] = original_parsed.get('Message-ID', '')
msg['Auto-Submitted'] = 'auto-replied'
msg['X-SES-Worker-Processed'] = 'ooo-reply'
body_part = MIMEMultipart('alternative')
# Text version
text_content = f"{ooo_msg}\n\n--- Original Message ---\n"
text_content += f"From: {original_from}\n"
text_content += f"Subject: {original_subject}\n\n"
text_content += text_body
body_part.attach(MIMEText(text_content, 'plain', 'utf-8'))
# HTML version (if desired and original available)
if content_type == 'html' or html_body:
html_content = f"<div>{ooo_msg}</div><br><hr><br>"
html_content += "<strong>Original Message</strong><br>"
html_content += f"<strong>From:</strong> {original_from}<br>"
html_content += f"<strong>Subject:</strong> {original_subject}<br><br>"
html_content += (html_body if html_body else text_body.replace('\n', '<br>'))
body_part.attach(MIMEText(html_content, 'html', 'utf-8'))
msg.attach(body_part)
return msg
@staticmethod
def _create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str):
"""Create Forward message as complete MIME message"""
original_subject = original_parsed.get('Subject', '(no subject)')
original_date = original_parsed.get('Date', 'unknown')
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = forward_to
msg['Subject'] = f"FWD: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['Reply-To'] = original_from
msg['X-SES-Worker-Processed'] = 'forwarded'
text_body, html_body = EmailParser.extract_body_parts(original_parsed)
body_part = MIMEMultipart('alternative')
# Text version
fwd_text = "---------- Forwarded message ---------\n"
fwd_text += f"From: {original_from}\n"
fwd_text += f"Date: {original_date}\n"
fwd_text += f"Subject: {original_subject}\n"
fwd_text += f"To: {recipient}\n\n"
fwd_text += text_body
body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8'))
# HTML version
if html_body:
fwd_html = "<div style='border-left:3px solid #ccc;padding-left:10px;'>"
fwd_html += "<strong>---------- Forwarded message ---------</strong><br>"
fwd_html += f"<strong>From:</strong> {original_from}<br>"
fwd_html += f"<strong>Date:</strong> {original_date}<br>"
fwd_html += f"<strong>Subject:</strong> {original_subject}<br>"
fwd_html += f"<strong>To:</strong> {recipient}<br><br>"
fwd_html += html_body
fwd_html += "</div>"
body_part.attach(MIMEText(fwd_html, 'html', 'utf-8'))
msg.attach(body_part)
# Copy attachments - FIX FILENAMES
if original_parsed.is_multipart():
for part in original_parsed.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_type() in ['text/plain', 'text/html']:
continue
# Fix malformed filename in Content-Disposition
content_disp = part.get('Content-Disposition', '')
if 'filename=' in content_disp and '"' not in content_disp:
# Add quotes around filename with spaces
import re
fixed_disp = re.sub(r'filename=([^;"\s]+(?:\s+[^;"\s]+)*)', r'filename="\1"', content_disp)
part.replace_header('Content-Disposition', fixed_disp)
msg.attach(part)
return msg

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
"""
HTTP health check server
"""
import sys
import json
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from logger import log
from config import config
class SilentHTTPServer(HTTPServer):
"""HTTP Server that ignores connection reset errors from scanners"""
def handle_error(self, request, client_address):
exc_type = sys.exc_info()[0]
if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
pass # Silently ignore - these are just scanners/health checks disconnecting
else:
log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING')
class HealthHandler(BaseHTTPRequestHandler):
"""Health check request handler"""
worker = None # Will be set by start_health_server()
dynamodb_available = False
def do_GET(self):
if self.path == '/health' or self.path == '/':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
status = {
'status': 'healthy',
'domains': len(self.worker.queue_urls) if self.worker else 0,
'domain_list': list(self.worker.queue_urls.keys()) if self.worker else [],
'dynamodb': self.dynamodb_available,
'features': {
'bounce_rewriting': True,
'auto_reply': self.dynamodb_available,
'forwarding': self.dynamodb_available,
'blocklist': self.dynamodb_available,
'lmtp': config.lmtp_enabled
},
'timestamp': datetime.utcnow().isoformat()
}
self.wfile.write(json.dumps(status, indent=2).encode())
elif self.path == '/domains':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
domain_list = list(self.worker.queue_urls.keys()) if self.worker else []
self.wfile.write(json.dumps(domain_list).encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # Suppress HTTP access logs
def start_health_server(worker, dynamodb_available: bool):
"""
Start HTTP health check server
Args:
worker: UnifiedWorker instance
dynamodb_available: Whether DynamoDB is available
"""
# Set class attributes for handler
HealthHandler.worker = worker
HealthHandler.dynamodb_available = dynamodb_available
server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server')
thread.start()
log(f"Health server on port {config.health_port}")

View File

@ -1,79 +0,0 @@
#!/usr/bin/env python3
"""
Structured logging for email worker with Daily Rotation (Robust Version)
"""
import os
import sys
import logging
import threading
from logging.handlers import TimedRotatingFileHandler
# Konfiguration
LOG_DIR = "/var/log/email-worker"
LOG_FILE = os.path.join(LOG_DIR, "worker.log")
# Logger initialisieren
logger = logging.getLogger("unified-worker")
logger.setLevel(logging.INFO)
logger.propagate = False
# Formatierung
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 1. Console Handler (Immer aktiv!)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 2. File Handler (Robustes Setup)
try:
# Versuchen, das Verzeichnis zu erstellen, falls es fehlt
os.makedirs(LOG_DIR, exist_ok=True)
file_handler = TimedRotatingFileHandler(
LOG_FILE,
when="midnight",
interval=1,
backupCount=30,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.suffix = "%Y-%m-%d"
logger.addHandler(file_handler)
# Erfolgsmeldung auf Konsole (damit wir sehen, dass es geklappt hat)
print(f"✓ Logging to file enabled: {LOG_FILE}")
except Exception as e:
# Fallback: Ausführliche Fehlerdiagnose auf stdout
error_msg = f"⚠ LOGGING ERROR: Could not write to {LOG_FILE}\n"
error_msg += f" Error: {e}\n"
try:
error_msg += f" Current User (UID): {os.getuid()}\n"
error_msg += f" Current Group (GID): {os.getgid()}\n"
except:
pass
print(error_msg)
def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'):
"""
Structured logging function
"""
lvl_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
'SUCCESS': logging.INFO
}
log_level = lvl_map.get(level.upper(), logging.INFO)
prefix = "[SUCCESS] " if level.upper() == 'SUCCESS' else ""
final_message = f"[{worker_name}] {prefix}{message}"
logger.log(log_level, final_message)

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python3
"""
Main entry point for unified email worker
"""
import sys
import signal
from logger import log
from config import config
from unified_worker import UnifiedWorker
from health_server import start_health_server
from metrics.prometheus import start_metrics_server
def main():
"""Main entry point"""
# Create worker instance
worker = UnifiedWorker()
# Signal handlers for graceful shutdown
def signal_handler(signum, frame):
log(f"Received signal {signum}")
worker.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler) # Fixed: was signalIGINT in old version
# Setup worker
worker.setup()
# Start metrics server (if available)
metrics = start_metrics_server(config.metrics_port)
if metrics:
worker.set_metrics(metrics)
# Start health check server
start_health_server(worker, worker.dynamodb.available)
# Print startup banner
worker.print_startup_banner()
# Start worker
worker.start()
if __name__ == '__main__':
main()

View File

@ -1,8 +0,0 @@
#!/usr/bin/env python3
"""
Metrics collection
"""
from .prometheus import MetricsCollector, start_metrics_server
__all__ = ['MetricsCollector', 'start_metrics_server']

View File

@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
Prometheus metrics collection
"""
from typing import Optional
from logger import log
# Try to import Prometheus client
try:
from prometheus_client import start_http_server, Counter, Gauge, Histogram
PROMETHEUS_ENABLED = True
except ImportError:
PROMETHEUS_ENABLED = False
class MetricsCollector:
"""Collects and exposes Prometheus metrics"""
def __init__(self):
self.enabled = PROMETHEUS_ENABLED
if self.enabled:
# Email processing metrics
self.emails_processed = Counter(
'emails_processed_total',
'Total emails processed',
['domain', 'status']
)
self.emails_in_flight = Gauge(
'emails_in_flight',
'Emails currently being processed'
)
self.processing_time = Histogram(
'email_processing_seconds',
'Time to process email',
['domain']
)
self.queue_size = Gauge(
'queue_messages_available',
'Messages in queue',
['domain']
)
# Bounce metrics
self.bounces_processed = Counter(
'bounces_processed_total',
'Bounce notifications processed',
['domain', 'type']
)
# Rules metrics
self.autoreplies_sent = Counter(
'autoreplies_sent_total',
'Auto-replies sent',
['domain']
)
self.forwards_sent = Counter(
'forwards_sent_total',
'Forwards sent',
['domain']
)
# Blocklist metrics
self.blocked_senders = Counter(
'blocked_senders_total',
'Emails blocked by blacklist',
['domain']
)
def increment_processed(self, domain: str, status: str):
"""Increment processed email counter"""
if self.enabled:
self.emails_processed.labels(domain=domain, status=status).inc()
def increment_in_flight(self):
"""Increment in-flight email gauge"""
if self.enabled:
self.emails_in_flight.inc()
def decrement_in_flight(self):
"""Decrement in-flight email gauge"""
if self.enabled:
self.emails_in_flight.dec()
def observe_processing_time(self, domain: str, seconds: float):
"""Record processing time"""
if self.enabled:
self.processing_time.labels(domain=domain).observe(seconds)
def set_queue_size(self, domain: str, size: int):
"""Set queue size"""
if self.enabled:
self.queue_size.labels(domain=domain).set(size)
def increment_bounce(self, domain: str, bounce_type: str):
"""Increment bounce counter"""
if self.enabled:
self.bounces_processed.labels(domain=domain, type=bounce_type).inc()
def increment_autoreply(self, domain: str):
"""Increment autoreply counter"""
if self.enabled:
self.autoreplies_sent.labels(domain=domain).inc()
def increment_forward(self, domain: str):
"""Increment forward counter"""
if self.enabled:
self.forwards_sent.labels(domain=domain).inc()
def increment_blocked(self, domain: str):
"""Increment blocked sender counter"""
if self.enabled:
self.blocked_senders.labels(domain=domain).inc()
def start_metrics_server(port: int) -> Optional[MetricsCollector]:
"""
Start Prometheus metrics HTTP server
Args:
port: Port to listen on
Returns:
MetricsCollector instance or None if Prometheus not available
"""
if not PROMETHEUS_ENABLED:
log("⚠ Prometheus client not installed, metrics disabled", 'WARNING')
return None
try:
start_http_server(port)
log(f"Prometheus metrics on port {port}")
return MetricsCollector()
except Exception as e:
log(f"Failed to start metrics server: {e}", 'ERROR')
return None

View File

@ -1,2 +0,0 @@
boto3>=1.34.0
prometheus-client>=0.19.0

View File

@ -1,8 +0,0 @@
#!/usr/bin/env python3
"""
SMTP connection handling
"""
from .pool import SMTPPool
__all__ = ['SMTPPool']

View File

@ -1,187 +0,0 @@
#!/usr/bin/env python3
"""
SMTP/LMTP email delivery with retry logic
"""
import time
import smtplib
from typing import Tuple, Optional
from logger import log
from config import config
from smtp.pool import SMTPPool
class EmailDelivery:
"""Handles email delivery via SMTP or LMTP"""
def __init__(self, smtp_pool: SMTPPool):
self.smtp_pool = smtp_pool
@staticmethod
def is_permanent_recipient_error(error_msg: str) -> bool:
"""Check if error is permanent for this recipient (inbox doesn't exist)"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_to_recipient(
self,
from_addr: str,
recipient: str,
raw_message: bytes,
worker_name: str,
max_retries: int = 2
) -> Tuple[bool, Optional[str], bool]:
"""
Send email via SMTP/LMTP to ONE recipient
If LMTP is enabled, delivers directly to Dovecot (bypasses transport_maps).
With retry logic for connection errors.
Args:
from_addr: From address
recipient: Recipient address
raw_message: Raw MIME message bytes
worker_name: Worker name for logging
max_retries: Maximum retry attempts
Returns:
Tuple of (success: bool, error: str or None, is_permanent: bool)
"""
last_error = None
use_lmtp = config.lmtp_enabled
for attempt in range(max_retries + 1):
conn = None
try:
if use_lmtp:
# LMTP connection directly to Dovecot (bypasses Postfix/transport_maps)
conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30)
conn.ehlo()
else:
# Normal SMTP connection from pool
conn = self.smtp_pool.get_connection()
if not conn:
last_error = "Could not get SMTP connection"
log(
f"{recipient}: No SMTP connection "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
time.sleep(0.5)
continue
result = conn.sendmail(from_addr, [recipient], raw_message)
# Success
if use_lmtp:
conn.quit()
else:
self.smtp_pool.return_connection(conn)
if isinstance(result, dict) and result:
error = str(result.get(recipient, 'Unknown refusal'))
is_permanent = self.is_permanent_recipient_error(error)
log(
f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})",
'ERROR',
worker_name
)
return False, error, is_permanent
else:
delivery_method = "LMTP" if use_lmtp else "SMTP"
log(f"{recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name)
return True, None, False
except smtplib.SMTPServerDisconnected as e:
# Connection was closed - Retry with new connection
log(
f"{recipient}: Connection lost, retrying... "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
last_error = str(e)
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
except smtplib.SMTPRecipientsRefused as e:
if conn and not use_lmtp:
self.smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
error_msg = str(e)
is_permanent = self.is_permanent_recipient_error(error_msg)
log(f"{recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except smtplib.SMTPException as e:
error_msg = str(e)
# On connection errors: Retry
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
log(
f"{recipient}: Connection error, retrying... "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
last_error = error_msg
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
if conn and not use_lmtp:
self.smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
is_permanent = self.is_permanent_recipient_error(error_msg)
log(f"{recipient}: Error - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except Exception as e:
# Unknown error
if conn:
try:
conn.quit()
except:
pass
log(f"{recipient}: Unexpected error - {e}", 'ERROR', worker_name)
return False, str(e), False
# All retries failed
log(
f"{recipient}: All retries failed - {last_error}",
'ERROR',
worker_name
)
return False, last_error or "Connection failed after retries", False

View File

@ -1,113 +0,0 @@
#!/usr/bin/env python3
"""
SMTP Connection Pool with robust connection handling
"""
import smtplib
from queue import Queue, Empty
from typing import Optional
from logger import log
from config import config
class SMTPPool:
"""Thread-safe SMTP Connection Pool"""
def __init__(self, host: str, port: int, pool_size: int = 5):
self.host = host
self.port = port
self.pool_size = pool_size
self._pool: Queue = Queue(maxsize=pool_size)
self._initialized = False
def _create_connection(self) -> Optional[smtplib.SMTP]:
"""Create new SMTP connection"""
try:
conn = smtplib.SMTP(self.host, self.port, timeout=30)
conn.ehlo()
if config.smtp_use_tls:
conn.starttls()
conn.ehlo()
if config.smtp_user and config.smtp_pass:
conn.login(config.smtp_user, config.smtp_pass)
log(f" 📡 New SMTP connection created to {self.host}:{self.port}")
return conn
except Exception as e:
log(f"Failed to create SMTP connection: {e}", 'ERROR')
return None
def _test_connection(self, conn: smtplib.SMTP) -> bool:
"""Test if connection is still alive"""
try:
status = conn.noop()[0]
return status == 250
except Exception:
return False
def initialize(self):
"""Pre-create connections"""
if self._initialized:
return
# Only 1-2 connections initially, rest on-demand
for _ in range(min(2, self.pool_size)):
conn = self._create_connection()
if conn:
self._pool.put(conn)
self._initialized = True
log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})")
def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]:
"""Get a valid connection from pool or create new one"""
# Try to get from pool
try:
conn = self._pool.get(block=False)
# Test if connection is still alive
if self._test_connection(conn):
return conn
else:
# Connection is dead, close and create new one
try:
conn.quit()
except:
pass
log(f" ♻ Recycled stale SMTP connection")
return self._create_connection()
except Empty:
# Pool empty, create new connection
return self._create_connection()
def return_connection(self, conn: smtplib.SMTP):
"""Return connection to pool if still valid"""
if conn is None:
return
# Check if connection is still good
if not self._test_connection(conn):
try:
conn.quit()
except:
pass
log(f" 🗑 Discarded broken SMTP connection")
return
# Try to return to pool
try:
self._pool.put_nowait(conn)
except:
# Pool full, close connection
try:
conn.quit()
except:
pass
def close_all(self):
"""Close all connections"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.quit()
except:
pass

View File

@ -1,201 +0,0 @@
#!/usr/bin/env python3
"""
Unified Worker - coordinates all domain pollers
"""
import sys
import time
import threading
from typing import List, Dict
from logger import log
from config import config, load_domains
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
from smtp import SMTPPool
from smtp.delivery import EmailDelivery
from worker import MessageProcessor
from domain_poller import DomainPoller
from metrics.prometheus import MetricsCollector
class UnifiedWorker:
"""Main worker coordinating all domain pollers"""
def __init__(self):
self.stop_event = threading.Event()
self.domains: List[str] = []
self.queue_urls: Dict[str, str] = {}
self.poller_threads: List[threading.Thread] = []
# Shared stats across all pollers
self.domain_stats: Dict[str, int] = {} # domain -> processed count
self.stats_lock = threading.Lock()
# AWS handlers
self.s3 = S3Handler()
self.sqs = SQSHandler()
self.ses = SESHandler()
self.dynamodb = DynamoDBHandler()
# SMTP pool
self.smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
# Email delivery
self.delivery = EmailDelivery(self.smtp_pool)
# Metrics
self.metrics: MetricsCollector = None
# Message processor
self.processor = MessageProcessor(
self.s3,
self.sqs,
self.ses,
self.dynamodb,
self.delivery,
None # Metrics will be set later
)
def setup(self):
"""Initialize worker"""
self.domains = load_domains()
if not self.domains:
log("❌ No domains configured!", 'ERROR')
sys.exit(1)
# Get queue URLs
for domain in self.domains:
url = self.sqs.get_queue_url(domain)
if url:
self.queue_urls[domain] = url
log(f"{domain} -> queue found")
else:
log(f"{domain} -> Queue not found!", 'WARNING')
if not self.queue_urls:
log("❌ No valid queues found!", 'ERROR')
sys.exit(1)
# Initialize SMTP pool
self.smtp_pool.initialize()
log(f"Initialized with {len(self.queue_urls)} domains")
def start(self):
"""Start all domain pollers"""
# Initialize stats for all domains
for domain in self.queue_urls.keys():
self.domain_stats[domain] = 0
# Create poller for each domain
for domain, queue_url in self.queue_urls.items():
poller = DomainPoller(
domain=domain,
queue_url=queue_url,
message_processor=self.processor,
sqs=self.sqs,
metrics=self.metrics,
stop_event=self.stop_event,
stats_dict=self.domain_stats,
stats_lock=self.stats_lock
)
thread = threading.Thread(
target=poller.poll,
name=f"poller-{domain}",
daemon=True
)
thread.start()
self.poller_threads.append(thread)
log(f"Started {len(self.poller_threads)} domain pollers")
# Periodic status log (every 5 minutes)
last_status_log = time.time()
status_interval = 300 # 5 minutes
try:
while not self.stop_event.is_set():
self.stop_event.wait(timeout=10)
# Log status summary every 5 minutes
if time.time() - last_status_log > status_interval:
self._log_status_table()
last_status_log = time.time()
except KeyboardInterrupt:
pass
def _log_status_table(self):
"""Log a compact status table"""
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
with self.stats_lock:
total_processed = sum(self.domain_stats.values())
# Build compact stats: only show domains with activity or top domains
stats_parts = []
for domain in sorted(self.queue_urls.keys()):
count = self.domain_stats.get(domain, 0)
if count > 0: # Only show active domains
# Shorten domain for display
short_domain = domain.split('.')[0][:12]
stats_parts.append(f"{short_domain}:{count}")
if stats_parts:
stats_line = " | ".join(stats_parts)
else:
stats_line = "no activity"
log(
f"📊 Status: {active_threads}/{len(self.poller_threads)} active, "
f"total:{total_processed} | {stats_line}"
)
def stop(self):
"""Stop gracefully"""
log("⚠ Stopping worker...")
self.stop_event.set()
# Wait for poller threads (max 10 seconds each)
for thread in self.poller_threads:
thread.join(timeout=10)
if thread.is_alive():
log(f"Warning: {thread.name} did not stop gracefully", 'WARNING')
self.smtp_pool.close_all()
log("👋 Worker stopped")
def set_metrics(self, metrics: MetricsCollector):
"""Set metrics collector"""
self.metrics = metrics
self.processor.metrics = metrics
def print_startup_banner(self):
"""Print startup information"""
log(f"\n{'='*70}")
log(f"🚀 UNIFIED EMAIL WORKER")
log(f"{'='*70}")
log(f" Domains: {len(self.queue_urls)}")
log(f" DynamoDB: {'Connected' if self.dynamodb.available else 'Not Available'}")
if config.lmtp_enabled:
log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)")
else:
log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}")
log(f" Poll Interval: {config.poll_interval}s")
log(f" Visibility: {config.visibility_timeout}s")
log(f"")
log(f" Features:")
log(f" ✓ Bounce Detection & Header Rewriting")
log(f" {'' if self.dynamodb.available else ''} Auto-Reply / Out-of-Office")
log(f" {'' if self.dynamodb.available else ''} Email Forwarding")
log(f" {'' if self.dynamodb.available else ''} Blocked Senders (Wildcard)")
log(f" {'' if self.metrics else ''} Prometheus Metrics")
log(f" {'' if config.lmtp_enabled else ''} LMTP Direct Delivery")
log(f"")
log(f" Active Domains:")
for domain in sorted(self.queue_urls.keys()):
log(f"{domain}")
log(f"{'='*70}\n")

View File

@ -1,352 +0,0 @@
#!/usr/bin/env python3
"""
Email message processing worker
"""
import json
import traceback
from logger import log
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker
from smtp.delivery import EmailDelivery
from metrics.prometheus import MetricsCollector
from email.parser import BytesParser # War wahrscheinlich schon da, prüfen
from email.policy import compat32 # <--- NEU: Hinzufügen
class MessageProcessor:
"""Processes individual email messages"""
def __init__(
self,
s3: S3Handler,
sqs: SQSHandler,
ses: SESHandler,
dynamodb: DynamoDBHandler,
delivery: EmailDelivery,
metrics: MetricsCollector
):
self.s3 = s3
self.sqs = sqs
self.ses = ses
self.dynamodb = dynamodb
self.delivery = delivery
self.metrics = metrics
# Initialize sub-processors
self.parser = EmailParser()
self.bounce_handler = BounceHandler(dynamodb)
self.rules_processor = RulesProcessor(dynamodb, ses)
self.blocklist = BlocklistChecker(dynamodb)
def process_message(self, domain: str, message: dict, receive_count: int) -> bool:
"""
Process one email message from queue
Args:
domain: Email domain
message: SQS message dict
receive_count: Number of times received
Returns:
True to delete from queue, False to retry
"""
worker_name = f"worker-{domain}"
try:
# 1. UNPACKING (SNS -> SES)
message_body = json.loads(message['Body'])
if 'Message' in message_body and 'Type' in message_body:
# It's an SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
ses_msg = message_body
# 2. EXTRACT DATA
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId')
# FIX: Ignore Amazon SES Setup Notification
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log(" Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name)
return True
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR', worker_name)
return True
# Domain Validation
if recipients:
first_recipient = recipients[0]
recipient_domain = first_recipient.split('@')[1]
if recipient_domain.lower() != domain.lower():
log(
f"⚠ Security: Ignored message for {recipient_domain} "
f"(I am worker for {domain})",
'WARNING',
worker_name
)
return True
else:
log("⚠ Warning: No recipients in event", 'WARNING', worker_name)
return True
key = message_id
# Compact single-line log for email processing
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
# 3. DOWNLOAD FROM S3
raw_bytes = self.s3.get_email(domain, message_id, receive_count)
if raw_bytes is None:
# S3 object not found yet, retry
return False
# 4. LOOP DETECTION
temp_parsed = self.parser.parse_bytes(raw_bytes)
skip_rules = self.parser.is_processed_by_worker(temp_parsed)
if skip_rules:
log("🔄 Loop prevention: Already processed by worker", 'INFO', worker_name)
# 5. PARSING & BOUNCE LOGIC
try:
# --- FIX 2.0: Pre-Sanitize via Legacy Mode ---
# Der strikte Parser crasht SOFORT beim Zugriff auf kaputte Header.
# Wir müssen erst "nachsichtig" parsen, reparieren und Bytes neu generieren.
try:
# 1. Parsen im Compat32-Modus (ignoriert Syntaxfehler)
lenient_parser = BytesParser(policy=compat32)
temp_msg = lenient_parser.parsebytes(raw_bytes)
# 2. Prüfen und Reparieren
bad_msg_id = temp_msg.get('Message-ID', '')
if bad_msg_id and ('[' in bad_msg_id or ']' in bad_msg_id):
clean_id = bad_msg_id.replace('[', '').replace(']', '')
temp_msg.replace_header('Message-ID', clean_id)
# 3. Bytes mit repariertem Header neu schreiben
raw_bytes = temp_msg.as_bytes()
log(f" 🔧 Sanitized malformed Message-ID via Legacy Mode: {clean_id}", 'INFO', worker_name)
if self.metrics:
self.metrics.increment_bounce(domain, 'sanitized_header')
except Exception as e_sanitize:
# Sollte nicht passieren, aber wir wollen hier nicht abbrechen
log(f" ⚠ Sanitization warning: {e_sanitize}", 'WARNING', worker_name)
# ---------------------------------------------
parsed = self.parser.parse_bytes(raw_bytes)
# --- FIX START: Sanitize Malformed Headers ---
# Fix für Microsofts <[uuid]@domain> Message-IDs, die Python crashen lassen
current_msg_id = parsed.get('Message-ID', '')
if current_msg_id and ('[' in current_msg_id or ']' in current_msg_id):
# Klammern entfernen, aber spitze Klammern behalten
clean_id = current_msg_id.replace('[', '').replace(']', '')
parsed.replace_header('Message-ID', clean_id)
log(" 🔧 Sanitized malformed Message-ID", 'INFO', worker_name)
# --- FIX END ---
subject = parsed.get('Subject', '(no subject)')
# Bounce header rewriting
is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed)
parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name)
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From')
if self.metrics:
self.metrics.increment_bounce(domain, 'rewritten')
else:
from_addr_final = from_addr
# Marker für alle Emails von extern setzen
if not skip_rules: # Nur wenn nicht bereits processed
parsed['X-SES-Worker-Processed'] = 'delivered'
raw_bytes = parsed.as_bytes() # <--- Hier knallte es vorher
except Exception as e:
# --- VERBESSERTES ERROR LOGGING ---
error_msg = f"⚠ Parsing/Logic Error: {e}. Sending original."
log(error_msg, 'WARNING', worker_name)
# Den vollen Traceback ins Log schreiben (als ERROR markiert)
tb_str = traceback.format_exc()
log(f"Full Traceback:\n{tb_str}", 'ERROR', worker_name)
# ----------------------------------
# Fallback: Wir versuchen trotzdem, die Original-Mail zuzustellen
from_addr_final = from_addr
is_bounce = False
skip_rules = False
# 6. BLOCKLIST CHECK (Batch for efficiency)
senders_to_check = []
# 1. Die Envelope-Adresse (aus dem SES Event / Return-Path)
if from_addr:
senders_to_check.append(from_addr)
# 2. Die echte Header-Adresse (aus der MIME-E-Mail geparst)
header_from = parsed.get('From')
if header_from and header_from not in senders_to_check:
senders_to_check.append(header_from)
# 3. Falls die Bounce-Logik die Adresse umgeschrieben hat
if from_addr_final and from_addr_final not in senders_to_check:
senders_to_check.append(from_addr_final)
# Prüfe nun alle extrahierten Adressen gegen die Datenbank
blocked_by_recipient = self.blocklist.batch_check_blocked_senders(
recipients,
senders_to_check, # <-- Übergabe der Liste
worker_name
)
# 7. PROCESS RECIPIENTS
log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name)
successful = []
failed_permanent = []
failed_temporary = []
blocked_recipients = []
for recipient in recipients:
# Check if blocked
if blocked_by_recipient.get(recipient, False):
log(
f"🗑 Silently dropping message for {recipient} (Sender blocked)",
'INFO',
worker_name
)
blocked_recipients.append(recipient)
if self.metrics:
self.metrics.increment_blocked(domain)
continue
# Process rules (OOO, Forwarding) - not for bounces or already forwarded
skip_local_delivery = False # NEU
if not is_bounce and not skip_rules:
def metrics_callback(action_type: str, dom: str):
"""Callback for metrics from rules processor"""
if self.metrics:
if action_type == 'autoreply':
self.metrics.increment_autoreply(dom)
elif action_type == 'forward':
self.metrics.increment_forward(dom)
skip_local_delivery = self.rules_processor.process_rules_for_recipient(
recipient,
parsed,
domain,
worker_name,
metrics_callback
)
# SMTP Delivery
if skip_local_delivery: # NEU
log(f" ⏭ Skipping local delivery for {recipient} (legacy forward active)",
'INFO', worker_name)
successful.append(recipient) # Zählt als "handled"
else:
success, error, is_perm = self.delivery.send_to_recipient(
from_addr_final, recipient, raw_bytes, worker_name
)
if success:
successful.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'success')
elif is_perm:
failed_permanent.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'permanent_failure')
else:
failed_temporary.append(recipient)
if self.metrics:
self.metrics.increment_processed(domain, 'temporary_failure')
# 8. RESULT & CLEANUP
total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients)
if total_handled == len(recipients):
# All recipients handled (success, permanent fail, or blocked)
if len(blocked_recipients) == len(recipients):
# All recipients blocked - mark and delete S3 object
try:
self.s3.mark_as_blocked(
domain,
message_id,
blocked_recipients,
from_addr_final,
worker_name
)
self.s3.delete_blocked_email(domain, message_id, worker_name)
except Exception as e:
log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name)
# Don't delete from queue if S3 operations failed
return False
elif len(successful) > 0:
# At least one success
self.s3.mark_as_processed(
domain,
message_id,
worker_name,
failed_permanent if failed_permanent else None
)
elif len(failed_permanent) > 0:
# All failed permanently
self.s3.mark_as_all_invalid(
domain,
message_id,
failed_permanent,
worker_name
)
# Build result summary
result_parts = []
if successful:
result_parts.append(f"{len(successful)} OK")
if failed_permanent:
result_parts.append(f"{len(failed_permanent)} invalid")
if blocked_recipients:
result_parts.append(f"{len(blocked_recipients)} blocked")
log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name)
return True
else:
# Some recipients had temporary failures
log(
f"🔄 Temp failure ({len(failed_temporary)} failed), will retry",
'WARNING',
worker_name
)
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name)
traceback.print_exc()
return False