From 218e8063368056ac83f3d0ea4dab38cf38df3015 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Mon, 17 Mar 2025 15:40:42 +0100 Subject: [PATCH] downloader for many domains --- dovecot/s3_email_downloader1.py | 433 ++++++++++++++++++++++++++++++++ 1 file changed, 433 insertions(+) create mode 100644 dovecot/s3_email_downloader1.py diff --git a/dovecot/s3_email_downloader1.py b/dovecot/s3_email_downloader1.py new file mode 100644 index 0000000..dfb900e --- /dev/null +++ b/dovecot/s3_email_downloader1.py @@ -0,0 +1,433 @@ +#!/usr/bin/env python3 +""" +Multi-Domain S3 E-Mail Downloader Script + +Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im +Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen +Bucket-Einstellungen und Benutzerlisten. + +Nutzung: + python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen + python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen +""" + +import boto3 +import os +import time +import logging +import json +import re +import hashlib +import sys +from pathlib import Path +from email.parser import BytesParser +from email import policy +from dotenv import load_dotenv + +# .env-Datei laden +load_dotenv() + +# Logging konfigurieren +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('s3_email_downloader.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger("s3-email-downloader") + +# Hauptkonfiguration +MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume +AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden + +# Status-Datei für die Synchronisation +STATUS_FILE = Path('sync_status.json') + +# Prüfen, ob automatische Löschung aktiviert ist +AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y' + +# Multi-Domain-Konfiguration +# Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen +DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json') + +def load_domains_config(): + """Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen""" + domains = {} + + # Versuchen, Konfiguration aus JSON-Datei zu laden + config_file = Path(DOMAINS_CONFIG_FILE) + if config_file.exists(): + try: + with open(config_file, 'r') as f: + domains = json.load(f) + logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen") + return domains + except Exception as e: + logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}") + + # Fallback: Konfiguration aus Umgebungsvariablen + # Format für Umgebungsvariablen: + # DOMAIN_1=bizmatch.net + # DOMAIN_1_BUCKET=bizmatch-emails + # DOMAIN_1_PREFIX=emails/ + # DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3 + # DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION + + domain_index = 1 + while True: + domain_key = f"DOMAIN_{domain_index}" + domain_name = os.environ.get(domain_key) + + if not domain_name: + break # Keine weitere Domain-Definition gefunden + + bucket = os.environ.get(f"{domain_key}_BUCKET", "") + prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/") + usernames = os.environ.get(f"{domain_key}_USERNAMES", "") + region = os.environ.get(f"{domain_key}_REGION", AWS_REGION) + + if bucket and usernames: + domains[domain_name] = { + "bucket": bucket, + "prefix": prefix, + "usernames": usernames.split(','), + "region": region + } + logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert") + else: + logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen") + + domain_index += 1 + + # Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen + if not domains: + old_domain = os.environ.get('VALID_DOMAIN', '') + old_bucket = os.environ.get('S3_BUCKET', '') + old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/') + old_usernames = os.environ.get('VALID_USERNAMES', '') + + if old_domain and old_bucket and old_usernames: + domains[old_domain] = { + "bucket": old_bucket, + "prefix": old_prefix, + "usernames": old_usernames.split(','), + "region": AWS_REGION + } + logger.info(f"Alte Konfiguration für Domain {old_domain} geladen") + + return domains + +def load_sync_status(): + """Lädt den letzten Synchronisationsstatus""" + if STATUS_FILE.exists(): + try: + with open(STATUS_FILE, 'r') as f: + status = json.load(f) + return status.get('last_sync', {}) + except Exception as e: + logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}") + + return {} + +def save_sync_status(last_sync): + """Speichert den aktuellen Synchronisationsstatus""" + try: + with open(STATUS_FILE, 'w') as f: + json.dump({ + 'last_sync': last_sync, + 'last_sync_time': time.time() + }, f) + except Exception as e: + logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}") + +def extract_email_address(address): + """Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat""" + if not address: + return None + + # Einfacher Fall: nur E-Mail-Adresse + if '@' in address and '<' not in address: + return address.strip() + + # Komplexer Fall: "Name " + match = re.search(r'<([^>]+)>', address) + if match: + return match.group(1) + + # Fallback + return address.strip() + +def is_valid_recipient(to_address, domains_config): + """ + Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username) + Gibt ein Tuple zurück: (ist_gültig, domain_name) + """ + email = extract_email_address(to_address) + + # E-Mail-Adresse aufteilen + if not email or '@' not in email: + return False, None + + username, domain = email.split('@', 1) + + # Prüfen, ob die Domain in der Konfiguration existiert + if domain.lower() in domains_config: + domain_config = domains_config[domain.lower()] + # Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist + if username.lower() in [u.lower() for u in domain_config["usernames"]]: + return True, domain.lower() + + return False, None + +def get_maildir_path(to_address, mail_dir): + """ + Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse + Format: {mail_dir}/domain.com/user/ + """ + email = extract_email_address(to_address) + + # E-Mail-Adresse aufteilen + if '@' in email: + user, domain = email.split('@', 1) + else: + return None # Ungültige E-Mail-Adresse + + # Pfad erstellen + mail_dir_path = Path(mail_dir) + domain_dir = mail_dir_path / domain + user_dir = domain_dir / user + + # Maildir-Struktur sicherstellen + for directory in [mail_dir_path, domain_dir, user_dir]: + directory.mkdir(parents=True, exist_ok=True) + + # Maildir-Unterverzeichnisse + for subdir in ['cur', 'new', 'tmp']: + (user_dir / subdir).mkdir(exist_ok=True) + + return user_dir + +def store_email(email_content, to_address, message_id, s3_key, mail_dir): + """Speichert eine E-Mail im Maildir-Format""" + try: + # Maildir-Pfad ermitteln + maildir = get_maildir_path(to_address, mail_dir) + if not maildir: + logger.error(f"Konnte Maildir für {to_address} nicht ermitteln") + return False + + # Eindeutigen Dateinamen generieren + timestamp = int(time.time()) + hostname = 'mail' + unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest() + + # Maildir-Dateiname im Format: timestamp.unique_id.hostname:2, + filename = f"{timestamp}.{unique_id}.{hostname}:2," + + # E-Mail in "new" speichern + email_path = maildir / 'new' / filename + + with open(email_path, 'wb') as f: + f.write(email_content) + + logger.info(f"E-Mail gespeichert: {email_path}") + return True + + except Exception as e: + logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}") + return False + +def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info): + """Löscht E-Mails aus dem S3-Bucket""" + if not emails_to_delete: + return 0 + + if not AUTO_DELETE: + # Bestätigung einholen + print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:") + for key in emails_to_delete: + info = email_info.get(key, {}) + from_addr = info.get('from', 'Unbekannt') + to_addr = info.get('to', 'Unbekannt') + date = info.get('date', 'Unbekannt') + print(f" - {key}") + print(f" Von: {from_addr}") + print(f" An: {to_addr}") + print(f" Datum: {date}\n") + + confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ") + if confirmation.lower() not in ['j', 'ja', 'y', 'yes']: + logger.info("Löschung abgebrochen") + return 0 + + # Löschung durchführen + deleted_count = 0 + for key in emails_to_delete: + try: + s3_client.delete_object(Bucket=bucket, Key=key) + logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}") + deleted_count += 1 + except Exception as e: + logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}") + + return deleted_count + +def process_domain(domain_name, domain_config, last_sync): + """Verarbeitet eine einzelne Domain""" + bucket = domain_config["bucket"] + prefix = domain_config["prefix"] + region = domain_config["region"] + + logger.info(f"Verarbeite Domain: {domain_name}") + logger.info(f" Bucket: {bucket}") + logger.info(f" Präfix: {prefix}") + logger.info(f" Region: {region}") + logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}") + + try: + # S3-Client initialisieren + s3 = boto3.client('s3', region_name=region) + + # Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden + paginator = s3.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + + new_emails = 0 + total_emails = 0 + emails_to_delete = [] + email_info = {} + domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")} + + # Alle Seiten durchlaufen + for page in pages: + if 'Contents' not in page: + continue + + objects = page['Contents'] + total_emails += len(objects) + + for obj in objects: + key = obj['Key'] + + # Verzeichnisse überspringen + if key.endswith('/'): + continue + + # Eindeutiger Key für die Synchronisation + sync_key = f"{bucket}:{key}" + + # Prüfen, ob die E-Mail bereits synchronisiert wurde + if sync_key in domain_last_sync: + logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen") + continue + + try: + # E-Mail aus S3 laden + response = s3.get_object(Bucket=bucket, Key=key) + email_content = response['Body'].read() + + # Header parsen + try: + headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True) + to_address = headers.get('To', '') + from_address = headers.get('From', '') + date = headers.get('Date', '') + message_id = headers.get('Message-ID', '') + + # E-Mail-Informationen speichern + email_info[key] = { + 'to': to_address, + 'from': from_address, + 'date': date + } + + # Alle Domains-Konfigurationen für die Validierung verwenden + is_valid, recipient_domain = is_valid_recipient(to_address, {domain_name: domain_config}) + + if is_valid: + logger.info(f"Gültige E-Mail für: {to_address}") + + # E-Mail speichern + if store_email(email_content, to_address, message_id, key, MAIL_DIR): + # Status aktualisieren + last_sync[sync_key] = { + 'timestamp': time.time(), + 'to': to_address, + 'message_id': message_id, + 'domain': domain_name, + 'bucket': bucket + } + new_emails += 1 + logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})") + + # Status nach jeweils 10 E-Mails speichern + if new_emails % 10 == 0: + save_sync_status(last_sync) + logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher") + else: + logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}") + emails_to_delete.append(key) + + except Exception as e: + logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}") + emails_to_delete.append(key) + + except Exception as e: + logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}") + + # Ungültige E-Mails löschen + if emails_to_delete: + deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info) + logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}") + + logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.") + return new_emails, total_emails + + except Exception as e: + logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return 0, 0 + +def main(): + """Hauptfunktion""" + logger.info("Multi-Domain S3 E-Mail Downloader gestartet") + logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen") + logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}") + + try: + # Domain-Konfigurationen laden + domains_config = load_domains_config() + + if not domains_config: + logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.") + return + + logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}") + + # Letzten Synchronisationsstatus laden + last_sync = load_sync_status() + + # Jede Domain einzeln verarbeiten + total_new_emails = 0 + total_all_emails = 0 + + for domain_name, domain_config in domains_config.items(): + new_emails, all_emails = process_domain(domain_name, domain_config, last_sync) + total_new_emails += new_emails + total_all_emails += all_emails + + # Nach jeder Domain den Status speichern + save_sync_status(last_sync) + + logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.") + + except Exception as e: + logger.error(f"Fehler: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + +if __name__ == "__main__": + main() \ No newline at end of file