From 688d49e21868977568fd0186a37d5fbde8f8c287 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Fri, 13 Mar 2026 20:12:52 -0500 Subject: [PATCH] remove python worker --- email-worker/.dockerignore | 38 -- email-worker/.env.example | 42 -- email-worker/.gitignore | 35 -- email-worker/Dockerfile | 37 -- email-worker/Makefile | 50 --- email-worker/aws/__init__.py | 11 - email-worker/aws/dynamodb_handler.py | 192 --------- email-worker/aws/s3_handler.py | 193 --------- email-worker/aws/ses_handler.py | 53 --- email-worker/aws/sqs_handler.py | 103 ----- email-worker/config.py | 100 ----- email-worker/docker-compose.yml | 85 ---- email-worker/docs/ARCHITECTURE.md | 381 ------------------ email-worker/docs/CHANGELOG.md | 37 -- email-worker/docs/COMPATIBILITY.md | 311 -------------- email-worker/docs/MIGRATION.md | 366 ----------------- email-worker/docs/QUICKSTART.md | 330 --------------- email-worker/docs/README.md | 306 -------------- email-worker/docs/SUMMARY.md | 247 ------------ email-worker/domain_poller.py | 109 ----- email-worker/domains.txt | 6 - email-worker/email_processing/__init__.py | 11 - email-worker/email_processing/blocklist.py | 100 ----- .../email_processing/bounce_handler.py | 99 ----- email-worker/email_processing/parser.py | 80 ---- .../email_processing/rules_processor.py | 365 ----------------- email-worker/health_server.py | 85 ---- email-worker/logger.py | 79 ---- email-worker/main.py | 50 --- email-worker/metrics/__init__.py | 8 - email-worker/metrics/prometheus.py | 142 ------- email-worker/requirements.txt | 2 - email-worker/smtp/__init__.py | 8 - email-worker/smtp/delivery.py | 187 --------- email-worker/smtp/pool.py | 113 ------ email-worker/unified_worker.py | 201 --------- email-worker/worker.py | 352 ---------------- 37 files changed, 4914 deletions(-) delete mode 100644 email-worker/.dockerignore delete mode 100644 email-worker/.env.example delete mode 100644 email-worker/.gitignore delete mode 100644 email-worker/Dockerfile delete mode 100644 email-worker/Makefile delete mode 100644 email-worker/aws/__init__.py delete mode 100644 email-worker/aws/dynamodb_handler.py delete mode 100644 email-worker/aws/s3_handler.py delete mode 100644 email-worker/aws/ses_handler.py delete mode 100644 email-worker/aws/sqs_handler.py delete mode 100644 email-worker/config.py delete mode 100644 email-worker/docker-compose.yml delete mode 100644 email-worker/docs/ARCHITECTURE.md delete mode 100644 email-worker/docs/CHANGELOG.md delete mode 100644 email-worker/docs/COMPATIBILITY.md delete mode 100644 email-worker/docs/MIGRATION.md delete mode 100644 email-worker/docs/QUICKSTART.md delete mode 100644 email-worker/docs/README.md delete mode 100644 email-worker/docs/SUMMARY.md delete mode 100644 email-worker/domain_poller.py delete mode 100644 email-worker/domains.txt delete mode 100644 email-worker/email_processing/__init__.py delete mode 100644 email-worker/email_processing/blocklist.py delete mode 100644 email-worker/email_processing/bounce_handler.py delete mode 100644 email-worker/email_processing/parser.py delete mode 100644 email-worker/email_processing/rules_processor.py delete mode 100644 email-worker/health_server.py delete mode 100644 email-worker/logger.py delete mode 100644 email-worker/main.py delete mode 100644 email-worker/metrics/__init__.py delete mode 100644 email-worker/metrics/prometheus.py delete mode 100644 email-worker/requirements.txt delete mode 100644 email-worker/smtp/__init__.py delete mode 100644 email-worker/smtp/delivery.py delete mode 100644 email-worker/smtp/pool.py delete mode 100644 email-worker/unified_worker.py delete mode 100644 email-worker/worker.py diff --git a/email-worker/.dockerignore b/email-worker/.dockerignore deleted file mode 100644 index 1b401b8..0000000 --- a/email-worker/.dockerignore +++ /dev/null @@ -1,38 +0,0 @@ -# Documentation -*.md -!README.md - -# Git -.git -.gitignore - -# Python -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -*.so - -# Logs -logs/ -*.log - -# Environment -.env -.env.example - -# IDE -.vscode/ -.idea/ -*.swp -*.swo - -# OS -.DS_Store -Thumbs.db - -# Build -*.tar.gz -dist/ -build/ diff --git a/email-worker/.env.example b/email-worker/.env.example deleted file mode 100644 index 5c36780..0000000 --- a/email-worker/.env.example +++ /dev/null @@ -1,42 +0,0 @@ -# AWS Credentials -AWS_REGION=us-east-2 -AWS_ACCESS_KEY_ID=your_access_key_here -AWS_SECRET_ACCESS_KEY=your_secret_key_here - -# Domains Configuration -DOMAINS=example.com,another.com -# Alternative: Use domains file -# DOMAINS_FILE=/etc/email-worker/domains.txt - -# Worker Settings -WORKER_THREADS=10 -POLL_INTERVAL=20 -MAX_MESSAGES=10 -VISIBILITY_TIMEOUT=300 - -# SMTP Configuration -SMTP_HOST=localhost -SMTP_PORT=25 -SMTP_USE_TLS=false -SMTP_USER= -SMTP_PASS= -SMTP_POOL_SIZE=5 -INTERNAL_SMTP_PORT=2525 - -# LMTP Configuration (Optional) -LMTP_ENABLED=false -LMTP_HOST=localhost -LMTP_PORT=24 - -# DynamoDB Tables -DYNAMODB_RULES_TABLE=email-rules -DYNAMODB_MESSAGES_TABLE=ses-outbound-messages -DYNAMODB_BLOCKED_TABLE=email-blocked-senders - -# Bounce Handling -BOUNCE_LOOKUP_RETRIES=3 -BOUNCE_LOOKUP_DELAY=1.0 - -# Monitoring Ports -METRICS_PORT=8000 -HEALTH_PORT=8080 diff --git a/email-worker/.gitignore b/email-worker/.gitignore deleted file mode 100644 index 6e9d2ba..0000000 --- a/email-worker/.gitignore +++ /dev/null @@ -1,35 +0,0 @@ -# Python -__pycache__/ -*.py[cod] -*$py.class -*.so -.Python -env/ -venv/ -ENV/ -.venv - -# Logs -*.log - -# Environment -.env - -# IDE -.vscode/ -.idea/ -*.swp -*.swo - -# OS -.DS_Store -Thumbs.db - -# Build -dist/ -build/ -*.egg-info/ - -# Archives -*.tar.gz -*.zip diff --git a/email-worker/Dockerfile b/email-worker/Dockerfile deleted file mode 100644 index 7660a2c..0000000 --- a/email-worker/Dockerfile +++ /dev/null @@ -1,37 +0,0 @@ -FROM python:3.11-slim - -LABEL maintainer="andreas@knuth.dev" -LABEL description="Unified multi-domain email worker (modular version)" - -# System packages -RUN apt-get update && apt-get install -y --no-install-recommends \ - curl \ - && rm -rf /var/lib/apt/lists/* - -# Non-root user -RUN useradd -m -u 1000 worker && \ - mkdir -p /app /var/log/email-worker /etc/email-worker && \ - chown -R worker:worker /app /var/log/email-worker /etc/email-worker - -# Python dependencies -COPY requirements.txt /app/ -RUN pip install --no-cache-dir -r /app/requirements.txt - -# Worker code (all modules) -COPY --chown=worker:worker aws/ /app/aws/ -COPY --chown=worker:worker email_processing/ /app/email_processing/ -COPY --chown=worker:worker smtp/ /app/smtp/ -COPY --chown=worker:worker metrics/ /app/metrics/ -COPY --chown=worker:worker *.py /app/ - -WORKDIR /app -USER worker - -# Health check -HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ - CMD curl -f http://localhost:8080/health || exit 1 - -# Unbuffered output -ENV PYTHONUNBUFFERED=1 - -CMD ["python3", "main.py"] diff --git a/email-worker/Makefile b/email-worker/Makefile deleted file mode 100644 index b088779..0000000 --- a/email-worker/Makefile +++ /dev/null @@ -1,50 +0,0 @@ -.PHONY: help install run test lint clean docker-build docker-run docker-stop docker-logs - -help: - @echo "Available commands:" - @echo " make install - Install dependencies" - @echo " make run - Run worker locally" - @echo " make test - Run tests (TODO)" - @echo " make lint - Run linting" - @echo " make clean - Clean up files" - @echo " make docker-build - Build Docker image" - @echo " make docker-run - Run with docker-compose" - @echo " make docker-stop - Stop docker-compose" - @echo " make docker-logs - Show docker logs" - -install: - pip install -r requirements.txt - -run: - python3 main.py - -test: - @echo "TODO: Add tests" - # python3 -m pytest tests/ - -lint: - @echo "Running pylint..." - -pylint --rcfile=.pylintrc *.py **/*.py 2>/dev/null || echo "pylint not installed" - @echo "Running flake8..." - -flake8 --max-line-length=120 . 2>/dev/null || echo "flake8 not installed" - -clean: - find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true - find . -type f -name "*.pyc" -delete - find . -type f -name "*.pyo" -delete - find . -type f -name "*.log" -delete - -docker-build: - docker build -t unified-email-worker:latest . - -docker-run: - docker-compose up -d - -docker-stop: - docker-compose down - -docker-logs: - docker-compose logs -f email-worker - -docker-restart: docker-stop docker-build docker-run - @echo "Worker restarted" diff --git a/email-worker/aws/__init__.py b/email-worker/aws/__init__.py deleted file mode 100644 index cb80192..0000000 --- a/email-worker/aws/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -""" -AWS service handlers -""" - -from .s3_handler import S3Handler -from .sqs_handler import SQSHandler -from .ses_handler import SESHandler -from .dynamodb_handler import DynamoDBHandler - -__all__ = ['S3Handler', 'SQSHandler', 'SESHandler', 'DynamoDBHandler'] diff --git a/email-worker/aws/dynamodb_handler.py b/email-worker/aws/dynamodb_handler.py deleted file mode 100644 index 436eae1..0000000 --- a/email-worker/aws/dynamodb_handler.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python3 -""" -DynamoDB operations handler -""" - -import time -from typing import Optional, Dict, Any, List -import boto3 -from botocore.exceptions import ClientError - -from logger import log -from config import config - - -class DynamoDBHandler: - """Handles all DynamoDB operations""" - - def __init__(self): - self.resource = boto3.resource('dynamodb', region_name=config.aws_region) - self.available = False - self.rules_table = None - self.messages_table = None - self.blocked_table = None - - self._initialize_tables() - - def _initialize_tables(self): - """Initialize DynamoDB table connections""" - try: - self.rules_table = self.resource.Table(config.rules_table) - self.messages_table = self.resource.Table(config.messages_table) - self.blocked_table = self.resource.Table(config.blocked_table) - - # Test connection - self.rules_table.table_status - self.messages_table.table_status - self.blocked_table.table_status - - self.available = True - log("✓ DynamoDB tables connected successfully") - - except Exception as e: - log(f"⚠ DynamoDB not fully available: {e}", 'WARNING') - self.available = False - - def get_email_rules(self, email_address: str) -> Optional[Dict[str, Any]]: - """ - Get email rules for recipient (OOO, Forwarding) - - Args: - email_address: Recipient email address - - Returns: - Rule dictionary or None if not found - """ - if not self.available or not self.rules_table: - return None - - try: - response = self.rules_table.get_item(Key={'email_address': email_address}) - return response.get('Item') - - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - log(f"⚠ DynamoDB error for {email_address}: {e}", 'ERROR') - return None - - except Exception as e: - log(f"⚠ DynamoDB error for {email_address}: {e}", 'WARNING') - return None - - def get_bounce_info(self, message_id: str, worker_name: str = 'unified') -> Optional[Dict]: - """ - Get bounce information from DynamoDB with retry logic - - Args: - message_id: SES Message ID - worker_name: Worker name for logging - - Returns: - Bounce info dictionary or None - """ - if not self.available or not self.messages_table: - return None - - for attempt in range(config.bounce_lookup_retries): - try: - response = self.messages_table.get_item(Key={'MessageId': message_id}) - item = response.get('Item') - - if item: - return { - 'original_source': item.get('original_source', ''), - 'bounceType': item.get('bounceType', 'Unknown'), - 'bounceSubType': item.get('bounceSubType', 'Unknown'), - 'bouncedRecipients': item.get('bouncedRecipients', []), - 'timestamp': item.get('timestamp', '') - } - - if attempt < config.bounce_lookup_retries - 1: - log( - f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s " - f"(attempt {attempt + 1}/{config.bounce_lookup_retries})...", - 'INFO', - worker_name - ) - time.sleep(config.bounce_lookup_delay) - else: - log( - f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts " - f"for Message-ID: {message_id}", - 'WARNING', - worker_name - ) - return None - - except Exception as e: - log( - f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}", - 'ERROR', - worker_name - ) - if attempt < config.bounce_lookup_retries - 1: - time.sleep(config.bounce_lookup_delay) - else: - return None - - return None - - def get_blocked_patterns(self, email_address: str) -> List[str]: - """ - Get blocked sender patterns for recipient - - Args: - email_address: Recipient email address - - Returns: - List of blocked patterns (may include wildcards) - """ - if not self.available or not self.blocked_table: - return [] - - try: - response = self.blocked_table.get_item(Key={'email_address': email_address}) - item = response.get('Item', {}) - return item.get('blocked_patterns', []) - - except Exception as e: - log(f"⚠ Error getting block list for {email_address}: {e}", 'ERROR') - return [] - - def batch_get_blocked_patterns(self, email_addresses: List[str]) -> Dict[str, List[str]]: - """ - Batch get blocked patterns for multiple recipients (more efficient) - - Args: - email_addresses: List of recipient email addresses - - Returns: - Dictionary mapping email_address -> list of blocked patterns - """ - if not self.available or not self.blocked_table: - return {addr: [] for addr in email_addresses} - - try: - # DynamoDB BatchGetItem - keys = [{'email_address': addr} for addr in email_addresses] - response = self.resource.batch_get_item( - RequestItems={ - config.blocked_table: {'Keys': keys} - } - ) - - items = response.get('Responses', {}).get(config.blocked_table, []) - - # Build result dictionary - result = {} - for email_address in email_addresses: - matching_item = next( - (item for item in items if item['email_address'] == email_address), - None - ) - if matching_item: - result[email_address] = matching_item.get('blocked_patterns', []) - else: - result[email_address] = [] - - return result - - except Exception as e: - log(f"⚠ Batch blocklist check error: {e}", 'ERROR') - return {addr: [] for addr in email_addresses} diff --git a/email-worker/aws/s3_handler.py b/email-worker/aws/s3_handler.py deleted file mode 100644 index d0ed849..0000000 --- a/email-worker/aws/s3_handler.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python3 -""" -S3 operations handler -""" - -import time -from typing import Optional, List -import boto3 -from botocore.exceptions import ClientError - -from logger import log -from config import config, domain_to_bucket_name - - -class S3Handler: - """Handles all S3 operations""" - - def __init__(self): - self.client = boto3.client('s3', region_name=config.aws_region) - - def get_email(self, domain: str, message_id: str, receive_count: int) -> Optional[bytes]: - """ - Download email from S3 - - Args: - domain: Email domain - message_id: SES Message ID - receive_count: Number of times this message was received from queue - - Returns: - Raw email bytes or None if not found/error - """ - bucket = domain_to_bucket_name(domain) - - try: - response = self.client.get_object(Bucket=bucket, Key=message_id) - return response['Body'].read() - - except self.client.exceptions.NoSuchKey: - if receive_count < 5: - log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') - return None - else: - log(f"❌ S3 Object missing permanently after retries.", 'ERROR') - raise - - except ClientError as e: - if e.response['Error']['Code'] == 'NoSuchKey': - if receive_count < 5: - log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') - return None - else: - log(f"❌ S3 Object missing permanently after retries.", 'ERROR') - raise - log(f"❌ S3 Download Error: {e}", 'ERROR') - raise - - except Exception as e: - log(f"❌ S3 Download Error: {e}", 'ERROR') - raise - - def mark_as_processed( - self, - domain: str, - message_id: str, - worker_name: str, - invalid_inboxes: Optional[List[str]] = None - ): - """Mark email as successfully delivered""" - bucket = domain_to_bucket_name(domain) - - try: - head = self.client.head_object(Bucket=bucket, Key=message_id) - metadata = head.get('Metadata', {}) or {} - - metadata['processed'] = 'true' - metadata['processed_at'] = str(int(time.time())) - metadata['processed_by'] = worker_name - metadata['status'] = 'delivered' - metadata.pop('processing_started', None) - metadata.pop('queued_at', None) - - if invalid_inboxes: - metadata['invalid_inboxes'] = ','.join(invalid_inboxes) - log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name) - - self.client.copy_object( - Bucket=bucket, - Key=message_id, - CopySource={'Bucket': bucket, 'Key': message_id}, - Metadata=metadata, - MetadataDirective='REPLACE' - ) - - except Exception as e: - log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) - - def mark_as_all_invalid( - self, - domain: str, - message_id: str, - invalid_inboxes: List[str], - worker_name: str - ): - """Mark email as failed because all recipients are invalid""" - bucket = domain_to_bucket_name(domain) - - try: - head = self.client.head_object(Bucket=bucket, Key=message_id) - metadata = head.get('Metadata', {}) or {} - - metadata['processed'] = 'true' - metadata['processed_at'] = str(int(time.time())) - metadata['processed_by'] = worker_name - metadata['status'] = 'failed' - metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' - metadata['invalid_inboxes'] = ','.join(invalid_inboxes) - metadata.pop('processing_started', None) - metadata.pop('queued_at', None) - - self.client.copy_object( - Bucket=bucket, - Key=message_id, - CopySource={'Bucket': bucket, 'Key': message_id}, - Metadata=metadata, - MetadataDirective='REPLACE' - ) - - except Exception as e: - log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) - - def mark_as_blocked( - self, - domain: str, - message_id: str, - blocked_recipients: List[str], - sender: str, - worker_name: str - ): - """ - Mark email as blocked by sender blacklist - - This sets metadata BEFORE deletion for audit trail - """ - bucket = domain_to_bucket_name(domain) - - try: - head = self.client.head_object(Bucket=bucket, Key=message_id) - metadata = head.get('Metadata', {}) or {} - - metadata['processed'] = 'true' - metadata['processed_at'] = str(int(time.time())) - metadata['processed_by'] = worker_name - metadata['status'] = 'blocked' - metadata['blocked_recipients'] = ','.join(blocked_recipients) - metadata['blocked_sender'] = sender - metadata.pop('processing_started', None) - metadata.pop('queued_at', None) - - self.client.copy_object( - Bucket=bucket, - Key=message_id, - CopySource={'Bucket': bucket, 'Key': message_id}, - Metadata=metadata, - MetadataDirective='REPLACE' - ) - - log(f"✓ Marked as blocked in S3 metadata", 'INFO', worker_name) - - except Exception as e: - log(f"⚠ Failed to mark as blocked: {e}", 'ERROR', worker_name) - raise - - def delete_blocked_email( - self, - domain: str, - message_id: str, - worker_name: str - ): - """ - Delete email after marking as blocked - - Only call this after mark_as_blocked() succeeded - """ - bucket = domain_to_bucket_name(domain) - - try: - self.client.delete_object(Bucket=bucket, Key=message_id) - log(f"🗑 Deleted blocked email from S3", 'SUCCESS', worker_name) - - except Exception as e: - log(f"⚠ Failed to delete blocked email: {e}", 'ERROR', worker_name) - raise diff --git a/email-worker/aws/ses_handler.py b/email-worker/aws/ses_handler.py deleted file mode 100644 index 8a249bf..0000000 --- a/email-worker/aws/ses_handler.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python3 -""" -SES operations handler -""" - -import boto3 -from botocore.exceptions import ClientError - -from logger import log -from config import config - - -class SESHandler: - """Handles all SES operations""" - - def __init__(self): - self.client = boto3.client('ses', region_name=config.aws_region) - - def send_raw_email( - self, - source: str, - destination: str, - raw_message: bytes, - worker_name: str - ) -> bool: - """ - Send raw email via SES - - Args: - source: From address - destination: To address - raw_message: Raw MIME message bytes - worker_name: Worker name for logging - - Returns: - True if sent successfully, False otherwise - """ - try: - self.client.send_raw_email( - Source=source, - Destinations=[destination], - RawMessage={'Data': raw_message} - ) - return True - - except ClientError as e: - error_code = e.response['Error']['Code'] - log(f"⚠ SES send failed to {destination} ({error_code}): {e}", 'ERROR', worker_name) - return False - - except Exception as e: - log(f"⚠ SES send failed to {destination}: {e}", 'ERROR', worker_name) - return False diff --git a/email-worker/aws/sqs_handler.py b/email-worker/aws/sqs_handler.py deleted file mode 100644 index 020e268..0000000 --- a/email-worker/aws/sqs_handler.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python3 -""" -SQS operations handler -""" - -from typing import Optional, List, Dict, Any -import boto3 -from botocore.exceptions import ClientError - -from logger import log -from config import config, domain_to_queue_name - - -class SQSHandler: - """Handles all SQS operations""" - - def __init__(self): - self.client = boto3.client('sqs', region_name=config.aws_region) - - def get_queue_url(self, domain: str) -> Optional[str]: - """ - Get SQS queue URL for domain - - Args: - domain: Email domain - - Returns: - Queue URL or None if not found - """ - queue_name = domain_to_queue_name(domain) - - try: - response = self.client.get_queue_url(QueueName=queue_name) - return response['QueueUrl'] - - except ClientError as e: - if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': - log(f"Queue not found for domain: {domain}", 'WARNING') - else: - log(f"Error getting queue URL for {domain}: {e}", 'ERROR') - return None - - def receive_messages(self, queue_url: str) -> List[Dict[str, Any]]: - """ - Receive messages from queue - - Args: - queue_url: SQS Queue URL - - Returns: - List of message dictionaries - """ - try: - response = self.client.receive_message( - QueueUrl=queue_url, - MaxNumberOfMessages=config.max_messages, - WaitTimeSeconds=config.poll_interval, - VisibilityTimeout=config.visibility_timeout, - AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'] - ) - - return response.get('Messages', []) - - except Exception as e: - log(f"Error receiving messages: {e}", 'ERROR') - return [] - - def delete_message(self, queue_url: str, receipt_handle: str): - """ - Delete message from queue - - Args: - queue_url: SQS Queue URL - receipt_handle: Message receipt handle - """ - try: - self.client.delete_message( - QueueUrl=queue_url, - ReceiptHandle=receipt_handle - ) - except Exception as e: - log(f"Error deleting message: {e}", 'ERROR') - raise - - def get_queue_size(self, queue_url: str) -> int: - """ - Get approximate number of messages in queue - - Args: - queue_url: SQS Queue URL - - Returns: - Number of messages (0 if error) - """ - try: - attrs = self.client.get_queue_attributes( - QueueUrl=queue_url, - AttributeNames=['ApproximateNumberOfMessages'] - ) - return int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0)) - - except Exception: - return 0 diff --git a/email-worker/config.py b/email-worker/config.py deleted file mode 100644 index bcc78a5..0000000 --- a/email-worker/config.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python3 -""" -Configuration management for unified email worker -""" - -import os -from dataclasses import dataclass -from typing import Set - - -@dataclass -class Config: - """Worker Configuration""" - # AWS - aws_region: str = os.environ.get('AWS_REGION', 'us-east-2') - - # Domains to process - domains_list: str = os.environ.get('DOMAINS', '') - domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt') - - # Worker Settings - worker_threads: int = int(os.environ.get('WORKER_THREADS', '10')) - poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20')) - max_messages: int = int(os.environ.get('MAX_MESSAGES', '10')) - visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) - - # SMTP for delivery - smtp_host: str = os.environ.get('SMTP_HOST', 'localhost') - smtp_port: int = int(os.environ.get('SMTP_PORT', '25')) - smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' - smtp_user: str = os.environ.get('SMTP_USER', '') - smtp_pass: str = os.environ.get('SMTP_PASS', '') - smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5')) - - # Internal SMTP (bypasses transport_maps) - internal_smtp_port: int = int(os.environ.get('INTERNAL_SMTP_PORT', '2525')) - - # LMTP for local delivery (bypasses Postfix transport_maps) - lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true' - lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost') - lmtp_port: int = int(os.environ.get('LMTP_PORT', '24')) - - # DynamoDB Tables - rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules') - messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages') - blocked_table: str = os.environ.get('DYNAMODB_BLOCKED_TABLE', 'email-blocked-senders') - - # Bounce Handling - bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3')) - bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0')) - - # Monitoring - metrics_port: int = int(os.environ.get('METRICS_PORT', '8000')) - health_port: int = int(os.environ.get('HEALTH_PORT', '8080')) - - -# Global configuration instance -config = Config() - -# Global set of managed domains (populated at startup) -MANAGED_DOMAINS: Set[str] = set() - - -def load_domains() -> list[str]: - """Load domains from config and populate MANAGED_DOMAINS global""" - global MANAGED_DOMAINS - domains = [] - - if config.domains_list: - domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()]) - - if os.path.exists(config.domains_file): - with open(config.domains_file, 'r') as f: - for line in f: - domain = line.strip() - if domain and not domain.startswith('#'): - domains.append(domain) - - domains = list(set(domains)) - MANAGED_DOMAINS = set(d.lower() for d in domains) - - return domains - - -def is_internal_address(email_address: str) -> bool: - """Check if email address belongs to one of our managed domains""" - if '@' not in email_address: - return False - domain = email_address.split('@')[1].lower() - return domain in MANAGED_DOMAINS - - -def domain_to_queue_name(domain: str) -> str: - """Convert domain to SQS queue name""" - return domain.replace('.', '-') + '-queue' - - -def domain_to_bucket_name(domain: str) -> str: - """Convert domain to S3 bucket name""" - return domain.replace('.', '-') + '-emails' diff --git a/email-worker/docker-compose.yml b/email-worker/docker-compose.yml deleted file mode 100644 index 2fcd892..0000000 --- a/email-worker/docker-compose.yml +++ /dev/null @@ -1,85 +0,0 @@ -services: - unified-worker: - build: - context: . - dockerfile: Dockerfile - container_name: unified-email-worker - restart: unless-stopped - network_mode: host # Für lokalen SMTP-Zugriff - - volumes: - # Domain-Liste (eine Domain pro Zeile) - - ./domains.txt:/etc/email-worker/domains.txt:ro - # Logs - - ./logs:/var/log/email-worker - - environment: - # AWS Credentials - - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - - AWS_REGION=us-east-2 - - # Domains via File (domains.txt) - - DOMAINS_FILE=/etc/email-worker/domains.txt - - # Alternative: Domains direkt als Liste - # - DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net - - # Worker Settings - - WORKER_THREADS=${WORKER_THREADS:-10} - - POLL_INTERVAL=${POLL_INTERVAL:-20} - - MAX_MESSAGES=${MAX_MESSAGES:-10} - - VISIBILITY_TIMEOUT=${VISIBILITY_TIMEOUT:-300} - - # SMTP (lokal zum DMS) - - SMTP_HOST=${SMTP_HOST:-localhost} - - SMTP_PORT=${SMTP_PORT:-25} - - SMTP_POOL_SIZE=${SMTP_POOL_SIZE:-5} - - SMTP_USE_TLS=false - - # Internal SMTP Port (bypasses transport_maps) - - INTERNAL_SMTP_PORT=25 - - # LMTP (Optional - für direktes Dovecot Delivery) - - LMTP_ENABLED=${LMTP_ENABLED:-false} - - LMTP_HOST=${LMTP_HOST:-localhost} - - LMTP_PORT=${LMTP_PORT:-24} - - # DynamoDB Tables - - DYNAMODB_RULES_TABLE=${DYNAMODB_RULES_TABLE:-email-rules} - - DYNAMODB_MESSAGES_TABLE=${DYNAMODB_MESSAGES_TABLE:-ses-outbound-messages} - - DYNAMODB_BLOCKED_TABLE=${DYNAMODB_BLOCKED_TABLE:-email-blocked-senders} - - # Bounce Handling - - BOUNCE_LOOKUP_RETRIES=${BOUNCE_LOOKUP_RETRIES:-3} - - BOUNCE_LOOKUP_DELAY=${BOUNCE_LOOKUP_DELAY:-1.0} - - # Monitoring - - METRICS_PORT=8000 - - HEALTH_PORT=8080 - - ports: - # Prometheus Metrics - - "8000:8000" - # Health Check - - "8080:8080" - - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8080/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 10s - - logging: - driver: "json-file" - options: - max-size: "50m" - max-file: "10" - - deploy: - resources: - limits: - memory: 512M - reservations: - memory: 256M diff --git a/email-worker/docs/ARCHITECTURE.md b/email-worker/docs/ARCHITECTURE.md deleted file mode 100644 index 1611c0e..0000000 --- a/email-worker/docs/ARCHITECTURE.md +++ /dev/null @@ -1,381 +0,0 @@ -# Architecture Documentation - -## 📐 System Overview - -``` -┌─────────────────────────────────────────────────────────────────────┐ -│ AWS Cloud Services │ -├─────────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ -│ │ SQS │────▶│ S3 │ │ SES │ │ -│ │ Queues │ │ Buckets │ │ Sending │ │ -│ └──────────┘ └──────────┘ └──────────┘ │ -│ │ │ │ │ -│ │ │ │ │ -│ ┌────▼─────────────────▼─────────────────▼───────────────┐ │ -│ │ DynamoDB Tables │ │ -│ │ • email-rules (OOO, Forwarding) │ │ -│ │ • ses-outbound-messages (Bounce Tracking) │ │ -│ │ • email-blocked-senders (Blocklist) │ │ -│ └─────────────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────────────────┘ - │ - │ Polling & Processing - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Unified Email Worker │ -├─────────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌─────────────────────────────────────────────────────────┐ │ -│ │ Main Thread (unified_worker.py) │ │ -│ │ • Coordination │ │ -│ │ • Status Monitoring │ │ -│ │ • Signal Handling │ │ -│ └────────────┬────────────────────────────────────────────┘ │ -│ │ │ -│ ├──▶ Domain Poller Thread 1 (example.com) │ -│ ├──▶ Domain Poller Thread 2 (another.com) │ -│ ├──▶ Domain Poller Thread 3 (...) │ -│ ├──▶ Health Server Thread (port 8080) │ -│ └──▶ Metrics Server Thread (port 8000) │ -│ │ -│ ┌──────────────────────────────────────────────────────┐ │ -│ │ SMTP Connection Pool │ │ -│ │ • Connection Reuse │ │ -│ │ • Health Checks │ │ -│ │ • Auto-reconnect │ │ -│ └──────────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────────────────┘ - │ - │ SMTP/LMTP Delivery - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Mail Server (Docker Mailserver) │ -├─────────────────────────────────────────────────────────────────────┤ -│ │ -│ Port 25 (SMTP - from pool) │ -│ Port 2525 (SMTP - internal delivery, bypasses transport_maps) │ -│ Port 24 (LMTP - direct to Dovecot, bypasses Postfix) │ -│ │ -└─────────────────────────────────────────────────────────────────────┘ -``` - -## 🔄 Message Flow - -### 1. Email Reception -``` -1. SES receives email -2. SES stores in S3 bucket (domain-emails/) -3. SES publishes SNS notification -4. SNS enqueues message to SQS (domain-queue) -``` - -### 2. Worker Processing -``` -┌─────────────────────────────────────────────────────────────┐ -│ Domain Poller (domain_poller.py) │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 1. Poll SQS Queue (20s long poll) │ -│ • Receive up to 10 messages │ -│ • Extract SES notification from SNS wrapper │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 2. Download from S3 (s3_handler.py) │ -│ • Get raw email bytes │ -│ • Handle retry if not found yet │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 3. Parse Email (parser.py) │ -│ • Parse MIME structure │ -│ • Extract headers, body, attachments │ -│ • Check for loop prevention marker │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 4. Bounce Detection (bounce_handler.py) │ -│ • Check if from mailer-daemon@amazonses.com │ -│ • Lookup original sender in DynamoDB │ -│ • Rewrite From/Reply-To headers │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 5. Blocklist Check (blocklist.py) │ -│ • Batch lookup blocked patterns for all recipients │ -│ • Check sender against wildcard patterns │ -│ • Mark blocked recipients │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 6. Process Rules for Each Recipient (rules_processor.py) │ -│ ├─▶ Auto-Reply (OOO) │ -│ │ • Check if ooo_active = true │ -│ │ • Don't reply to auto-submitted messages │ -│ │ • Create reply with original message quoted │ -│ │ • Send via SES (external) or Port 2525 (internal) │ -│ │ │ -│ └─▶ Forwarding │ -│ • Get forward addresses from rule │ -│ • Create forward with FWD: prefix │ -│ • Preserve attachments │ -│ • Send via SES (external) or Port 2525 (internal) │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 7. SMTP Delivery (delivery.py) │ -│ • Get connection from pool │ -│ • Send to each recipient (not blocked) │ -│ • Track success/permanent/temporary failures │ -│ • Return connection to pool │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 8. Update S3 Metadata (s3_handler.py) │ -│ ├─▶ All Blocked: mark_as_blocked() + delete() │ -│ ├─▶ Some Success: mark_as_processed() │ -│ └─▶ All Invalid: mark_as_all_invalid() │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ 9. Delete from Queue │ -│ • Success or permanent failure → delete │ -│ • Temporary failure → keep in queue (retry) │ -└─────────────────────────────────────────────────────────────┘ -``` - -## 🧩 Component Details - -### AWS Handlers (`aws/`) - -#### `s3_handler.py` -- **Purpose**: All S3 operations -- **Key Methods**: - - `get_email()`: Download with retry logic - - `mark_as_processed()`: Update metadata on success - - `mark_as_all_invalid()`: Update metadata on permanent failure - - `mark_as_blocked()`: Set metadata before deletion - - `delete_blocked_email()`: Delete after marking - -#### `sqs_handler.py` -- **Purpose**: Queue operations -- **Key Methods**: - - `get_queue_url()`: Resolve domain to queue - - `receive_messages()`: Long poll with attributes - - `delete_message()`: Remove after processing - - `get_queue_size()`: For metrics - -#### `ses_handler.py` -- **Purpose**: Send emails via SES -- **Key Methods**: - - `send_raw_email()`: Send raw MIME message - -#### `dynamodb_handler.py` -- **Purpose**: All DynamoDB operations -- **Key Methods**: - - `get_email_rules()`: OOO and forwarding rules - - `get_bounce_info()`: Bounce lookup with retry - - `get_blocked_patterns()`: Single recipient - - `batch_get_blocked_patterns()`: Multiple recipients (efficient!) - -### Email Processors (`email_processing/`) - -#### `parser.py` -- **Purpose**: Email parsing utilities -- **Key Methods**: - - `parse_bytes()`: Parse raw email - - `extract_body_parts()`: Get text/html bodies - - `is_processed_by_worker()`: Loop detection - -#### `bounce_handler.py` -- **Purpose**: Bounce detection and rewriting -- **Key Methods**: - - `is_ses_bounce_notification()`: Detect MAILER-DAEMON - - `apply_bounce_logic()`: Rewrite headers - -#### `blocklist.py` -- **Purpose**: Sender blocking with wildcards -- **Key Methods**: - - `is_sender_blocked()`: Single check - - `batch_check_blocked_senders()`: Batch check (preferred!) -- **Wildcard Support**: Uses `fnmatch` for patterns like `*@spam.com` - -#### `rules_processor.py` -- **Purpose**: OOO and forwarding logic -- **Key Methods**: - - `process_rules_for_recipient()`: Main entry point - - `_handle_ooo()`: Auto-reply logic - - `_handle_forwards()`: Forwarding logic - - `_create_ooo_reply()`: Build OOO message - - `_create_forward_message()`: Build forward with attachments - -### SMTP Components (`smtp/`) - -#### `pool.py` -- **Purpose**: Connection pooling -- **Features**: - - Lazy initialization - - Health checks (NOOP) - - Auto-reconnect on stale connections - - Thread-safe queue - -#### `delivery.py` -- **Purpose**: Actual email delivery -- **Features**: - - SMTP or LMTP support - - Retry logic for connection errors - - Permanent vs temporary failure detection - - Connection pool integration - -### Monitoring (`metrics/`) - -#### `prometheus.py` -- **Purpose**: Metrics collection -- **Metrics**: - - Counters: processed, bounces, autoreplies, forwards, blocked - - Gauges: in_flight, queue_size - - Histograms: processing_time - -## 🔐 Security Features - -### 1. Domain Validation -Each worker only processes messages for its assigned domains: -```python -if recipient_domain.lower() != domain.lower(): - log("Security: Ignored message for wrong domain") - return True # Delete from queue -``` - -### 2. Loop Prevention -Detects already-processed emails: -```python -if parsed.get('X-SES-Worker-Processed'): - log("Loop prevention: Already processed") - skip_rules = True -``` - -### 3. Blocklist Wildcards -Supports flexible patterns: -```python -blocked_patterns = [ - "*@spam.com", # Any user at spam.com - "noreply@*.com", # noreply at any .com - "newsletter@example.*" # newsletter at any example TLD -] -``` - -### 4. Internal vs External Routing -Prevents SES loops for internal forwards: -```python -if is_internal_address(forward_to): - # Direct SMTP to port 2525 (bypasses transport_maps) - send_internal_email(...) -else: - # Send via SES - ses.send_raw_email(...) -``` - -## 📊 Data Flow Diagrams - -### Bounce Rewriting Flow -``` -SES Bounce → Worker → DynamoDB Lookup → Header Rewrite → Delivery - ↓ - Message-ID - ↓ - ses-outbound-messages - {MessageId: "abc", - original_source: "real@sender.com", - bouncedRecipients: ["failed@domain.com"]} - ↓ - Rewrite From: mailer-daemon@amazonses.com - → failed@domain.com -``` - -### Blocklist Check Flow -``` -Incoming Email → Batch DynamoDB Call → Pattern Matching → Decision - ↓ ↓ ↓ ↓ -sender@spam.com Get patterns for fnmatch() Block/Allow - all recipients "*@spam.com" - matches! -``` - -## ⚡ Performance Optimizations - -### 1. Batch DynamoDB Calls -```python -# ❌ Old way: N calls for N recipients -for recipient in recipients: - patterns = dynamodb.get_blocked_patterns(recipient) - -# ✅ New way: 1 call for N recipients -patterns_by_recipient = dynamodb.batch_get_blocked_patterns(recipients) -``` - -### 2. Connection Pooling -```python -# ❌ Old way: New connection per email -conn = smtplib.SMTP(host, port) -conn.sendmail(...) -conn.quit() - -# ✅ New way: Reuse connections -conn = pool.get_connection() # Reuses existing -conn.sendmail(...) -pool.return_connection(conn) # Returns to pool -``` - -### 3. Parallel Domain Processing -``` -Domain 1 Thread ──▶ Process 10 emails/poll -Domain 2 Thread ──▶ Process 10 emails/poll -Domain 3 Thread ──▶ Process 10 emails/poll - (All in parallel!) -``` - -## 🔄 Error Handling Strategy - -### Retry Logic -- **Temporary Errors**: Keep in queue, retry (visibility timeout) -- **Permanent Errors**: Mark in S3, delete from queue -- **S3 Not Found**: Retry up to 5 times (eventual consistency) - -### Connection Failures -```python -for attempt in range(max_retries): - try: - conn.sendmail(...) - return True - except SMTPServerDisconnected: - log("Connection lost, retrying...") - time.sleep(0.3) - continue # Try again -``` - -### Audit Trail -All actions recorded in S3 metadata: -```json -{ - "processed": "true", - "processed_at": "1706000000", - "processed_by": "worker-example.com", - "status": "delivered", - "invalid_inboxes": "baduser@example.com", - "blocked_sender": "spam@bad.com" -} -``` diff --git a/email-worker/docs/CHANGELOG.md b/email-worker/docs/CHANGELOG.md deleted file mode 100644 index 005c6d1..0000000 --- a/email-worker/docs/CHANGELOG.md +++ /dev/null @@ -1,37 +0,0 @@ -# Changelog - -## v1.0.1 - 2025-01-23 - -### Fixed -- **CRITICAL:** Renamed `email/` directory to `email_processing/` to avoid namespace conflict with Python's built-in `email` module - - This fixes the `ImportError: cannot import name 'BytesParser' from partially initialized module 'email.parser'` error - - All imports updated accordingly - - No functional changes, only namespace fix - -### Changed -- Updated all documentation to reflect new directory name -- Updated Dockerfile to copy `email_processing/` instead of `email/` - -## v1.0.0 - 2025-01-23 - -### Added -- Modular architecture (27 files vs 1 monolith) -- Batch DynamoDB operations (10x performance improvement) -- Sender blocklist with wildcard support -- LMTP direct delivery support -- Enhanced metrics and monitoring -- Comprehensive documentation (6 MD files) - -### Fixed -- `signal.SIGINT` typo (was `signalIGINT`) -- Missing S3 metadata audit trail for blocked emails -- Inefficient DynamoDB calls (N calls → 1 batch call) -- S3 delete error handling (proper retry logic) - -### Documentation -- README.md - Full feature documentation -- QUICKSTART.md - Quick deployment guide for your setup -- ARCHITECTURE.md - Detailed system architecture -- MIGRATION.md - Migration from monolith -- COMPATIBILITY.md - 100% compatibility proof -- SUMMARY.md - All improvements overview diff --git a/email-worker/docs/COMPATIBILITY.md b/email-worker/docs/COMPATIBILITY.md deleted file mode 100644 index c1a5237..0000000 --- a/email-worker/docs/COMPATIBILITY.md +++ /dev/null @@ -1,311 +0,0 @@ -# Kompatibilität mit bestehendem Setup - -## ✅ 100% Kompatibel - -Die modulare Version ist **vollständig kompatibel** mit deinem bestehenden Setup: - -### 1. Dockerfile -- ✅ Gleicher Base Image: `python:3.11-slim` -- ✅ Gleicher User: `worker` (UID 1000) -- ✅ Gleiche Verzeichnisse: `/app`, `/var/log/email-worker`, `/etc/email-worker` -- ✅ Gleicher Health Check: `curl http://localhost:8080/health` -- ✅ Gleiche Labels: `maintainer`, `description` -- **Änderung:** Kopiert nun mehrere Module statt einer Datei - -### 2. docker-compose.yml -- ✅ Gleicher Container Name: `unified-email-worker` -- ✅ Gleicher Network Mode: `host` -- ✅ Gleiche Volumes: `domains.txt`, `logs/` -- ✅ Gleiche Ports: `8000`, `8080` -- ✅ Gleiche Environment Variables -- ✅ Gleiche Resource Limits: 512M / 256M -- ✅ Gleiche Logging Config: 50M / 10 files -- **Neu:** Zusätzliche optionale Env Vars (abwärtskompatibel) - -### 3. requirements.txt -- ✅ Gleiche Dependencies: `boto3`, `prometheus-client` -- ✅ Aktualisierte Versionen (>=1.34.0 statt >=1.26.0) -- **Kompatibel:** Alte Version funktioniert auch, neue ist empfohlen - -### 4. domains.txt -- ✅ Gleiches Format: Eine Domain pro Zeile -- ✅ Kommentare mit `#` funktionieren -- ✅ Gleiche Location: `/etc/email-worker/domains.txt` -- **Keine Änderung nötig** - -## 🔄 Was ist neu/anders? - -### Dateistruktur -**Alt:** -``` -/ -├── Dockerfile -├── docker-compose.yml -├── requirements.txt -├── domains.txt -└── unified_worker.py (800+ Zeilen) -``` - -**Neu:** -``` -/ -├── Dockerfile -├── docker-compose.yml -├── requirements.txt -├── domains.txt -├── main.py # Entry Point -├── config.py # Konfiguration -├── logger.py # Logging -├── worker.py # Message Processing -├── unified_worker.py # Worker Coordinator -├── domain_poller.py # Queue Polling -├── health_server.py # Health Check Server -├── aws/ -│ ├── s3_handler.py -│ ├── sqs_handler.py -│ ├── ses_handler.py -│ └── dynamodb_handler.py -├── email_processing/ -│ ├── parser.py -│ ├── bounce_handler.py -│ ├── blocklist.py -│ └── rules_processor.py -├── smtp/ -│ ├── pool.py -│ └── delivery.py -└── metrics/ - └── prometheus.py -``` - -### Neue optionale Umgebungsvariablen - -Diese sind **optional** und haben sinnvolle Defaults: - -```bash -# Internal SMTP Port (neu) -INTERNAL_SMTP_PORT=2525 # Default: 2525 - -# LMTP Support (neu) -LMTP_ENABLED=false # Default: false -LMTP_HOST=localhost # Default: localhost -LMTP_PORT=24 # Default: 24 - -# Blocklist Table (neu) -DYNAMODB_BLOCKED_TABLE=email-blocked-senders # Default: email-blocked-senders -``` - -**Wichtig:** Wenn du diese nicht setzt, funktioniert alles wie vorher! - -## 🚀 Deployment - -### Option 1: Drop-In Replacement -```bash -# Alte Dateien sichern -cp unified_worker.py unified_worker.py.backup -cp Dockerfile Dockerfile.backup -cp docker-compose.yml docker-compose.yml.backup - -# Neue Dateien entpacken -tar -xzf email-worker-modular.tar.gz -cd email-worker/ - -# domains.txt und .env anpassen (falls nötig) -# Dann normal deployen: -docker-compose build -docker-compose up -d -``` - -### Option 2: Side-by-Side (Empfohlen) -```bash -# Altes Setup bleibt in /opt/email-worker-old -# Neues Setup in /opt/email-worker - -# Neue Version entpacken -cd /opt -tar -xzf email-worker-modular.tar.gz -mv email-worker email-worker-new - -# Container Namen unterscheiden: -# In docker-compose.yml: -container_name: unified-email-worker-new - -# Starten -cd email-worker-new -docker-compose up -d - -# Parallel laufen lassen (24h Test) -# Dann alte Version stoppen, neue umbenennen -``` - -## 🔍 Verifikation der Kompatibilität - -### 1. Environment Variables -Alle deine bestehenden Env Vars funktionieren: - -```bash -# Deine bisherigen Vars (alle kompatibel) -AWS_ACCESS_KEY_ID ✅ -AWS_SECRET_ACCESS_KEY ✅ -AWS_REGION ✅ -WORKER_THREADS ✅ -POLL_INTERVAL ✅ -MAX_MESSAGES ✅ -VISIBILITY_TIMEOUT ✅ -SMTP_HOST ✅ -SMTP_PORT ✅ -SMTP_POOL_SIZE ✅ -METRICS_PORT ✅ -HEALTH_PORT ✅ -``` - -### 2. DynamoDB Tables -Bestehende Tables funktionieren ohne Änderung: - -```bash -# Bounce Tracking (bereits vorhanden) -ses-outbound-messages ✅ - -# Email Rules (bereits vorhanden?) -email-rules ✅ - -# Blocklist (neu, optional) -email-blocked-senders 🆕 Optional -``` - -### 3. API Endpoints -Gleiche Endpoints wie vorher: - -```bash -# Health Check -GET http://localhost:8080/health ✅ Gleiche Response - -# Domains List -GET http://localhost:8080/domains ✅ Gleiche Response - -# Prometheus Metrics -GET http://localhost:8000/metrics ✅ Kompatibel + neue Metrics -``` - -### 4. Logging -Gleiches Format, gleiche Location: - -```bash -# Logs in Container -/var/log/email-worker/ ✅ Gleich - -# Log Format -[timestamp] [LEVEL] [worker-name] [thread] message ✅ Gleich -``` - -### 5. S3 Metadata -Gleiches Schema, volle Kompatibilität: - -```json -{ - "processed": "true", - "processed_at": "1706000000", - "processed_by": "worker-andreasknuth-de", - "status": "delivered", - "invalid_inboxes": "..." -} -``` - -**Neu:** Zusätzliche Metadata bei blockierten Emails: -```json -{ - "status": "blocked", - "blocked_sender": "spam@bad.com", - "blocked_recipients": "user@andreasknuth.de" -} -``` - -## ⚠️ Breaking Changes - -**KEINE!** Die modulare Version ist 100% abwärtskompatibel. - -Die einzigen Unterschiede sind: -1. ✅ **Mehr Dateien** statt einer (aber gleiches Verhalten) -2. ✅ **Neue optionale Features** (müssen nicht genutzt werden) -3. ✅ **Bessere Performance** (durch Batch-Calls) -4. ✅ **Mehr Metrics** (zusätzliche, alte bleiben) - -## 🧪 Testing Checklist - -Nach Deployment prüfen: - -```bash -# 1. Container läuft -docker ps | grep unified-email-worker -✅ Status: Up - -# 2. Health Check -curl http://localhost:8080/health | jq -✅ "status": "healthy" - -# 3. Domains geladen -curl http://localhost:8080/domains -✅ ["andreasknuth.de"] - -# 4. Logs ohne Fehler -docker-compose logs | grep ERROR -✅ Keine kritischen Fehler - -# 5. Test Email senden -# Email via SES senden -✅ Wird zugestellt - -# 6. Metrics verfügbar -curl http://localhost:8000/metrics | grep emails_processed -✅ Metrics werden erfasst -``` - -## 💡 Empfohlener Rollout-Plan - -### Phase 1: Testing (1-2 Tage) -- Neuen Container parallel zum alten starten -- Nur 1 Test-Domain zuweisen -- Logs monitoren -- Performance vergleichen - -### Phase 2: Staged Rollout (3-7 Tage) -- 50% der Domains auf neue Version -- Metrics vergleichen (alte vs neue) -- Bei Problemen: Rollback auf alte Version - -### Phase 3: Full Rollout -- Alle Domains auf neue Version -- Alte Version als Backup behalten (1 Woche) -- Dann alte Version dekommissionieren - -## 🔙 Rollback-Plan - -Falls Probleme auftreten: - -```bash -# 1. Neue Version stoppen -docker-compose -f docker-compose.yml down - -# 2. Backup wiederherstellen -cp unified_worker.py.backup unified_worker.py -cp Dockerfile.backup Dockerfile -cp docker-compose.yml.backup docker-compose.yml - -# 3. Alte Version starten -docker-compose build -docker-compose up -d - -# 4. Verifizieren -curl http://localhost:8080/health -``` - -**Downtime:** < 30 Sekunden (Zeit für Container Restart) - -## ✅ Fazit - -Die modulare Version ist ein **Drop-In Replacement**: -- Gleiche Konfiguration -- Gleiche API -- Gleiche Infrastruktur -- **Bonus:** Bessere Performance, mehr Features, weniger Bugs - -Einziger Unterschied: Mehr Dateien, aber alle in einem tarball verpackt. diff --git a/email-worker/docs/MIGRATION.md b/email-worker/docs/MIGRATION.md deleted file mode 100644 index 4a5bbf1..0000000 --- a/email-worker/docs/MIGRATION.md +++ /dev/null @@ -1,366 +0,0 @@ -# Migration Guide: Monolith → Modular Architecture - -## 🎯 Why Migrate? - -### Problems with Monolith -- ❌ **Single file > 800 lines** - hard to navigate -- ❌ **Mixed responsibilities** - S3, SQS, SMTP, DynamoDB all in one place -- ❌ **Hard to test** - can't test components in isolation -- ❌ **Difficult to debug** - errors could be anywhere -- ❌ **Critical bugs** - `signalIGINT` typo, missing audit trail -- ❌ **Performance issues** - N DynamoDB calls for N recipients - -### Benefits of Modular -- ✅ **Separation of Concerns** - each module has one job -- ✅ **Easy to Test** - mock S3Handler, test in isolation -- ✅ **Better Performance** - batch DynamoDB calls -- ✅ **Maintainable** - changes isolated to specific files -- ✅ **Extensible** - easy to add new features -- ✅ **Bug Fixes** - all critical bugs fixed - -## 🔄 Migration Steps - -### Step 1: Backup Current Setup -```bash -# Backup monolith -cp unified_worker.py unified_worker.py.backup - -# Backup any configuration -cp .env .env.backup -``` - -### Step 2: Clone New Structure -```bash -# Download modular version -git clone email-worker-modular -cd email-worker-modular - -# Copy environment variables -cp .env.example .env -# Edit .env with your settings -``` - -### Step 3: Update Configuration - -The modular version uses the SAME environment variables, so your existing `.env` should work: - -```bash -# No changes needed to these: -AWS_REGION=us-east-2 -DOMAINS=example.com,another.com -SMTP_HOST=localhost -SMTP_PORT=25 -# ... etc -``` - -**New variables** (optional): -```bash -# For internal delivery (bypasses transport_maps) -INTERNAL_SMTP_PORT=2525 - -# For blocklist feature -DYNAMODB_BLOCKED_TABLE=email-blocked-senders -``` - -### Step 4: Install Dependencies -```bash -pip install -r requirements.txt -``` - -### Step 5: Test Locally -```bash -# Run worker -python3 main.py - -# Check health endpoint -curl http://localhost:8080/health - -# Check metrics -curl http://localhost:8000/metrics -``` - -### Step 6: Deploy - -#### Docker Deployment -```bash -# Build image -docker build -t unified-email-worker:latest . - -# Run with docker-compose -docker-compose up -d - -# Check logs -docker-compose logs -f email-worker -``` - -#### Systemd Deployment -```bash -# Create systemd service -sudo nano /etc/systemd/system/email-worker.service -``` - -```ini -[Unit] -Description=Unified Email Worker -After=network.target - -[Service] -Type=simple -User=worker -WorkingDirectory=/opt/email-worker -EnvironmentFile=/opt/email-worker/.env -ExecStart=/usr/bin/python3 /opt/email-worker/main.py -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target -``` - -```bash -# Enable and start -sudo systemctl enable email-worker -sudo systemctl start email-worker -sudo systemctl status email-worker -``` - -### Step 7: Monitor Migration -```bash -# Watch logs -tail -f /var/log/syslog | grep email-worker - -# Check metrics -watch -n 5 'curl -s http://localhost:8000/metrics | grep emails_processed' - -# Monitor S3 metadata -aws s3api head-object \ - --bucket example-com-emails \ - --key \ - --query Metadata -``` - -## 🔍 Verification Checklist - -After migration, verify all features work: - -- [ ] **Email Delivery** - ```bash - # Send test email via SES - # Check it arrives in mailbox - ``` - -- [ ] **Bounce Rewriting** - ```bash - # Trigger a bounce (send to invalid@example.com) - # Verify bounce comes FROM the failed recipient - ``` - -- [ ] **Auto-Reply (OOO)** - ```bash - # Set OOO in DynamoDB: - aws dynamodb put-item \ - --table-name email-rules \ - --item '{"email_address": {"S": "test@example.com"}, "ooo_active": {"BOOL": true}, "ooo_message": {"S": "I am away"}}' - - # Send email to test@example.com - # Verify auto-reply received - ``` - -- [ ] **Forwarding** - ```bash - # Set forward rule: - aws dynamodb put-item \ - --table-name email-rules \ - --item '{"email_address": {"S": "test@example.com"}, "forwards": {"L": [{"S": "other@example.com"}]}}' - - # Send email to test@example.com - # Verify other@example.com receives forwarded email - ``` - -- [ ] **Blocklist** - ```bash - # Block sender: - aws dynamodb put-item \ - --table-name email-blocked-senders \ - --item '{"email_address": {"S": "test@example.com"}, "blocked_patterns": {"L": [{"S": "spam@*.com"}]}}' - - # Send email from spam@bad.com to test@example.com - # Verify email is blocked (not delivered, S3 deleted) - ``` - -- [ ] **Metrics** - ```bash - curl http://localhost:8000/metrics | grep emails_processed - ``` - -- [ ] **Health Check** - ```bash - curl http://localhost:8080/health | jq - ``` - -## 🐛 Troubleshooting Migration Issues - -### Issue: Worker not starting -```bash -# Check Python version -python3 --version # Should be 3.11+ - -# Check dependencies -pip list | grep boto3 - -# Check logs -python3 main.py # Run in foreground to see errors -``` - -### Issue: No emails processing -```bash -# Check queue URLs -curl http://localhost:8080/domains - -# Verify SQS permissions -aws sqs list-queues - -# Check worker logs for errors -tail -f /var/log/email-worker.log -``` - -### Issue: Bounces not rewriting -```bash -# Verify DynamoDB table exists -aws dynamodb describe-table --table-name ses-outbound-messages - -# Check if Lambda is writing bounce records -aws dynamodb scan --table-name ses-outbound-messages --limit 5 - -# Verify worker can read DynamoDB -# (Check logs for "DynamoDB tables connected successfully") -``` - -### Issue: Performance degradation -```bash -# Check if batch calls are used -grep "batch_get_blocked_patterns" main.py # Should exist in modular version - -# Monitor DynamoDB read capacity -aws cloudwatch get-metric-statistics \ - --namespace AWS/DynamoDB \ - --metric-name ConsumedReadCapacityUnits \ - --dimensions Name=TableName,Value=email-blocked-senders \ - --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \ - --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \ - --period 300 \ - --statistics Sum -``` - -## 📊 Comparison: Before vs After - -| Feature | Monolith | Modular | Improvement | -|---------|----------|---------|-------------| -| Lines of Code | 800+ in 1 file | ~150 per file | ✅ Easier to read | -| DynamoDB Calls | N per message | 1 per message | ✅ 10x faster | -| Error Handling | Missing in places | Comprehensive | ✅ More reliable | -| Testability | Hard | Easy | ✅ Can unit test | -| Audit Trail | Incomplete | Complete | ✅ Better compliance | -| Bugs Fixed | - | 4 critical | ✅ More stable | -| Extensibility | Hard | Easy | ✅ Future-proof | - -## 🎓 Code Comparison Examples - -### Example 1: Blocklist Check - -**Monolith (Inefficient):** -```python -for recipient in recipients: - if is_sender_blocked(recipient, sender, worker_name): - # DynamoDB call for EACH recipient! - blocked_recipients.append(recipient) -``` - -**Modular (Efficient):** -```python -# ONE DynamoDB call for ALL recipients -blocked_by_recipient = blocklist.batch_check_blocked_senders( - recipients, sender, worker_name -) -for recipient in recipients: - if blocked_by_recipient[recipient]: - blocked_recipients.append(recipient) -``` - -### Example 2: S3 Blocked Email Handling - -**Monolith (Missing Audit Trail):** -```python -if all_blocked: - s3.delete_object(Bucket=bucket, Key=key) # ❌ No metadata! -``` - -**Modular (Proper Audit):** -```python -if all_blocked: - s3.mark_as_blocked(domain, key, blocked, sender, worker) # ✅ Set metadata - s3.delete_blocked_email(domain, key, worker) # ✅ Then delete -``` - -### Example 3: Signal Handling - -**Monolith (Bug):** -```python -signal.signal(signal.SIGTERM, handler) -signal.signal(signalIGINT, handler) # ❌ Typo! Should be signal.SIGINT -``` - -**Modular (Fixed):** -```python -signal.signal(signal.SIGTERM, handler) -signal.signal(signal.SIGINT, handler) # ✅ Correct -``` - -## 🔄 Rollback Plan - -If you need to rollback: - -```bash -# Stop new worker -docker-compose down -# or -sudo systemctl stop email-worker - -# Restore monolith -cp unified_worker.py.backup unified_worker.py - -# Restart old worker -python3 unified_worker.py -# or restore old systemd service -``` - -## 💡 Best Practices After Migration - -1. **Monitor Metrics**: Set up Prometheus/Grafana dashboards -2. **Set up Alerts**: Alert on queue buildup, high error rates -3. **Regular Updates**: Keep dependencies updated -4. **Backup Rules**: Export DynamoDB rules regularly -5. **Test in Staging**: Always test rule changes in non-prod first - -## 📚 Additional Resources - -- [ARCHITECTURE.md](ARCHITECTURE.md) - Detailed architecture diagrams -- [README.md](README.md) - Complete feature documentation -- [Makefile](Makefile) - Common commands - -## ❓ FAQ - -**Q: Will my existing DynamoDB tables work?** -A: Yes! Same schema, just need to add `email-blocked-senders` table for blocklist feature. - -**Q: Do I need to change my Lambda functions?** -A: No, bounce tracking Lambda stays the same. - -**Q: Can I migrate one domain at a time?** -A: Yes! Run both workers with different `DOMAINS` settings, then migrate gradually. - -**Q: What about my existing S3 metadata?** -A: New worker reads and writes same metadata format, fully compatible. - -**Q: How do I add new features?** -A: Just add a new module in appropriate directory (e.g., new file in `email/`), import in `worker.py`. diff --git a/email-worker/docs/QUICKSTART.md b/email-worker/docs/QUICKSTART.md deleted file mode 100644 index 6b2000d..0000000 --- a/email-worker/docs/QUICKSTART.md +++ /dev/null @@ -1,330 +0,0 @@ -# Quick Start Guide - -## 🚀 Deployment auf deinem System - -### Voraussetzungen -- Docker & Docker Compose installiert -- AWS Credentials mit Zugriff auf SQS, S3, SES, DynamoDB -- Docker Mailserver (DMS) läuft lokal - -### 1. Vorbereitung - -```bash -# Ins Verzeichnis wechseln -cd /pfad/zu/email-worker - -# domains.txt anpassen (falls weitere Domains) -nano domains.txt - -# Logs-Verzeichnis erstellen -mkdir -p logs -``` - -### 2. Umgebungsvariablen - -Erstelle `.env` Datei: - -```bash -# AWS Credentials -AWS_ACCESS_KEY_ID=dein_access_key -AWS_SECRET_ACCESS_KEY=dein_secret_key - -# Optional: Worker Settings überschreiben -WORKER_THREADS=10 -POLL_INTERVAL=20 -MAX_MESSAGES=10 - -# Optional: SMTP Settings -SMTP_HOST=localhost -SMTP_PORT=25 - -# Optional: LMTP für direktes Dovecot Delivery -# LMTP_ENABLED=true -# LMTP_PORT=24 -``` - -### 3. Build & Start - -```bash -# Image bauen -docker-compose build - -# Starten -docker-compose up -d - -# Logs anschauen -docker-compose logs -f -``` - -### 4. Verifizierung - -```bash -# Health Check -curl http://localhost:8080/health | jq - -# Domains prüfen -curl http://localhost:8080/domains - -# Metrics (Prometheus) -curl http://localhost:8000/metrics | grep emails_processed - -# Container Status -docker ps | grep unified-email-worker -``` - -### 5. Test Email senden - -```bash -# Via AWS SES Console oder CLI eine Test-Email senden -aws ses send-email \ - --from sender@andreasknuth.de \ - --destination ToAddresses=test@andreasknuth.de \ - --message Subject={Data="Test"},Body={Text={Data="Test message"}} - -# Worker Logs beobachten -docker-compose logs -f | grep "Processing:" -``` - -## 🔧 Wartung - -### Logs anschauen -```bash -# Live Logs -docker-compose logs -f - -# Nur Worker Logs -docker logs -f unified-email-worker - -# Logs im Volume -tail -f logs/*.log -``` - -### Neustart -```bash -# Neustart nach Code-Änderungen -docker-compose restart - -# Kompletter Rebuild -docker-compose down -docker-compose build -docker-compose up -d -``` - -### Update -```bash -# Neue Version pullen/kopieren -git pull # oder manuell Dateien ersetzen - -# Rebuild & Restart -docker-compose down -docker-compose build -docker-compose up -d -``` - -## 📊 Monitoring - -### Prometheus Metrics (Port 8000) -```bash -# Alle Metrics -curl http://localhost:8000/metrics - -# Verarbeitete Emails -curl -s http://localhost:8000/metrics | grep emails_processed_total - -# Queue Größe -curl -s http://localhost:8000/metrics | grep queue_messages_available - -# Blocked Senders -curl -s http://localhost:8000/metrics | grep blocked_senders_total -``` - -### Health Check (Port 8080) -```bash -# Status -curl http://localhost:8080/health | jq - -# Domains -curl http://localhost:8080/domains | jq -``` - -## 🔐 DynamoDB Tabellen Setup - -### Email Rules (OOO, Forwarding) -```bash -# Tabelle erstellen (falls nicht vorhanden) -aws dynamodb create-table \ - --table-name email-rules \ - --attribute-definitions AttributeName=email_address,AttributeType=S \ - --key-schema AttributeName=email_address,KeyType=HASH \ - --billing-mode PAY_PER_REQUEST \ - --region us-east-2 - -# OOO Regel hinzufügen -aws dynamodb put-item \ - --table-name email-rules \ - --item '{ - "email_address": {"S": "andreas@andreasknuth.de"}, - "ooo_active": {"BOOL": true}, - "ooo_message": {"S": "Ich bin derzeit nicht erreichbar."}, - "ooo_content_type": {"S": "text"} - }' \ - --region us-east-2 - -# Forward Regel hinzufügen -aws dynamodb put-item \ - --table-name email-rules \ - --item '{ - "email_address": {"S": "info@andreasknuth.de"}, - "forwards": {"L": [ - {"S": "andreas@andreasknuth.de"} - ]} - }' \ - --region us-east-2 -``` - -### Blocked Senders -```bash -# Tabelle erstellen (falls nicht vorhanden) -aws dynamodb create-table \ - --table-name email-blocked-senders \ - --attribute-definitions AttributeName=email_address,AttributeType=S \ - --key-schema AttributeName=email_address,KeyType=HASH \ - --billing-mode PAY_PER_REQUEST \ - --region us-east-2 - -# Blocklist hinzufügen -aws dynamodb put-item \ - --table-name email-blocked-senders \ - --item '{ - "email_address": {"S": "andreas@andreasknuth.de"}, - "blocked_patterns": {"L": [ - {"S": "*@spam.com"}, - {"S": "noreply@*.marketing.com"} - ]} - }' \ - --region us-east-2 -``` - -## 🐛 Troubleshooting - -### Worker startet nicht -```bash -# Logs prüfen -docker-compose logs unified-worker - -# Container Status -docker ps -a | grep unified - -# Manuell starten (Debug) -docker-compose run --rm unified-worker python3 main.py -``` - -### Keine Emails werden verarbeitet -```bash -# Queue URLs prüfen -curl http://localhost:8080/domains - -# AWS Permissions prüfen -aws sqs list-queues --region us-east-2 - -# DynamoDB Verbindung prüfen (in Logs) -docker-compose logs | grep "DynamoDB" -``` - -### Bounces werden nicht umgeschrieben -```bash -# DynamoDB Bounce Records prüfen -aws dynamodb scan \ - --table-name ses-outbound-messages \ - --limit 5 \ - --region us-east-2 - -# Worker Logs nach "Bounce detected" durchsuchen -docker-compose logs | grep "Bounce detected" -``` - -### SMTP Delivery Fehler -```bash -# SMTP Verbindung testen -docker-compose exec unified-worker nc -zv localhost 25 - -# Worker Logs -docker-compose logs | grep "SMTP" -``` - -## 📈 Performance Tuning - -### Mehr Worker Threads -```bash -# In .env -WORKER_THREADS=20 # Default: 10 -``` - -### Längeres Polling -```bash -# In .env -POLL_INTERVAL=30 # Default: 20 (Sekunden) -``` - -### Größerer Connection Pool -```bash -# In .env -SMTP_POOL_SIZE=10 # Default: 5 -``` - -### LMTP für bessere Performance -```bash -# In .env -LMTP_ENABLED=true -LMTP_PORT=24 -``` - -## 🔄 Migration vom Monolithen - -### Side-by-Side Deployment -```bash -# Alte Version läuft als "unified-email-worker-old" -# Neue Version als "unified-email-worker" - -# domains.txt aufteilen: -# old: andreasknuth.de -# new: andere-domain.de - -# Nach Verifizierung alle Domains auf new migrieren -``` - -### Zero-Downtime Switch -```bash -# 1. Neue Version starten (andere Domains) -docker-compose up -d - -# 2. Beide parallel laufen lassen (24h) -# 3. Monitoring: Metrics vergleichen -curl http://localhost:8000/metrics - -# 4. Alte Version stoppen -docker stop unified-email-worker-old - -# 5. domains.txt updaten (alle Domains) -# 6. Neue Version neustarten -docker-compose restart -``` - -## ✅ Checkliste nach Deployment - -- [ ] Container läuft: `docker ps | grep unified` -- [ ] Health Check OK: `curl http://localhost:8080/health` -- [ ] Domains geladen: `curl http://localhost:8080/domains` -- [ ] Logs ohne Fehler: `docker-compose logs | grep ERROR` -- [ ] Test-Email erfolgreich: Email an Test-Adresse senden -- [ ] Bounce Rewriting funktioniert: Bounce-Email testen -- [ ] Metrics erreichbar: `curl http://localhost:8000/metrics` -- [ ] DynamoDB Tables vorhanden: AWS Console prüfen - -## 📞 Support - -Bei Problemen: -1. Logs prüfen: `docker-compose logs -f` -2. Health Check: `curl http://localhost:8080/health` -3. AWS Console: Queues, S3 Buckets, DynamoDB prüfen -4. Container neu starten: `docker-compose restart` diff --git a/email-worker/docs/README.md b/email-worker/docs/README.md deleted file mode 100644 index e71f0ed..0000000 --- a/email-worker/docs/README.md +++ /dev/null @@ -1,306 +0,0 @@ -# Unified Email Worker (Modular Version) - -Multi-domain email processing worker for AWS SES/S3/SQS with bounce handling, auto-replies, forwarding, and sender blocking. - -## 🏗️ Architecture - -``` -email-worker/ -├── config.py # Configuration management -├── logger.py # Structured logging -├── aws/ # AWS service handlers -│ ├── s3_handler.py # S3 operations (download, metadata) -│ ├── sqs_handler.py # SQS polling -│ ├── ses_handler.py # SES email sending -│ └── dynamodb_handler.py # DynamoDB (rules, bounces, blocklist) -├── email_processing/ # Email processing -│ ├── parser.py # Email parsing utilities -│ ├── bounce_handler.py # Bounce detection & rewriting -│ ├── rules_processor.py # OOO & forwarding logic -│ └── blocklist.py # Sender blocking with wildcards -├── smtp/ # SMTP delivery -│ ├── pool.py # Connection pooling -│ └── delivery.py # SMTP/LMTP delivery with retry -├── metrics/ # Monitoring -│ └── prometheus.py # Prometheus metrics -├── worker.py # Message processing logic -├── domain_poller.py # Domain queue poller -├── unified_worker.py # Main worker coordinator -├── health_server.py # Health check HTTP server -└── main.py # Entry point -``` - -## ✨ Features - -- ✅ **Multi-Domain Processing**: Parallel processing of multiple domains via thread pool -- ✅ **Bounce Detection**: Automatic SES bounce notification rewriting -- ✅ **Auto-Reply/OOO**: Out-of-office automatic replies -- ✅ **Email Forwarding**: Rule-based forwarding to internal/external addresses -- ✅ **Sender Blocking**: Wildcard-based sender blocklist per recipient -- ✅ **SMTP Connection Pooling**: Efficient reuse of connections -- ✅ **LMTP Support**: Direct delivery to Dovecot (bypasses Postfix transport_maps) -- ✅ **Prometheus Metrics**: Comprehensive monitoring -- ✅ **Health Checks**: HTTP health endpoint for container orchestration -- ✅ **Graceful Shutdown**: Proper cleanup on SIGTERM/SIGINT - -## 🔧 Configuration - -All configuration via environment variables: - -### AWS Settings -```bash -AWS_REGION=us-east-2 -``` - -### Domains -```bash -# Option 1: Comma-separated list -DOMAINS=example.com,another.com - -# Option 2: File with one domain per line -DOMAINS_FILE=/etc/email-worker/domains.txt -``` - -### Worker Settings -```bash -WORKER_THREADS=10 -POLL_INTERVAL=20 # SQS long polling (seconds) -MAX_MESSAGES=10 # Max messages per poll -VISIBILITY_TIMEOUT=300 # Message visibility timeout (seconds) -``` - -### SMTP Delivery -```bash -SMTP_HOST=localhost -SMTP_PORT=25 -SMTP_USE_TLS=false -SMTP_USER= -SMTP_PASS= -SMTP_POOL_SIZE=5 -INTERNAL_SMTP_PORT=2525 # Port for internal delivery (bypasses transport_maps) -``` - -### LMTP (Direct Dovecot Delivery) -```bash -LMTP_ENABLED=false # Set to 'true' to use LMTP -LMTP_HOST=localhost -LMTP_PORT=24 -``` - -### DynamoDB Tables -```bash -DYNAMODB_RULES_TABLE=email-rules -DYNAMODB_MESSAGES_TABLE=ses-outbound-messages -DYNAMODB_BLOCKED_TABLE=email-blocked-senders -``` - -### Bounce Handling -```bash -BOUNCE_LOOKUP_RETRIES=3 -BOUNCE_LOOKUP_DELAY=1.0 -``` - -### Monitoring -```bash -METRICS_PORT=8000 # Prometheus metrics -HEALTH_PORT=8080 # Health check endpoint -``` - -## 📊 DynamoDB Schemas - -### email-rules -```json -{ - "email_address": "user@example.com", // Partition Key - "ooo_active": true, - "ooo_message": "I am currently out of office...", - "ooo_content_type": "text", // "text" or "html" - "forwards": ["other@example.com", "external@gmail.com"] -} -``` - -### ses-outbound-messages -```json -{ - "MessageId": "abc123...", // Partition Key (SES Message-ID) - "original_source": "sender@example.com", - "recipients": ["recipient@other.com"], - "timestamp": "2025-01-01T12:00:00Z", - "bounceType": "Permanent", - "bounceSubType": "General", - "bouncedRecipients": ["recipient@other.com"] -} -``` - -### email-blocked-senders -```json -{ - "email_address": "user@example.com", // Partition Key - "blocked_patterns": [ - "spam@*.com", // Wildcard support - "noreply@badsite.com", - "*@malicious.org" - ] -} -``` - -## 🚀 Usage - -### Installation -```bash -cd email-worker -pip install -r requirements.txt -``` - -### Run -```bash -python3 main.py -``` - -### Docker -```dockerfile -FROM python:3.11-slim - -WORKDIR /app -COPY . /app - -RUN pip install --no-cache-dir -r requirements.txt - -CMD ["python3", "main.py"] -``` - -## 📈 Metrics - -Available at `http://localhost:8000/metrics`: - -- `emails_processed_total{domain, status}` - Total emails processed -- `emails_in_flight` - Currently processing emails -- `email_processing_seconds{domain}` - Processing time histogram -- `queue_messages_available{domain}` - Queue size gauge -- `bounces_processed_total{domain, type}` - Bounce notifications -- `autoreplies_sent_total{domain}` - Auto-replies sent -- `forwards_sent_total{domain}` - Forwards sent -- `blocked_senders_total{domain}` - Blocked emails - -## 🏥 Health Checks - -Available at `http://localhost:8080/health`: - -```json -{ - "status": "healthy", - "domains": 5, - "domain_list": ["example.com", "another.com"], - "dynamodb": true, - "features": { - "bounce_rewriting": true, - "auto_reply": true, - "forwarding": true, - "blocklist": true, - "lmtp": false - }, - "timestamp": "2025-01-22T10:00:00.000000" -} -``` - -## 🔍 Key Improvements in Modular Version - -### 1. **Fixed Critical Bugs** -- ✅ Fixed `signal.SIGINT` typo (was `signalIGINT`) -- ✅ Proper S3 metadata before deletion (audit trail) -- ✅ Batch DynamoDB calls for blocklist (performance) -- ✅ Error handling for S3 delete failures - -### 2. **Better Architecture** -- **Separation of Concerns**: Each component has single responsibility -- **Testability**: Easy to unit test individual components -- **Maintainability**: Changes isolated to specific modules -- **Extensibility**: Easy to add new features - -### 3. **Performance** -- **Batch Blocklist Checks**: One DynamoDB call for all recipients -- **Connection Pooling**: Reusable SMTP connections -- **Efficient Metrics**: Optional Prometheus integration - -### 4. **Reliability** -- **Proper Error Handling**: Each component handles its own errors -- **Graceful Degradation**: Works even if DynamoDB unavailable -- **Audit Trail**: All actions logged to S3 metadata - -## 🔐 Security Features - -1. **Domain Validation**: Workers only process their assigned domains -2. **Loop Prevention**: Detects and skips already-processed emails -3. **Blocklist Support**: Wildcard-based sender blocking -4. **Internal vs External**: Separate handling prevents loops - -## 📝 Example Usage - -### Enable OOO for user -```python -import boto3 - -dynamodb = boto3.resource('dynamodb') -table = dynamodb.Table('email-rules') - -table.put_item(Item={ - 'email_address': 'john@example.com', - 'ooo_active': True, - 'ooo_message': 'I am out of office until Feb 1st.', - 'ooo_content_type': 'html' -}) -``` - -### Block spam senders -```python -table = dynamodb.Table('email-blocked-senders') - -table.put_item(Item={ - 'email_address': 'john@example.com', - 'blocked_patterns': [ - '*@spam.com', - 'noreply@*.marketing.com', - 'newsletter@*' - ] -}) -``` - -### Forward emails -```python -table = dynamodb.Table('email-rules') - -table.put_item(Item={ - 'email_address': 'support@example.com', - 'forwards': [ - 'john@example.com', - 'jane@example.com', - 'external@gmail.com' - ] -}) -``` - -## 🐛 Troubleshooting - -### Worker not processing emails -1. Check queue URLs: `curl http://localhost:8080/domains` -2. Check logs for SQS errors -3. Verify IAM permissions for SQS/S3/SES/DynamoDB - -### Bounces not rewritten -1. Check DynamoDB table name: `DYNAMODB_MESSAGES_TABLE` -2. Verify Lambda function is writing bounce records -3. Check logs for DynamoDB lookup errors - -### Auto-replies not sent -1. Verify DynamoDB rules table accessible -2. Check `ooo_active` is `true` (boolean, not string) -3. Review logs for SES send errors - -### Blocked emails still delivered -1. Verify blocklist table exists and is accessible -2. Check wildcard patterns are lowercase -3. Review logs for blocklist check errors - -## 📄 License - -MIT License - See LICENSE file for details diff --git a/email-worker/docs/SUMMARY.md b/email-worker/docs/SUMMARY.md deleted file mode 100644 index ea306e8..0000000 --- a/email-worker/docs/SUMMARY.md +++ /dev/null @@ -1,247 +0,0 @@ -# 📋 Refactoring Summary - -## ✅ Critical Bugs Fixed - -### 1. **Signal Handler Typo** (CRITICAL) -**Old:** -```python -signal.signal(signalIGINT, signal_handler) # ❌ NameError at startup -``` -**New:** -```python -signal.signal(signal.SIGINT, signal_handler) # ✅ Fixed -``` -**Impact:** Worker couldn't start due to Python syntax error - ---- - -### 2. **Missing Audit Trail for Blocked Emails** (HIGH) -**Old:** -```python -if all_blocked: - s3.delete_object(Bucket=bucket, Key=key) # ❌ No metadata -``` -**New:** -```python -if all_blocked: - s3.mark_as_blocked(domain, key, blocked, sender, worker) # ✅ Metadata first - s3.delete_blocked_email(domain, key, worker) # ✅ Then delete -``` -**Impact:** -- ❌ No compliance trail (who blocked, when, why) -- ❌ Impossible to troubleshoot -- ✅ Now: Full audit trail in S3 metadata before deletion - ---- - -### 3. **Inefficient DynamoDB Calls** (MEDIUM - Performance) -**Old:** -```python -for recipient in recipients: - patterns = dynamodb.get_item(Key={'email_address': recipient}) # N calls! - if is_blocked(patterns, sender): - blocked.append(recipient) -``` -**New:** -```python -# 1 batch call for all recipients -patterns_map = dynamodb.batch_get_blocked_patterns(recipients) -for recipient in recipients: - if is_blocked(patterns_map[recipient], sender): - blocked.append(recipient) -``` -**Impact:** -- Old: 10 recipients = 10 DynamoDB calls = higher latency + costs -- New: 10 recipients = 1 DynamoDB call = **10x faster, 10x cheaper** - ---- - -### 4. **S3 Delete Error Handling** (MEDIUM) -**Old:** -```python -try: - s3.delete_object(...) -except Exception as e: - log(f"Failed: {e}") - # ❌ Queue message still deleted → inconsistent state -return True -``` -**New:** -```python -try: - s3.mark_as_blocked(...) - s3.delete_blocked_email(...) -except Exception as e: - log(f"Failed: {e}") - return False # ✅ Keep in queue for retry -``` -**Impact:** Prevents orphaned S3 objects when delete fails - ---- - -## 🏗️ Architecture Improvements - -### Modular Structure -``` -Before: 1 file, 800+ lines -After: 27 files, ~150 lines each -``` - -| Module | Responsibility | LOC | -|--------|---------------|-----| -| `config.py` | Configuration management | 85 | -| `logger.py` | Structured logging | 20 | -| `aws/s3_handler.py` | S3 operations | 180 | -| `aws/sqs_handler.py` | SQS polling | 95 | -| `aws/ses_handler.py` | SES sending | 45 | -| `aws/dynamodb_handler.py` | DynamoDB access | 175 | -| `email_processing/parser.py` | Email parsing | 75 | -| `email_processing/bounce_handler.py` | Bounce detection | 95 | -| `email_processing/blocklist.py` | Sender blocking | 90 | -| `email_processing/rules_processor.py` | OOO & forwarding | 285 | -| `smtp/pool.py` | Connection pooling | 110 | -| `smtp/delivery.py` | SMTP/LMTP delivery | 165 | -| `metrics/prometheus.py` | Metrics collection | 140 | -| `worker.py` | Message processing | 265 | -| `domain_poller.py` | Queue polling | 105 | -| `unified_worker.py` | Worker coordination | 180 | -| `health_server.py` | Health checks | 85 | -| `main.py` | Entry point | 45 | - -**Total:** ~2,420 lines (well-organized vs 800 spaghetti) - ---- - -## 🎯 Benefits Summary - -### Maintainability -- ✅ **Single Responsibility**: Each class has one job -- ✅ **Easy to Navigate**: Find code by feature -- ✅ **Reduced Coupling**: Changes isolated to modules -- ✅ **Better Documentation**: Each module documented - -### Testability -- ✅ **Unit Testing**: Mock `S3Handler`, test `BounceHandler` independently -- ✅ **Integration Testing**: Test components in isolation -- ✅ **Faster CI/CD**: Test only changed modules - -### Performance -- ✅ **Batch Operations**: 10x fewer DynamoDB calls -- ✅ **Connection Pooling**: Reuse SMTP connections -- ✅ **Parallel Processing**: One thread per domain - -### Reliability -- ✅ **Error Isolation**: Errors in one module don't crash others -- ✅ **Comprehensive Logging**: Structured, searchable logs -- ✅ **Audit Trail**: All actions recorded in S3 metadata -- ✅ **Graceful Degradation**: Works even if DynamoDB down - -### Extensibility -Adding new features is now easy: - -**Example: Add DKIM Signing** -1. Create `email_processing/dkim_signer.py` -2. Add to `worker.py`: `signed_bytes = dkim.sign(raw_bytes)` -3. Done! No touching 800-line monolith - ---- - -## 📊 Performance Comparison - -| Metric | Monolith | Modular | Improvement | -|--------|----------|---------|-------------| -| DynamoDB Calls/Email | N (per recipient) | 1 (batch) | **10x reduction** | -| SMTP Connections/Email | 1 (new each time) | Pooled (reused) | **5x fewer** | -| Startup Time | ~2s | ~1s | **2x faster** | -| Memory Usage | ~150MB | ~120MB | **20% less** | -| Lines per Feature | Mixed in 800 | ~100-150 | **Clearer** | - ---- - -## 🔒 Security Improvements - -1. **Audit Trail**: Every action logged with timestamp, worker ID -2. **Domain Validation**: Workers only process assigned domains -3. **Loop Prevention**: Detects recursive processing -4. **Blocklist**: Per-recipient wildcard blocking -5. **Separate Internal Routing**: Prevents SES loops - ---- - -## 📝 Migration Path - -### Zero Downtime Migration -1. Deploy modular version alongside monolith -2. Route half domains to new worker -3. Monitor metrics, logs for issues -4. Gradually shift all traffic -5. Decommission monolith - -### Rollback Strategy -- Same environment variables -- Same DynamoDB schema -- Easy to switch back if needed - ---- - -## 🎓 Code Quality Metrics - -### Complexity Reduction -- **Cyclomatic Complexity**: Reduced from 45 → 8 per function -- **Function Length**: Max 50 lines (was 200+) -- **File Length**: Max 285 lines (was 800+) - -### Code Smells Removed -- ❌ God Object (1 class doing everything) -- ❌ Long Methods (200+ line functions) -- ❌ Duplicate Code (3 copies of S3 metadata update) -- ❌ Magic Numbers (hardcoded retry counts) - -### Best Practices Added -- ✅ Type Hints (where appropriate) -- ✅ Docstrings (all public methods) -- ✅ Logging (structured, consistent) -- ✅ Error Handling (specific exceptions) - ---- - -## 🚀 Next Steps - -### Recommended Follow-ups -1. **Add Unit Tests**: Use `pytest` with mocked AWS services -2. **CI/CD Pipeline**: Automated testing and deployment -3. **Monitoring Dashboard**: Grafana + Prometheus -4. **Alert Rules**: Notify on high error rates -5. **Load Testing**: Verify performance at scale - -### Future Enhancements (Easy to Add Now!) -- **DKIM Signing**: New module in `email/` -- **Spam Filtering**: New module in `email/` -- **Rate Limiting**: New module in `smtp/` -- **Queue Prioritization**: Modify `domain_poller.py` -- **Multi-Region**: Add region config - ---- - -## 📚 Documentation - -All documentation included: - -- **README.md**: Features, configuration, usage -- **ARCHITECTURE.md**: System design, data flows -- **MIGRATION.md**: Step-by-step migration guide -- **SUMMARY.md**: This file - key improvements -- **Code Comments**: Inline documentation -- **Docstrings**: All public methods documented - ---- - -## ✨ Key Takeaway - -The refactoring transforms a **fragile 800-line monolith** into a **robust, modular system** that is: -- **Faster** (batch operations) -- **Safer** (better error handling, audit trail) -- **Easier to maintain** (clear structure) -- **Ready to scale** (extensible architecture) - -All while **fixing 4 critical bugs** and maintaining **100% backwards compatibility**. diff --git a/email-worker/domain_poller.py b/email-worker/domain_poller.py deleted file mode 100644 index 35ca9e4..0000000 --- a/email-worker/domain_poller.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -""" -Domain queue poller -""" - -import json -import time -import threading -import traceback - -from logger import log -from aws import SQSHandler -from worker import MessageProcessor -from metrics.prometheus import MetricsCollector - - -class DomainPoller: - """Polls SQS queue for a single domain""" - - def __init__( - self, - domain: str, - queue_url: str, - message_processor: MessageProcessor, - sqs: SQSHandler, - metrics: MetricsCollector, - stop_event: threading.Event, - stats_dict: dict, - stats_lock: threading.Lock - ): - self.domain = domain - self.queue_url = queue_url - self.processor = message_processor - self.sqs = sqs - self.metrics = metrics - self.stop_event = stop_event - self.stats_dict = stats_dict - self.stats_lock = stats_lock - self.worker_name = f"worker-{domain}" - self.messages_processed = 0 - - def poll(self): - """Main polling loop""" - log(f"🚀 Starting poller for {self.domain}", 'INFO', self.worker_name) - - while not self.stop_event.is_set(): - try: - # Receive messages from queue - messages = self.sqs.receive_messages(self.queue_url) - - # Update queue size metric - if self.metrics: - queue_size = self.sqs.get_queue_size(self.queue_url) - self.metrics.set_queue_size(self.domain, queue_size) - - if not messages: - continue - - log(f"✉ Received {len(messages)} message(s)", 'INFO', self.worker_name) - - for message in messages: - if self.stop_event.is_set(): - break - - receipt_handle = message['ReceiptHandle'] - receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1)) - - if self.metrics: - self.metrics.increment_in_flight() - start_time = time.time() - - try: - success = self.processor.process_message(self.domain, message, receive_count) - - if success: - self.sqs.delete_message(self.queue_url, receipt_handle) - self.messages_processed += 1 - - # Update shared stats - with self.stats_lock: - self.stats_dict[self.domain] = self.messages_processed - else: - log( - f"⚠ Retry queued (attempt {receive_count}/3)", - 'WARNING', - self.worker_name - ) - - except json.JSONDecodeError as e: - log(f"✗ Invalid message format: {e}", 'ERROR', self.worker_name) - self.sqs.delete_message(self.queue_url, receipt_handle) - - except Exception as e: - log(f"✗ Error processing message: {e}", 'ERROR', self.worker_name) - traceback.print_exc() - - finally: - if self.metrics: - self.metrics.decrement_in_flight() - self.metrics.observe_processing_time( - self.domain, - time.time() - start_time - ) - - except Exception as e: - log(f"✗ Error polling: {e}", 'ERROR', self.worker_name) - time.sleep(5) - - log(f"👋 Stopped (processed: {self.messages_processed})", 'INFO', self.worker_name) diff --git a/email-worker/domains.txt b/email-worker/domains.txt deleted file mode 100644 index ec8aa40..0000000 --- a/email-worker/domains.txt +++ /dev/null @@ -1,6 +0,0 @@ -# domains.txt - Liste aller zu verarbeitenden Domains -# Eine Domain pro Zeile -# Zeilen mit # werden ignoriert - -# Production Domains -andreasknuth.de diff --git a/email-worker/email_processing/__init__.py b/email-worker/email_processing/__init__.py deleted file mode 100644 index 2775518..0000000 --- a/email-worker/email_processing/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -""" -Email processing components -""" - -from .parser import EmailParser -from .bounce_handler import BounceHandler -from .rules_processor import RulesProcessor -from .blocklist import BlocklistChecker - -__all__ = ['EmailParser', 'BounceHandler', 'RulesProcessor', 'BlocklistChecker'] diff --git a/email-worker/email_processing/blocklist.py b/email-worker/email_processing/blocklist.py deleted file mode 100644 index a5dea0c..0000000 --- a/email-worker/email_processing/blocklist.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python3 -""" -Sender blocklist checking with wildcard support -""" - -import fnmatch -from typing import List, Dict -from email.utils import parseaddr - -from logger import log -from aws.dynamodb_handler import DynamoDBHandler - - -class BlocklistChecker: - """Checks if senders are blocked""" - - def __init__(self, dynamodb: DynamoDBHandler): - self.dynamodb = dynamodb - - def is_sender_blocked( - self, - recipient: str, - sender: str, - worker_name: str - ) -> bool: - """ - Check if sender is blocked for this recipient - - Args: - recipient: Recipient email address - sender: Sender email address (may include name) - worker_name: Worker name for logging - - Returns: - True if sender is blocked - """ - patterns = self.dynamodb.get_blocked_patterns(recipient) - - if not patterns: - return False - - sender_clean = parseaddr(sender)[1].lower() - - for pattern in patterns: - if fnmatch.fnmatch(sender_clean, pattern.lower()): - log( - f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' " - f"for inbox {recipient}", - 'WARNING', - worker_name - ) - return True - - return False - - def batch_check_blocked_senders( - self, - recipients: List[str], - senders: List[str], # <-- Geändert: Erwartet nun eine Liste - worker_name: str - ) -> Dict[str, bool]: - """ - Batch check if ANY of the senders are blocked for multiple recipients (more efficient) - - Args: - recipients: List of recipient email addresses - senders: List of sender email addresses (Envelope & Header) - worker_name: Worker name for logging - - Returns: - Dictionary mapping recipient -> is_blocked (bool) - """ - # Get all blocked patterns in one batch call - patterns_by_recipient = self.dynamodb.batch_get_blocked_patterns(recipients) - - # Alle übergebenen Adressen bereinigen - senders_clean = [parseaddr(s)[1].lower() for s in senders if s] - result = {} - - for recipient in recipients: - patterns = patterns_by_recipient.get(recipient, []) - - is_blocked = False - for pattern in patterns: - for sender_clean in senders_clean: - if fnmatch.fnmatch(sender_clean, pattern.lower()): - log( - f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' " - f"for inbox {recipient}", - 'WARNING', - worker_name - ) - is_blocked = True - break # Bricht die Senders-Schleife ab - if is_blocked: - break # Bricht die Pattern-Schleife ab - - result[recipient] = is_blocked - - return result diff --git a/email-worker/email_processing/bounce_handler.py b/email-worker/email_processing/bounce_handler.py deleted file mode 100644 index 625612d..0000000 --- a/email-worker/email_processing/bounce_handler.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -""" -Bounce detection and header rewriting -""" - -from typing import Tuple, Any - -from logger import log -from aws.dynamodb_handler import DynamoDBHandler - - -class BounceHandler: - """Handles bounce detection and header rewriting""" - - def __init__(self, dynamodb: DynamoDBHandler): - self.dynamodb = dynamodb - - @staticmethod - def is_ses_bounce_notification(parsed_email) -> bool: - """Check if email is from SES MAILER-DAEMON""" - try: - from_header = (parsed_email.get('From') or '').lower() - except (AttributeError, TypeError, KeyError): - # Malformed From header - safely extract raw value - try: - from_header = str(parsed_email.get_all('From', [''])[0]).lower() - except: - from_header = '' - - return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header - - def apply_bounce_logic( - self, - parsed, - subject: str, - worker_name: str = 'unified' - ) -> Tuple[Any, bool]: - """ - Check for SES Bounce, lookup in DynamoDB and rewrite headers - - Args: - parsed: Parsed email message object - subject: Email subject - worker_name: Worker name for logging - - Returns: - Tuple of (parsed_email_object, was_modified_bool) - """ - if not self.is_ses_bounce_notification(parsed): - return parsed, False - - log("🔍 Detected SES MAILER-DAEMON bounce notification", 'INFO', worker_name) - - # Extract Message-ID from header - message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0] - - if not message_id: - log("⚠ Could not extract Message-ID from bounce notification", 'WARNING', worker_name) - return parsed, False - - log(f" Looking up Message-ID: {message_id}", 'INFO', worker_name) - - # Lookup in DynamoDB - bounce_info = self.dynamodb.get_bounce_info(message_id, worker_name) - - if not bounce_info: - return parsed, False - - # Bounce Info ausgeben - original_source = bounce_info['original_source'] - bounced_recipients = bounce_info['bouncedRecipients'] - bounce_type = bounce_info['bounceType'] - bounce_subtype = bounce_info['bounceSubType'] - - log(f"✓ Found bounce info:", 'INFO', worker_name) - log(f" Original sender: {original_source}", 'INFO', worker_name) - log(f" Bounce type: {bounce_type}/{bounce_subtype}", 'INFO', worker_name) - log(f" Bounced recipients: {bounced_recipients}", 'INFO', worker_name) - - if bounced_recipients: - new_from = bounced_recipients[0] - - # Rewrite Headers - parsed['X-Original-SES-From'] = parsed.get('From', '') - parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}" - parsed.replace_header('From', new_from) - - if not parsed.get('Reply-To'): - parsed['Reply-To'] = new_from - - # Subject anpassen - if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower(): - parsed.replace_header('Subject', f"Delivery Status: {new_from}") - - log(f"✓ Rewritten FROM: {new_from}", 'SUCCESS', worker_name) - return parsed, True - - log("⚠ No bounced recipients found in bounce info", 'WARNING', worker_name) - return parsed, False \ No newline at end of file diff --git a/email-worker/email_processing/parser.py b/email-worker/email_processing/parser.py deleted file mode 100644 index 0c554f8..0000000 --- a/email-worker/email_processing/parser.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -""" -Email parsing utilities -""" - -from typing import Tuple, Optional -from email.parser import BytesParser -from email.policy import SMTP as SMTPPolicy - - -class EmailParser: - """Email parsing utilities""" - - @staticmethod - def parse_bytes(raw_bytes: bytes): - """Parse raw email bytes into email.message object""" - return BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) - - @staticmethod - def extract_body_parts(parsed) -> Tuple[str, Optional[str]]: - """ - Extract both text/plain and text/html body parts - - Args: - parsed: Parsed email message object - - Returns: - Tuple of (text_body, html_body or None) - """ - text_body = '' - html_body = None - - if parsed.is_multipart(): - for part in parsed.walk(): - content_type = part.get_content_type() - - if content_type == 'text/plain': - try: - text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore') - except Exception: - pass - - elif content_type == 'text/html': - try: - html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore') - except Exception: - pass - else: - try: - payload = parsed.get_payload(decode=True) - if payload: - decoded = payload.decode('utf-8', errors='ignore') - if parsed.get_content_type() == 'text/html': - html_body = decoded - else: - text_body = decoded - except Exception: - text_body = str(parsed.get_payload()) - - return text_body.strip() if text_body else '(No body content)', html_body - - @staticmethod - def is_processed_by_worker(parsed) -> bool: - """ - Check if email was already processed by our worker (loop detection) - - Args: - parsed: Parsed email message object - - Returns: - True if already processed - """ - x_worker_processed = parsed.get('X-SES-Worker-Processed', '') - auto_submitted = parsed.get('Auto-Submitted', '') - - # Only skip if OUR header is present - is_processed_by_us = bool(x_worker_processed) - is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed - - return is_processed_by_us or is_our_auto_reply diff --git a/email-worker/email_processing/rules_processor.py b/email-worker/email_processing/rules_processor.py deleted file mode 100644 index 743ee34..0000000 --- a/email-worker/email_processing/rules_processor.py +++ /dev/null @@ -1,365 +0,0 @@ -#!/usr/bin/env python3 -""" -Email rules processing (Auto-Reply/OOO and Forwarding) -""" - -import smtplib -from email.mime.text import MIMEText -from email.mime.multipart import MIMEMultipart -from email.utils import parseaddr, formatdate, make_msgid -from botocore.exceptions import ClientError - -from logger import log -from config import config, is_internal_address -from aws.dynamodb_handler import DynamoDBHandler -from aws.ses_handler import SESHandler -from email_processing.parser import EmailParser - - -class RulesProcessor: - """Processes email rules (OOO, Forwarding)""" - - def __init__(self, dynamodb: DynamoDBHandler, ses: SESHandler): - self.dynamodb = dynamodb - self.ses = ses - - def process_rules_for_recipient( - self, - recipient: str, - parsed, - domain: str, - worker_name: str, - metrics_callback=None - ): - """ - Process OOO and Forward rules for a recipient - - Args: - recipient: Recipient email address - parsed: Parsed email message object - domain: Email domain - worker_name: Worker name for logging - metrics_callback: Optional callback to increment metrics - """ - rule = self.dynamodb.get_email_rules(recipient.lower()) - - if not rule: - return False # NEU: Return-Wert - - original_from = parsed.get('From', '') - sender_name, sender_addr = parseaddr(original_from) - if not sender_addr: - sender_addr = original_from - - # ============================================ - # OOO / Auto-Reply handling - # ============================================ - if rule.get('ooo_active', False): - self._handle_ooo( - recipient, - parsed, - sender_addr, - rule, - domain, - worker_name, - metrics_callback - ) - - # ============================================ - # Forward handling - # ============================================ - forwards = rule.get('forwards', []) - has_legacy_forward = False # NEU - - if forwards: - if rule.get('forward_smtp_override'): - has_legacy_forward = True # NEU - self._handle_forwards( - recipient, parsed, original_from, forwards, - domain, worker_name, metrics_callback, rule=rule - ) - return has_legacy_forward # NEU: statt kein Return - - def _handle_ooo( - self, - recipient: str, - parsed, - sender_addr: str, - rule: dict, - domain: str, - worker_name: str, - metrics_callback=None - ): - """Handle Out-of-Office auto-reply""" - # Don't reply to automatic messages - auto_submitted = parsed.get('Auto-Submitted', '') - precedence = (parsed.get('Precedence') or '').lower() - - if auto_submitted and auto_submitted != 'no': - log(f" ⏭ Skipping OOO for auto-submitted message", 'INFO', worker_name) - return - - if precedence in ['bulk', 'junk', 'list']: - log(f" ⏭ Skipping OOO for {precedence} message", 'INFO', worker_name) - return - - if any(x in sender_addr.lower() for x in ['noreply', 'no-reply', 'mailer-daemon']): - log(f" ⏭ Skipping OOO for noreply address", 'INFO', worker_name) - return - - try: - ooo_msg = rule.get('ooo_message', 'I am out of office.') - content_type = rule.get('ooo_content_type', 'text') - ooo_reply = self._create_ooo_reply(parsed, recipient, ooo_msg, content_type) - ooo_bytes = ooo_reply.as_bytes() - - # Distinguish: Internal (Port 2525) vs External (SES) - if is_internal_address(sender_addr): - # Internal address → direct via Port 2525 - success = self._send_internal_email(recipient, sender_addr, ooo_bytes, worker_name) - if success: - log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name) - else: - log(f"⚠ Internal OOO reply failed to {sender_addr}", 'WARNING', worker_name) - else: - # External address → via SES - success = self.ses.send_raw_email(recipient, sender_addr, ooo_bytes, worker_name) - if success: - log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name) - - if metrics_callback: - metrics_callback('autoreply', domain) - - except Exception as e: - log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name) - - - def _handle_forwards( - self, - recipient: str, - parsed, - original_from: str, - forwards: list, - domain: str, - worker_name: str, - metrics_callback=None, - rule: dict = None - ): - """Handle email forwarding""" - smtp_override = None - if rule: - smtp_override = rule.get('forward_smtp_override') - - for forward_to in forwards: - try: - if smtp_override: - # Migration: Original-Mail unverändert weiterleiten - raw_bytes = parsed.as_bytes() - success = self._send_via_legacy_smtp( - recipient, forward_to, raw_bytes, - smtp_override, worker_name - ) - if success: - log(f"✓ Forwarded via legacy SMTP to {forward_to} " - f"({smtp_override.get('host', '?')})", - 'SUCCESS', worker_name) - else: - log(f"⚠ Legacy SMTP forward failed to {forward_to}", - 'WARNING', worker_name) - else: - # Normaler Forward (neue FWD-Message) - fwd_msg = self._create_forward_message( - parsed, recipient, forward_to, original_from - ) - fwd_bytes = fwd_msg.as_bytes() - - if is_internal_address(forward_to): - success = self._send_internal_email( - recipient, forward_to, fwd_bytes, worker_name - ) - if success: - log(f"✓ Forwarded internally to {forward_to}", - 'SUCCESS', worker_name) - else: - log(f"⚠ Internal forward failed to {forward_to}", - 'WARNING', worker_name) - else: - success = self.ses.send_raw_email( - recipient, forward_to, fwd_bytes, worker_name - ) - if success: - log(f"✓ Forwarded externally to {forward_to} via SES", - 'SUCCESS', worker_name) - - if metrics_callback: - metrics_callback('forward', domain) - - except Exception as e: - log(f"⚠ Forward failed to {forward_to}: {e}", - 'ERROR', worker_name) - - @staticmethod - def _send_via_legacy_smtp( - from_addr: str, - to_addr: str, - raw_message: bytes, - smtp_config: dict, - worker_name: str - ) -> bool: - """ - Send email directly to a legacy SMTP server (for migration). - Bypasses SES completely to avoid mail loops. - """ - try: - host = smtp_config.get('host', '') - - # DynamoDB speichert Zahlen als Decimal, daher int() - port = int(smtp_config.get('port', 25)) - use_tls = smtp_config.get('tls', False) - username = smtp_config.get('username') - password = smtp_config.get('password') - - if not host: - log(f" ✗ Legacy SMTP: no host configured", 'ERROR', worker_name) - return False - - with smtplib.SMTP(host, port, timeout=30) as conn: - conn.ehlo() - if use_tls: - conn.starttls() - conn.ehlo() - if username and password: - conn.login(username, password) - conn.sendmail(from_addr, [to_addr], raw_message) - return True - - except Exception as e: - log( - f" ✗ Legacy SMTP failed ({smtp_config.get('host', '?')}:" - f"{smtp_config.get('port', '?')}): {e}", - 'ERROR', worker_name - ) - return False - - @staticmethod - def _send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool: - """ - Send email via internal SMTP port (bypasses transport_maps) - - Args: - from_addr: From address - to_addr: To address - raw_message: Raw MIME message bytes - worker_name: Worker name for logging - - Returns: - True on success, False on failure - """ - try: - with smtplib.SMTP(config.smtp_host, config.internal_smtp_port, timeout=30) as conn: - conn.ehlo() - conn.sendmail(from_addr, [to_addr], raw_message) - return True - except Exception as e: - log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name) - return False - - @staticmethod - def _create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type: str = 'text'): - """Create Out-of-Office reply as complete MIME message""" - text_body, html_body = EmailParser.extract_body_parts(original_parsed) - original_subject = original_parsed.get('Subject', '(no subject)') - original_from = original_parsed.get('From', 'unknown') - - msg = MIMEMultipart('mixed') - msg['From'] = recipient - msg['To'] = original_from - msg['Subject'] = f"Out of Office: {original_subject}" - msg['Date'] = formatdate(localtime=True) - msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) - msg['In-Reply-To'] = original_parsed.get('Message-ID', '') - msg['References'] = original_parsed.get('Message-ID', '') - msg['Auto-Submitted'] = 'auto-replied' - msg['X-SES-Worker-Processed'] = 'ooo-reply' - - body_part = MIMEMultipart('alternative') - - # Text version - text_content = f"{ooo_msg}\n\n--- Original Message ---\n" - text_content += f"From: {original_from}\n" - text_content += f"Subject: {original_subject}\n\n" - text_content += text_body - body_part.attach(MIMEText(text_content, 'plain', 'utf-8')) - - # HTML version (if desired and original available) - if content_type == 'html' or html_body: - html_content = f"
{ooo_msg}



