#!/usr/bin/env python3 """ Optimierte S3 E-Mail Processor REST API Mit Kompression, verbesserter Fehlerbehandlung und Request-Tracking """ import os import time import logging import json import re import hashlib import base64 import gzip import boto3 from pathlib import Path from email.parser import BytesParser from email import policy from dotenv import load_dotenv from flask import Flask, request, jsonify, abort from functools import wraps # .env-Datei laden load_dotenv() # Flask-App initialisieren app = Flask(__name__) # Logging konfigurieren mit Request-ID-Support class RequestIDFilter(logging.Filter): def filter(self, record): from flask import g record.request_id = getattr(g, 'request_id', 'no-request') return True logging.basicConfig( level=logging.INFO, format='%(asctime)s - [%(request_id)s] - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('s3_email_processor_api.log'), logging.StreamHandler() ] ) logger = logging.getLogger("s3-email-processor-api") logger.addFilter(RequestIDFilter()) # Konfiguration MAIL_DIR = os.environ.get('MAIL_DIR', './mail') API_TOKEN = os.environ.get('API_TOKEN') # Request-Tracking für Duplikat-Erkennung processed_requests = {} REQUEST_CACHE_SIZE = 1000 REQUEST_CACHE_TTL = 3600 # 1 Stunde def require_token(f): @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): logger.warning("Fehlender Authorization-Header") abort(401, description="Fehlender Authorization-Header") token = auth_header[7:] if not API_TOKEN or token != API_TOKEN: logger.warning("Ungültiger API-Token") abort(403, description="Ungültiger API-Token") return f(*args, **kwargs) return decorated_function def setup_request_context(): """Setzt Request-Kontext für Logging auf""" from flask import g g.request_id = request.headers.get('X-Request-ID', f'req-{int(time.time())}-{id(request)}') @app.before_request def before_request(): setup_request_context() def is_duplicate_request(request_id, etag): """Prüft, ob Request bereits verarbeitet wurde""" if not request_id or not etag: return False key = f"{request_id}:{etag}" current_time = time.time() # Cache cleanup if len(processed_requests) > REQUEST_CACHE_SIZE: cutoff_time = current_time - REQUEST_CACHE_TTL processed_requests.clear() # Einfache Lösung: kompletten Cache leeren # Duplikat-Check if key in processed_requests: last_processed = processed_requests[key] if current_time - last_processed < REQUEST_CACHE_TTL: return True # Request als verarbeitet markieren processed_requests[key] = current_time return False def load_domains_config(): """Lädt die Domain-Konfiguration aus Umgebungsvariablen""" domains = {} domain_index = 1 while True: domain_key = f"DOMAIN_{domain_index}" domain_name = os.environ.get(domain_key) if not domain_name: break bucket = os.environ.get(f"{domain_key}_BUCKET", "") prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/") usernames = os.environ.get(f"{domain_key}_USERNAMES", "") region = os.environ.get(f"{domain_key}_REGION", "us-east-2") if bucket and usernames: domains[domain_name.lower()] = { "bucket": bucket, "prefix": prefix, "usernames": usernames.split(','), "region": region } logger.info(f"Domain {domain_name} konfiguriert") else: logger.warning(f"Unvollständige Konfiguration für {domain_name}") domain_index += 1 return domains def extract_email_address(address): """Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat""" if not address: return None if '@' in address and '<' not in address: return address.strip() match = re.search(r'<([^>]+)>', address) if match: return match.group(1) return address.strip() def is_valid_recipient(to_address, domains_config): """Prüft, ob die Empfängeradresse gültig ist""" email = extract_email_address(to_address) if not email or '@' not in email: return False, None username, domain = email.split('@', 1) if domain.lower() in domains_config: domain_config = domains_config[domain.lower()] if username.lower() in [u.lower() for u in domain_config["usernames"]]: return True, domain.lower() return False, None def get_maildir_path(to_address, mail_dir): """Ermittelt den Pfad im Maildir-Format""" email = extract_email_address(to_address) if '@' in email: user, domain = email.split('@', 1) else: return None mail_dir_path = Path(mail_dir) domain_dir = mail_dir_path / domain user_dir = domain_dir / user # Maildir-Struktur erstellen for directory in [mail_dir_path, domain_dir, user_dir]: directory.mkdir(parents=True, exist_ok=True) os.chmod(directory, 0o775) # Maildir-Unterverzeichnisse for subdir in ['cur', 'new', 'tmp']: subdir_path = user_dir / subdir subdir_path.mkdir(exist_ok=True) return user_dir def store_email(email_content, to_address, message_id, s3_key, mail_dir): """Speichert eine E-Mail im Maildir-Format""" try: maildir = get_maildir_path(to_address, mail_dir) if not maildir: logger.error(f"Konnte Maildir für {to_address} nicht ermitteln") return False # Eindeutigen Dateinamen generieren timestamp = int(time.time()) hostname = 'mail' unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest() filename = f"{timestamp}.{unique_id}.{hostname}:2," email_path = maildir / 'new' / filename with open(email_path, 'wb') as f: f.write(email_content) os.chmod(email_path, 0o664) logger.info(f"E-Mail gespeichert: {email_path} ({len(email_content)} Bytes)") return True except Exception as e: logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}") return False def process_single_email(email_content, bucket, key, domain_name): """Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung""" try: all_domains_config = load_domains_config() domain_name = domain_name.lower() if domain_name not in all_domains_config: logger.error(f"Domain {domain_name} nicht konfiguriert") return { "action": "error", "error": f"Domain {domain_name} nicht konfiguriert", "status": "error" } # Header parsen mit besserer Fehlerbehandlung try: headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) to_address = headers.get('To', '') from_address = headers.get('From', '') date = headers.get('Date', '') message_id = headers.get('Message-ID', '') subject = headers.get('Subject', '') # Header-Validierung if not to_address: logger.warning(f"E-Mail ohne To-Header: {key}") return { "action": "invalid", "error": "Fehlender To-Header", "status": "rejected" } logger.info(f"Verarbeite: '{subject}' von {from_address} an {to_address}") except Exception as e: logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}") return { "action": "invalid", "error": f"Header-Parsing-Fehler: {str(e)}", "status": "error" } # Empfänger validieren is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config) if not is_valid: logger.info(f"Ungültige Empfängeradresse: {to_address}") return { "action": "invalid", "message": f"Ungültige Empfängeradresse: {to_address}", "status": "rejected", "recipient": to_address, "reason": "invalid_recipient" } # Domain-Zugehörigkeit prüfen if recipient_domain != domain_name: logger.info(f"E-Mail gehört zu Domain {recipient_domain}, nicht zu {domain_name}") return { "action": "wrong_domain", "message": f"E-Mail gehört zu Domain {recipient_domain}", "status": "skipped", "expected_domain": domain_name, "actual_domain": recipient_domain } # E-Mail speichern if store_email(email_content, to_address, message_id, key, MAIL_DIR): logger.info(f"E-Mail erfolgreich gespeichert für {to_address}") return { "action": "stored", "message": f"E-Mail erfolgreich gespeichert", "status": "success", "recipient": to_address, "sender": from_address, "subject": subject, "size": len(email_content) } else: return { "action": "error", "error": "Fehler beim Speichern der E-Mail", "status": "error" } except Exception as e: logger.error(f"Unerwarteter Fehler bei E-Mail {key}: {str(e)}", exc_info=True) return { "action": "error", "error": str(e), "status": "error" } # API-Endpunkte @app.route('/health', methods=['GET']) def health_check(): """Erweiterte Gesundheitsprüfung""" from flask import g # Basis-Gesundheitscheck health_status = { "status": "OK", "message": "S3 E-Mail Processor API ist aktiv", "timestamp": int(time.time()), "version": "2.0", "request_id": getattr(g, 'request_id', 'health-check') } # Erweiterte Checks try: # Maildir-Zugriff prüfen mail_path = Path(MAIL_DIR) if mail_path.exists() and mail_path.is_dir(): health_status["mail_dir"] = "accessible" else: health_status["mail_dir"] = "not_accessible" health_status["status"] = "WARNING" # Domain-Konfiguration prüfen domains = load_domains_config() health_status["configured_domains"] = len(domains) health_status["domains"] = list(domains.keys()) # Cache-Status health_status["request_cache_size"] = len(processed_requests) except Exception as e: health_status["status"] = "ERROR" health_status["error"] = str(e) return jsonify(health_status) @app.route('/process/', methods=['POST']) @require_token def process_email(domain): """Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung""" from flask import g start_time = time.time() try: # JSON-Payload validieren if not request.is_json: return jsonify({"error": "Content-Type muss application/json sein"}), 400 data = request.get_json() # Erforderliche Felder prüfen required_fields = ['bucket', 'key', 'email_content', 'domain'] missing_fields = [field for field in required_fields if field not in data] if missing_fields: return jsonify({ "error": f"Fehlende Felder: {', '.join(missing_fields)}" }), 400 # Duplikat-Check request_id = data.get('request_id', g.request_id) etag = data.get('etag') if is_duplicate_request(request_id, etag): logger.info(f"Duplikat-Request erkannt: {request_id}:{etag}") return jsonify({ "action": "duplicate", "message": "Request bereits verarbeitet", "status": "skipped", "request_id": request_id }) # E-Mail-Content dekodieren try: email_base64 = data['email_content'] # Kompression-Support if data.get('compressed', False): compressed_data = base64.b64decode(email_base64) email_content = gzip.decompress(compressed_data) logger.info(f"E-Mail dekomprimiert: {data.get('compressed_size', 0)} -> {len(email_content)} Bytes") else: email_content = base64.b64decode(email_base64) except Exception as e: logger.error(f"Fehler beim Dekodieren des E-Mail-Contents: {str(e)}") return jsonify({ "error": f"Content-Dekodierung fehlgeschlagen: {str(e)}" }), 400 # Größen-Validierung email_size = len(email_content) max_size = 25 * 1024 * 1024 # 25MB if email_size > max_size: logger.warning(f"E-Mail zu groß: {email_size} Bytes") return jsonify({ "action": "too_large", "error": f"E-Mail zu groß: {email_size} Bytes (max: {max_size})", "status": "rejected" }), 413 # E-Mail verarbeiten logger.info(f"Verarbeite E-Mail: {data['key']} ({email_size} Bytes)") result = process_single_email( email_content=email_content, bucket=data['bucket'], key=data['key'], domain_name=domain ) # Performance-Metriken hinzufügen processing_time = time.time() - start_time result.update({ "processing_time_ms": round(processing_time * 1000, 2), "request_id": request_id, "email_size": email_size }) # Log-Level basierend auf Ergebnis if result.get("status") == "success": logger.info(f"E-Mail erfolgreich verarbeitet in {processing_time:.2f}s") elif result.get("status") in ["rejected", "skipped"]: logger.info(f"E-Mail {result.get('action')}: {result.get('message')}") else: logger.error(f"E-Mail-Verarbeitung fehlgeschlagen: {result.get('error')}") # HTTP-Status basierend auf Ergebnis if result.get("status") == "error": return jsonify(result), 500 elif result.get("action") == "too_large": return jsonify(result), 413 else: return jsonify(result), 200 except Exception as e: processing_time = time.time() - start_time logger.error(f"Unerwarteter Fehler nach {processing_time:.2f}s: {str(e)}", exc_info=True) return jsonify({ "error": "Interner Server-Fehler", "details": str(e), "processing_time_ms": round(processing_time * 1000, 2), "request_id": getattr(g, 'request_id', 'unknown') }), 500 @app.route('/retry//', methods=['GET']) @require_token def retry_single_email(email_id, domain): """ Retry-Endpunkt für einzelne E-Mail basierend auf ID Sucht die E-Mail in S3 anhand der ID und verarbeitet sie erneut """ from flask import g start_time = time.time() logger.info(f"Retry-Request für E-Mail-ID: {email_id} in Domain: {domain}") try: # Domain-Konfiguration laden all_domains_config = load_domains_config() domain_name = domain.lower() if domain_name not in all_domains_config: logger.error(f"Domain {domain_name} nicht konfiguriert") return jsonify({ "error": f"Domain {domain_name} nicht konfiguriert", "status": "error", "email_id": email_id }), 400 domain_config = all_domains_config[domain_name] bucket = domain_config["bucket"] prefix = domain_config["prefix"] region = domain_config["region"] # S3-Client initialisieren s3_client = boto3.client('s3', region_name=region) # E-Mail anhand der ID suchen logger.info(f"Suche E-Mail mit ID {email_id} in Bucket {bucket}") try: # Paginator für alle Objekte im Bucket paginator = s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket, Prefix=prefix) found_key = None for page in pages: if 'Contents' not in page: continue for obj in page['Contents']: key = obj['Key'] # Verzeichnisse überspringen if key.endswith('/'): continue # E-Mail-ID aus dem Key extrahieren (oft Teil des Dateinamens) # Verschiedene Möglichkeiten prüfen: # 1. ID ist Teil des Dateinamens if email_id in key: found_key = key break # 2. ID könnte in Message-ID der E-Mail sein - Header laden und prüfen try: # Nur Header laden für Performance response = s3_client.get_object( Bucket=bucket, Key=key, Range='bytes=0-2048' # Nur erste 2KB für Header ) header_content = response['Body'].read() # Nach Message-ID suchen if email_id.encode() in header_content: found_key = key break except Exception as header_e: # Header-Check fehlgeschlagen, weiter mit nächster E-Mail logger.debug(f"Header-Check für {key} fehlgeschlagen: {str(header_e)}") continue if found_key: break if not found_key: logger.warning(f"E-Mail mit ID {email_id} nicht gefunden") return jsonify({ "error": f"E-Mail mit ID {email_id} nicht gefunden", "status": "not_found", "email_id": email_id, "searched_bucket": bucket, "searched_prefix": prefix }), 404 logger.info(f"E-Mail gefunden: {found_key}") except Exception as search_e: logger.error(f"Fehler beim Suchen der E-Mail: {str(search_e)}") return jsonify({ "error": f"Fehler beim Suchen: {str(search_e)}", "status": "error", "email_id": email_id }), 500 # E-Mail laden und verarbeiten try: response = s3_client.get_object(Bucket=bucket, Key=found_key) email_content = response['Body'].read() logger.info(f"E-Mail geladen: {len(email_content)} Bytes") # E-Mail verarbeiten (gleiche Logik wie bei process_email) result = process_single_email( email_content=email_content, bucket=bucket, key=found_key, domain_name=domain_name ) # Performance-Metriken hinzufügen processing_time = time.time() - start_time result.update({ "processing_time_ms": round(processing_time * 1000, 2), "request_id": getattr(g, 'request_id', 'retry-request'), "email_size": len(email_content), "email_id": email_id, "s3_key": found_key, "retry": True }) # Bei erfolgreichem Processing E-Mail aus S3 löschen if result.get('action') == 'stored': try: s3_client.delete_object(Bucket=bucket, Key=found_key) logger.info(f"E-Mail nach erfolgreichem Retry aus S3 gelöscht: {found_key}") result["s3_deleted"] = True except Exception as delete_e: logger.warning(f"Konnte E-Mail nach Retry nicht löschen: {str(delete_e)}") result["s3_deleted"] = False # Logging basierend auf Ergebnis if result.get("status") == "success": logger.info(f"Retry erfolgreich für E-Mail-ID {email_id} in {processing_time:.2f}s") else: logger.warning(f"Retry für E-Mail-ID {email_id} nicht erfolgreich: {result.get('message')}") # HTTP-Status basierend auf Ergebnis if result.get("status") == "error": return jsonify(result), 500 else: return jsonify(result), 200 except Exception as process_e: logger.error(f"Fehler beim Laden/Verarbeiten der E-Mail {found_key}: {str(process_e)}") return jsonify({ "error": f"Verarbeitungsfehler: {str(process_e)}", "status": "error", "email_id": email_id, "s3_key": found_key }), 500 except Exception as e: processing_time = time.time() - start_time logger.error(f"Unerwarteter Fehler bei Retry nach {processing_time:.2f}s: {str(e)}", exc_info=True) return jsonify({ "error": "Interner Server-Fehler beim Retry", "details": str(e), "processing_time_ms": round(processing_time * 1000, 2), "request_id": getattr(g, 'request_id', 'unknown'), "email_id": email_id }), 500 @app.route('/stats', methods=['GET']) @require_token def get_stats(): """API-Statistiken und Metriken""" try: stats = { "api_version": "2.0", "uptime_seconds": int(time.time() - app.start_time) if hasattr(app, 'start_time') else 0, "request_cache": { "size": len(processed_requests), "max_size": REQUEST_CACHE_SIZE, "ttl_seconds": REQUEST_CACHE_TTL }, "configuration": { "mail_dir": MAIL_DIR, "domains": len(load_domains_config()) } } # Maildir-Statistiken try: mail_path = Path(MAIL_DIR) if mail_path.exists(): domain_stats = {} for domain_dir in mail_path.iterdir(): if domain_dir.is_dir(): user_count = len([d for d in domain_dir.iterdir() if d.is_dir()]) domain_stats[domain_dir.name] = {"users": user_count} stats["maildir_stats"] = domain_stats except Exception as e: stats["maildir_error"] = str(e) return jsonify(stats) except Exception as e: return jsonify({"error": str(e)}), 500 # Cache-Cleanup-Task (läuft alle 10 Minuten) import threading def cleanup_cache(): """Bereinigt den Request-Cache periodisch""" while True: try: current_time = time.time() cutoff_time = current_time - REQUEST_CACHE_TTL # Alte Einträge entfernen keys_to_remove = [ key for key, timestamp in processed_requests.items() if timestamp < cutoff_time ] for key in keys_to_remove: processed_requests.pop(key, None) if keys_to_remove: logger.info(f"Cache bereinigt: {len(keys_to_remove)} alte Einträge entfernt") except Exception as e: logger.error(f"Fehler bei Cache-Bereinigung: {str(e)}") # 10 Minuten warten time.sleep(600) # Hintergrund-Thread für Cache-Bereinigung starten cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True) cleanup_thread.start() if __name__ == "__main__": # Start-Zeit für Uptime-Tracking app.start_time = time.time() # Überprüfungen beim Start if not API_TOKEN: logger.warning("WARNUNG: Kein API_TOKEN definiert!") domains = load_domains_config() logger.info(f"API startet mit {len(domains)} konfigurierten Domains") # Server starten app.run(host='0.0.0.0', port=5000, debug=False)