docker/dovecot/s3_email_processor_api.py

717 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Optimierte S3 E-Mail Processor REST API
Mit Kompression, verbesserter Fehlerbehandlung und Request-Tracking
"""
import os
import time
import logging
import json
import re
import hashlib
import base64
import gzip
import boto3
from pathlib import Path
from email.parser import BytesParser
from email import policy
from dotenv import load_dotenv
from flask import Flask, request, jsonify, abort
from functools import wraps
# .env-Datei laden
load_dotenv()
# Flask-App initialisieren
app = Flask(__name__)
# Logging konfigurieren mit Request-ID-Support
class RequestIDFilter(logging.Filter):
def filter(self, record):
from flask import g
record.request_id = getattr(g, 'request_id', 'no-request')
return True
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(request_id)s] - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('s3_email_processor_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("s3-email-processor-api")
logger.addFilter(RequestIDFilter())
# Konfiguration
MAIL_DIR = os.environ.get('MAIL_DIR', './mail')
API_TOKEN = os.environ.get('API_TOKEN')
# Request-Tracking für Duplikat-Erkennung
processed_requests = {}
REQUEST_CACHE_SIZE = 1000
REQUEST_CACHE_TTL = 3600 # 1 Stunde
def require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.warning("Fehlender Authorization-Header")
abort(401, description="Fehlender Authorization-Header")
token = auth_header[7:]
if not API_TOKEN or token != API_TOKEN:
logger.warning("Ungültiger API-Token")
abort(403, description="Ungültiger API-Token")
return f(*args, **kwargs)
return decorated_function
def setup_request_context():
"""Setzt Request-Kontext für Logging auf"""
from flask import g
g.request_id = request.headers.get('X-Request-ID', f'req-{int(time.time())}-{id(request)}')
@app.before_request
def before_request():
setup_request_context()
def is_duplicate_request(request_id, etag):
"""Prüft, ob Request bereits verarbeitet wurde"""
if not request_id or not etag:
return False
key = f"{request_id}:{etag}"
current_time = time.time()
# Cache cleanup
if len(processed_requests) > REQUEST_CACHE_SIZE:
cutoff_time = current_time - REQUEST_CACHE_TTL
processed_requests.clear() # Einfache Lösung: kompletten Cache leeren
# Duplikat-Check
if key in processed_requests:
last_processed = processed_requests[key]
if current_time - last_processed < REQUEST_CACHE_TTL:
return True
# Request als verarbeitet markieren
processed_requests[key] = current_time
return False
def load_domains_config():
"""Lädt die Domain-Konfiguration aus Umgebungsvariablen"""
domains = {}
domain_index = 1
while True:
domain_key = f"DOMAIN_{domain_index}"
domain_name = os.environ.get(domain_key)
if not domain_name:
break
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
region = os.environ.get(f"{domain_key}_REGION", "us-east-2")
if bucket and usernames:
domains[domain_name.lower()] = {
"bucket": bucket,
"prefix": prefix,
"usernames": usernames.split(','),
"region": region
}
logger.info(f"Domain {domain_name} konfiguriert")
else:
logger.warning(f"Unvollständige Konfiguration für {domain_name}")
domain_index += 1
return domains
def extract_email_address(address):
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
if not address:
return None
if '@' in address and '<' not in address:
return address.strip()
match = re.search(r'<([^>]+)>', address)
if match:
return match.group(1)
return address.strip()
def is_valid_recipient(to_address, domains_config):
"""Prüft, ob die Empfängeradresse gültig ist"""
email = extract_email_address(to_address)
if not email or '@' not in email:
return False, None
username, domain = email.split('@', 1)
if domain.lower() in domains_config:
domain_config = domains_config[domain.lower()]
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
return True, domain.lower()
return False, None
def get_maildir_path(to_address, mail_dir):
"""Ermittelt den Pfad im Maildir-Format"""
email = extract_email_address(to_address)
if '@' in email:
user, domain = email.split('@', 1)
else:
return None
mail_dir_path = Path(mail_dir)
domain_dir = mail_dir_path / domain
user_dir = domain_dir / user
# Maildir-Struktur erstellen
for directory in [mail_dir_path, domain_dir, user_dir]:
directory.mkdir(parents=True, exist_ok=True)
os.chmod(directory, 0o775)
# Maildir-Unterverzeichnisse
for subdir in ['cur', 'new', 'tmp']:
subdir_path = user_dir / subdir
subdir_path.mkdir(exist_ok=True)
return user_dir
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
"""Speichert eine E-Mail im Maildir-Format"""
try:
maildir = get_maildir_path(to_address, mail_dir)
if not maildir:
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
return False
# Eindeutigen Dateinamen generieren
timestamp = int(time.time())
hostname = 'mail'
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
email_path = maildir / 'new' / filename
with open(email_path, 'wb') as f:
f.write(email_content)
os.chmod(email_path, 0o664)
logger.info(f"E-Mail gespeichert: {email_path} ({len(email_content)} Bytes)")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
return False
def process_single_email(email_content, bucket, key, domain_name):
"""Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung"""
try:
all_domains_config = load_domains_config()
domain_name = domain_name.lower()
if domain_name not in all_domains_config:
logger.error(f"Domain {domain_name} nicht konfiguriert")
return {
"action": "error",
"error": f"Domain {domain_name} nicht konfiguriert",
"status": "error"
}
# Header parsen mit besserer Fehlerbehandlung
try:
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
to_address = headers.get('To', '')
from_address = headers.get('From', '')
date = headers.get('Date', '')
message_id = headers.get('Message-ID', '')
subject = headers.get('Subject', '')
# Header-Validierung
if not to_address:
logger.warning(f"E-Mail ohne To-Header: {key}")
return {
"action": "invalid",
"error": "Fehlender To-Header",
"status": "rejected"
}
logger.info(f"Verarbeite: '{subject}' von {from_address} an {to_address}")
except Exception as e:
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
return {
"action": "invalid",
"error": f"Header-Parsing-Fehler: {str(e)}",
"status": "error"
}
# Empfänger validieren
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
if not is_valid:
logger.info(f"Ungültige Empfängeradresse: {to_address}")
return {
"action": "invalid",
"message": f"Ungültige Empfängeradresse: {to_address}",
"status": "rejected",
"recipient": to_address,
"reason": "invalid_recipient"
}
# Domain-Zugehörigkeit prüfen
if recipient_domain != domain_name:
logger.info(f"E-Mail gehört zu Domain {recipient_domain}, nicht zu {domain_name}")
return {
"action": "wrong_domain",
"message": f"E-Mail gehört zu Domain {recipient_domain}",
"status": "skipped",
"expected_domain": domain_name,
"actual_domain": recipient_domain
}
# E-Mail speichern
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
logger.info(f"E-Mail erfolgreich gespeichert für {to_address}")
return {
"action": "stored",
"message": f"E-Mail erfolgreich gespeichert",
"status": "success",
"recipient": to_address,
"sender": from_address,
"subject": subject,
"size": len(email_content)
}
else:
return {
"action": "error",
"error": "Fehler beim Speichern der E-Mail",
"status": "error"
}
except Exception as e:
logger.error(f"Unerwarteter Fehler bei E-Mail {key}: {str(e)}", exc_info=True)
return {
"action": "error",
"error": str(e),
"status": "error"
}
# API-Endpunkte
@app.route('/health', methods=['GET'])
def health_check():
"""Erweiterte Gesundheitsprüfung"""
from flask import g
# Basis-Gesundheitscheck
health_status = {
"status": "OK",
"message": "S3 E-Mail Processor API ist aktiv",
"timestamp": int(time.time()),
"version": "2.0",
"request_id": getattr(g, 'request_id', 'health-check')
}
# Erweiterte Checks
try:
# Maildir-Zugriff prüfen
mail_path = Path(MAIL_DIR)
if mail_path.exists() and mail_path.is_dir():
health_status["mail_dir"] = "accessible"
else:
health_status["mail_dir"] = "not_accessible"
health_status["status"] = "WARNING"
# Domain-Konfiguration prüfen
domains = load_domains_config()
health_status["configured_domains"] = len(domains)
health_status["domains"] = list(domains.keys())
# Cache-Status
health_status["request_cache_size"] = len(processed_requests)
except Exception as e:
health_status["status"] = "ERROR"
health_status["error"] = str(e)
return jsonify(health_status)
@app.route('/process/<domain>', methods=['POST'])
@require_token
def process_email(domain):
"""Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung"""
from flask import g
start_time = time.time()
try:
# JSON-Payload validieren
if not request.is_json:
return jsonify({"error": "Content-Type muss application/json sein"}), 400
data = request.get_json()
# Erforderliche Felder prüfen
required_fields = ['bucket', 'key', 'email_content', 'domain']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
"error": f"Fehlende Felder: {', '.join(missing_fields)}"
}), 400
# Duplikat-Check
request_id = data.get('request_id', g.request_id)
etag = data.get('etag')
if is_duplicate_request(request_id, etag):
logger.info(f"Duplikat-Request erkannt: {request_id}:{etag}")
return jsonify({
"action": "duplicate",
"message": "Request bereits verarbeitet",
"status": "skipped",
"request_id": request_id
})
# E-Mail-Content dekodieren
try:
email_base64 = data['email_content']
# Kompression-Support
if data.get('compressed', False):
compressed_data = base64.b64decode(email_base64)
email_content = gzip.decompress(compressed_data)
logger.info(f"E-Mail dekomprimiert: {data.get('compressed_size', 0)} -> {len(email_content)} Bytes")
else:
email_content = base64.b64decode(email_base64)
except Exception as e:
logger.error(f"Fehler beim Dekodieren des E-Mail-Contents: {str(e)}")
return jsonify({
"error": f"Content-Dekodierung fehlgeschlagen: {str(e)}"
}), 400
# Größen-Validierung
email_size = len(email_content)
max_size = 25 * 1024 * 1024 # 25MB
if email_size > max_size:
logger.warning(f"E-Mail zu groß: {email_size} Bytes")
return jsonify({
"action": "too_large",
"error": f"E-Mail zu groß: {email_size} Bytes (max: {max_size})",
"status": "rejected"
}), 413
# E-Mail verarbeiten
logger.info(f"Verarbeite E-Mail: {data['key']} ({email_size} Bytes)")
result = process_single_email(
email_content=email_content,
bucket=data['bucket'],
key=data['key'],
domain_name=domain
)
# Performance-Metriken hinzufügen
processing_time = time.time() - start_time
result.update({
"processing_time_ms": round(processing_time * 1000, 2),
"request_id": request_id,
"email_size": email_size
})
# Log-Level basierend auf Ergebnis
if result.get("status") == "success":
logger.info(f"E-Mail erfolgreich verarbeitet in {processing_time:.2f}s")
elif result.get("status") in ["rejected", "skipped"]:
logger.info(f"E-Mail {result.get('action')}: {result.get('message')}")
else:
logger.error(f"E-Mail-Verarbeitung fehlgeschlagen: {result.get('error')}")
# HTTP-Status basierend auf Ergebnis
if result.get("status") == "error":
return jsonify(result), 500
elif result.get("action") == "too_large":
return jsonify(result), 413
else:
return jsonify(result), 200
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Unerwarteter Fehler nach {processing_time:.2f}s: {str(e)}", exc_info=True)
return jsonify({
"error": "Interner Server-Fehler",
"details": str(e),
"processing_time_ms": round(processing_time * 1000, 2),
"request_id": getattr(g, 'request_id', 'unknown')
}), 500
@app.route('/retry/<email_id>/<domain>', methods=['GET'])
@require_token
def retry_single_email(email_id, domain):
"""
Retry-Endpunkt für einzelne E-Mail basierend auf ID
Sucht die E-Mail in S3 anhand der ID und verarbeitet sie erneut
"""
from flask import g
start_time = time.time()
logger.info(f"Retry-Request für E-Mail-ID: {email_id} in Domain: {domain}")
try:
# Domain-Konfiguration laden
all_domains_config = load_domains_config()
domain_name = domain.lower()
if domain_name not in all_domains_config:
logger.error(f"Domain {domain_name} nicht konfiguriert")
return jsonify({
"error": f"Domain {domain_name} nicht konfiguriert",
"status": "error",
"email_id": email_id
}), 400
domain_config = all_domains_config[domain_name]
bucket = domain_config["bucket"]
prefix = domain_config["prefix"]
region = domain_config["region"]
# S3-Client initialisieren
s3_client = boto3.client('s3', region_name=region)
# E-Mail anhand der ID suchen
logger.info(f"Suche E-Mail mit ID {email_id} in Bucket {bucket}")
try:
# Paginator für alle Objekte im Bucket
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
found_key = None
for page in pages:
if 'Contents' not in page:
continue
for obj in page['Contents']:
key = obj['Key']
# Verzeichnisse überspringen
if key.endswith('/'):
continue
# E-Mail-ID aus dem Key extrahieren (oft Teil des Dateinamens)
# Verschiedene Möglichkeiten prüfen:
# 1. ID ist Teil des Dateinamens
if email_id in key:
found_key = key
break
# 2. ID könnte in Message-ID der E-Mail sein - Header laden und prüfen
try:
# Nur Header laden für Performance
response = s3_client.get_object(
Bucket=bucket,
Key=key,
Range='bytes=0-2048' # Nur erste 2KB für Header
)
header_content = response['Body'].read()
# Nach Message-ID suchen
if email_id.encode() in header_content:
found_key = key
break
except Exception as header_e:
# Header-Check fehlgeschlagen, weiter mit nächster E-Mail
logger.debug(f"Header-Check für {key} fehlgeschlagen: {str(header_e)}")
continue
if found_key:
break
if not found_key:
logger.warning(f"E-Mail mit ID {email_id} nicht gefunden")
return jsonify({
"error": f"E-Mail mit ID {email_id} nicht gefunden",
"status": "not_found",
"email_id": email_id,
"searched_bucket": bucket,
"searched_prefix": prefix
}), 404
logger.info(f"E-Mail gefunden: {found_key}")
except Exception as search_e:
logger.error(f"Fehler beim Suchen der E-Mail: {str(search_e)}")
return jsonify({
"error": f"Fehler beim Suchen: {str(search_e)}",
"status": "error",
"email_id": email_id
}), 500
# E-Mail laden und verarbeiten
try:
response = s3_client.get_object(Bucket=bucket, Key=found_key)
email_content = response['Body'].read()
logger.info(f"E-Mail geladen: {len(email_content)} Bytes")
# E-Mail verarbeiten (gleiche Logik wie bei process_email)
result = process_single_email(
email_content=email_content,
bucket=bucket,
key=found_key,
domain_name=domain_name
)
# Performance-Metriken hinzufügen
processing_time = time.time() - start_time
result.update({
"processing_time_ms": round(processing_time * 1000, 2),
"request_id": getattr(g, 'request_id', 'retry-request'),
"email_size": len(email_content),
"email_id": email_id,
"s3_key": found_key,
"retry": True
})
# Bei erfolgreichem Processing E-Mail aus S3 löschen
if result.get('action') == 'stored':
try:
s3_client.delete_object(Bucket=bucket, Key=found_key)
logger.info(f"E-Mail nach erfolgreichem Retry aus S3 gelöscht: {found_key}")
result["s3_deleted"] = True
except Exception as delete_e:
logger.warning(f"Konnte E-Mail nach Retry nicht löschen: {str(delete_e)}")
result["s3_deleted"] = False
# Logging basierend auf Ergebnis
if result.get("status") == "success":
logger.info(f"Retry erfolgreich für E-Mail-ID {email_id} in {processing_time:.2f}s")
else:
logger.warning(f"Retry für E-Mail-ID {email_id} nicht erfolgreich: {result.get('message')}")
# HTTP-Status basierend auf Ergebnis
if result.get("status") == "error":
return jsonify(result), 500
else:
return jsonify(result), 200
except Exception as process_e:
logger.error(f"Fehler beim Laden/Verarbeiten der E-Mail {found_key}: {str(process_e)}")
return jsonify({
"error": f"Verarbeitungsfehler: {str(process_e)}",
"status": "error",
"email_id": email_id,
"s3_key": found_key
}), 500
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Unerwarteter Fehler bei Retry nach {processing_time:.2f}s: {str(e)}", exc_info=True)
return jsonify({
"error": "Interner Server-Fehler beim Retry",
"details": str(e),
"processing_time_ms": round(processing_time * 1000, 2),
"request_id": getattr(g, 'request_id', 'unknown'),
"email_id": email_id
}), 500
@app.route('/stats', methods=['GET'])
@require_token
def get_stats():
"""API-Statistiken und Metriken"""
try:
stats = {
"api_version": "2.0",
"uptime_seconds": int(time.time() - app.start_time) if hasattr(app, 'start_time') else 0,
"request_cache": {
"size": len(processed_requests),
"max_size": REQUEST_CACHE_SIZE,
"ttl_seconds": REQUEST_CACHE_TTL
},
"configuration": {
"mail_dir": MAIL_DIR,
"domains": len(load_domains_config())
}
}
# Maildir-Statistiken
try:
mail_path = Path(MAIL_DIR)
if mail_path.exists():
domain_stats = {}
for domain_dir in mail_path.iterdir():
if domain_dir.is_dir():
user_count = len([d for d in domain_dir.iterdir() if d.is_dir()])
domain_stats[domain_dir.name] = {"users": user_count}
stats["maildir_stats"] = domain_stats
except Exception as e:
stats["maildir_error"] = str(e)
return jsonify(stats)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Cache-Cleanup-Task (läuft alle 10 Minuten)
import threading
def cleanup_cache():
"""Bereinigt den Request-Cache periodisch"""
while True:
try:
current_time = time.time()
cutoff_time = current_time - REQUEST_CACHE_TTL
# Alte Einträge entfernen
keys_to_remove = [
key for key, timestamp in processed_requests.items()
if timestamp < cutoff_time
]
for key in keys_to_remove:
processed_requests.pop(key, None)
if keys_to_remove:
logger.info(f"Cache bereinigt: {len(keys_to_remove)} alte Einträge entfernt")
except Exception as e:
logger.error(f"Fehler bei Cache-Bereinigung: {str(e)}")
# 10 Minuten warten
time.sleep(600)
# Hintergrund-Thread für Cache-Bereinigung starten
cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True)
cleanup_thread.start()
if __name__ == "__main__":
# Start-Zeit für Uptime-Tracking
app.start_time = time.time()
# Überprüfungen beim Start
if not API_TOKEN:
logger.warning("WARNUNG: Kein API_TOKEN definiert!")
domains = load_domains_config()
logger.info(f"API startet mit {len(domains)} konfigurierten Domains")
# Server starten
app.run(host='0.0.0.0', port=5000, debug=False)