new processing
This commit is contained in:
parent
1b05ae48ad
commit
991047d286
File diff suppressed because it is too large
Load Diff
|
|
@ -1,28 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Setze den absoluten Pfad zum Projektverzeichnis
|
|
||||||
PROJECT_DIR="/home/aknuth/git/docker/dovecot"
|
|
||||||
|
|
||||||
# Wechsle in das Projektverzeichnis
|
|
||||||
cd "$PROJECT_DIR" || { echo "Projektverzeichnis nicht gefunden"; exit 1; }
|
|
||||||
|
|
||||||
# Aktiviere die virtuelle Umgebung
|
|
||||||
source venv/bin/activate || { echo "Virtuelle Umgebung konnte nicht aktiviert werden"; exit 1; }
|
|
||||||
|
|
||||||
# Führe das Python-Script aus
|
|
||||||
# Füge 'y' als Parameter hinzu, wenn automatisches Löschen ohne Bestätigung erfolgen soll
|
|
||||||
python3 s3_email_downloader.py y
|
|
||||||
|
|
||||||
# Setze die korrekten Berechtigungen für Dovecot im Docker-Container
|
|
||||||
MAIL_DIR="${PROJECT_DIR}/mail" # Anpassen an den tatsächlichen Pfad
|
|
||||||
USER_ID=1000 # User ID für aknuth
|
|
||||||
GROUP_ID=1000 # Group ID für aknuth
|
|
||||||
|
|
||||||
echo "Setze Berechtigungen für Dovecot in Docker..."
|
|
||||||
find "$MAIL_DIR" -type d -exec chmod 0755 {} \; # rwxr-xr-x für Verzeichnisse
|
|
||||||
find "$MAIL_DIR" -type f -exec chmod 0644 {} \; # rw-r--r-- für Dateien
|
|
||||||
# chown -R $USER_ID:$GROUP_ID "$MAIL_DIR"
|
|
||||||
echo "Berechtigungen gesetzt."
|
|
||||||
|
|
||||||
# Deaktiviere die virtuelle Umgebung (optional)
|
|
||||||
deactivate
|
|
||||||
|
|
@ -1,440 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Multi-Domain S3 E-Mail Downloader Script
|
|
||||||
|
|
||||||
Dieses Script lädt E-Mails aus mehreren S3-Buckets herunter und speichert sie im
|
|
||||||
Maildir-Format für Dovecot. Es unterstützt mehrere Domains, jede mit eigenen
|
|
||||||
Bucket-Einstellungen und Benutzerlisten.
|
|
||||||
|
|
||||||
Nutzung:
|
|
||||||
python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen
|
|
||||||
python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen
|
|
||||||
"""
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
import logging
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
import hashlib
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from email.parser import BytesParser
|
|
||||||
from email import policy
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
# .env-Datei laden
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
# Logging konfigurieren
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.FileHandler('s3_email_downloader.log'),
|
|
||||||
logging.StreamHandler()
|
|
||||||
]
|
|
||||||
)
|
|
||||||
logger = logging.getLogger("s3-email-downloader")
|
|
||||||
|
|
||||||
# Hauptkonfiguration
|
|
||||||
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
|
|
||||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region, kann pro Domain überschrieben werden
|
|
||||||
|
|
||||||
# Status-Datei für die Synchronisation
|
|
||||||
STATUS_FILE = Path('sync_status.json')
|
|
||||||
|
|
||||||
# Prüfen, ob automatische Löschung aktiviert ist
|
|
||||||
AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y'
|
|
||||||
|
|
||||||
# Multi-Domain-Konfiguration
|
|
||||||
# Die Konfiguration kann über Umgebungsvariablen oder eine separate JSON-Datei erfolgen
|
|
||||||
DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json')
|
|
||||||
|
|
||||||
def load_domains_config():
|
|
||||||
"""Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen"""
|
|
||||||
domains = {}
|
|
||||||
|
|
||||||
# Versuchen, Konfiguration aus JSON-Datei zu laden
|
|
||||||
config_file = Path(DOMAINS_CONFIG_FILE)
|
|
||||||
if config_file.exists():
|
|
||||||
try:
|
|
||||||
with open(config_file, 'r') as f:
|
|
||||||
domains = json.load(f)
|
|
||||||
logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen")
|
|
||||||
return domains
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}")
|
|
||||||
|
|
||||||
# Fallback: Konfiguration aus Umgebungsvariablen
|
|
||||||
# Format für Umgebungsvariablen:
|
|
||||||
# DOMAIN_1=bizmatch.net
|
|
||||||
# DOMAIN_1_BUCKET=bizmatch-emails
|
|
||||||
# DOMAIN_1_PREFIX=emails/
|
|
||||||
# DOMAIN_1_USERNAMES=accounting,info,sales,support,test1,test2,test3
|
|
||||||
# DOMAIN_1_REGION=us-east-2 # Optional, überschreibt AWS_REGION
|
|
||||||
|
|
||||||
domain_index = 1
|
|
||||||
while True:
|
|
||||||
domain_key = f"DOMAIN_{domain_index}"
|
|
||||||
domain_name = os.environ.get(domain_key)
|
|
||||||
|
|
||||||
if not domain_name:
|
|
||||||
break # Keine weitere Domain-Definition gefunden
|
|
||||||
|
|
||||||
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
|
||||||
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
|
||||||
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
|
||||||
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
|
|
||||||
|
|
||||||
if bucket and usernames:
|
|
||||||
domains[domain_name] = {
|
|
||||||
"bucket": bucket,
|
|
||||||
"prefix": prefix,
|
|
||||||
"usernames": usernames.split(','),
|
|
||||||
"region": region
|
|
||||||
}
|
|
||||||
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
|
|
||||||
else:
|
|
||||||
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
|
|
||||||
|
|
||||||
domain_index += 1
|
|
||||||
|
|
||||||
# Fallback für Abwärtskompatibilität: Alte Konfiguration aus Umgebungsvariablen
|
|
||||||
if not domains:
|
|
||||||
old_domain = os.environ.get('VALID_DOMAIN', '')
|
|
||||||
old_bucket = os.environ.get('S3_BUCKET', '')
|
|
||||||
old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/')
|
|
||||||
old_usernames = os.environ.get('VALID_USERNAMES', '')
|
|
||||||
|
|
||||||
if old_domain and old_bucket and old_usernames:
|
|
||||||
domains[old_domain] = {
|
|
||||||
"bucket": old_bucket,
|
|
||||||
"prefix": old_prefix,
|
|
||||||
"usernames": old_usernames.split(','),
|
|
||||||
"region": AWS_REGION
|
|
||||||
}
|
|
||||||
logger.info(f"Alte Konfiguration für Domain {old_domain} geladen")
|
|
||||||
|
|
||||||
return domains
|
|
||||||
|
|
||||||
def load_sync_status():
|
|
||||||
"""Lädt den letzten Synchronisationsstatus"""
|
|
||||||
if STATUS_FILE.exists():
|
|
||||||
try:
|
|
||||||
with open(STATUS_FILE, 'r') as f:
|
|
||||||
status = json.load(f)
|
|
||||||
return status.get('last_sync', {})
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def save_sync_status(last_sync):
|
|
||||||
"""Speichert den aktuellen Synchronisationsstatus"""
|
|
||||||
try:
|
|
||||||
with open(STATUS_FILE, 'w') as f:
|
|
||||||
json.dump({
|
|
||||||
'last_sync': last_sync,
|
|
||||||
'last_sync_time': time.time()
|
|
||||||
}, f)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
|
|
||||||
|
|
||||||
def extract_email_address(address):
|
|
||||||
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
|
||||||
if not address:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Einfacher Fall: nur E-Mail-Adresse
|
|
||||||
if '@' in address and '<' not in address:
|
|
||||||
return address.strip()
|
|
||||||
|
|
||||||
# Komplexer Fall: "Name <email@domain.com>"
|
|
||||||
match = re.search(r'<([^>]+)>', address)
|
|
||||||
if match:
|
|
||||||
return match.group(1)
|
|
||||||
|
|
||||||
# Fallback
|
|
||||||
return address.strip()
|
|
||||||
|
|
||||||
def is_valid_recipient(to_address, domains_config):
|
|
||||||
"""
|
|
||||||
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
|
|
||||||
Gibt ein Tuple zurück: (ist_gültig, domain_name)
|
|
||||||
"""
|
|
||||||
email = extract_email_address(to_address)
|
|
||||||
|
|
||||||
# E-Mail-Adresse aufteilen
|
|
||||||
if not email or '@' not in email:
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
username, domain = email.split('@', 1)
|
|
||||||
|
|
||||||
# Prüfen, ob die Domain in der Konfiguration existiert
|
|
||||||
if domain.lower() in domains_config:
|
|
||||||
domain_config = domains_config[domain.lower()]
|
|
||||||
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
|
|
||||||
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
|
||||||
return True, domain.lower()
|
|
||||||
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
def get_maildir_path(to_address, mail_dir):
|
|
||||||
"""
|
|
||||||
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
|
|
||||||
Format: {mail_dir}/domain.com/user/
|
|
||||||
"""
|
|
||||||
email = extract_email_address(to_address)
|
|
||||||
|
|
||||||
# E-Mail-Adresse aufteilen
|
|
||||||
if '@' in email:
|
|
||||||
user, domain = email.split('@', 1)
|
|
||||||
else:
|
|
||||||
return None # Ungültige E-Mail-Adresse
|
|
||||||
|
|
||||||
# Pfad erstellen
|
|
||||||
mail_dir_path = Path(mail_dir)
|
|
||||||
domain_dir = mail_dir_path / domain
|
|
||||||
user_dir = domain_dir / user
|
|
||||||
|
|
||||||
# Maildir-Struktur sicherstellen
|
|
||||||
for directory in [mail_dir_path, domain_dir, user_dir]:
|
|
||||||
directory.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# Maildir-Unterverzeichnisse
|
|
||||||
for subdir in ['cur', 'new', 'tmp']:
|
|
||||||
(user_dir / subdir).mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
return user_dir
|
|
||||||
|
|
||||||
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
|
||||||
"""Speichert eine E-Mail im Maildir-Format"""
|
|
||||||
try:
|
|
||||||
# Maildir-Pfad ermitteln
|
|
||||||
maildir = get_maildir_path(to_address, mail_dir)
|
|
||||||
if not maildir:
|
|
||||||
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Eindeutigen Dateinamen generieren
|
|
||||||
timestamp = int(time.time())
|
|
||||||
hostname = 'mail'
|
|
||||||
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
|
||||||
|
|
||||||
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
|
|
||||||
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
|
|
||||||
|
|
||||||
# E-Mail in "new" speichern
|
|
||||||
email_path = maildir / 'new' / filename
|
|
||||||
|
|
||||||
with open(email_path, 'wb') as f:
|
|
||||||
f.write(email_content)
|
|
||||||
|
|
||||||
logger.info(f"E-Mail gespeichert: {email_path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info):
|
|
||||||
"""Löscht E-Mails aus dem S3-Bucket"""
|
|
||||||
if not emails_to_delete:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if not AUTO_DELETE:
|
|
||||||
# Bestätigung einholen
|
|
||||||
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht aus Bucket {bucket}:")
|
|
||||||
for key in emails_to_delete:
|
|
||||||
info = email_info.get(key, {})
|
|
||||||
from_addr = info.get('from', 'Unbekannt')
|
|
||||||
to_addr = info.get('to', 'Unbekannt')
|
|
||||||
date = info.get('date', 'Unbekannt')
|
|
||||||
print(f" - {key}")
|
|
||||||
print(f" Von: {from_addr}")
|
|
||||||
print(f" An: {to_addr}")
|
|
||||||
print(f" Datum: {date}\n")
|
|
||||||
|
|
||||||
confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ")
|
|
||||||
if confirmation.lower() not in ['j', 'ja', 'y', 'yes']:
|
|
||||||
logger.info("Löschung abgebrochen")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Löschung durchführen
|
|
||||||
deleted_count = 0
|
|
||||||
for key in emails_to_delete:
|
|
||||||
try:
|
|
||||||
s3_client.delete_object(Bucket=bucket, Key=key)
|
|
||||||
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
|
|
||||||
deleted_count += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
|
|
||||||
|
|
||||||
return deleted_count
|
|
||||||
|
|
||||||
def process_domain(domain_name, domain_config, last_sync, all_domains_config):
|
|
||||||
"""Verarbeitet eine einzelne Domain"""
|
|
||||||
bucket = domain_config["bucket"]
|
|
||||||
prefix = domain_config["prefix"]
|
|
||||||
region = domain_config["region"]
|
|
||||||
|
|
||||||
logger.info(f"Verarbeite Domain: {domain_name}")
|
|
||||||
logger.info(f" Bucket: {bucket}")
|
|
||||||
logger.info(f" Präfix: {prefix}")
|
|
||||||
logger.info(f" Region: {region}")
|
|
||||||
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# S3-Client initialisieren
|
|
||||||
s3 = boto3.client('s3', region_name=region)
|
|
||||||
|
|
||||||
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
|
|
||||||
paginator = s3.get_paginator('list_objects_v2')
|
|
||||||
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
|
||||||
|
|
||||||
new_emails = 0
|
|
||||||
total_emails = 0
|
|
||||||
emails_to_delete = []
|
|
||||||
email_info = {}
|
|
||||||
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
|
|
||||||
|
|
||||||
# Alle Seiten durchlaufen
|
|
||||||
for page in pages:
|
|
||||||
if 'Contents' not in page:
|
|
||||||
continue
|
|
||||||
|
|
||||||
objects = page['Contents']
|
|
||||||
total_emails += len(objects)
|
|
||||||
|
|
||||||
for obj in objects:
|
|
||||||
key = obj['Key']
|
|
||||||
|
|
||||||
# Verzeichnisse überspringen
|
|
||||||
if key.endswith('/'):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Eindeutiger Key für die Synchronisation
|
|
||||||
sync_key = f"{bucket}:{key}"
|
|
||||||
|
|
||||||
# Prüfen, ob die E-Mail bereits synchronisiert wurde
|
|
||||||
if sync_key in domain_last_sync:
|
|
||||||
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
# E-Mail aus S3 laden
|
|
||||||
response = s3.get_object(Bucket=bucket, Key=key)
|
|
||||||
email_content = response['Body'].read()
|
|
||||||
|
|
||||||
# Header parsen
|
|
||||||
try:
|
|
||||||
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
|
|
||||||
to_address = headers.get('To', '')
|
|
||||||
from_address = headers.get('From', '')
|
|
||||||
date = headers.get('Date', '')
|
|
||||||
message_id = headers.get('Message-ID', '')
|
|
||||||
|
|
||||||
# E-Mail-Informationen speichern
|
|
||||||
email_info[key] = {
|
|
||||||
'to': to_address,
|
|
||||||
'from': from_address,
|
|
||||||
'date': date
|
|
||||||
}
|
|
||||||
|
|
||||||
# Alle Domains-Konfigurationen für die Validierung verwenden
|
|
||||||
# BUGFIX: Hier prüfen wir gegen ALLE konfigurierten Domains
|
|
||||||
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
|
|
||||||
|
|
||||||
if is_valid:
|
|
||||||
logger.info(f"Gültige E-Mail für: {to_address}")
|
|
||||||
|
|
||||||
# Nur speichern, wenn die E-Mail zur aktuellen Domain gehört
|
|
||||||
if recipient_domain == domain_name:
|
|
||||||
# E-Mail speichern
|
|
||||||
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
|
||||||
# Status aktualisieren
|
|
||||||
last_sync[sync_key] = {
|
|
||||||
'timestamp': time.time(),
|
|
||||||
'to': to_address,
|
|
||||||
'message_id': message_id,
|
|
||||||
'domain': domain_name,
|
|
||||||
'bucket': bucket
|
|
||||||
}
|
|
||||||
new_emails += 1
|
|
||||||
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
|
|
||||||
|
|
||||||
# Status nach jeweils 10 E-Mails speichern
|
|
||||||
if new_emails % 10 == 0:
|
|
||||||
save_sync_status(last_sync)
|
|
||||||
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
|
|
||||||
else:
|
|
||||||
# Gültige E-Mail, aber für eine andere Domain - nicht löschen!
|
|
||||||
logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)")
|
|
||||||
else:
|
|
||||||
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
|
|
||||||
emails_to_delete.append(key)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
|
|
||||||
emails_to_delete.append(key)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
|
|
||||||
|
|
||||||
# Ungültige E-Mails löschen
|
|
||||||
if emails_to_delete:
|
|
||||||
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info)
|
|
||||||
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
|
|
||||||
|
|
||||||
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
|
||||||
return new_emails, total_emails
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
|
|
||||||
import traceback
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
return 0, 0
|
|
||||||
|
|
||||||
def main():
|
|
||||||
"""Hauptfunktion"""
|
|
||||||
logger.info("Multi-Domain S3 E-Mail Downloader gestartet")
|
|
||||||
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
|
|
||||||
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Domain-Konfigurationen laden
|
|
||||||
domains_config = load_domains_config()
|
|
||||||
|
|
||||||
if not domains_config:
|
|
||||||
logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.")
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}")
|
|
||||||
|
|
||||||
# Letzten Synchronisationsstatus laden
|
|
||||||
last_sync = load_sync_status()
|
|
||||||
|
|
||||||
# Jede Domain einzeln verarbeiten
|
|
||||||
total_new_emails = 0
|
|
||||||
total_all_emails = 0
|
|
||||||
|
|
||||||
for domain_name, domain_config in domains_config.items():
|
|
||||||
# BUGFIX: all_domains_config übergeben statt nur der aktuellen Domain-Konfiguration
|
|
||||||
new_emails, all_emails = process_domain(domain_name, domain_config, last_sync, domains_config)
|
|
||||||
total_new_emails += new_emails
|
|
||||||
total_all_emails += all_emails
|
|
||||||
|
|
||||||
# Nach jeder Domain den Status speichern
|
|
||||||
save_sync_status(last_sync)
|
|
||||||
|
|
||||||
logger.info(f"Gesamtverarbeitung abgeschlossen. Insgesamt {total_all_emails} E-Mails gefunden, {total_new_emails} neue heruntergeladen.")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler: {str(e)}")
|
|
||||||
import traceback
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,442 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
S3 E-Mail Downloader REST API
|
|
||||||
|
|
||||||
Diese API ermöglicht den Abruf von E-Mails aus S3-Buckets für eine bestimmte Domain.
|
|
||||||
Die E-Mails werden im Maildir-Format für Dovecot gespeichert.
|
|
||||||
|
|
||||||
Endpunkte:
|
|
||||||
GET /health - API-Statusprüfung
|
|
||||||
POST /sync/{domain} - Synchronisiert E-Mails für eine bestimmte Domain
|
|
||||||
|
|
||||||
Authentifizierung:
|
|
||||||
Der Endpunkt /sync/{domain} erfordert einen gültigen API-Token im Header.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
import logging
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
import hashlib
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from email.parser import BytesParser
|
|
||||||
from email import policy
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from flask import Flask, request, jsonify, abort
|
|
||||||
from functools import wraps
|
|
||||||
|
|
||||||
# .env-Datei laden
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
# Flask-App initialisieren
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
# Logging konfigurieren
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.FileHandler('s3_email_downloader_api.log'),
|
|
||||||
logging.StreamHandler()
|
|
||||||
]
|
|
||||||
)
|
|
||||||
logger = logging.getLogger("s3-email-downloader-api")
|
|
||||||
|
|
||||||
# Hauptkonfiguration
|
|
||||||
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
|
|
||||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region
|
|
||||||
API_TOKEN = os.environ.get('API_TOKEN') # API-Token für die Authentifizierung
|
|
||||||
|
|
||||||
# Status-Datei für die Synchronisation
|
|
||||||
STATUS_FILE = Path('sync_status.json')
|
|
||||||
|
|
||||||
# Token-Authentifizierung
|
|
||||||
def require_token(f):
|
|
||||||
@wraps(f)
|
|
||||||
def decorated_function(*args, **kwargs):
|
|
||||||
auth_header = request.headers.get('Authorization')
|
|
||||||
|
|
||||||
if not auth_header or not auth_header.startswith('Bearer '):
|
|
||||||
logger.warning("Fehlender oder falsch formatierter Authorization-Header")
|
|
||||||
abort(401, description="Fehlender Authorization-Header")
|
|
||||||
|
|
||||||
token = auth_header[7:] # "Bearer " entfernen
|
|
||||||
|
|
||||||
if not API_TOKEN or token != API_TOKEN:
|
|
||||||
logger.warning("Ungültiger API-Token")
|
|
||||||
abort(403, description="Ungültiger API-Token")
|
|
||||||
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
return decorated_function
|
|
||||||
|
|
||||||
def load_domains_config():
|
|
||||||
"""Lädt die Domain-Konfiguration aus Umgebungsvariablen"""
|
|
||||||
domains = {}
|
|
||||||
|
|
||||||
# Konfiguration aus Umgebungsvariablen laden
|
|
||||||
domain_index = 1
|
|
||||||
while True:
|
|
||||||
domain_key = f"DOMAIN_{domain_index}"
|
|
||||||
domain_name = os.environ.get(domain_key)
|
|
||||||
|
|
||||||
if not domain_name:
|
|
||||||
break # Keine weitere Domain-Definition gefunden
|
|
||||||
|
|
||||||
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
|
||||||
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
|
||||||
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
|
||||||
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
|
|
||||||
|
|
||||||
if bucket and usernames:
|
|
||||||
domains[domain_name.lower()] = {
|
|
||||||
"bucket": bucket,
|
|
||||||
"prefix": prefix,
|
|
||||||
"usernames": usernames.split(','),
|
|
||||||
"region": region
|
|
||||||
}
|
|
||||||
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
|
|
||||||
else:
|
|
||||||
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
|
|
||||||
|
|
||||||
domain_index += 1
|
|
||||||
|
|
||||||
return domains
|
|
||||||
|
|
||||||
def load_sync_status():
|
|
||||||
"""Lädt den letzten Synchronisationsstatus"""
|
|
||||||
if STATUS_FILE.exists():
|
|
||||||
try:
|
|
||||||
with open(STATUS_FILE, 'r') as f:
|
|
||||||
status = json.load(f)
|
|
||||||
return status.get('last_sync', {})
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def save_sync_status(last_sync):
|
|
||||||
"""Speichert den aktuellen Synchronisationsstatus"""
|
|
||||||
try:
|
|
||||||
with open(STATUS_FILE, 'w') as f:
|
|
||||||
json.dump({
|
|
||||||
'last_sync': last_sync,
|
|
||||||
'last_sync_time': time.time()
|
|
||||||
}, f)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
|
|
||||||
|
|
||||||
def extract_email_address(address):
|
|
||||||
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
|
||||||
if not address:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Einfacher Fall: nur E-Mail-Adresse
|
|
||||||
if '@' in address and '<' not in address:
|
|
||||||
return address.strip()
|
|
||||||
|
|
||||||
# Komplexer Fall: "Name <email@domain.com>"
|
|
||||||
match = re.search(r'<([^>]+)>', address)
|
|
||||||
if match:
|
|
||||||
return match.group(1)
|
|
||||||
|
|
||||||
# Fallback
|
|
||||||
return address.strip()
|
|
||||||
|
|
||||||
def is_valid_recipient(to_address, domains_config):
|
|
||||||
"""
|
|
||||||
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
|
|
||||||
Gibt ein Tuple zurück: (ist_gültig, domain_name)
|
|
||||||
"""
|
|
||||||
email = extract_email_address(to_address)
|
|
||||||
|
|
||||||
# E-Mail-Adresse aufteilen
|
|
||||||
if not email or '@' not in email:
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
username, domain = email.split('@', 1)
|
|
||||||
|
|
||||||
# Prüfen, ob die Domain in der Konfiguration existiert
|
|
||||||
if domain.lower() in domains_config:
|
|
||||||
domain_config = domains_config[domain.lower()]
|
|
||||||
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
|
|
||||||
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
|
||||||
return True, domain.lower()
|
|
||||||
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
def get_maildir_path(to_address, mail_dir):
|
|
||||||
"""
|
|
||||||
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
|
|
||||||
Format: {mail_dir}/domain.com/user/
|
|
||||||
"""
|
|
||||||
email = extract_email_address(to_address)
|
|
||||||
|
|
||||||
# E-Mail-Adresse aufteilen
|
|
||||||
if '@' in email:
|
|
||||||
user, domain = email.split('@', 1)
|
|
||||||
else:
|
|
||||||
return None # Ungültige E-Mail-Adresse
|
|
||||||
|
|
||||||
# Pfad erstellen
|
|
||||||
mail_dir_path = Path(mail_dir)
|
|
||||||
domain_dir = mail_dir_path / domain
|
|
||||||
user_dir = domain_dir / user
|
|
||||||
|
|
||||||
# Maildir-Struktur sicherstellen
|
|
||||||
for directory in [mail_dir_path, domain_dir, user_dir]:
|
|
||||||
directory.mkdir(parents=True, exist_ok=True)
|
|
||||||
os.chmod(directory, 0o775) # rwxrwxr-x
|
|
||||||
|
|
||||||
# Maildir-Unterverzeichnisse
|
|
||||||
for subdir in ['cur', 'new', 'tmp']:
|
|
||||||
#(user_dir / subdir).mkdir(exist_ok=True)
|
|
||||||
subdir_path = user_dir / subdir
|
|
||||||
subdir_path.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
return user_dir
|
|
||||||
|
|
||||||
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
|
||||||
"""Speichert eine E-Mail im Maildir-Format"""
|
|
||||||
try:
|
|
||||||
# Maildir-Pfad ermitteln
|
|
||||||
maildir = get_maildir_path(to_address, mail_dir)
|
|
||||||
if not maildir:
|
|
||||||
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Eindeutigen Dateinamen generieren
|
|
||||||
timestamp = int(time.time())
|
|
||||||
hostname = 'mail'
|
|
||||||
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
|
||||||
|
|
||||||
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
|
|
||||||
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
|
|
||||||
|
|
||||||
# E-Mail in "new" speichern
|
|
||||||
email_path = maildir / 'new' / filename
|
|
||||||
|
|
||||||
with open(email_path, 'wb') as f:
|
|
||||||
f.write(email_content)
|
|
||||||
|
|
||||||
logger.info(f"E-Mail gespeichert: {email_path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info, auto_delete=True):
|
|
||||||
"""Löscht E-Mails aus dem S3-Bucket"""
|
|
||||||
if not emails_to_delete:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Für API immer automatisch löschen, wenn der Parameter gesetzt ist
|
|
||||||
if not auto_delete:
|
|
||||||
logger.info(f"Automatisches Löschen deaktiviert, {len(emails_to_delete)} E-Mails werden nicht gelöscht.")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Löschung durchführen
|
|
||||||
deleted_count = 0
|
|
||||||
for key in emails_to_delete:
|
|
||||||
try:
|
|
||||||
s3_client.delete_object(Bucket=bucket, Key=key)
|
|
||||||
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
|
|
||||||
deleted_count += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
|
|
||||||
|
|
||||||
return deleted_count
|
|
||||||
|
|
||||||
def process_domain(domain_name, auto_delete=True):
|
|
||||||
"""Verarbeitet eine einzelne Domain"""
|
|
||||||
# Domain-Konfigurationen laden
|
|
||||||
all_domains_config = load_domains_config()
|
|
||||||
|
|
||||||
# Prüfen, ob die Domain konfiguriert ist
|
|
||||||
domain_name = domain_name.lower()
|
|
||||||
if domain_name not in all_domains_config:
|
|
||||||
logger.error(f"Domain {domain_name} nicht konfiguriert")
|
|
||||||
return {
|
|
||||||
"error": f"Domain {domain_name} nicht konfiguriert",
|
|
||||||
"status": "error"
|
|
||||||
}
|
|
||||||
|
|
||||||
domain_config = all_domains_config[domain_name]
|
|
||||||
bucket = domain_config["bucket"]
|
|
||||||
prefix = domain_config["prefix"]
|
|
||||||
region = domain_config["region"]
|
|
||||||
|
|
||||||
logger.info(f"Verarbeite Domain: {domain_name}")
|
|
||||||
logger.info(f" Bucket: {bucket}")
|
|
||||||
logger.info(f" Präfix: {prefix}")
|
|
||||||
logger.info(f" Region: {region}")
|
|
||||||
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
|
|
||||||
logger.info(f" Automatisches Löschen: {auto_delete}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# S3-Client initialisieren
|
|
||||||
s3 = boto3.client('s3', region_name=region)
|
|
||||||
|
|
||||||
# Letzten Synchronisationsstatus laden
|
|
||||||
last_sync = load_sync_status()
|
|
||||||
|
|
||||||
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
|
|
||||||
paginator = s3.get_paginator('list_objects_v2')
|
|
||||||
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
|
||||||
|
|
||||||
new_emails = 0
|
|
||||||
total_emails = 0
|
|
||||||
emails_to_delete = []
|
|
||||||
email_info = {}
|
|
||||||
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
|
|
||||||
|
|
||||||
# Alle Seiten durchlaufen
|
|
||||||
for page in pages:
|
|
||||||
if 'Contents' not in page:
|
|
||||||
continue
|
|
||||||
|
|
||||||
objects = page['Contents']
|
|
||||||
total_emails += len(objects)
|
|
||||||
|
|
||||||
for obj in objects:
|
|
||||||
key = obj['Key']
|
|
||||||
|
|
||||||
# Verzeichnisse überspringen
|
|
||||||
if key.endswith('/'):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Eindeutiger Key für die Synchronisation
|
|
||||||
sync_key = f"{bucket}:{key}"
|
|
||||||
|
|
||||||
# Prüfen, ob die E-Mail bereits synchronisiert wurde
|
|
||||||
if sync_key in domain_last_sync:
|
|
||||||
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
# E-Mail aus S3 laden
|
|
||||||
response = s3.get_object(Bucket=bucket, Key=key)
|
|
||||||
email_content = response['Body'].read()
|
|
||||||
|
|
||||||
# Header parsen
|
|
||||||
try:
|
|
||||||
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
|
|
||||||
to_address = headers.get('To', '')
|
|
||||||
from_address = headers.get('From', '')
|
|
||||||
date = headers.get('Date', '')
|
|
||||||
message_id = headers.get('Message-ID', '')
|
|
||||||
|
|
||||||
# E-Mail-Informationen speichern
|
|
||||||
email_info[key] = {
|
|
||||||
'to': to_address,
|
|
||||||
'from': from_address,
|
|
||||||
'date': date
|
|
||||||
}
|
|
||||||
|
|
||||||
# Alle Domains-Konfigurationen für die Validierung verwenden
|
|
||||||
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
|
|
||||||
|
|
||||||
if is_valid:
|
|
||||||
logger.info(f"Gültige E-Mail für: {to_address}")
|
|
||||||
|
|
||||||
# Nur speichern, wenn die E-Mail zur aktuellen Domain gehört
|
|
||||||
if recipient_domain == domain_name:
|
|
||||||
# E-Mail speichern
|
|
||||||
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
|
||||||
# Status aktualisieren
|
|
||||||
last_sync[sync_key] = {
|
|
||||||
'timestamp': time.time(),
|
|
||||||
'to': to_address,
|
|
||||||
'message_id': message_id,
|
|
||||||
'domain': domain_name,
|
|
||||||
'bucket': bucket
|
|
||||||
}
|
|
||||||
new_emails += 1
|
|
||||||
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
|
|
||||||
|
|
||||||
# Status nach jeweils 10 E-Mails speichern
|
|
||||||
if new_emails % 10 == 0:
|
|
||||||
save_sync_status(last_sync)
|
|
||||||
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
|
|
||||||
else:
|
|
||||||
# Gültige E-Mail, aber für eine andere Domain - nicht löschen!
|
|
||||||
logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)")
|
|
||||||
else:
|
|
||||||
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
|
|
||||||
emails_to_delete.append(key)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
|
|
||||||
emails_to_delete.append(key)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
|
|
||||||
|
|
||||||
# Status speichern
|
|
||||||
save_sync_status(last_sync)
|
|
||||||
|
|
||||||
# Ungültige E-Mails löschen
|
|
||||||
deleted_count = 0
|
|
||||||
if emails_to_delete:
|
|
||||||
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info, auto_delete)
|
|
||||||
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
|
|
||||||
|
|
||||||
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "success",
|
|
||||||
"domain": domain_name,
|
|
||||||
"total_emails": total_emails,
|
|
||||||
"new_emails": new_emails,
|
|
||||||
"emails_to_delete": len(emails_to_delete),
|
|
||||||
"deleted_emails": deleted_count
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
|
|
||||||
import traceback
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "error",
|
|
||||||
"domain": domain_name,
|
|
||||||
"error": str(e),
|
|
||||||
"traceback": traceback.format_exc()
|
|
||||||
}
|
|
||||||
|
|
||||||
# API-Endpunkte
|
|
||||||
|
|
||||||
@app.route('/health', methods=['GET'])
|
|
||||||
def health_check():
|
|
||||||
"""Gesundheitsprüfung für die API"""
|
|
||||||
return jsonify({
|
|
||||||
"status": "OK",
|
|
||||||
"message": "S3 E-Mail Downloader API ist aktiv"
|
|
||||||
})
|
|
||||||
|
|
||||||
@app.route('/sync/<domain>', methods=['POST'])
|
|
||||||
@require_token
|
|
||||||
def sync_domain(domain):
|
|
||||||
"""Synchronisiert E-Mails für eine bestimmte Domain"""
|
|
||||||
# Automatisches Löschen aus den Anfrageparametern
|
|
||||||
auto_delete = request.args.get('auto_delete', 'true').lower() in ['true', '1', 't', 'y', 'yes']
|
|
||||||
|
|
||||||
# Domain-Verarbeitung starten
|
|
||||||
result = process_domain(domain, auto_delete)
|
|
||||||
|
|
||||||
# Erfolg oder Fehler zurückgeben
|
|
||||||
if result.get("status") == "error":
|
|
||||||
return jsonify(result), 500
|
|
||||||
else:
|
|
||||||
return jsonify(result)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Überprüfen, ob ein API-Token gesetzt ist
|
|
||||||
if not API_TOKEN:
|
|
||||||
logger.warning("WARNUNG: Kein API_TOKEN in Umgebungsvariablen definiert. API ist ungeschützt!")
|
|
||||||
|
|
||||||
# Flask-Server starten
|
|
||||||
app.run(host='0.0.0.0', port=5000, debug=False)
|
|
||||||
|
|
@ -0,0 +1,717 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Optimierte S3 E-Mail Processor REST API
|
||||||
|
Mit Kompression, verbesserter Fehlerbehandlung und Request-Tracking
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
|
import gzip
|
||||||
|
import boto3
|
||||||
|
from pathlib import Path
|
||||||
|
from email.parser import BytesParser
|
||||||
|
from email import policy
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from flask import Flask, request, jsonify, abort
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
# .env-Datei laden
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Flask-App initialisieren
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
# Logging konfigurieren mit Request-ID-Support
|
||||||
|
class RequestIDFilter(logging.Filter):
|
||||||
|
def filter(self, record):
|
||||||
|
from flask import g
|
||||||
|
record.request_id = getattr(g, 'request_id', 'no-request')
|
||||||
|
return True
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - [%(request_id)s] - %(name)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler('s3_email_processor_api.log'),
|
||||||
|
logging.StreamHandler()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger("s3-email-processor-api")
|
||||||
|
logger.addFilter(RequestIDFilter())
|
||||||
|
|
||||||
|
# Konfiguration
|
||||||
|
MAIL_DIR = os.environ.get('MAIL_DIR', './mail')
|
||||||
|
API_TOKEN = os.environ.get('API_TOKEN')
|
||||||
|
|
||||||
|
# Request-Tracking für Duplikat-Erkennung
|
||||||
|
processed_requests = {}
|
||||||
|
REQUEST_CACHE_SIZE = 1000
|
||||||
|
REQUEST_CACHE_TTL = 3600 # 1 Stunde
|
||||||
|
|
||||||
|
def require_token(f):
|
||||||
|
@wraps(f)
|
||||||
|
def decorated_function(*args, **kwargs):
|
||||||
|
auth_header = request.headers.get('Authorization')
|
||||||
|
|
||||||
|
if not auth_header or not auth_header.startswith('Bearer '):
|
||||||
|
logger.warning("Fehlender Authorization-Header")
|
||||||
|
abort(401, description="Fehlender Authorization-Header")
|
||||||
|
|
||||||
|
token = auth_header[7:]
|
||||||
|
|
||||||
|
if not API_TOKEN or token != API_TOKEN:
|
||||||
|
logger.warning("Ungültiger API-Token")
|
||||||
|
abort(403, description="Ungültiger API-Token")
|
||||||
|
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
return decorated_function
|
||||||
|
|
||||||
|
def setup_request_context():
|
||||||
|
"""Setzt Request-Kontext für Logging auf"""
|
||||||
|
from flask import g
|
||||||
|
g.request_id = request.headers.get('X-Request-ID', f'req-{int(time.time())}-{id(request)}')
|
||||||
|
|
||||||
|
@app.before_request
|
||||||
|
def before_request():
|
||||||
|
setup_request_context()
|
||||||
|
|
||||||
|
def is_duplicate_request(request_id, etag):
|
||||||
|
"""Prüft, ob Request bereits verarbeitet wurde"""
|
||||||
|
if not request_id or not etag:
|
||||||
|
return False
|
||||||
|
|
||||||
|
key = f"{request_id}:{etag}"
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Cache cleanup
|
||||||
|
if len(processed_requests) > REQUEST_CACHE_SIZE:
|
||||||
|
cutoff_time = current_time - REQUEST_CACHE_TTL
|
||||||
|
processed_requests.clear() # Einfache Lösung: kompletten Cache leeren
|
||||||
|
|
||||||
|
# Duplikat-Check
|
||||||
|
if key in processed_requests:
|
||||||
|
last_processed = processed_requests[key]
|
||||||
|
if current_time - last_processed < REQUEST_CACHE_TTL:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Request als verarbeitet markieren
|
||||||
|
processed_requests[key] = current_time
|
||||||
|
return False
|
||||||
|
|
||||||
|
def load_domains_config():
|
||||||
|
"""Lädt die Domain-Konfiguration aus Umgebungsvariablen"""
|
||||||
|
domains = {}
|
||||||
|
|
||||||
|
domain_index = 1
|
||||||
|
while True:
|
||||||
|
domain_key = f"DOMAIN_{domain_index}"
|
||||||
|
domain_name = os.environ.get(domain_key)
|
||||||
|
|
||||||
|
if not domain_name:
|
||||||
|
break
|
||||||
|
|
||||||
|
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
||||||
|
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
||||||
|
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
||||||
|
region = os.environ.get(f"{domain_key}_REGION", "us-east-2")
|
||||||
|
|
||||||
|
if bucket and usernames:
|
||||||
|
domains[domain_name.lower()] = {
|
||||||
|
"bucket": bucket,
|
||||||
|
"prefix": prefix,
|
||||||
|
"usernames": usernames.split(','),
|
||||||
|
"region": region
|
||||||
|
}
|
||||||
|
logger.info(f"Domain {domain_name} konfiguriert")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Unvollständige Konfiguration für {domain_name}")
|
||||||
|
|
||||||
|
domain_index += 1
|
||||||
|
|
||||||
|
return domains
|
||||||
|
|
||||||
|
def extract_email_address(address):
|
||||||
|
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
||||||
|
if not address:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if '@' in address and '<' not in address:
|
||||||
|
return address.strip()
|
||||||
|
|
||||||
|
match = re.search(r'<([^>]+)>', address)
|
||||||
|
if match:
|
||||||
|
return match.group(1)
|
||||||
|
|
||||||
|
return address.strip()
|
||||||
|
|
||||||
|
def is_valid_recipient(to_address, domains_config):
|
||||||
|
"""Prüft, ob die Empfängeradresse gültig ist"""
|
||||||
|
email = extract_email_address(to_address)
|
||||||
|
|
||||||
|
if not email or '@' not in email:
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
username, domain = email.split('@', 1)
|
||||||
|
|
||||||
|
if domain.lower() in domains_config:
|
||||||
|
domain_config = domains_config[domain.lower()]
|
||||||
|
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
||||||
|
return True, domain.lower()
|
||||||
|
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
def get_maildir_path(to_address, mail_dir):
|
||||||
|
"""Ermittelt den Pfad im Maildir-Format"""
|
||||||
|
email = extract_email_address(to_address)
|
||||||
|
|
||||||
|
if '@' in email:
|
||||||
|
user, domain = email.split('@', 1)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mail_dir_path = Path(mail_dir)
|
||||||
|
domain_dir = mail_dir_path / domain
|
||||||
|
user_dir = domain_dir / user
|
||||||
|
|
||||||
|
# Maildir-Struktur erstellen
|
||||||
|
for directory in [mail_dir_path, domain_dir, user_dir]:
|
||||||
|
directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(directory, 0o775)
|
||||||
|
|
||||||
|
# Maildir-Unterverzeichnisse
|
||||||
|
for subdir in ['cur', 'new', 'tmp']:
|
||||||
|
subdir_path = user_dir / subdir
|
||||||
|
subdir_path.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
return user_dir
|
||||||
|
|
||||||
|
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
||||||
|
"""Speichert eine E-Mail im Maildir-Format"""
|
||||||
|
try:
|
||||||
|
maildir = get_maildir_path(to_address, mail_dir)
|
||||||
|
if not maildir:
|
||||||
|
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Eindeutigen Dateinamen generieren
|
||||||
|
timestamp = int(time.time())
|
||||||
|
hostname = 'mail'
|
||||||
|
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
||||||
|
|
||||||
|
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
|
||||||
|
email_path = maildir / 'new' / filename
|
||||||
|
|
||||||
|
with open(email_path, 'wb') as f:
|
||||||
|
f.write(email_content)
|
||||||
|
|
||||||
|
os.chmod(email_path, 0o664)
|
||||||
|
|
||||||
|
logger.info(f"E-Mail gespeichert: {email_path} ({len(email_content)} Bytes)")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def process_single_email(email_content, bucket, key, domain_name):
|
||||||
|
"""Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung"""
|
||||||
|
try:
|
||||||
|
all_domains_config = load_domains_config()
|
||||||
|
|
||||||
|
domain_name = domain_name.lower()
|
||||||
|
if domain_name not in all_domains_config:
|
||||||
|
logger.error(f"Domain {domain_name} nicht konfiguriert")
|
||||||
|
return {
|
||||||
|
"action": "error",
|
||||||
|
"error": f"Domain {domain_name} nicht konfiguriert",
|
||||||
|
"status": "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Header parsen mit besserer Fehlerbehandlung
|
||||||
|
try:
|
||||||
|
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
|
||||||
|
to_address = headers.get('To', '')
|
||||||
|
from_address = headers.get('From', '')
|
||||||
|
date = headers.get('Date', '')
|
||||||
|
message_id = headers.get('Message-ID', '')
|
||||||
|
subject = headers.get('Subject', '')
|
||||||
|
|
||||||
|
# Header-Validierung
|
||||||
|
if not to_address:
|
||||||
|
logger.warning(f"E-Mail ohne To-Header: {key}")
|
||||||
|
return {
|
||||||
|
"action": "invalid",
|
||||||
|
"error": "Fehlender To-Header",
|
||||||
|
"status": "rejected"
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(f"Verarbeite: '{subject}' von {from_address} an {to_address}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
|
||||||
|
return {
|
||||||
|
"action": "invalid",
|
||||||
|
"error": f"Header-Parsing-Fehler: {str(e)}",
|
||||||
|
"status": "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Empfänger validieren
|
||||||
|
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
|
||||||
|
|
||||||
|
if not is_valid:
|
||||||
|
logger.info(f"Ungültige Empfängeradresse: {to_address}")
|
||||||
|
return {
|
||||||
|
"action": "invalid",
|
||||||
|
"message": f"Ungültige Empfängeradresse: {to_address}",
|
||||||
|
"status": "rejected",
|
||||||
|
"recipient": to_address,
|
||||||
|
"reason": "invalid_recipient"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Domain-Zugehörigkeit prüfen
|
||||||
|
if recipient_domain != domain_name:
|
||||||
|
logger.info(f"E-Mail gehört zu Domain {recipient_domain}, nicht zu {domain_name}")
|
||||||
|
return {
|
||||||
|
"action": "wrong_domain",
|
||||||
|
"message": f"E-Mail gehört zu Domain {recipient_domain}",
|
||||||
|
"status": "skipped",
|
||||||
|
"expected_domain": domain_name,
|
||||||
|
"actual_domain": recipient_domain
|
||||||
|
}
|
||||||
|
|
||||||
|
# E-Mail speichern
|
||||||
|
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
||||||
|
logger.info(f"E-Mail erfolgreich gespeichert für {to_address}")
|
||||||
|
return {
|
||||||
|
"action": "stored",
|
||||||
|
"message": f"E-Mail erfolgreich gespeichert",
|
||||||
|
"status": "success",
|
||||||
|
"recipient": to_address,
|
||||||
|
"sender": from_address,
|
||||||
|
"subject": subject,
|
||||||
|
"size": len(email_content)
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
"action": "error",
|
||||||
|
"error": "Fehler beim Speichern der E-Mail",
|
||||||
|
"status": "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unerwarteter Fehler bei E-Mail {key}: {str(e)}", exc_info=True)
|
||||||
|
return {
|
||||||
|
"action": "error",
|
||||||
|
"error": str(e),
|
||||||
|
"status": "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
# API-Endpunkte
|
||||||
|
|
||||||
|
@app.route('/health', methods=['GET'])
|
||||||
|
def health_check():
|
||||||
|
"""Erweiterte Gesundheitsprüfung"""
|
||||||
|
from flask import g
|
||||||
|
|
||||||
|
# Basis-Gesundheitscheck
|
||||||
|
health_status = {
|
||||||
|
"status": "OK",
|
||||||
|
"message": "S3 E-Mail Processor API ist aktiv",
|
||||||
|
"timestamp": int(time.time()),
|
||||||
|
"version": "2.0",
|
||||||
|
"request_id": getattr(g, 'request_id', 'health-check')
|
||||||
|
}
|
||||||
|
|
||||||
|
# Erweiterte Checks
|
||||||
|
try:
|
||||||
|
# Maildir-Zugriff prüfen
|
||||||
|
mail_path = Path(MAIL_DIR)
|
||||||
|
if mail_path.exists() and mail_path.is_dir():
|
||||||
|
health_status["mail_dir"] = "accessible"
|
||||||
|
else:
|
||||||
|
health_status["mail_dir"] = "not_accessible"
|
||||||
|
health_status["status"] = "WARNING"
|
||||||
|
|
||||||
|
# Domain-Konfiguration prüfen
|
||||||
|
domains = load_domains_config()
|
||||||
|
health_status["configured_domains"] = len(domains)
|
||||||
|
health_status["domains"] = list(domains.keys())
|
||||||
|
|
||||||
|
# Cache-Status
|
||||||
|
health_status["request_cache_size"] = len(processed_requests)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
health_status["status"] = "ERROR"
|
||||||
|
health_status["error"] = str(e)
|
||||||
|
|
||||||
|
return jsonify(health_status)
|
||||||
|
|
||||||
|
@app.route('/process/<domain>', methods=['POST'])
|
||||||
|
@require_token
|
||||||
|
def process_email(domain):
|
||||||
|
"""Verarbeitet eine einzelne E-Mail mit verbesserter Fehlerbehandlung"""
|
||||||
|
from flask import g
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# JSON-Payload validieren
|
||||||
|
if not request.is_json:
|
||||||
|
return jsonify({"error": "Content-Type muss application/json sein"}), 400
|
||||||
|
|
||||||
|
data = request.get_json()
|
||||||
|
|
||||||
|
# Erforderliche Felder prüfen
|
||||||
|
required_fields = ['bucket', 'key', 'email_content', 'domain']
|
||||||
|
missing_fields = [field for field in required_fields if field not in data]
|
||||||
|
if missing_fields:
|
||||||
|
return jsonify({
|
||||||
|
"error": f"Fehlende Felder: {', '.join(missing_fields)}"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
# Duplikat-Check
|
||||||
|
request_id = data.get('request_id', g.request_id)
|
||||||
|
etag = data.get('etag')
|
||||||
|
|
||||||
|
if is_duplicate_request(request_id, etag):
|
||||||
|
logger.info(f"Duplikat-Request erkannt: {request_id}:{etag}")
|
||||||
|
return jsonify({
|
||||||
|
"action": "duplicate",
|
||||||
|
"message": "Request bereits verarbeitet",
|
||||||
|
"status": "skipped",
|
||||||
|
"request_id": request_id
|
||||||
|
})
|
||||||
|
|
||||||
|
# E-Mail-Content dekodieren
|
||||||
|
try:
|
||||||
|
email_base64 = data['email_content']
|
||||||
|
|
||||||
|
# Kompression-Support
|
||||||
|
if data.get('compressed', False):
|
||||||
|
compressed_data = base64.b64decode(email_base64)
|
||||||
|
email_content = gzip.decompress(compressed_data)
|
||||||
|
logger.info(f"E-Mail dekomprimiert: {data.get('compressed_size', 0)} -> {len(email_content)} Bytes")
|
||||||
|
else:
|
||||||
|
email_content = base64.b64decode(email_base64)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler beim Dekodieren des E-Mail-Contents: {str(e)}")
|
||||||
|
return jsonify({
|
||||||
|
"error": f"Content-Dekodierung fehlgeschlagen: {str(e)}"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
# Größen-Validierung
|
||||||
|
email_size = len(email_content)
|
||||||
|
max_size = 25 * 1024 * 1024 # 25MB
|
||||||
|
if email_size > max_size:
|
||||||
|
logger.warning(f"E-Mail zu groß: {email_size} Bytes")
|
||||||
|
return jsonify({
|
||||||
|
"action": "too_large",
|
||||||
|
"error": f"E-Mail zu groß: {email_size} Bytes (max: {max_size})",
|
||||||
|
"status": "rejected"
|
||||||
|
}), 413
|
||||||
|
|
||||||
|
# E-Mail verarbeiten
|
||||||
|
logger.info(f"Verarbeite E-Mail: {data['key']} ({email_size} Bytes)")
|
||||||
|
result = process_single_email(
|
||||||
|
email_content=email_content,
|
||||||
|
bucket=data['bucket'],
|
||||||
|
key=data['key'],
|
||||||
|
domain_name=domain
|
||||||
|
)
|
||||||
|
|
||||||
|
# Performance-Metriken hinzufügen
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
result.update({
|
||||||
|
"processing_time_ms": round(processing_time * 1000, 2),
|
||||||
|
"request_id": request_id,
|
||||||
|
"email_size": email_size
|
||||||
|
})
|
||||||
|
|
||||||
|
# Log-Level basierend auf Ergebnis
|
||||||
|
if result.get("status") == "success":
|
||||||
|
logger.info(f"E-Mail erfolgreich verarbeitet in {processing_time:.2f}s")
|
||||||
|
elif result.get("status") in ["rejected", "skipped"]:
|
||||||
|
logger.info(f"E-Mail {result.get('action')}: {result.get('message')}")
|
||||||
|
else:
|
||||||
|
logger.error(f"E-Mail-Verarbeitung fehlgeschlagen: {result.get('error')}")
|
||||||
|
|
||||||
|
# HTTP-Status basierend auf Ergebnis
|
||||||
|
if result.get("status") == "error":
|
||||||
|
return jsonify(result), 500
|
||||||
|
elif result.get("action") == "too_large":
|
||||||
|
return jsonify(result), 413
|
||||||
|
else:
|
||||||
|
return jsonify(result), 200
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
logger.error(f"Unerwarteter Fehler nach {processing_time:.2f}s: {str(e)}", exc_info=True)
|
||||||
|
return jsonify({
|
||||||
|
"error": "Interner Server-Fehler",
|
||||||
|
"details": str(e),
|
||||||
|
"processing_time_ms": round(processing_time * 1000, 2),
|
||||||
|
"request_id": getattr(g, 'request_id', 'unknown')
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
@app.route('/retry/<email_id>/<domain>', methods=['GET'])
|
||||||
|
@require_token
|
||||||
|
def retry_single_email(email_id, domain):
|
||||||
|
"""
|
||||||
|
Retry-Endpunkt für einzelne E-Mail basierend auf ID
|
||||||
|
Sucht die E-Mail in S3 anhand der ID und verarbeitet sie erneut
|
||||||
|
"""
|
||||||
|
from flask import g
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
logger.info(f"Retry-Request für E-Mail-ID: {email_id} in Domain: {domain}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Domain-Konfiguration laden
|
||||||
|
all_domains_config = load_domains_config()
|
||||||
|
domain_name = domain.lower()
|
||||||
|
|
||||||
|
if domain_name not in all_domains_config:
|
||||||
|
logger.error(f"Domain {domain_name} nicht konfiguriert")
|
||||||
|
return jsonify({
|
||||||
|
"error": f"Domain {domain_name} nicht konfiguriert",
|
||||||
|
"status": "error",
|
||||||
|
"email_id": email_id
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
domain_config = all_domains_config[domain_name]
|
||||||
|
bucket = domain_config["bucket"]
|
||||||
|
prefix = domain_config["prefix"]
|
||||||
|
region = domain_config["region"]
|
||||||
|
|
||||||
|
# S3-Client initialisieren
|
||||||
|
s3_client = boto3.client('s3', region_name=region)
|
||||||
|
|
||||||
|
# E-Mail anhand der ID suchen
|
||||||
|
logger.info(f"Suche E-Mail mit ID {email_id} in Bucket {bucket}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Paginator für alle Objekte im Bucket
|
||||||
|
paginator = s3_client.get_paginator('list_objects_v2')
|
||||||
|
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
||||||
|
|
||||||
|
found_key = None
|
||||||
|
|
||||||
|
for page in pages:
|
||||||
|
if 'Contents' not in page:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for obj in page['Contents']:
|
||||||
|
key = obj['Key']
|
||||||
|
|
||||||
|
# Verzeichnisse überspringen
|
||||||
|
if key.endswith('/'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# E-Mail-ID aus dem Key extrahieren (oft Teil des Dateinamens)
|
||||||
|
# Verschiedene Möglichkeiten prüfen:
|
||||||
|
# 1. ID ist Teil des Dateinamens
|
||||||
|
if email_id in key:
|
||||||
|
found_key = key
|
||||||
|
break
|
||||||
|
|
||||||
|
# 2. ID könnte in Message-ID der E-Mail sein - Header laden und prüfen
|
||||||
|
try:
|
||||||
|
# Nur Header laden für Performance
|
||||||
|
response = s3_client.get_object(
|
||||||
|
Bucket=bucket,
|
||||||
|
Key=key,
|
||||||
|
Range='bytes=0-2048' # Nur erste 2KB für Header
|
||||||
|
)
|
||||||
|
header_content = response['Body'].read()
|
||||||
|
|
||||||
|
# Nach Message-ID suchen
|
||||||
|
if email_id.encode() in header_content:
|
||||||
|
found_key = key
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as header_e:
|
||||||
|
# Header-Check fehlgeschlagen, weiter mit nächster E-Mail
|
||||||
|
logger.debug(f"Header-Check für {key} fehlgeschlagen: {str(header_e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if found_key:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not found_key:
|
||||||
|
logger.warning(f"E-Mail mit ID {email_id} nicht gefunden")
|
||||||
|
return jsonify({
|
||||||
|
"error": f"E-Mail mit ID {email_id} nicht gefunden",
|
||||||
|
"status": "not_found",
|
||||||
|
"email_id": email_id,
|
||||||
|
"searched_bucket": bucket,
|
||||||
|
"searched_prefix": prefix
|
||||||
|
}), 404
|
||||||
|
|
||||||
|
logger.info(f"E-Mail gefunden: {found_key}")
|
||||||
|
|
||||||
|
except Exception as search_e:
|
||||||
|
logger.error(f"Fehler beim Suchen der E-Mail: {str(search_e)}")
|
||||||
|
return jsonify({
|
||||||
|
"error": f"Fehler beim Suchen: {str(search_e)}",
|
||||||
|
"status": "error",
|
||||||
|
"email_id": email_id
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
# E-Mail laden und verarbeiten
|
||||||
|
try:
|
||||||
|
response = s3_client.get_object(Bucket=bucket, Key=found_key)
|
||||||
|
email_content = response['Body'].read()
|
||||||
|
|
||||||
|
logger.info(f"E-Mail geladen: {len(email_content)} Bytes")
|
||||||
|
|
||||||
|
# E-Mail verarbeiten (gleiche Logik wie bei process_email)
|
||||||
|
result = process_single_email(
|
||||||
|
email_content=email_content,
|
||||||
|
bucket=bucket,
|
||||||
|
key=found_key,
|
||||||
|
domain_name=domain_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Performance-Metriken hinzufügen
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
result.update({
|
||||||
|
"processing_time_ms": round(processing_time * 1000, 2),
|
||||||
|
"request_id": getattr(g, 'request_id', 'retry-request'),
|
||||||
|
"email_size": len(email_content),
|
||||||
|
"email_id": email_id,
|
||||||
|
"s3_key": found_key,
|
||||||
|
"retry": True
|
||||||
|
})
|
||||||
|
|
||||||
|
# Bei erfolgreichem Processing E-Mail aus S3 löschen
|
||||||
|
if result.get('action') == 'stored':
|
||||||
|
try:
|
||||||
|
s3_client.delete_object(Bucket=bucket, Key=found_key)
|
||||||
|
logger.info(f"E-Mail nach erfolgreichem Retry aus S3 gelöscht: {found_key}")
|
||||||
|
result["s3_deleted"] = True
|
||||||
|
except Exception as delete_e:
|
||||||
|
logger.warning(f"Konnte E-Mail nach Retry nicht löschen: {str(delete_e)}")
|
||||||
|
result["s3_deleted"] = False
|
||||||
|
|
||||||
|
# Logging basierend auf Ergebnis
|
||||||
|
if result.get("status") == "success":
|
||||||
|
logger.info(f"Retry erfolgreich für E-Mail-ID {email_id} in {processing_time:.2f}s")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Retry für E-Mail-ID {email_id} nicht erfolgreich: {result.get('message')}")
|
||||||
|
|
||||||
|
# HTTP-Status basierend auf Ergebnis
|
||||||
|
if result.get("status") == "error":
|
||||||
|
return jsonify(result), 500
|
||||||
|
else:
|
||||||
|
return jsonify(result), 200
|
||||||
|
|
||||||
|
except Exception as process_e:
|
||||||
|
logger.error(f"Fehler beim Laden/Verarbeiten der E-Mail {found_key}: {str(process_e)}")
|
||||||
|
return jsonify({
|
||||||
|
"error": f"Verarbeitungsfehler: {str(process_e)}",
|
||||||
|
"status": "error",
|
||||||
|
"email_id": email_id,
|
||||||
|
"s3_key": found_key
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
logger.error(f"Unerwarteter Fehler bei Retry nach {processing_time:.2f}s: {str(e)}", exc_info=True)
|
||||||
|
return jsonify({
|
||||||
|
"error": "Interner Server-Fehler beim Retry",
|
||||||
|
"details": str(e),
|
||||||
|
"processing_time_ms": round(processing_time * 1000, 2),
|
||||||
|
"request_id": getattr(g, 'request_id', 'unknown'),
|
||||||
|
"email_id": email_id
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
@app.route('/stats', methods=['GET'])
|
||||||
|
@require_token
|
||||||
|
def get_stats():
|
||||||
|
"""API-Statistiken und Metriken"""
|
||||||
|
try:
|
||||||
|
stats = {
|
||||||
|
"api_version": "2.0",
|
||||||
|
"uptime_seconds": int(time.time() - app.start_time) if hasattr(app, 'start_time') else 0,
|
||||||
|
"request_cache": {
|
||||||
|
"size": len(processed_requests),
|
||||||
|
"max_size": REQUEST_CACHE_SIZE,
|
||||||
|
"ttl_seconds": REQUEST_CACHE_TTL
|
||||||
|
},
|
||||||
|
"configuration": {
|
||||||
|
"mail_dir": MAIL_DIR,
|
||||||
|
"domains": len(load_domains_config())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Maildir-Statistiken
|
||||||
|
try:
|
||||||
|
mail_path = Path(MAIL_DIR)
|
||||||
|
if mail_path.exists():
|
||||||
|
domain_stats = {}
|
||||||
|
for domain_dir in mail_path.iterdir():
|
||||||
|
if domain_dir.is_dir():
|
||||||
|
user_count = len([d for d in domain_dir.iterdir() if d.is_dir()])
|
||||||
|
domain_stats[domain_dir.name] = {"users": user_count}
|
||||||
|
|
||||||
|
stats["maildir_stats"] = domain_stats
|
||||||
|
except Exception as e:
|
||||||
|
stats["maildir_error"] = str(e)
|
||||||
|
|
||||||
|
return jsonify(stats)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return jsonify({"error": str(e)}), 500
|
||||||
|
|
||||||
|
# Cache-Cleanup-Task (läuft alle 10 Minuten)
|
||||||
|
import threading
|
||||||
|
|
||||||
|
def cleanup_cache():
|
||||||
|
"""Bereinigt den Request-Cache periodisch"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
current_time = time.time()
|
||||||
|
cutoff_time = current_time - REQUEST_CACHE_TTL
|
||||||
|
|
||||||
|
# Alte Einträge entfernen
|
||||||
|
keys_to_remove = [
|
||||||
|
key for key, timestamp in processed_requests.items()
|
||||||
|
if timestamp < cutoff_time
|
||||||
|
]
|
||||||
|
|
||||||
|
for key in keys_to_remove:
|
||||||
|
processed_requests.pop(key, None)
|
||||||
|
|
||||||
|
if keys_to_remove:
|
||||||
|
logger.info(f"Cache bereinigt: {len(keys_to_remove)} alte Einträge entfernt")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Cache-Bereinigung: {str(e)}")
|
||||||
|
|
||||||
|
# 10 Minuten warten
|
||||||
|
time.sleep(600)
|
||||||
|
|
||||||
|
# Hintergrund-Thread für Cache-Bereinigung starten
|
||||||
|
cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True)
|
||||||
|
cleanup_thread.start()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Start-Zeit für Uptime-Tracking
|
||||||
|
app.start_time = time.time()
|
||||||
|
|
||||||
|
# Überprüfungen beim Start
|
||||||
|
if not API_TOKEN:
|
||||||
|
logger.warning("WARNUNG: Kein API_TOKEN definiert!")
|
||||||
|
|
||||||
|
domains = load_domains_config()
|
||||||
|
logger.info(f"API startet mit {len(domains)} konfigurierten Domains")
|
||||||
|
|
||||||
|
# Server starten
|
||||||
|
app.run(host='0.0.0.0', port=5000, debug=False)
|
||||||
|
|
@ -64,7 +64,7 @@ if ! grep -q "API_TOKEN" "$ENV_FILE"; then
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Prüfen, ob das Python-Script existiert
|
# Prüfen, ob das Python-Script existiert
|
||||||
API_SCRIPT="s3_email_downloader_api.py"
|
API_SCRIPT="s3_email_processor_api.py"
|
||||||
if [ ! -f "$API_SCRIPT" ]; then
|
if [ ! -f "$API_SCRIPT" ]; then
|
||||||
echo "Fehler: $API_SCRIPT nicht gefunden!"
|
echo "Fehler: $API_SCRIPT nicht gefunden!"
|
||||||
exit 1
|
exit 1
|
||||||
|
|
@ -72,4 +72,4 @@ fi
|
||||||
|
|
||||||
# API im Produktionsmodus mit Gunicorn starten
|
# API im Produktionsmodus mit Gunicorn starten
|
||||||
echo "Starte S3 Email Downloader API auf Port $PORT..."
|
echo "Starte S3 Email Downloader API auf Port $PORT..."
|
||||||
exec gunicorn --bind "0.0.0.0:$PORT" --workers 2 "s3_email_downloader_api:app"
|
exec gunicorn --bind "0.0.0.0:$PORT" --workers 2 "s3_email_processor_api:app"
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue