#!/usr/bin/env python3 """ Multi-Domain S3 E-Mail Downloader Script Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen Bucket-Einstellungen und Benutzerlisten. Nutzung: python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen """ import boto3 import os import time import logging import json import re import hashlib import sys from pathlib import Path from email.parser import BytesParser from email import policy from dotenv import load_dotenv # .env-Datei laden load_dotenv() # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('s3_email_downloader.log'), logging.StreamHandler() ] ) logger = logging.getLogger("s3-email-downloader") # Hauptkonfiguration MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden # Status-Datei für die Synchronisation STATUS_FILE = Path('sync_status.json') # Prüfen, ob automatische Löschung aktiviert ist AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y' # Multi-Domain-Konfiguration # Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json') def load_domains_config(): """Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen""" domains = {} # Versuchen, Konfiguration aus JSON-Datei zu laden config_file = Path(DOMAINS_CONFIG_FILE) if config_file.exists(): try: with open(config_file, 'r') as f: domains = json.load(f) logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen") return domains except Exception as e: logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}") # Fallback: Konfiguration aus Umgebungsvariablen # Format für Umgebungsvariablen: # DOMAIN_1=bizmatch.net # DOMAIN_1_BUCKET=bizmatch-emails # DOMAIN_1_PREFIX=emails/ # DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3 # DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION domain_index = 1 while True: domain_key = f"DOMAIN_{domain_index}" domain_name = os.environ.get(domain_key) if not domain_name: break # Keine weitere Domain-Definition gefunden bucket = os.environ.get(f"{domain_key}_BUCKET", "") prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/") usernames = os.environ.get(f"{domain_key}_USERNAMES", "") region = os.environ.get(f"{domain_key}_REGION", AWS_REGION) if bucket and usernames: domains[domain_name] = { "bucket": bucket, "prefix": prefix, "usernames": usernames.split(','), "region": region } logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert") else: logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen") domain_index += 1 # Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen if not domains: old_domain = os.environ.get('VALID_DOMAIN', '') old_bucket = os.environ.get('S3_BUCKET', '') old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/') old_usernames = os.environ.get('VALID_USERNAMES', '') if old_domain and old_bucket and old_usernames: domains[old_domain] = { "bucket": old_bucket, "prefix": old_prefix, "usernames": old_usernames.split(','), "region": AWS_REGION } logger.info(f"Alte Konfiguration für Domain {old_domain} geladen") return domains def load_sync_status(): """Lädt den letzten Synchronisationsstatus""" if STATUS_FILE.exists(): try: with open(STATUS_FILE, 'r') as f: status = json.load(f) return status.get('last_sync', {}) except Exception as e: logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}") return {} def save_sync_status(last_sync): """Speichert den aktuellen Synchronisationsstatus""" try: with open(STATUS_FILE, 'w') as f: json.dump({ 'last_sync': last_sync, 'last_sync_time': time.time() }, f) except Exception as e: logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}") def extract_email_address(address): """Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat""" if not address: return None # Einfacher Fall: nur E-Mail-Adresse if '@' in address and '<' not in address: return address.strip() # Komplexer Fall: "Name " match = re.search(r'<([^>]+)>', address) if match: return match.group(1) # Fallback return address.strip() def is_valid_recipient(to_address, domains_config): """ Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username) Gibt ein Tuple zurück: (ist_gültig, domain_name) """ email = extract_email_address(to_address) # E-Mail-Adresse aufteilen if not email or '@' not in email: return False, None username, domain = email.split('@', 1) # Prüfen, ob die Domain in der Konfiguration existiert if domain.lower() in domains_config: domain_config = domains_config[domain.lower()] # Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist if username.lower() in [u.lower() for u in domain_config["usernames"]]: return True, domain.lower() return False, None def get_maildir_path(to_address, mail_dir): """ Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse Format: {mail_dir}/domain.com/user/ """ email = extract_email_address(to_address) # E-Mail-Adresse aufteilen if '@' in email: user, domain = email.split('@', 1) else: return None # Ungültige E-Mail-Adresse # Pfad erstellen mail_dir_path = Path(mail_dir) domain_dir = mail_dir_path / domain user_dir = domain_dir / user # Maildir-Struktur sicherstellen for directory in [mail_dir_path, domain_dir, user_dir]: directory.mkdir(parents=True, exist_ok=True) # Maildir-Unterverzeichnisse for subdir in ['cur', 'new', 'tmp']: (user_dir / subdir).mkdir(exist_ok=True) return user_dir def store_email(email_content, to_address, message_id, s3_key, mail_dir): """Speichert eine E-Mail im Maildir-Format""" try: # Maildir-Pfad ermitteln maildir = get_maildir_path(to_address, mail_dir) if not maildir: logger.error(f"Konnte Maildir für {to_address} nicht ermitteln") return False # Eindeutigen Dateinamen generieren timestamp = int(time.time()) hostname = 'mail' unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest() # Maildir-Dateiname im Format: timestamp.unique_id.hostname:2, filename = f"{timestamp}.{unique_id}.{hostname}:2," # E-Mail in "new" speichern email_path = maildir / 'new' / filename with open(email_path, 'wb') as f: f.write(email_content) logger.info(f"E-Mail gespeichert: {email_path}") return True except Exception as e: logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}") return False def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info): """Löscht E-Mails aus dem S3-Bucket""" if not emails_to_delete: return 0 if not AUTO_DELETE: # Bestätigung einholen print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:") for key in emails_to_delete: info = email_info.get(key, {}) from_addr = info.get('from', 'Unbekannt') to_addr = info.get('to', 'Unbekannt') date = info.get('date', 'Unbekannt') print(f" - {key}") print(f" Von: {from_addr}") print(f" An: {to_addr}") print(f" Datum: {date}\n") confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ") if confirmation.lower() not in ['j', 'ja', 'y', 'yes']: logger.info("Löschung abgebrochen") return 0 # Löschung durchführen deleted_count = 0 for key in emails_to_delete: try: s3_client.delete_object(Bucket=bucket, Key=key) logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}") deleted_count += 1 except Exception as e: logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}") return deleted_count def process_domain(domain_name, domain_config, last_sync, all_domains_config): """Verarbeitet eine einzelne Domain""" bucket = domain_config["bucket"] prefix = domain_config["prefix"] region = domain_config["region"] logger.info(f"Verarbeite Domain: {domain_name}") logger.info(f" Bucket: {bucket}") logger.info(f" Präfix: {prefix}") logger.info(f" Region: {region}") logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}") try: # S3-Client initialisieren s3 = boto3.client('s3', region_name=region) # Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden paginator = s3.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket, Prefix=prefix) new_emails = 0 total_emails = 0 emails_to_delete = [] email_info = {} domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")} # Alle Seiten durchlaufen for page in pages: if 'Contents' not in page: continue objects = page['Contents'] total_emails += len(objects) for obj in objects: key = obj['Key'] # Verzeichnisse überspringen if key.endswith('/'): continue # Eindeutiger Key für die Synchronisation sync_key = f"{bucket}:{key}" # Prüfen, ob die E-Mail bereits synchronisiert wurde if sync_key in domain_last_sync: logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen") continue try: # E-Mail aus S3 laden response = s3.get_object(Bucket=bucket, Key=key) email_content = response['Body'].read() # Header parsen try: headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) to_address = headers.get('To', '') from_address = headers.get('From', '') date = headers.get('Date', '') message_id = headers.get('Message-ID', '') # E-Mail-Informationen speichern email_info[key] = { 'to': to_address, 'from': from_address, 'date': date } # Alle Domains-Konfigurationen für die Validierung verwenden # BUGFIX: Hier prüfen wir gegen ALLE konfigurierten Domains is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config) if is_valid: logger.info(f"Gültige E-Mail für: {to_address}") # Nur speichern, wenn die E-Mail zur aktuellen Domain gehört if recipient_domain == domain_name: # E-Mail speichern if store_email(email_content, to_address, message_id, key, MAIL_DIR): # Status aktualisieren last_sync[sync_key] = { 'timestamp': time.time(), 'to': to_address, 'message_id': message_id, 'domain': domain_name, 'bucket': bucket } new_emails += 1 logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})") # Status nach jeweils 10 E-Mails speichern if new_emails % 10 == 0: save_sync_status(last_sync) logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher") else: # Gültige E-Mail, aber für eine andere Domain - nicht löschen! logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)") else: logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}") emails_to_delete.append(key) except Exception as e: logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}") emails_to_delete.append(key) except Exception as e: logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}") # Ungültige E-Mails löschen if emails_to_delete: deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info) logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}") logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.") return new_emails, total_emails except Exception as e: logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}") import traceback logger.error(traceback.format_exc()) return 0, 0 def main(): """Hauptfunktion""" logger.info("Multi-Domain S3 E-Mail Downloader gestartet") logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen") logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}") try: # Domain-Konfigurationen laden domains_config = load_domains_config() if not domains_config: logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.") return logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}") # Letzten Synchronisationsstatus laden last_sync = load_sync_status() # Jede Domain einzeln verarbeiten total_new_emails = 0 total_all_emails = 0 for domain_name, domain_config in domains_config.items(): # BUGFIX: all_domains_config übergeben statt nur der aktuellen Domain-Konfiguration new_emails, all_emails = process_domain(domain_name, domain_config, last_sync, domains_config) total_new_emails += new_emails total_all_emails += all_emails # Nach jeder Domain den Status speichern save_sync_status(last_sync) logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.") except Exception as e: logger.error(f"Fehler: {str(e)}") import traceback logger.error(traceback.format_exc()) if __name__ == "__main__": main()