" - html_content += "Original Message
" - html_content += f"From: {original_from}
" - html_content += f"Subject: {original_subject}

" - html_content += (html_body if html_body else text_body.replace('\n', '
')) - body_part.attach(MIMEText(html_content, 'html', 'utf-8')) - - msg.attach(body_part) - return msg - - @staticmethod - def _create_forward_message(original_parsed, recipient: str, forward_to: str, original_from: str): - """Create Forward message as complete MIME message""" - original_subject = original_parsed.get('Subject', '(no subject)') - original_date = original_parsed.get('Date', 'unknown') - - msg = MIMEMultipart('mixed') - msg['From'] = recipient - msg['To'] = forward_to - msg['Subject'] = f"FWD: {original_subject}" - msg['Date'] = formatdate(localtime=True) - msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) - msg['Reply-To'] = original_from - msg['X-SES-Worker-Processed'] = 'forwarded' - - text_body, html_body = EmailParser.extract_body_parts(original_parsed) - body_part = MIMEMultipart('alternative') - - # Text version - fwd_text = "---------- Forwarded message ---------\n" - fwd_text += f"From: {original_from}\n" - fwd_text += f"Date: {original_date}\n" - fwd_text += f"Subject: {original_subject}\n" - fwd_text += f"To: {recipient}\n\n" - fwd_text += text_body - body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8')) - - # HTML version - if html_body: - fwd_html = "
" - fwd_html += "---------- Forwarded message ---------
" - fwd_html += f"From: {original_from}
" - fwd_html += f"Date: {original_date}
" - fwd_html += f"Subject: {original_subject}
" - fwd_html += f"To: {recipient}

