actual state
This commit is contained in:
parent
b1ef230144
commit
6ae4da137e
|
|
@ -1,10 +1,10 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
S3 E-Mail Downloader Script
|
||||
Multi-Domain S3 E-Mail Downloader Script
|
||||
|
||||
Dieses Script lädt E-Mails aus einem S3-Bucket herunter und speichert sie im
|
||||
Maildir-Format für Dovecot. Es filtert nach bestimmten Domains und Benutzernamen
|
||||
und kann nicht passende E-Mails löschen.
|
||||
Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im
|
||||
Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen
|
||||
Bucket-Einstellungen und Benutzerlisten.
|
||||
|
||||
Nutzung:
|
||||
python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen
|
||||
|
|
@ -38,16 +38,9 @@ logging.basicConfig(
|
|||
)
|
||||
logger = logging.getLogger("s3-email-downloader")
|
||||
|
||||
# Konfiguration aus Umgebungsvariablen
|
||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2')
|
||||
S3_BUCKET = os.environ.get('S3_BUCKET', '')
|
||||
EMAIL_PREFIX = os.environ.get('EMAIL_PREFIX', 'emails/')
|
||||
# Hauptkonfiguration
|
||||
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
|
||||
VALID_DOMAIN = os.environ.get('VALID_DOMAIN', 'bizmatch.net')
|
||||
VALID_USERNAMES = os.environ.get('VALID_USERNAMES', 'accounting,info,sales,support,test1,test2,test3')
|
||||
|
||||
# Umwandlung der komma-getrennten Liste in ein Set für schnellere Suche
|
||||
VALID_USERNAMES_SET = set(VALID_USERNAMES.split(','))
|
||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden
|
||||
|
||||
# Status-Datei für die Synchronisation
|
||||
STATUS_FILE = Path('sync_status.json')
|
||||
|
|
@ -55,6 +48,77 @@ STATUS_FILE = Path('sync_status.json')
|
|||
# Prüfen, ob automatische Löschung aktiviert ist
|
||||
AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y'
|
||||
|
||||
# Multi-Domain-Konfiguration
|
||||
# Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen
|
||||
DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json')
|
||||
|
||||
def load_domains_config():
|
||||
"""Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen"""
|
||||
domains = {}
|
||||
|
||||
# Versuchen, Konfiguration aus JSON-Datei zu laden
|
||||
config_file = Path(DOMAINS_CONFIG_FILE)
|
||||
if config_file.exists():
|
||||
try:
|
||||
with open(config_file, 'r') as f:
|
||||
domains = json.load(f)
|
||||
logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen")
|
||||
return domains
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}")
|
||||
|
||||
# Fallback: Konfiguration aus Umgebungsvariablen
|
||||
# Format für Umgebungsvariablen:
|
||||
# DOMAIN_1=bizmatch.net
|
||||
# DOMAIN_1_BUCKET=bizmatch-emails
|
||||
# DOMAIN_1_PREFIX=emails/
|
||||
# DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3
|
||||
# DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION
|
||||
|
||||
domain_index = 1
|
||||
while True:
|
||||
domain_key = f"DOMAIN_{domain_index}"
|
||||
domain_name = os.environ.get(domain_key)
|
||||
|
||||
if not domain_name:
|
||||
break # Keine weitere Domain-Definition gefunden
|
||||
|
||||
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
||||
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
||||
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
||||
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
|
||||
|
||||
if bucket and usernames:
|
||||
domains[domain_name] = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
"usernames": usernames.split(','),
|
||||
"region": region
|
||||
}
|
||||
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
|
||||
else:
|
||||
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
|
||||
|
||||
domain_index += 1
|
||||
|
||||
# Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen
|
||||
if not domains:
|
||||
old_domain = os.environ.get('VALID_DOMAIN', '')
|
||||
old_bucket = os.environ.get('S3_BUCKET', '')
|
||||
old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/')
|
||||
old_usernames = os.environ.get('VALID_USERNAMES', '')
|
||||
|
||||
if old_domain and old_bucket and old_usernames:
|
||||
domains[old_domain] = {
|
||||
"bucket": old_bucket,
|
||||
"prefix": old_prefix,
|
||||
"usernames": old_usernames.split(','),
|
||||
"region": AWS_REGION
|
||||
}
|
||||
logger.info(f"Alte Konfiguration für Domain {old_domain} geladen")
|
||||
|
||||
return domains
|
||||
|
||||
def load_sync_status():
|
||||
"""Lädt den letzten Synchronisationsstatus"""
|
||||
if STATUS_FILE.exists():
|
||||
|
|
@ -81,7 +145,7 @@ def save_sync_status(last_sync):
|
|||
def extract_email_address(address):
|
||||
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
||||
if not address:
|
||||
return 'unknown@bizmatch.net'
|
||||
return None
|
||||
|
||||
# Einfacher Fall: nur E-Mail-Adresse
|
||||
if '@' in address and '<' not in address:
|
||||
|
|
@ -95,18 +159,27 @@ def extract_email_address(address):
|
|||
# Fallback
|
||||
return address.strip()
|
||||
|
||||
def is_valid_recipient(to_address):
|
||||
"""Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)"""
|
||||
def is_valid_recipient(to_address, domains_config):
|
||||
"""
|
||||
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
|
||||
Gibt ein Tuple zurück: (ist_gültig, domain_name)
|
||||
"""
|
||||
email = extract_email_address(to_address)
|
||||
|
||||
# E-Mail-Adresse aufteilen
|
||||
if '@' not in email:
|
||||
return False
|
||||
if not email or '@' not in email:
|
||||
return False, None
|
||||
|
||||
username, domain = email.split('@', 1)
|
||||
|
||||
# Domain und Username prüfen
|
||||
return domain.lower() == VALID_DOMAIN.lower() and username.lower() in VALID_USERNAMES_SET
|
||||
# Prüfen, ob die Domain in der Konfiguration existiert
|
||||
if domain.lower() in domains_config:
|
||||
domain_config = domains_config[domain.lower()]
|
||||
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
|
||||
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
||||
return True, domain.lower()
|
||||
|
||||
return False, None
|
||||
|
||||
def get_maildir_path(to_address, mail_dir):
|
||||
"""
|
||||
|
|
@ -119,7 +192,7 @@ def get_maildir_path(to_address, mail_dir):
|
|||
if '@' in email:
|
||||
user, domain = email.split('@', 1)
|
||||
else:
|
||||
user, domain = email, VALID_DOMAIN
|
||||
return None # Ungültige E-Mail-Adresse
|
||||
|
||||
# Pfad erstellen
|
||||
mail_dir_path = Path(mail_dir)
|
||||
|
|
@ -141,10 +214,13 @@ def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
|||
try:
|
||||
# Maildir-Pfad ermitteln
|
||||
maildir = get_maildir_path(to_address, mail_dir)
|
||||
if not maildir:
|
||||
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
||||
return False
|
||||
|
||||
# Eindeutigen Dateinamen generieren
|
||||
timestamp = int(time.time())
|
||||
hostname = 'bizmatch'
|
||||
hostname = 'mail'
|
||||
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
||||
|
||||
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
|
||||
|
|
@ -163,14 +239,14 @@ def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
|||
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
||||
return False
|
||||
|
||||
def delete_s3_emails(s3, emails_to_delete, email_info):
|
||||
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info):
|
||||
"""Löscht E-Mails aus dem S3-Bucket"""
|
||||
if not emails_to_delete:
|
||||
return 0
|
||||
|
||||
if not AUTO_DELETE:
|
||||
# Bestätigung einholen
|
||||
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht:")
|
||||
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:")
|
||||
for key in emails_to_delete:
|
||||
info = email_info.get(key, {})
|
||||
from_addr = info.get('from', 'Unbekannt')
|
||||
|
|
@ -190,42 +266,39 @@ def delete_s3_emails(s3, emails_to_delete, email_info):
|
|||
deleted_count = 0
|
||||
for key in emails_to_delete:
|
||||
try:
|
||||
s3.delete_object(Bucket=S3_BUCKET, Key=key)
|
||||
logger.info(f"E-Mail gelöscht: {key}")
|
||||
s3_client.delete_object(Bucket=bucket, Key=key)
|
||||
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
|
||||
deleted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Löschen der E-Mail {key}: {str(e)}")
|
||||
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
|
||||
|
||||
return deleted_count
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion"""
|
||||
# Prüfen, ob die erforderlichen Umgebungsvariablen gesetzt sind
|
||||
if not S3_BUCKET:
|
||||
logger.error("S3_BUCKET Umgebungsvariable nicht gesetzt")
|
||||
return
|
||||
|
||||
logger.info(f"S3 E-Mail Downloader gestartet. Bucket: {S3_BUCKET}, Präfix: {EMAIL_PREFIX}")
|
||||
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
|
||||
logger.info(f"Gültige Domain: {VALID_DOMAIN}")
|
||||
logger.info(f"Gültige Usernames: {VALID_USERNAMES}")
|
||||
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
|
||||
def process_domain(domain_name, domain_config, last_sync):
|
||||
"""Verarbeitet eine einzelne Domain"""
|
||||
bucket = domain_config["bucket"]
|
||||
prefix = domain_config["prefix"]
|
||||
region = domain_config["region"]
|
||||
|
||||
logger.info(f"Verarbeite Domain: {domain_name}")
|
||||
logger.info(f" Bucket: {bucket}")
|
||||
logger.info(f" Präfix: {prefix}")
|
||||
logger.info(f" Region: {region}")
|
||||
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
|
||||
|
||||
try:
|
||||
# S3-Client initialisieren
|
||||
s3 = boto3.client('s3', region_name=AWS_REGION)
|
||||
|
||||
# Letzten Synchronisationsstatus laden
|
||||
last_sync = load_sync_status()
|
||||
s3 = boto3.client('s3', region_name=region)
|
||||
|
||||
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
|
||||
paginator = s3.get_paginator('list_objects_v2')
|
||||
pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=EMAIL_PREFIX)
|
||||
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
||||
|
||||
new_emails = 0
|
||||
total_emails = 0
|
||||
emails_to_delete = []
|
||||
email_info = {}
|
||||
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
|
||||
|
||||
# Alle Seiten durchlaufen
|
||||
for page in pages:
|
||||
|
|
@ -242,14 +315,17 @@ def main():
|
|||
if key.endswith('/'):
|
||||
continue
|
||||
|
||||
# Eindeutiger Key für die Synchronisation
|
||||
sync_key = f"{bucket}:{key}"
|
||||
|
||||
# Prüfen, ob die E-Mail bereits synchronisiert wurde
|
||||
if key in last_sync:
|
||||
if sync_key in domain_last_sync:
|
||||
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
|
||||
continue
|
||||
|
||||
try:
|
||||
# E-Mail aus S3 laden
|
||||
response = s3.get_object(Bucket=S3_BUCKET, Key=key)
|
||||
response = s3.get_object(Bucket=bucket, Key=key)
|
||||
email_content = response['Body'].read()
|
||||
|
||||
# Header parsen
|
||||
|
|
@ -259,6 +335,7 @@ def main():
|
|||
from_address = headers.get('From', '')
|
||||
date = headers.get('Date', '')
|
||||
message_id = headers.get('Message-ID', '')
|
||||
|
||||
# E-Mail-Informationen speichern
|
||||
email_info[key] = {
|
||||
'to': to_address,
|
||||
|
|
@ -266,17 +343,21 @@ def main():
|
|||
'date': date
|
||||
}
|
||||
|
||||
# Prüfen, ob die Empfängeradresse gültig ist
|
||||
if is_valid_recipient(to_address):
|
||||
# Alle Domains-Konfigurationen für die Validierung verwenden
|
||||
is_valid, recipient_domain = is_valid_recipient(to_address, {domain_name: domain_config})
|
||||
|
||||
if is_valid:
|
||||
logger.info(f"Gültige E-Mail für: {to_address}")
|
||||
|
||||
# E-Mail speichern
|
||||
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
||||
# Status aktualisieren
|
||||
last_sync[key] = {
|
||||
last_sync[sync_key] = {
|
||||
'timestamp': time.time(),
|
||||
'to': to_address,
|
||||
'message_id': message_id
|
||||
'message_id': message_id,
|
||||
'domain': domain_name,
|
||||
'bucket': bucket
|
||||
}
|
||||
new_emails += 1
|
||||
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
|
||||
|
|
@ -296,15 +377,52 @@ def main():
|
|||
except Exception as e:
|
||||
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
|
||||
|
||||
# Abschließend den Status speichern
|
||||
save_sync_status(last_sync)
|
||||
|
||||
# Ungültige E-Mails löschen
|
||||
if emails_to_delete:
|
||||
deleted_count = delete_s3_emails(s3, emails_to_delete, email_info)
|
||||
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht")
|
||||
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info)
|
||||
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
|
||||
|
||||
logger.info(f"Verarbeitung abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
||||
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
||||
return new_emails, total_emails
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
return 0, 0
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion"""
|
||||
logger.info("Multi-Domain S3 E-Mail Downloader gestartet")
|
||||
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
|
||||
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
|
||||
|
||||
try:
|
||||
# Domain-Konfigurationen laden
|
||||
domains_config = load_domains_config()
|
||||
|
||||
if not domains_config:
|
||||
logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.")
|
||||
return
|
||||
|
||||
logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}")
|
||||
|
||||
# Letzten Synchronisationsstatus laden
|
||||
last_sync = load_sync_status()
|
||||
|
||||
# Jede Domain einzeln verarbeiten
|
||||
total_new_emails = 0
|
||||
total_all_emails = 0
|
||||
|
||||
for domain_name, domain_config in domains_config.items():
|
||||
new_emails, all_emails = process_domain(domain_name, domain_config, last_sync)
|
||||
total_new_emails += new_emails
|
||||
total_all_emails += all_emails
|
||||
|
||||
# Nach jeder Domain den Status speichern
|
||||
save_sync_status(last_sync)
|
||||
|
||||
logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler: {str(e)}")
|
||||
|
|
|
|||
|
|
@ -1,433 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Multi-Domain S3 E-Mail Downloader Script
|
||||
|
||||
Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im
|
||||
Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen
|
||||
Bucket-Einstellungen und Benutzerlisten.
|
||||
|
||||
Nutzung:
|
||||
python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen
|
||||
python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen
|
||||
"""
|
||||
|
||||
import boto3
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
import hashlib
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from email.parser import BytesParser
|
||||
from email import policy
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# .env-Datei laden
|
||||
load_dotenv()
|
||||
|
||||
# Logging konfigurieren
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('s3_email_downloader.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger("s3-email-downloader")
|
||||
|
||||
# Hauptkonfiguration
|
||||
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
|
||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden
|
||||
|
||||
# Status-Datei für die Synchronisation
|
||||
STATUS_FILE = Path('sync_status.json')
|
||||
|
||||
# Prüfen, ob automatische Löschung aktiviert ist
|
||||
AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y'
|
||||
|
||||
# Multi-Domain-Konfiguration
|
||||
# Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen
|
||||
DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json')
|
||||
|
||||
def load_domains_config():
|
||||
"""Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen"""
|
||||
domains = {}
|
||||
|
||||
# Versuchen, Konfiguration aus JSON-Datei zu laden
|
||||
config_file = Path(DOMAINS_CONFIG_FILE)
|
||||
if config_file.exists():
|
||||
try:
|
||||
with open(config_file, 'r') as f:
|
||||
domains = json.load(f)
|
||||
logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen")
|
||||
return domains
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}")
|
||||
|
||||
# Fallback: Konfiguration aus Umgebungsvariablen
|
||||
# Format für Umgebungsvariablen:
|
||||
# DOMAIN_1=bizmatch.net
|
||||
# DOMAIN_1_BUCKET=bizmatch-emails
|
||||
# DOMAIN_1_PREFIX=emails/
|
||||
# DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3
|
||||
# DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION
|
||||
|
||||
domain_index = 1
|
||||
while True:
|
||||
domain_key = f"DOMAIN_{domain_index}"
|
||||
domain_name = os.environ.get(domain_key)
|
||||
|
||||
if not domain_name:
|
||||
break # Keine weitere Domain-Definition gefunden
|
||||
|
||||
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
||||
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
||||
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
||||
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
|
||||
|
||||
if bucket and usernames:
|
||||
domains[domain_name] = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
"usernames": usernames.split(','),
|
||||
"region": region
|
||||
}
|
||||
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
|
||||
else:
|
||||
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
|
||||
|
||||
domain_index += 1
|
||||
|
||||
# Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen
|
||||
if not domains:
|
||||
old_domain = os.environ.get('VALID_DOMAIN', '')
|
||||
old_bucket = os.environ.get('S3_BUCKET', '')
|
||||
old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/')
|
||||
old_usernames = os.environ.get('VALID_USERNAMES', '')
|
||||
|
||||
if old_domain and old_bucket and old_usernames:
|
||||
domains[old_domain] = {
|
||||
"bucket": old_bucket,
|
||||
"prefix": old_prefix,
|
||||
"usernames": old_usernames.split(','),
|
||||
"region": AWS_REGION
|
||||
}
|
||||
logger.info(f"Alte Konfiguration für Domain {old_domain} geladen")
|
||||
|
||||
return domains
|
||||
|
||||
def load_sync_status():
|
||||
"""Lädt den letzten Synchronisationsstatus"""
|
||||
if STATUS_FILE.exists():
|
||||
try:
|
||||
with open(STATUS_FILE, 'r') as f:
|
||||
status = json.load(f)
|
||||
return status.get('last_sync', {})
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
|
||||
|
||||
return {}
|
||||
|
||||
def save_sync_status(last_sync):
|
||||
"""Speichert den aktuellen Synchronisationsstatus"""
|
||||
try:
|
||||
with open(STATUS_FILE, 'w') as f:
|
||||
json.dump({
|
||||
'last_sync': last_sync,
|
||||
'last_sync_time': time.time()
|
||||
}, f)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
|
||||
|
||||
def extract_email_address(address):
|
||||
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
||||
if not address:
|
||||
return None
|
||||
|
||||
# Einfacher Fall: nur E-Mail-Adresse
|
||||
if '@' in address and '<' not in address:
|
||||
return address.strip()
|
||||
|
||||
# Komplexer Fall: "Name <email@domain.com>"
|
||||
match = re.search(r'<([^>]+)>', address)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
# Fallback
|
||||
return address.strip()
|
||||
|
||||
def is_valid_recipient(to_address, domains_config):
|
||||
"""
|
||||
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
|
||||
Gibt ein Tuple zurück: (ist_gültig, domain_name)
|
||||
"""
|
||||
email = extract_email_address(to_address)
|
||||
|
||||
# E-Mail-Adresse aufteilen
|
||||
if not email or '@' not in email:
|
||||
return False, None
|
||||
|
||||
username, domain = email.split('@', 1)
|
||||
|
||||
# Prüfen, ob die Domain in der Konfiguration existiert
|
||||
if domain.lower() in domains_config:
|
||||
domain_config = domains_config[domain.lower()]
|
||||
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
|
||||
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
||||
return True, domain.lower()
|
||||
|
||||
return False, None
|
||||
|
||||
def get_maildir_path(to_address, mail_dir):
|
||||
"""
|
||||
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
|
||||
Format: {mail_dir}/domain.com/user/
|
||||
"""
|
||||
email = extract_email_address(to_address)
|
||||
|
||||
# E-Mail-Adresse aufteilen
|
||||
if '@' in email:
|
||||
user, domain = email.split('@', 1)
|
||||
else:
|
||||
return None # Ungültige E-Mail-Adresse
|
||||
|
||||
# Pfad erstellen
|
||||
mail_dir_path = Path(mail_dir)
|
||||
domain_dir = mail_dir_path / domain
|
||||
user_dir = domain_dir / user
|
||||
|
||||
# Maildir-Struktur sicherstellen
|
||||
for directory in [mail_dir_path, domain_dir, user_dir]:
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Maildir-Unterverzeichnisse
|
||||
for subdir in ['cur', 'new', 'tmp']:
|
||||
(user_dir / subdir).mkdir(exist_ok=True)
|
||||
|
||||
return user_dir
|
||||
|
||||
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
||||
"""Speichert eine E-Mail im Maildir-Format"""
|
||||
try:
|
||||
# Maildir-Pfad ermitteln
|
||||
maildir = get_maildir_path(to_address, mail_dir)
|
||||
if not maildir:
|
||||
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
||||
return False
|
||||
|
||||
# Eindeutigen Dateinamen generieren
|
||||
timestamp = int(time.time())
|
||||
hostname = 'mail'
|
||||
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
||||
|
||||
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
|
||||
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
|
||||
|
||||
# E-Mail in "new" speichern
|
||||
email_path = maildir / 'new' / filename
|
||||
|
||||
with open(email_path, 'wb') as f:
|
||||
f.write(email_content)
|
||||
|
||||
logger.info(f"E-Mail gespeichert: {email_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
||||
return False
|
||||
|
||||
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info):
|
||||
"""Löscht E-Mails aus dem S3-Bucket"""
|
||||
if not emails_to_delete:
|
||||
return 0
|
||||
|
||||
if not AUTO_DELETE:
|
||||
# Bestätigung einholen
|
||||
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:")
|
||||
for key in emails_to_delete:
|
||||
info = email_info.get(key, {})
|
||||
from_addr = info.get('from', 'Unbekannt')
|
||||
to_addr = info.get('to', 'Unbekannt')
|
||||
date = info.get('date', 'Unbekannt')
|
||||
print(f" - {key}")
|
||||
print(f" Von: {from_addr}")
|
||||
print(f" An: {to_addr}")
|
||||
print(f" Datum: {date}\n")
|
||||
|
||||
confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ")
|
||||
if confirmation.lower() not in ['j', 'ja', 'y', 'yes']:
|
||||
logger.info("Löschung abgebrochen")
|
||||
return 0
|
||||
|
||||
# Löschung durchführen
|
||||
deleted_count = 0
|
||||
for key in emails_to_delete:
|
||||
try:
|
||||
s3_client.delete_object(Bucket=bucket, Key=key)
|
||||
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
|
||||
deleted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
|
||||
|
||||
return deleted_count
|
||||
|
||||
def process_domain(domain_name, domain_config, last_sync):
|
||||
"""Verarbeitet eine einzelne Domain"""
|
||||
bucket = domain_config["bucket"]
|
||||
prefix = domain_config["prefix"]
|
||||
region = domain_config["region"]
|
||||
|
||||
logger.info(f"Verarbeite Domain: {domain_name}")
|
||||
logger.info(f" Bucket: {bucket}")
|
||||
logger.info(f" Präfix: {prefix}")
|
||||
logger.info(f" Region: {region}")
|
||||
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
|
||||
|
||||
try:
|
||||
# S3-Client initialisieren
|
||||
s3 = boto3.client('s3', region_name=region)
|
||||
|
||||
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
|
||||
paginator = s3.get_paginator('list_objects_v2')
|
||||
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
||||
|
||||
new_emails = 0
|
||||
total_emails = 0
|
||||
emails_to_delete = []
|
||||
email_info = {}
|
||||
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
|
||||
|
||||
# Alle Seiten durchlaufen
|
||||
for page in pages:
|
||||
if 'Contents' not in page:
|
||||
continue
|
||||
|
||||
objects = page['Contents']
|
||||
total_emails += len(objects)
|
||||
|
||||
for obj in objects:
|
||||
key = obj['Key']
|
||||
|
||||
# Verzeichnisse überspringen
|
||||
if key.endswith('/'):
|
||||
continue
|
||||
|
||||
# Eindeutiger Key für die Synchronisation
|
||||
sync_key = f"{bucket}:{key}"
|
||||
|
||||
# Prüfen, ob die E-Mail bereits synchronisiert wurde
|
||||
if sync_key in domain_last_sync:
|
||||
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
|
||||
continue
|
||||
|
||||
try:
|
||||
# E-Mail aus S3 laden
|
||||
response = s3.get_object(Bucket=bucket, Key=key)
|
||||
email_content = response['Body'].read()
|
||||
|
||||
# Header parsen
|
||||
try:
|
||||
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
|
||||
to_address = headers.get('To', '')
|
||||
from_address = headers.get('From', '')
|
||||
date = headers.get('Date', '')
|
||||
message_id = headers.get('Message-ID', '')
|
||||
|
||||
# E-Mail-Informationen speichern
|
||||
email_info[key] = {
|
||||
'to': to_address,
|
||||
'from': from_address,
|
||||
'date': date
|
||||
}
|
||||
|
||||
# Alle Domains-Konfigurationen für die Validierung verwenden
|
||||
is_valid, recipient_domain = is_valid_recipient(to_address, {domain_name: domain_config})
|
||||
|
||||
if is_valid:
|
||||
logger.info(f"Gültige E-Mail für: {to_address}")
|
||||
|
||||
# E-Mail speichern
|
||||
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
||||
# Status aktualisieren
|
||||
last_sync[sync_key] = {
|
||||
'timestamp': time.time(),
|
||||
'to': to_address,
|
||||
'message_id': message_id,
|
||||
'domain': domain_name,
|
||||
'bucket': bucket
|
||||
}
|
||||
new_emails += 1
|
||||
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
|
||||
|
||||
# Status nach jeweils 10 E-Mails speichern
|
||||
if new_emails % 10 == 0:
|
||||
save_sync_status(last_sync)
|
||||
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
|
||||
else:
|
||||
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
|
||||
emails_to_delete.append(key)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
|
||||
emails_to_delete.append(key)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
|
||||
|
||||
# Ungültige E-Mails löschen
|
||||
if emails_to_delete:
|
||||
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info)
|
||||
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
|
||||
|
||||
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
||||
return new_emails, total_emails
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
return 0, 0
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion"""
|
||||
logger.info("Multi-Domain S3 E-Mail Downloader gestartet")
|
||||
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
|
||||
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
|
||||
|
||||
try:
|
||||
# Domain-Konfigurationen laden
|
||||
domains_config = load_domains_config()
|
||||
|
||||
if not domains_config:
|
||||
logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.")
|
||||
return
|
||||
|
||||
logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}")
|
||||
|
||||
# Letzten Synchronisationsstatus laden
|
||||
last_sync = load_sync_status()
|
||||
|
||||
# Jede Domain einzeln verarbeiten
|
||||
total_new_emails = 0
|
||||
total_all_emails = 0
|
||||
|
||||
for domain_name, domain_config in domains_config.items():
|
||||
new_emails, all_emails = process_domain(domain_name, domain_config, last_sync)
|
||||
total_new_emails += new_emails
|
||||
total_all_emails += all_emails
|
||||
|
||||
# Nach jeder Domain den Status speichern
|
||||
save_sync_status(last_sync)
|
||||
|
||||
logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler: {str(e)}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue