#!/usr/bin/env python3 """ S3 E-Mail Downloader Script Dieses Script lädt E-Mails aus einem S3-Bucket herunter und speichert sie im Maildir-Format für Dovecot. Es berücksichtigt die Docker-Container-Umgebung. Verwendung: python3 s3_email_downloader.py Umgebungsvariablen: AWS_ACCESS_KEY_ID: AWS Zugriffsschlüssel AWS_SECRET_ACCESS_KEY: AWS geheimer Zugriffsschlüssel AWS_REGION: AWS-Region (Standard: us-east-2) S3_BUCKET: S3-Bucket-Name EMAIL_PREFIX: Präfix für E-Mails im S3-Bucket (Standard: emails/) MAIL_DIR: Lokales Verzeichnis für E-Mails (Standard: ./data/mail) """ import boto3 import os import time import logging import json import re import hashlib from pathlib import Path from email.parser import BytesParser from email import policy from dotenv import load_dotenv load_dotenv() # .env-Datei laden # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('s3_email_downloader.log'), logging.StreamHandler() ] ) logger = logging.getLogger("s3-email-downloader") # Konfiguration aus Umgebungsvariablen AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') S3_BUCKET = os.environ.get('S3_BUCKET', '') EMAIL_PREFIX = os.environ.get('EMAIL_PREFIX', 'emails/') MAIL_DIR = os.environ.get('MAIL_DIR', './data/mail') # Pfad zum Docker-Volume # Status-Datei für die Synchronisation STATUS_FILE = Path('sync_status.json') def load_sync_status(): """Lädt den letzten Synchronisationsstatus""" if STATUS_FILE.exists(): try: with open(STATUS_FILE, 'r') as f: status = json.load(f) return status.get('last_sync', {}) except Exception as e: logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}") return {} def save_sync_status(last_sync): """Speichert den aktuellen Synchronisationsstatus""" try: with open(STATUS_FILE, 'w') as f: json.dump({ 'last_sync': last_sync, 'last_sync_time': time.time() }, f) except Exception as e: logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}") def extract_email_address(address): """Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat""" if not address: return 'unknown@bizmatch.net' # Einfacher Fall: nur E-Mail-Adresse if '@' in address and '<' not in address: return address.strip() # Komplexer Fall: "Name " match = re.search(r'<([^>]+)>', address) if match: return match.group(1) # Fallback return address.strip() def get_maildir_path(to_address, mail_dir): """ Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse Format: {mail_dir}/domain.com/user/ """ email = extract_email_address(to_address) # E-Mail-Adresse aufteilen if '@' in email: user, domain = email.split('@', 1) else: user, domain = email, 'bizmatch.net' # Pfad erstellen mail_dir_path = Path(mail_dir) domain_dir = mail_dir_path / domain user_dir = domain_dir / user # Maildir-Struktur sicherstellen for directory in [mail_dir_path, domain_dir, user_dir]: directory.mkdir(parents=True, exist_ok=True) # Maildir-Unterverzeichnisse for subdir in ['cur', 'new', 'tmp']: (user_dir / subdir).mkdir(exist_ok=True) return user_dir def store_email(email_content, to_address, message_id, s3_key, mail_dir): """Speichert eine E-Mail im Maildir-Format""" try: # Maildir-Pfad ermitteln maildir = get_maildir_path(to_address, mail_dir) # Eindeutigen Dateinamen generieren timestamp = int(time.time()) hostname = 'bizmatch' unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest() # Maildir-Dateiname im Format: timestamp.unique_id.hostname:2, filename = f"{timestamp}.{unique_id}.{hostname}:2," # E-Mail in "new" speichern email_path = maildir / 'new' / filename with open(email_path, 'wb') as f: f.write(email_content) logger.info(f"E-Mail gespeichert: {email_path}") return True except Exception as e: logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}") return False def main(): """Hauptfunktion""" # Prüfen, ob die erforderlichen Umgebungsvariablen gesetzt sind if not S3_BUCKET: logger.error("S3_BUCKET Umgebungsvariable nicht gesetzt") return logger.info(f"S3 E-Mail Downloader gestartet. Bucket: {S3_BUCKET}, Präfix: {EMAIL_PREFIX}") logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen") try: # S3-Client initialisieren s3 = boto3.client('s3', region_name=AWS_REGION) # Letzten Synchronisationsstatus laden last_sync = load_sync_status() # Alle E-Mails im Bucket auflisten paginator = s3.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=EMAIL_PREFIX) new_emails = 0 for page in pages: if 'Contents' not in page: logger.error(f"Keine 'Contents' in der Seite gefunden") else: logger.info(f"Gefunden: {len(page['Contents'])} Objekte in der Seite") # Ersten 5 Objekte anzeigen for i, obj in enumerate(page['Contents'][:5]): logger.info(f"Objekt {i}: {obj['Key']}") for obj in page['Contents']: key = obj['Key'] if not key.endswith('/'): continue logger.info(f"Verarbeite Objekt mit Key: {key}") # Prüfen, ob die E-Mail bereits synchronisiert wurde if key in last_sync: logger.info(f"E-Mail {key} bereits synchronisiert - ÜBERSPRUNGEN") continue else: logger.info(f"E-Mail {key} wird heruntergeladen...") try: # E-Mail aus S3 laden logger.info(f"Versuche, E-Mail aus S3 zu laden: {key}") response = s3.get_object(Bucket=S3_BUCKET, Key=key) email_content = response['Body'].read() logger.info(f"E-Mail erfolgreich geladen, Größe: {len(email_content)} Bytes") # Debug: Erste 100 Bytes anzeigen logger.info(f"E-Mail-Anfang: {email_content[:100]}") # Header parsen headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) # Empfänger-Adresse extrahieren to_address = headers.get('To', '') message_id = headers.get('Message-ID', '') logger.info(f"Empfänger: '{to_address}', Message-ID: '{message_id}'") # Überprüfe MAIL_DIR logger.info(f"MAIL_DIR ist: {MAIL_DIR}") mail_dir_path = Path(MAIL_DIR) logger.info(f"MAIL_DIR existiert: {mail_dir_path.exists()}, ist Verzeichnis: {mail_dir_path.is_dir() if mail_dir_path.exists() else 'N/A'}") # E-Mail speichern result = store_email(email_content, to_address, message_id, key, MAIL_DIR) logger.info(f"Ergebnis des Speicherns: {result}") if result: # Status aktualisieren last_sync[key] = { 'timestamp': time.time(), 'to': to_address, 'message_id': message_id } new_emails += 1 logger.info(f"E-Mail {key} erfolgreich synchronisiert") else: logger.error(f"E-Mail {key} konnte nicht gespeichert werden") except Exception as e: logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}") import traceback logger.error(traceback.format_exc()) try: # E-Mail aus S3 laden response = s3.get_object(Bucket=S3_BUCKET, Key=key) email_content = response['Body'].read() # Header parsen headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) # Empfänger-Adresse extrahieren to_address = headers.get('To', '') message_id = headers.get('Message-ID', '') # E-Mail speichern if store_email(email_content, to_address, message_id, key, MAIL_DIR): # Status aktualisieren last_sync[key] = { 'timestamp': time.time(), 'to': to_address, 'message_id': message_id } new_emails += 1 except Exception as e: logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}") # Status speichern logger.info(f"Aktueller Status enthält {len(last_sync)} E-Mails") save_sync_status(last_sync) logger.info(f"{new_emails} neue E-Mails heruntergeladen") except Exception as e: logger.error(f"Fehler: {str(e)}") if __name__ == "__main__": main()