downloader for many domains

This commit is contained in:
Andreas Knuth 2025-03-17 15:40:42 +01:00
parent ff8d54817a
commit 218e806336
1 changed files with 433 additions and 0 deletions

View File

@ -0,0 +1,433 @@
#!/usr/bin/env python3
"""
Multi-Domain S3 E-Mail Downloader Script
Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im
Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen
Bucket-Einstellungen und Benutzerlisten.
Nutzung:
python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen
python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen
"""
import boto3
import os
import time
import logging
import json
import re
import hashlib
import sys
from pathlib import Path
from email.parser import BytesParser
from email import policy
from dotenv import load_dotenv
# .env-Datei laden
load_dotenv()
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('s3_email_downloader.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("s3-email-downloader")
# Hauptkonfiguration
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden
# Status-Datei für die Synchronisation
STATUS_FILE = Path('sync_status.json')
# Prüfen, ob automatische Löschung aktiviert ist
AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y'
# Multi-Domain-Konfiguration
# Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen
DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json')
def load_domains_config():
"""Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen"""
domains = {}
# Versuchen, Konfiguration aus JSON-Datei zu laden
config_file = Path(DOMAINS_CONFIG_FILE)
if config_file.exists():
try:
with open(config_file, 'r') as f:
domains = json.load(f)
logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen")
return domains
except Exception as e:
logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}")
# Fallback: Konfiguration aus Umgebungsvariablen
# Format für Umgebungsvariablen:
# DOMAIN_1=bizmatch.net
# DOMAIN_1_BUCKET=bizmatch-emails
# DOMAIN_1_PREFIX=emails/
# DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3
# DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION
domain_index = 1
while True:
domain_key = f"DOMAIN_{domain_index}"
domain_name = os.environ.get(domain_key)
if not domain_name:
break # Keine weitere Domain-Definition gefunden
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
if bucket and usernames:
domains[domain_name] = {
"bucket": bucket,
"prefix": prefix,
"usernames": usernames.split(','),
"region": region
}
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
else:
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
domain_index += 1
# Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen
if not domains:
old_domain = os.environ.get('VALID_DOMAIN', '')
old_bucket = os.environ.get('S3_BUCKET', '')
old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/')
old_usernames = os.environ.get('VALID_USERNAMES', '')
if old_domain and old_bucket and old_usernames:
domains[old_domain] = {
"bucket": old_bucket,
"prefix": old_prefix,
"usernames": old_usernames.split(','),
"region": AWS_REGION
}
logger.info(f"Alte Konfiguration für Domain {old_domain} geladen")
return domains
def load_sync_status():
"""Lädt den letzten Synchronisationsstatus"""
if STATUS_FILE.exists():
try:
with open(STATUS_FILE, 'r') as f:
status = json.load(f)
return status.get('last_sync', {})
except Exception as e:
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
return {}
def save_sync_status(last_sync):
"""Speichert den aktuellen Synchronisationsstatus"""
try:
with open(STATUS_FILE, 'w') as f:
json.dump({
'last_sync': last_sync,
'last_sync_time': time.time()
}, f)
except Exception as e:
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
def extract_email_address(address):
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
if not address:
return None
# Einfacher Fall: nur E-Mail-Adresse
if '@' in address and '<' not in address:
return address.strip()
# Komplexer Fall: "Name <email@domain.com>"
match = re.search(r'<([^>]+)>', address)
if match:
return match.group(1)
# Fallback
return address.strip()
def is_valid_recipient(to_address, domains_config):
"""
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
Gibt ein Tuple zurück: (ist_gültig, domain_name)
"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if not email or '@' not in email:
return False, None
username, domain = email.split('@', 1)
# Prüfen, ob die Domain in der Konfiguration existiert
if domain.lower() in domains_config:
domain_config = domains_config[domain.lower()]
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
return True, domain.lower()
return False, None
def get_maildir_path(to_address, mail_dir):
"""
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
Format: {mail_dir}/domain.com/user/
"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if '@' in email:
user, domain = email.split('@', 1)
else:
return None # Ungültige E-Mail-Adresse
# Pfad erstellen
mail_dir_path = Path(mail_dir)
domain_dir = mail_dir_path / domain
user_dir = domain_dir / user
# Maildir-Struktur sicherstellen
for directory in [mail_dir_path, domain_dir, user_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Maildir-Unterverzeichnisse
for subdir in ['cur', 'new', 'tmp']:
(user_dir / subdir).mkdir(exist_ok=True)
return user_dir
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
"""Speichert eine E-Mail im Maildir-Format"""
try:
# Maildir-Pfad ermitteln
maildir = get_maildir_path(to_address, mail_dir)
if not maildir:
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
return False
# Eindeutigen Dateinamen generieren
timestamp = int(time.time())
hostname = 'mail'
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
# E-Mail in "new" speichern
email_path = maildir / 'new' / filename
with open(email_path, 'wb') as f:
f.write(email_content)
logger.info(f"E-Mail gespeichert: {email_path}")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
return False
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info):
"""Löscht E-Mails aus dem S3-Bucket"""
if not emails_to_delete:
return 0
if not AUTO_DELETE:
# Bestätigung einholen
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:")
for key in emails_to_delete:
info = email_info.get(key, {})
from_addr = info.get('from', 'Unbekannt')
to_addr = info.get('to', 'Unbekannt')
date = info.get('date', 'Unbekannt')
print(f" - {key}")
print(f" Von: {from_addr}")
print(f" An: {to_addr}")
print(f" Datum: {date}\n")
confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ")
if confirmation.lower() not in ['j', 'ja', 'y', 'yes']:
logger.info("Löschung abgebrochen")
return 0
# Löschung durchführen
deleted_count = 0
for key in emails_to_delete:
try:
s3_client.delete_object(Bucket=bucket, Key=key)
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
deleted_count += 1
except Exception as e:
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
return deleted_count
def process_domain(domain_name, domain_config, last_sync):
"""Verarbeitet eine einzelne Domain"""
bucket = domain_config["bucket"]
prefix = domain_config["prefix"]
region = domain_config["region"]
logger.info(f"Verarbeite Domain: {domain_name}")
logger.info(f" Bucket: {bucket}")
logger.info(f" Präfix: {prefix}")
logger.info(f" Region: {region}")
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
try:
# S3-Client initialisieren
s3 = boto3.client('s3', region_name=region)
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
new_emails = 0
total_emails = 0
emails_to_delete = []
email_info = {}
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
# Alle Seiten durchlaufen
for page in pages:
if 'Contents' not in page:
continue
objects = page['Contents']
total_emails += len(objects)
for obj in objects:
key = obj['Key']
# Verzeichnisse überspringen
if key.endswith('/'):
continue
# Eindeutiger Key für die Synchronisation
sync_key = f"{bucket}:{key}"
# Prüfen, ob die E-Mail bereits synchronisiert wurde
if sync_key in domain_last_sync:
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
continue
try:
# E-Mail aus S3 laden
response = s3.get_object(Bucket=bucket, Key=key)
email_content = response['Body'].read()
# Header parsen
try:
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
to_address = headers.get('To', '')
from_address = headers.get('From', '')
date = headers.get('Date', '')
message_id = headers.get('Message-ID', '')
# E-Mail-Informationen speichern
email_info[key] = {
'to': to_address,
'from': from_address,
'date': date
}
# Alle Domains-Konfigurationen für die Validierung verwenden
is_valid, recipient_domain = is_valid_recipient(to_address, {domain_name: domain_config})
if is_valid:
logger.info(f"Gültige E-Mail für: {to_address}")
# E-Mail speichern
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
# Status aktualisieren
last_sync[sync_key] = {
'timestamp': time.time(),
'to': to_address,
'message_id': message_id,
'domain': domain_name,
'bucket': bucket
}
new_emails += 1
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
# Status nach jeweils 10 E-Mails speichern
if new_emails % 10 == 0:
save_sync_status(last_sync)
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
else:
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
# Ungültige E-Mails löschen
if emails_to_delete:
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info)
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
return new_emails, total_emails
except Exception as e:
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return 0, 0
def main():
"""Hauptfunktion"""
logger.info("Multi-Domain S3 E-Mail Downloader gestartet")
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
try:
# Domain-Konfigurationen laden
domains_config = load_domains_config()
if not domains_config:
logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.")
return
logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}")
# Letzten Synchronisationsstatus laden
last_sync = load_sync_status()
# Jede Domain einzeln verarbeiten
total_new_emails = 0
total_all_emails = 0
for domain_name, domain_config in domains_config.items():
new_emails, all_emails = process_domain(domain_name, domain_config, last_sync)
total_new_emails += new_emails
total_all_emails += all_emails
# Nach jeder Domain den Status speichern
save_sync_status(last_sync)
logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.")
except Exception as e:
logger.error(f"Fehler: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
main()