" - fwd_html += html_body - fwd_html += "
" - body_part.attach(MIMEText(fwd_html, 'html', 'utf-8')) - - msg.attach(body_part) - - # Copy attachments - FIX FILENAMES - if original_parsed.is_multipart(): - for part in original_parsed.walk(): - if part.get_content_maintype() == 'multipart': - continue - if part.get_content_type() in ['text/plain', 'text/html']: - continue - - # Fix malformed filename in Content-Disposition - content_disp = part.get('Content-Disposition', '') - if 'filename=' in content_disp and '"' not in content_disp: - # Add quotes around filename with spaces - import re - fixed_disp = re.sub(r'filename=([^;"\s]+(?:\s+[^;"\s]+)*)', r'filename="\1"', content_disp) - part.replace_header('Content-Disposition', fixed_disp) - - msg.attach(part) - - return msg diff --git a/email-worker/health_server.py b/email-worker/health_server.py deleted file mode 100644 index 62eadb7..0000000 --- a/email-worker/health_server.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -""" -HTTP health check server -""" - -import sys -import json -import threading -from http.server import HTTPServer, BaseHTTPRequestHandler -from datetime import datetime - -from logger import log -from config import config - - -class SilentHTTPServer(HTTPServer): - """HTTP Server that ignores connection reset errors from scanners""" - - def handle_error(self, request, client_address): - exc_type = sys.exc_info()[0] - if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): - pass # Silently ignore - these are just scanners/health checks disconnecting - else: - log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING') - - -class HealthHandler(BaseHTTPRequestHandler): - """Health check request handler""" - - worker = None # Will be set by start_health_server() - dynamodb_available = False - - def do_GET(self): - if self.path == '/health' or self.path == '/': - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - - status = { - 'status': 'healthy', - 'domains': len(self.worker.queue_urls) if self.worker else 0, - 'domain_list': list(self.worker.queue_urls.keys()) if self.worker else [], - 'dynamodb': self.dynamodb_available, - 'features': { - 'bounce_rewriting': True, - 'auto_reply': self.dynamodb_available, - 'forwarding': self.dynamodb_available, - 'blocklist': self.dynamodb_available, - 'lmtp': config.lmtp_enabled - }, - 'timestamp': datetime.utcnow().isoformat() - } - self.wfile.write(json.dumps(status, indent=2).encode()) - - elif self.path == '/domains': - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - domain_list = list(self.worker.queue_urls.keys()) if self.worker else [] - self.wfile.write(json.dumps(domain_list).encode()) - - else: - self.send_response(404) - self.end_headers() - - def log_message(self, format, *args): - pass # Suppress HTTP access logs - - -def start_health_server(worker, dynamodb_available: bool): - """ - Start HTTP health check server - - Args: - worker: UnifiedWorker instance - dynamodb_available: Whether DynamoDB is available - """ - # Set class attributes for handler - HealthHandler.worker = worker - HealthHandler.dynamodb_available = dynamodb_available - - server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler) - thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') - thread.start() - log(f"Health server on port {config.health_port}") diff --git a/email-worker/logger.py b/email-worker/logger.py deleted file mode 100644 index 5c83534..0000000 --- a/email-worker/logger.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python3 -""" -Structured logging for email worker with Daily Rotation (Robust Version) -""" - -import os -import sys -import logging -import threading -from logging.handlers import TimedRotatingFileHandler - -# Konfiguration -LOG_DIR = "/var/log/email-worker" -LOG_FILE = os.path.join(LOG_DIR, "worker.log") - -# Logger initialisieren -logger = logging.getLogger("unified-worker") -logger.setLevel(logging.INFO) -logger.propagate = False - -# Formatierung -formatter = logging.Formatter( - '[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) - -# 1. Console Handler (Immer aktiv!) -console_handler = logging.StreamHandler(sys.stdout) -console_handler.setFormatter(formatter) -logger.addHandler(console_handler) - -# 2. File Handler (Robustes Setup) -try: - # Versuchen, das Verzeichnis zu erstellen, falls es fehlt - os.makedirs(LOG_DIR, exist_ok=True) - - file_handler = TimedRotatingFileHandler( - LOG_FILE, - when="midnight", - interval=1, - backupCount=30, - encoding='utf-8' - ) - file_handler.setFormatter(formatter) - file_handler.suffix = "%Y-%m-%d" - logger.addHandler(file_handler) - - # Erfolgsmeldung auf Konsole (damit wir sehen, dass es geklappt hat) - print(f"✓ Logging to file enabled: {LOG_FILE}") - -except Exception as e: - # Fallback: Ausführliche Fehlerdiagnose auf stdout - error_msg = f"⚠ LOGGING ERROR: Could not write to {LOG_FILE}\n" - error_msg += f" Error: {e}\n" - try: - error_msg += f" Current User (UID): {os.getuid()}\n" - error_msg += f" Current Group (GID): {os.getgid()}\n" - except: - pass - print(error_msg) - -def log(message: str, level: str = 'INFO', worker_name: str = 'unified-worker'): - """ - Structured logging function - """ - lvl_map = { - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO, - 'WARNING': logging.WARNING, - 'ERROR': logging.ERROR, - 'CRITICAL': logging.CRITICAL, - 'SUCCESS': logging.INFO - } - - log_level = lvl_map.get(level.upper(), logging.INFO) - prefix = "[SUCCESS] " if level.upper() == 'SUCCESS' else "" - final_message = f"[{worker_name}] {prefix}{message}" - - logger.log(log_level, final_message) \ No newline at end of file diff --git a/email-worker/main.py b/email-worker/main.py deleted file mode 100644 index 8b749dd..0000000 --- a/email-worker/main.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -""" -Main entry point for unified email worker -""" - -import sys -import signal - -from logger import log -from config import config -from unified_worker import UnifiedWorker -from health_server import start_health_server -from metrics.prometheus import start_metrics_server - - -def main(): - """Main entry point""" - - # Create worker instance - worker = UnifiedWorker() - - # Signal handlers for graceful shutdown - def signal_handler(signum, frame): - log(f"Received signal {signum}") - worker.stop() - sys.exit(0) - - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) # Fixed: was signalIGINT in old version - - # Setup worker - worker.setup() - - # Start metrics server (if available) - metrics = start_metrics_server(config.metrics_port) - if metrics: - worker.set_metrics(metrics) - - # Start health check server - start_health_server(worker, worker.dynamodb.available) - - # Print startup banner - worker.print_startup_banner() - - # Start worker - worker.start() - - -if __name__ == '__main__': - main() diff --git a/email-worker/metrics/__init__.py b/email-worker/metrics/__init__.py deleted file mode 100644 index 72317f5..0000000 --- a/email-worker/metrics/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env python3 -""" -Metrics collection -""" - -from .prometheus import MetricsCollector, start_metrics_server - -__all__ = ['MetricsCollector', 'start_metrics_server'] diff --git a/email-worker/metrics/prometheus.py b/email-worker/metrics/prometheus.py deleted file mode 100644 index 1afe569..0000000 --- a/email-worker/metrics/prometheus.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python3 -""" -Prometheus metrics collection -""" - -from typing import Optional - -from logger import log - -# Try to import Prometheus client -try: - from prometheus_client import start_http_server, Counter, Gauge, Histogram - PROMETHEUS_ENABLED = True -except ImportError: - PROMETHEUS_ENABLED = False - - -class MetricsCollector: - """Collects and exposes Prometheus metrics""" - - def __init__(self): - self.enabled = PROMETHEUS_ENABLED - - if self.enabled: - # Email processing metrics - self.emails_processed = Counter( - 'emails_processed_total', - 'Total emails processed', - ['domain', 'status'] - ) - - self.emails_in_flight = Gauge( - 'emails_in_flight', - 'Emails currently being processed' - ) - - self.processing_time = Histogram( - 'email_processing_seconds', - 'Time to process email', - ['domain'] - ) - - self.queue_size = Gauge( - 'queue_messages_available', - 'Messages in queue', - ['domain'] - ) - - # Bounce metrics - self.bounces_processed = Counter( - 'bounces_processed_total', - 'Bounce notifications processed', - ['domain', 'type'] - ) - - # Rules metrics - self.autoreplies_sent = Counter( - 'autoreplies_sent_total', - 'Auto-replies sent', - ['domain'] - ) - - self.forwards_sent = Counter( - 'forwards_sent_total', - 'Forwards sent', - ['domain'] - ) - - # Blocklist metrics - self.blocked_senders = Counter( - 'blocked_senders_total', - 'Emails blocked by blacklist', - ['domain'] - ) - - def increment_processed(self, domain: str, status: str): - """Increment processed email counter""" - if self.enabled: - self.emails_processed.labels(domain=domain, status=status).inc() - - def increment_in_flight(self): - """Increment in-flight email gauge""" - if self.enabled: - self.emails_in_flight.inc() - - def decrement_in_flight(self): - """Decrement in-flight email gauge""" - if self.enabled: - self.emails_in_flight.dec() - - def observe_processing_time(self, domain: str, seconds: float): - """Record processing time""" - if self.enabled: - self.processing_time.labels(domain=domain).observe(seconds) - - def set_queue_size(self, domain: str, size: int): - """Set queue size""" - if self.enabled: - self.queue_size.labels(domain=domain).set(size) - - def increment_bounce(self, domain: str, bounce_type: str): - """Increment bounce counter""" - if self.enabled: - self.bounces_processed.labels(domain=domain, type=bounce_type).inc() - - def increment_autoreply(self, domain: str): - """Increment autoreply counter""" - if self.enabled: - self.autoreplies_sent.labels(domain=domain).inc() - - def increment_forward(self, domain: str): - """Increment forward counter""" - if self.enabled: - self.forwards_sent.labels(domain=domain).inc() - - def increment_blocked(self, domain: str): - """Increment blocked sender counter""" - if self.enabled: - self.blocked_senders.labels(domain=domain).inc() - - -def start_metrics_server(port: int) -> Optional[MetricsCollector]: - """ - Start Prometheus metrics HTTP server - - Args: - port: Port to listen on - - Returns: - MetricsCollector instance or None if Prometheus not available - """ - if not PROMETHEUS_ENABLED: - log("⚠ Prometheus client not installed, metrics disabled", 'WARNING') - return None - - try: - start_http_server(port) - log(f"Prometheus metrics on port {port}") - return MetricsCollector() - except Exception as e: - log(f"Failed to start metrics server: {e}", 'ERROR') - return None diff --git a/email-worker/requirements.txt b/email-worker/requirements.txt deleted file mode 100644 index b366aaa..0000000 --- a/email-worker/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3>=1.34.0 -prometheus-client>=0.19.0 diff --git a/email-worker/smtp/__init__.py b/email-worker/smtp/__init__.py deleted file mode 100644 index 2eca07c..0000000 --- a/email-worker/smtp/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env python3 -""" -SMTP connection handling -""" - -from .pool import SMTPPool - -__all__ = ['SMTPPool'] diff --git a/email-worker/smtp/delivery.py b/email-worker/smtp/delivery.py deleted file mode 100644 index ce02a88..0000000 --- a/email-worker/smtp/delivery.py +++ /dev/null @@ -1,187 +0,0 @@ -#!/usr/bin/env python3 -""" -SMTP/LMTP email delivery with retry logic -""" - -import time -import smtplib -from typing import Tuple, Optional - -from logger import log -from config import config -from smtp.pool import SMTPPool - - -class EmailDelivery: - """Handles email delivery via SMTP or LMTP""" - - def __init__(self, smtp_pool: SMTPPool): - self.smtp_pool = smtp_pool - - @staticmethod - def is_permanent_recipient_error(error_msg: str) -> bool: - """Check if error is permanent for this recipient (inbox doesn't exist)""" - permanent_indicators = [ - '550', # Mailbox unavailable / not found - '551', # User not local - '553', # Mailbox name not allowed / invalid - 'mailbox not found', - 'user unknown', - 'no such user', - 'recipient rejected', - 'does not exist', - 'invalid recipient', - 'unknown user' - ] - - error_lower = error_msg.lower() - return any(indicator in error_lower for indicator in permanent_indicators) - - def send_to_recipient( - self, - from_addr: str, - recipient: str, - raw_message: bytes, - worker_name: str, - max_retries: int = 2 - ) -> Tuple[bool, Optional[str], bool]: - """ - Send email via SMTP/LMTP to ONE recipient - - If LMTP is enabled, delivers directly to Dovecot (bypasses transport_maps). - With retry logic for connection errors. - - Args: - from_addr: From address - recipient: Recipient address - raw_message: Raw MIME message bytes - worker_name: Worker name for logging - max_retries: Maximum retry attempts - - Returns: - Tuple of (success: bool, error: str or None, is_permanent: bool) - """ - last_error = None - use_lmtp = config.lmtp_enabled - - for attempt in range(max_retries + 1): - conn = None - - try: - if use_lmtp: - # LMTP connection directly to Dovecot (bypasses Postfix/transport_maps) - conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30) - conn.ehlo() - else: - # Normal SMTP connection from pool - conn = self.smtp_pool.get_connection() - if not conn: - last_error = "Could not get SMTP connection" - log( - f" ⚠ {recipient}: No SMTP connection " - f"(attempt {attempt + 1}/{max_retries + 1})", - 'WARNING', - worker_name - ) - time.sleep(0.5) - continue - - result = conn.sendmail(from_addr, [recipient], raw_message) - - # Success - if use_lmtp: - conn.quit() - else: - self.smtp_pool.return_connection(conn) - - if isinstance(result, dict) and result: - error = str(result.get(recipient, 'Unknown refusal')) - is_permanent = self.is_permanent_recipient_error(error) - log( - f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", - 'ERROR', - worker_name - ) - return False, error, is_permanent - else: - delivery_method = "LMTP" if use_lmtp else "SMTP" - log(f" ✓ {recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name) - return True, None, False - - except smtplib.SMTPServerDisconnected as e: - # Connection was closed - Retry with new connection - log( - f" ⚠ {recipient}: Connection lost, retrying... " - f"(attempt {attempt + 1}/{max_retries + 1})", - 'WARNING', - worker_name - ) - last_error = str(e) - if conn: - try: - conn.quit() - except: - pass - time.sleep(0.3) - continue - - except smtplib.SMTPRecipientsRefused as e: - if conn and not use_lmtp: - self.smtp_pool.return_connection(conn) - elif conn: - try: - conn.quit() - except: - pass - error_msg = str(e) - is_permanent = self.is_permanent_recipient_error(error_msg) - log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name) - return False, error_msg, is_permanent - - except smtplib.SMTPException as e: - error_msg = str(e) - # On connection errors: Retry - if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower(): - log( - f" ⚠ {recipient}: Connection error, retrying... " - f"(attempt {attempt + 1}/{max_retries + 1})", - 'WARNING', - worker_name - ) - last_error = error_msg - if conn: - try: - conn.quit() - except: - pass - time.sleep(0.3) - continue - - if conn and not use_lmtp: - self.smtp_pool.return_connection(conn) - elif conn: - try: - conn.quit() - except: - pass - is_permanent = self.is_permanent_recipient_error(error_msg) - log(f" ✗ {recipient}: Error - {error_msg}", 'ERROR', worker_name) - return False, error_msg, is_permanent - - except Exception as e: - # Unknown error - if conn: - try: - conn.quit() - except: - pass - log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name) - return False, str(e), False - - # All retries failed - log( - f" ✗ {recipient}: All retries failed - {last_error}", - 'ERROR', - worker_name - ) - return False, last_error or "Connection failed after retries", False diff --git a/email-worker/smtp/pool.py b/email-worker/smtp/pool.py deleted file mode 100644 index 63f6e89..0000000 --- a/email-worker/smtp/pool.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python3 -""" -SMTP Connection Pool with robust connection handling -""" - -import smtplib -from queue import Queue, Empty -from typing import Optional - -from logger import log -from config import config - - -class SMTPPool: - """Thread-safe SMTP Connection Pool""" - - def __init__(self, host: str, port: int, pool_size: int = 5): - self.host = host - self.port = port - self.pool_size = pool_size - self._pool: Queue = Queue(maxsize=pool_size) - self._initialized = False - - def _create_connection(self) -> Optional[smtplib.SMTP]: - """Create new SMTP connection""" - try: - conn = smtplib.SMTP(self.host, self.port, timeout=30) - conn.ehlo() - if config.smtp_use_tls: - conn.starttls() - conn.ehlo() - if config.smtp_user and config.smtp_pass: - conn.login(config.smtp_user, config.smtp_pass) - log(f" 📡 New SMTP connection created to {self.host}:{self.port}") - return conn - except Exception as e: - log(f"Failed to create SMTP connection: {e}", 'ERROR') - return None - - def _test_connection(self, conn: smtplib.SMTP) -> bool: - """Test if connection is still alive""" - try: - status = conn.noop()[0] - return status == 250 - except Exception: - return False - - def initialize(self): - """Pre-create connections""" - if self._initialized: - return - - # Only 1-2 connections initially, rest on-demand - for _ in range(min(2, self.pool_size)): - conn = self._create_connection() - if conn: - self._pool.put(conn) - - self._initialized = True - log(f"SMTP pool initialized with {self._pool.qsize()} connections (max: {self.pool_size})") - - def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]: - """Get a valid connection from pool or create new one""" - # Try to get from pool - try: - conn = self._pool.get(block=False) - # Test if connection is still alive - if self._test_connection(conn): - return conn - else: - # Connection is dead, close and create new one - try: - conn.quit() - except: - pass - log(f" ♻ Recycled stale SMTP connection") - return self._create_connection() - except Empty: - # Pool empty, create new connection - return self._create_connection() - - def return_connection(self, conn: smtplib.SMTP): - """Return connection to pool if still valid""" - if conn is None: - return - - # Check if connection is still good - if not self._test_connection(conn): - try: - conn.quit() - except: - pass - log(f" 🗑 Discarded broken SMTP connection") - return - - # Try to return to pool - try: - self._pool.put_nowait(conn) - except: - # Pool full, close connection - try: - conn.quit() - except: - pass - - def close_all(self): - """Close all connections""" - while not self._pool.empty(): - try: - conn = self._pool.get_nowait() - conn.quit() - except: - pass diff --git a/email-worker/unified_worker.py b/email-worker/unified_worker.py deleted file mode 100644 index 8bbc9da..0000000 --- a/email-worker/unified_worker.py +++ /dev/null @@ -1,201 +0,0 @@ -#!/usr/bin/env python3 -""" -Unified Worker - coordinates all domain pollers -""" - -import sys -import time -import threading -from typing import List, Dict - -from logger import log -from config import config, load_domains -from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler -from smtp import SMTPPool -from smtp.delivery import EmailDelivery -from worker import MessageProcessor -from domain_poller import DomainPoller -from metrics.prometheus import MetricsCollector - - -class UnifiedWorker: - """Main worker coordinating all domain pollers""" - - def __init__(self): - self.stop_event = threading.Event() - self.domains: List[str] = [] - self.queue_urls: Dict[str, str] = {} - self.poller_threads: List[threading.Thread] = [] - - # Shared stats across all pollers - self.domain_stats: Dict[str, int] = {} # domain -> processed count - self.stats_lock = threading.Lock() - - # AWS handlers - self.s3 = S3Handler() - self.sqs = SQSHandler() - self.ses = SESHandler() - self.dynamodb = DynamoDBHandler() - - # SMTP pool - self.smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size) - - # Email delivery - self.delivery = EmailDelivery(self.smtp_pool) - - # Metrics - self.metrics: MetricsCollector = None - - # Message processor - self.processor = MessageProcessor( - self.s3, - self.sqs, - self.ses, - self.dynamodb, - self.delivery, - None # Metrics will be set later - ) - - def setup(self): - """Initialize worker""" - self.domains = load_domains() - - if not self.domains: - log("❌ No domains configured!", 'ERROR') - sys.exit(1) - - # Get queue URLs - for domain in self.domains: - url = self.sqs.get_queue_url(domain) - if url: - self.queue_urls[domain] = url - log(f" ✓ {domain} -> queue found") - else: - log(f" ✗ {domain} -> Queue not found!", 'WARNING') - - if not self.queue_urls: - log("❌ No valid queues found!", 'ERROR') - sys.exit(1) - - # Initialize SMTP pool - self.smtp_pool.initialize() - - log(f"Initialized with {len(self.queue_urls)} domains") - - def start(self): - """Start all domain pollers""" - # Initialize stats for all domains - for domain in self.queue_urls.keys(): - self.domain_stats[domain] = 0 - - # Create poller for each domain - for domain, queue_url in self.queue_urls.items(): - poller = DomainPoller( - domain=domain, - queue_url=queue_url, - message_processor=self.processor, - sqs=self.sqs, - metrics=self.metrics, - stop_event=self.stop_event, - stats_dict=self.domain_stats, - stats_lock=self.stats_lock - ) - - thread = threading.Thread( - target=poller.poll, - name=f"poller-{domain}", - daemon=True - ) - thread.start() - self.poller_threads.append(thread) - - log(f"Started {len(self.poller_threads)} domain pollers") - - # Periodic status log (every 5 minutes) - last_status_log = time.time() - status_interval = 300 # 5 minutes - - try: - while not self.stop_event.is_set(): - self.stop_event.wait(timeout=10) - - # Log status summary every 5 minutes - if time.time() - last_status_log > status_interval: - self._log_status_table() - last_status_log = time.time() - except KeyboardInterrupt: - pass - - def _log_status_table(self): - """Log a compact status table""" - active_threads = sum(1 for t in self.poller_threads if t.is_alive()) - - with self.stats_lock: - total_processed = sum(self.domain_stats.values()) - - # Build compact stats: only show domains with activity or top domains - stats_parts = [] - for domain in sorted(self.queue_urls.keys()): - count = self.domain_stats.get(domain, 0) - if count > 0: # Only show active domains - # Shorten domain for display - short_domain = domain.split('.')[0][:12] - stats_parts.append(f"{short_domain}:{count}") - - if stats_parts: - stats_line = " | ".join(stats_parts) - else: - stats_line = "no activity" - - log( - f"📊 Status: {active_threads}/{len(self.poller_threads)} active, " - f"total:{total_processed} | {stats_line}" - ) - - def stop(self): - """Stop gracefully""" - log("⚠ Stopping worker...") - self.stop_event.set() - - # Wait for poller threads (max 10 seconds each) - for thread in self.poller_threads: - thread.join(timeout=10) - if thread.is_alive(): - log(f"Warning: {thread.name} did not stop gracefully", 'WARNING') - - self.smtp_pool.close_all() - log("👋 Worker stopped") - - def set_metrics(self, metrics: MetricsCollector): - """Set metrics collector""" - self.metrics = metrics - self.processor.metrics = metrics - - def print_startup_banner(self): - """Print startup information""" - log(f"\n{'='*70}") - log(f"🚀 UNIFIED EMAIL WORKER") - log(f"{'='*70}") - log(f" Domains: {len(self.queue_urls)}") - log(f" DynamoDB: {'Connected' if self.dynamodb.available else 'Not Available'}") - - if config.lmtp_enabled: - log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)") - else: - log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}") - - log(f" Poll Interval: {config.poll_interval}s") - log(f" Visibility: {config.visibility_timeout}s") - log(f"") - log(f" Features:") - log(f" ✓ Bounce Detection & Header Rewriting") - log(f" {'✓' if self.dynamodb.available else '✗'} Auto-Reply / Out-of-Office") - log(f" {'✓' if self.dynamodb.available else '✗'} Email Forwarding") - log(f" {'✓' if self.dynamodb.available else '✗'} Blocked Senders (Wildcard)") - log(f" {'✓' if self.metrics else '✗'} Prometheus Metrics") - log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery") - log(f"") - log(f" Active Domains:") - for domain in sorted(self.queue_urls.keys()): - log(f" • {domain}") - log(f"{'='*70}\n") diff --git a/email-worker/worker.py b/email-worker/worker.py deleted file mode 100644 index 29e27a5..0000000 --- a/email-worker/worker.py +++ /dev/null @@ -1,352 +0,0 @@ -#!/usr/bin/env python3 -""" -Email message processing worker -""" - -import json -import traceback - -from logger import log -from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler -from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker -from smtp.delivery import EmailDelivery -from metrics.prometheus import MetricsCollector -from email.parser import BytesParser # War wahrscheinlich schon da, prüfen -from email.policy import compat32 # <--- NEU: Hinzufügen - - -class MessageProcessor: - """Processes individual email messages""" - - def __init__( - self, - s3: S3Handler, - sqs: SQSHandler, - ses: SESHandler, - dynamodb: DynamoDBHandler, - delivery: EmailDelivery, - metrics: MetricsCollector - ): - self.s3 = s3 - self.sqs = sqs - self.ses = ses - self.dynamodb = dynamodb - self.delivery = delivery - self.metrics = metrics - - # Initialize sub-processors - self.parser = EmailParser() - self.bounce_handler = BounceHandler(dynamodb) - self.rules_processor = RulesProcessor(dynamodb, ses) - self.blocklist = BlocklistChecker(dynamodb) - - def process_message(self, domain: str, message: dict, receive_count: int) -> bool: - """ - Process one email message from queue - - Args: - domain: Email domain - message: SQS message dict - receive_count: Number of times received - - Returns: - True to delete from queue, False to retry - """ - worker_name = f"worker-{domain}" - - try: - # 1. UNPACKING (SNS -> SES) - message_body = json.loads(message['Body']) - - if 'Message' in message_body and 'Type' in message_body: - # It's an SNS Notification - sns_content = message_body['Message'] - if isinstance(sns_content, str): - ses_msg = json.loads(sns_content) - else: - ses_msg = sns_content - else: - ses_msg = message_body - - # 2. EXTRACT DATA - mail = ses_msg.get('mail', {}) - receipt = ses_msg.get('receipt', {}) - - message_id = mail.get('messageId') - - # FIX: Ignore Amazon SES Setup Notification - if message_id == "AMAZON_SES_SETUP_NOTIFICATION": - log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO', worker_name) - return True - - from_addr = mail.get('source') - recipients = receipt.get('recipients', []) - - if not message_id: - log("❌ Error: No messageId in event payload", 'ERROR', worker_name) - return True - - # Domain Validation - if recipients: - first_recipient = recipients[0] - recipient_domain = first_recipient.split('@')[1] - - if recipient_domain.lower() != domain.lower(): - log( - f"⚠ Security: Ignored message for {recipient_domain} " - f"(I am worker for {domain})", - 'WARNING', - worker_name - ) - return True - else: - log("⚠ Warning: No recipients in event", 'WARNING', worker_name) - return True - - key = message_id - - # Compact single-line log for email processing - recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients" - log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name) - - # 3. DOWNLOAD FROM S3 - raw_bytes = self.s3.get_email(domain, message_id, receive_count) - if raw_bytes is None: - # S3 object not found yet, retry - return False - - # 4. LOOP DETECTION - temp_parsed = self.parser.parse_bytes(raw_bytes) - skip_rules = self.parser.is_processed_by_worker(temp_parsed) - - if skip_rules: - log("🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) - - # 5. PARSING & BOUNCE LOGIC - try: - # --- FIX 2.0: Pre-Sanitize via Legacy Mode --- - # Der strikte Parser crasht SOFORT beim Zugriff auf kaputte Header. - # Wir müssen erst "nachsichtig" parsen, reparieren und Bytes neu generieren. - try: - # 1. Parsen im Compat32-Modus (ignoriert Syntaxfehler) - lenient_parser = BytesParser(policy=compat32) - temp_msg = lenient_parser.parsebytes(raw_bytes) - - # 2. Prüfen und Reparieren - bad_msg_id = temp_msg.get('Message-ID', '') - if bad_msg_id and ('[' in bad_msg_id or ']' in bad_msg_id): - clean_id = bad_msg_id.replace('[', '').replace(']', '') - temp_msg.replace_header('Message-ID', clean_id) - - # 3. Bytes mit repariertem Header neu schreiben - raw_bytes = temp_msg.as_bytes() - log(f" 🔧 Sanitized malformed Message-ID via Legacy Mode: {clean_id}", 'INFO', worker_name) - - if self.metrics: - self.metrics.increment_bounce(domain, 'sanitized_header') - - except Exception as e_sanitize: - # Sollte nicht passieren, aber wir wollen hier nicht abbrechen - log(f" ⚠ Sanitization warning: {e_sanitize}", 'WARNING', worker_name) - # --------------------------------------------- - - - parsed = self.parser.parse_bytes(raw_bytes) - - # --- FIX START: Sanitize Malformed Headers --- - # Fix für Microsofts <[uuid]@domain> Message-IDs, die Python crashen lassen - current_msg_id = parsed.get('Message-ID', '') - if current_msg_id and ('[' in current_msg_id or ']' in current_msg_id): - # Klammern entfernen, aber spitze Klammern behalten - clean_id = current_msg_id.replace('[', '').replace(']', '') - parsed.replace_header('Message-ID', clean_id) - log(" 🔧 Sanitized malformed Message-ID", 'INFO', worker_name) - # --- FIX END --- - - subject = parsed.get('Subject', '(no subject)') - - # Bounce header rewriting - is_bounce = self.bounce_handler.is_ses_bounce_notification(parsed) - parsed, modified = self.bounce_handler.apply_bounce_logic(parsed, subject, worker_name) - - if modified: - log(" ✨ Bounce detected & headers rewritten via DynamoDB", 'INFO', worker_name) - raw_bytes = parsed.as_bytes() - from_addr_final = parsed.get('From') - - if self.metrics: - self.metrics.increment_bounce(domain, 'rewritten') - else: - from_addr_final = from_addr - - # Marker für alle Emails von extern setzen - if not skip_rules: # Nur wenn nicht bereits processed - parsed['X-SES-Worker-Processed'] = 'delivered' - raw_bytes = parsed.as_bytes() # <--- Hier knallte es vorher - - except Exception as e: - # --- VERBESSERTES ERROR LOGGING --- - error_msg = f"⚠ Parsing/Logic Error: {e}. Sending original." - log(error_msg, 'WARNING', worker_name) - - # Den vollen Traceback ins Log schreiben (als ERROR markiert) - tb_str = traceback.format_exc() - log(f"Full Traceback:\n{tb_str}", 'ERROR', worker_name) - # ---------------------------------- - - # Fallback: Wir versuchen trotzdem, die Original-Mail zuzustellen - from_addr_final = from_addr - is_bounce = False - skip_rules = False - - # 6. BLOCKLIST CHECK (Batch for efficiency) - senders_to_check = [] - - # 1. Die Envelope-Adresse (aus dem SES Event / Return-Path) - if from_addr: - senders_to_check.append(from_addr) - - # 2. Die echte Header-Adresse (aus der MIME-E-Mail geparst) - header_from = parsed.get('From') - if header_from and header_from not in senders_to_check: - senders_to_check.append(header_from) - - # 3. Falls die Bounce-Logik die Adresse umgeschrieben hat - if from_addr_final and from_addr_final not in senders_to_check: - senders_to_check.append(from_addr_final) - - # Prüfe nun alle extrahierten Adressen gegen die Datenbank - blocked_by_recipient = self.blocklist.batch_check_blocked_senders( - recipients, - senders_to_check, # <-- Übergabe der Liste - worker_name - ) - - # 7. PROCESS RECIPIENTS - log(f"📤 Sending to {len(recipients)} recipient(s)...", 'INFO', worker_name) - - successful = [] - failed_permanent = [] - failed_temporary = [] - blocked_recipients = [] - - for recipient in recipients: - # Check if blocked - if blocked_by_recipient.get(recipient, False): - log( - f"🗑 Silently dropping message for {recipient} (Sender blocked)", - 'INFO', - worker_name - ) - blocked_recipients.append(recipient) - if self.metrics: - self.metrics.increment_blocked(domain) - continue - - # Process rules (OOO, Forwarding) - not for bounces or already forwarded - skip_local_delivery = False # NEU - if not is_bounce and not skip_rules: - def metrics_callback(action_type: str, dom: str): - """Callback for metrics from rules processor""" - if self.metrics: - if action_type == 'autoreply': - self.metrics.increment_autoreply(dom) - elif action_type == 'forward': - self.metrics.increment_forward(dom) - - skip_local_delivery = self.rules_processor.process_rules_for_recipient( - recipient, - parsed, - domain, - worker_name, - metrics_callback - ) - - # SMTP Delivery - if skip_local_delivery: # NEU - log(f" ⏭ Skipping local delivery for {recipient} (legacy forward active)", - 'INFO', worker_name) - successful.append(recipient) # Zählt als "handled" - else: - success, error, is_perm = self.delivery.send_to_recipient( - from_addr_final, recipient, raw_bytes, worker_name - ) - - if success: - successful.append(recipient) - if self.metrics: - self.metrics.increment_processed(domain, 'success') - elif is_perm: - failed_permanent.append(recipient) - if self.metrics: - self.metrics.increment_processed(domain, 'permanent_failure') - else: - failed_temporary.append(recipient) - if self.metrics: - self.metrics.increment_processed(domain, 'temporary_failure') - - # 8. RESULT & CLEANUP - total_handled = len(successful) + len(failed_permanent) + len(blocked_recipients) - - if total_handled == len(recipients): - # All recipients handled (success, permanent fail, or blocked) - - if len(blocked_recipients) == len(recipients): - # All recipients blocked - mark and delete S3 object - try: - self.s3.mark_as_blocked( - domain, - message_id, - blocked_recipients, - from_addr_final, - worker_name - ) - self.s3.delete_blocked_email(domain, message_id, worker_name) - except Exception as e: - log(f"⚠ Failed to handle blocked email: {e}", 'ERROR', worker_name) - # Don't delete from queue if S3 operations failed - return False - - elif len(successful) > 0: - # At least one success - self.s3.mark_as_processed( - domain, - message_id, - worker_name, - failed_permanent if failed_permanent else None - ) - - elif len(failed_permanent) > 0: - # All failed permanently - self.s3.mark_as_all_invalid( - domain, - message_id, - failed_permanent, - worker_name - ) - - # Build result summary - result_parts = [] - if successful: - result_parts.append(f"{len(successful)} OK") - if failed_permanent: - result_parts.append(f"{len(failed_permanent)} invalid") - if blocked_recipients: - result_parts.append(f"{len(blocked_recipients)} blocked") - - log(f"✅ Completed ({', '.join(result_parts)})", 'SUCCESS', worker_name) - return True - - else: - # Some recipients had temporary failures - log( - f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", - 'WARNING', - worker_name - ) - return False - - except Exception as e: - log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR', worker_name) - traceback.print_exc() - return False