#!/usr/bin/env python3 """ S3 E-Mail Downloader REST API Diese API ermöglicht den Abruf von E-Mails aus S3-Buckets für eine bestimmte Domain. Die E-Mails werden im Maildir-Format für Dovecot gespeichert. Endpunkte: GET /health - API-Statusprüfung POST /sync/{domain} - Synchronisiert E-Mails für eine bestimmte Domain Authentifizierung: Der Endpunkt /sync/{domain} erfordert einen gültigen API-Token im Header. """ import boto3 import os import time import logging import json import re import hashlib import sys from pathlib import Path from email.parser import BytesParser from email import policy from dotenv import load_dotenv from flask import Flask, request, jsonify, abort from functools import wraps # .env-Datei laden load_dotenv() # Flask-App initialisieren app = Flask(__name__) # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('s3_email_downloader_api.log'), logging.StreamHandler() ] ) logger = logging.getLogger("s3-email-downloader-api") # Hauptkonfiguration MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region API_TOKEN = os.environ.get('API_TOKEN') # API-Token für die Authentifizierung # Status-Datei für die Synchronisation STATUS_FILE = Path('sync_status.json') # Token-Authentifizierung def require_token(f): @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): logger.warning("Fehlender oder falsch formatierter Authorization-Header") abort(401, description="Fehlender Authorization-Header") token = auth_header[7:] # "Bearer " entfernen if not API_TOKEN or token != API_TOKEN: logger.warning("Ungültiger API-Token") abort(403, description="Ungültiger API-Token") return f(*args, **kwargs) return decorated_function def load_domains_config(): """Lädt die Domain-Konfiguration aus Umgebungsvariablen""" domains = {} # Konfiguration aus Umgebungsvariablen laden domain_index = 1 while True: domain_key = f"DOMAIN_{domain_index}" domain_name = os.environ.get(domain_key) if not domain_name: break # Keine weitere Domain-Definition gefunden bucket = os.environ.get(f"{domain_key}_BUCKET", "") prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/") usernames = os.environ.get(f"{domain_key}_USERNAMES", "") region = os.environ.get(f"{domain_key}_REGION", AWS_REGION) if bucket and usernames: domains[domain_name.lower()] = { "bucket": bucket, "prefix": prefix, "usernames": usernames.split(','), "region": region } logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert") else: logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen") domain_index += 1 return domains def load_sync_status(): """Lädt den letzten Synchronisationsstatus""" if STATUS_FILE.exists(): try: with open(STATUS_FILE, 'r') as f: status = json.load(f) return status.get('last_sync', {}) except Exception as e: logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}") return {} def save_sync_status(last_sync): """Speichert den aktuellen Synchronisationsstatus""" try: with open(STATUS_FILE, 'w') as f: json.dump({ 'last_sync': last_sync, 'last_sync_time': time.time() }, f) except Exception as e: logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}") def extract_email_address(address): """Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat""" if not address: return None # Einfacher Fall: nur E-Mail-Adresse if '@' in address and '<' not in address: return address.strip() # Komplexer Fall: "Name " match = re.search(r'<([^>]+)>', address) if match: return match.group(1) # Fallback return address.strip() def is_valid_recipient(to_address, domains_config): """ Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username) Gibt ein Tuple zurück: (ist_gültig, domain_name) """ email = extract_email_address(to_address) # E-Mail-Adresse aufteilen if not email or '@' not in email: return False, None username, domain = email.split('@', 1) # Prüfen, ob die Domain in der Konfiguration existiert if domain.lower() in domains_config: domain_config = domains_config[domain.lower()] # Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist if username.lower() in [u.lower() for u in domain_config["usernames"]]: return True, domain.lower() return False, None def get_maildir_path(to_address, mail_dir): """ Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse Format: {mail_dir}/domain.com/user/ """ email = extract_email_address(to_address) # E-Mail-Adresse aufteilen if '@' in email: user, domain = email.split('@', 1) else: return None # Ungültige E-Mail-Adresse # Pfad erstellen mail_dir_path = Path(mail_dir) domain_dir = mail_dir_path / domain user_dir = domain_dir / user # Maildir-Struktur sicherstellen for directory in [mail_dir_path, domain_dir, user_dir]: directory.mkdir(parents=True, exist_ok=True) os.chmod(directory, 0o775) # rwxrwxr-x # Maildir-Unterverzeichnisse for subdir in ['cur', 'new', 'tmp']: #(user_dir / subdir).mkdir(exist_ok=True) subdir_path = user_dir / subdir subdir_path.mkdir(exist_ok=True) return user_dir def store_email(email_content, to_address, message_id, s3_key, mail_dir): """Speichert eine E-Mail im Maildir-Format""" try: # Maildir-Pfad ermitteln maildir = get_maildir_path(to_address, mail_dir) if not maildir: logger.error(f"Konnte Maildir für {to_address} nicht ermitteln") return False # Eindeutigen Dateinamen generieren timestamp = int(time.time()) hostname = 'mail' unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest() # Maildir-Dateiname im Format: timestamp.unique_id.hostname:2, filename = f"{timestamp}.{unique_id}.{hostname}:2," # E-Mail in "new" speichern email_path = maildir / 'new' / filename with open(email_path, 'wb') as f: f.write(email_content) logger.info(f"E-Mail gespeichert: {email_path}") return True except Exception as e: logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}") return False def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info, auto_delete=True): """Löscht E-Mails aus dem S3-Bucket""" if not emails_to_delete: return 0 # Für API immer automatisch löschen, wenn der Parameter gesetzt ist if not auto_delete: logger.info(f"Automatisches Löschen deaktiviert, {len(emails_to_delete)} E-Mails werden nicht gelöscht.") return 0 # Löschung durchführen deleted_count = 0 for key in emails_to_delete: try: s3_client.delete_object(Bucket=bucket, Key=key) logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}") deleted_count += 1 except Exception as e: logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}") return deleted_count def process_domain(domain_name, auto_delete=True): """Verarbeitet eine einzelne Domain""" # Domain-Konfigurationen laden all_domains_config = load_domains_config() # Prüfen, ob die Domain konfiguriert ist domain_name = domain_name.lower() if domain_name not in all_domains_config: logger.error(f"Domain {domain_name} nicht konfiguriert") return { "error": f"Domain {domain_name} nicht konfiguriert", "status": "error" } domain_config = all_domains_config[domain_name] bucket = domain_config["bucket"] prefix = domain_config["prefix"] region = domain_config["region"] logger.info(f"Verarbeite Domain: {domain_name}") logger.info(f" Bucket: {bucket}") logger.info(f" Präfix: {prefix}") logger.info(f" Region: {region}") logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}") logger.info(f" Automatisches Löschen: {auto_delete}") try: # S3-Client initialisieren s3 = boto3.client('s3', region_name=region) # Letzten Synchronisationsstatus laden last_sync = load_sync_status() # Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden paginator = s3.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket, Prefix=prefix) new_emails = 0 total_emails = 0 emails_to_delete = [] email_info = {} domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")} # Alle Seiten durchlaufen for page in pages: if 'Contents' not in page: continue objects = page['Contents'] total_emails += len(objects) for obj in objects: key = obj['Key'] # Verzeichnisse überspringen if key.endswith('/'): continue # Eindeutiger Key für die Synchronisation sync_key = f"{bucket}:{key}" # Prüfen, ob die E-Mail bereits synchronisiert wurde if sync_key in domain_last_sync: logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen") continue try: # E-Mail aus S3 laden response = s3.get_object(Bucket=bucket, Key=key) email_content = response['Body'].read() # Header parsen try: headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) to_address = headers.get('To', '') from_address = headers.get('From', '') date = headers.get('Date', '') message_id = headers.get('Message-ID', '') # E-Mail-Informationen speichern email_info[key] = { 'to': to_address, 'from': from_address, 'date': date } # Alle Domains-Konfigurationen für die Validierung verwenden is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config) if is_valid: logger.info(f"Gültige E-Mail für: {to_address}") # Nur speichern, wenn die E-Mail zur aktuellen Domain gehört if recipient_domain == domain_name: # E-Mail speichern if store_email(email_content, to_address, message_id, key, MAIL_DIR): # Status aktualisieren last_sync[sync_key] = { 'timestamp': time.time(), 'to': to_address, 'message_id': message_id, 'domain': domain_name, 'bucket': bucket } new_emails += 1 logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})") # Status nach jeweils 10 E-Mails speichern if new_emails % 10 == 0: save_sync_status(last_sync) logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher") else: # Gültige E-Mail, aber für eine andere Domain - nicht löschen! logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)") else: logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}") emails_to_delete.append(key) except Exception as e: logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}") emails_to_delete.append(key) except Exception as e: logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}") # Status speichern save_sync_status(last_sync) # Ungültige E-Mails löschen deleted_count = 0 if emails_to_delete: deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info, auto_delete) logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}") logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.") return { "status": "success", "domain": domain_name, "total_emails": total_emails, "new_emails": new_emails, "emails_to_delete": len(emails_to_delete), "deleted_emails": deleted_count } except Exception as e: logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}") import traceback logger.error(traceback.format_exc()) return { "status": "error", "domain": domain_name, "error": str(e), "traceback": traceback.format_exc() } # API-Endpunkte @app.route('/health', methods=['GET']) def health_check(): """Gesundheitsprüfung für die API""" return jsonify({ "status": "OK", "message": "S3 E-Mail Downloader API ist aktiv" }) @app.route('/sync/', methods=['POST']) @require_token def sync_domain(domain): """Synchronisiert E-Mails für eine bestimmte Domain""" # Automatisches Löschen aus den Anfrageparametern auto_delete = request.args.get('auto_delete', 'true').lower() in ['true', '1', 't', 'y', 'yes'] # Domain-Verarbeitung starten result = process_domain(domain, auto_delete) # Erfolg oder Fehler zurückgeben if result.get("status") == "error": return jsonify(result), 500 else: return jsonify(result) if __name__ == "__main__": # Überprüfen, ob ein API-Token gesetzt ist if not API_TOKEN: logger.warning("WARNUNG: Kein API_TOKEN in Umgebungsvariablen definiert. API ist ungeschützt!") # Flask-Server starten app.run(host='0.0.0.0', port=5000, debug=False)