442 lines
16 KiB
Python
Executable File
442 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
S3 E-Mail Downloader REST API
|
|
|
|
Diese API ermöglicht den Abruf von E-Mails aus S3-Buckets für eine bestimmte Domain.
|
|
Die E-Mails werden im Maildir-Format für Dovecot gespeichert.
|
|
|
|
Endpunkte:
|
|
GET /health - API-Statusprüfung
|
|
POST /sync/{domain} - Synchronisiert E-Mails für eine bestimmte Domain
|
|
|
|
Authentifizierung:
|
|
Der Endpunkt /sync/{domain} erfordert einen gültigen API-Token im Header.
|
|
"""
|
|
|
|
import boto3
|
|
import os
|
|
import time
|
|
import logging
|
|
import json
|
|
import re
|
|
import hashlib
|
|
import sys
|
|
from pathlib import Path
|
|
from email.parser import BytesParser
|
|
from email import policy
|
|
from dotenv import load_dotenv
|
|
from flask import Flask, request, jsonify, abort
|
|
from functools import wraps
|
|
|
|
# .env-Datei laden
|
|
load_dotenv()
|
|
|
|
# Flask-App initialisieren
|
|
app = Flask(__name__)
|
|
|
|
# Logging konfigurieren
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('s3_email_downloader_api.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger("s3-email-downloader-api")
|
|
|
|
# Hauptkonfiguration
|
|
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
|
|
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region
|
|
API_TOKEN = os.environ.get('API_TOKEN') # API-Token für die Authentifizierung
|
|
|
|
# Status-Datei für die Synchronisation
|
|
STATUS_FILE = Path('sync_status.json')
|
|
|
|
# Token-Authentifizierung
|
|
def require_token(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
auth_header = request.headers.get('Authorization')
|
|
|
|
if not auth_header or not auth_header.startswith('Bearer '):
|
|
logger.warning("Fehlender oder falsch formatierter Authorization-Header")
|
|
abort(401, description="Fehlender Authorization-Header")
|
|
|
|
token = auth_header[7:] # "Bearer " entfernen
|
|
|
|
if not API_TOKEN or token != API_TOKEN:
|
|
logger.warning("Ungültiger API-Token")
|
|
abort(403, description="Ungültiger API-Token")
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def load_domains_config():
|
|
"""Lädt die Domain-Konfiguration aus Umgebungsvariablen"""
|
|
domains = {}
|
|
|
|
# Konfiguration aus Umgebungsvariablen laden
|
|
domain_index = 1
|
|
while True:
|
|
domain_key = f"DOMAIN_{domain_index}"
|
|
domain_name = os.environ.get(domain_key)
|
|
|
|
if not domain_name:
|
|
break # Keine weitere Domain-Definition gefunden
|
|
|
|
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
|
|
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
|
|
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
|
|
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
|
|
|
|
if bucket and usernames:
|
|
domains[domain_name.lower()] = {
|
|
"bucket": bucket,
|
|
"prefix": prefix,
|
|
"usernames": usernames.split(','),
|
|
"region": region
|
|
}
|
|
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
|
|
else:
|
|
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
|
|
|
|
domain_index += 1
|
|
|
|
return domains
|
|
|
|
def load_sync_status():
|
|
"""Lädt den letzten Synchronisationsstatus"""
|
|
if STATUS_FILE.exists():
|
|
try:
|
|
with open(STATUS_FILE, 'r') as f:
|
|
status = json.load(f)
|
|
return status.get('last_sync', {})
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
|
|
|
|
return {}
|
|
|
|
def save_sync_status(last_sync):
|
|
"""Speichert den aktuellen Synchronisationsstatus"""
|
|
try:
|
|
with open(STATUS_FILE, 'w') as f:
|
|
json.dump({
|
|
'last_sync': last_sync,
|
|
'last_sync_time': time.time()
|
|
}, f)
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
|
|
|
|
def extract_email_address(address):
|
|
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
|
|
if not address:
|
|
return None
|
|
|
|
# Einfacher Fall: nur E-Mail-Adresse
|
|
if '@' in address and '<' not in address:
|
|
return address.strip()
|
|
|
|
# Komplexer Fall: "Name <email@domain.com>"
|
|
match = re.search(r'<([^>]+)>', address)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Fallback
|
|
return address.strip()
|
|
|
|
def is_valid_recipient(to_address, domains_config):
|
|
"""
|
|
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
|
|
Gibt ein Tuple zurück: (ist_gültig, domain_name)
|
|
"""
|
|
email = extract_email_address(to_address)
|
|
|
|
# E-Mail-Adresse aufteilen
|
|
if not email or '@' not in email:
|
|
return False, None
|
|
|
|
username, domain = email.split('@', 1)
|
|
|
|
# Prüfen, ob die Domain in der Konfiguration existiert
|
|
if domain.lower() in domains_config:
|
|
domain_config = domains_config[domain.lower()]
|
|
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
|
|
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
|
|
return True, domain.lower()
|
|
|
|
return False, None
|
|
|
|
def get_maildir_path(to_address, mail_dir):
|
|
"""
|
|
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
|
|
Format: {mail_dir}/domain.com/user/
|
|
"""
|
|
email = extract_email_address(to_address)
|
|
|
|
# E-Mail-Adresse aufteilen
|
|
if '@' in email:
|
|
user, domain = email.split('@', 1)
|
|
else:
|
|
return None # Ungültige E-Mail-Adresse
|
|
|
|
# Pfad erstellen
|
|
mail_dir_path = Path(mail_dir)
|
|
domain_dir = mail_dir_path / domain
|
|
user_dir = domain_dir / user
|
|
|
|
# Maildir-Struktur sicherstellen
|
|
for directory in [mail_dir_path, domain_dir, user_dir]:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
os.chmod(directory, 0o775) # rwxrwxr-x
|
|
|
|
# Maildir-Unterverzeichnisse
|
|
for subdir in ['cur', 'new', 'tmp']:
|
|
#(user_dir / subdir).mkdir(exist_ok=True)
|
|
subdir_path = user_dir / subdir
|
|
subdir_path.mkdir(exist_ok=True)
|
|
|
|
return user_dir
|
|
|
|
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
|
|
"""Speichert eine E-Mail im Maildir-Format"""
|
|
try:
|
|
# Maildir-Pfad ermitteln
|
|
maildir = get_maildir_path(to_address, mail_dir)
|
|
if not maildir:
|
|
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
|
|
return False
|
|
|
|
# Eindeutigen Dateinamen generieren
|
|
timestamp = int(time.time())
|
|
hostname = 'mail'
|
|
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
|
|
|
|
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
|
|
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
|
|
|
|
# E-Mail in "new" speichern
|
|
email_path = maildir / 'new' / filename
|
|
|
|
with open(email_path, 'wb') as f:
|
|
f.write(email_content)
|
|
|
|
logger.info(f"E-Mail gespeichert: {email_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
|
|
return False
|
|
|
|
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info, auto_delete=True):
|
|
"""Löscht E-Mails aus dem S3-Bucket"""
|
|
if not emails_to_delete:
|
|
return 0
|
|
|
|
# Für API immer automatisch löschen, wenn der Parameter gesetzt ist
|
|
if not auto_delete:
|
|
logger.info(f"Automatisches Löschen deaktiviert, {len(emails_to_delete)} E-Mails werden nicht gelöscht.")
|
|
return 0
|
|
|
|
# Löschung durchführen
|
|
deleted_count = 0
|
|
for key in emails_to_delete:
|
|
try:
|
|
s3_client.delete_object(Bucket=bucket, Key=key)
|
|
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
|
|
|
|
return deleted_count
|
|
|
|
def process_domain(domain_name, auto_delete=True):
|
|
"""Verarbeitet eine einzelne Domain"""
|
|
# Domain-Konfigurationen laden
|
|
all_domains_config = load_domains_config()
|
|
|
|
# Prüfen, ob die Domain konfiguriert ist
|
|
domain_name = domain_name.lower()
|
|
if domain_name not in all_domains_config:
|
|
logger.error(f"Domain {domain_name} nicht konfiguriert")
|
|
return {
|
|
"error": f"Domain {domain_name} nicht konfiguriert",
|
|
"status": "error"
|
|
}
|
|
|
|
domain_config = all_domains_config[domain_name]
|
|
bucket = domain_config["bucket"]
|
|
prefix = domain_config["prefix"]
|
|
region = domain_config["region"]
|
|
|
|
logger.info(f"Verarbeite Domain: {domain_name}")
|
|
logger.info(f" Bucket: {bucket}")
|
|
logger.info(f" Präfix: {prefix}")
|
|
logger.info(f" Region: {region}")
|
|
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
|
|
logger.info(f" Automatisches Löschen: {auto_delete}")
|
|
|
|
try:
|
|
# S3-Client initialisieren
|
|
s3 = boto3.client('s3', region_name=region)
|
|
|
|
# Letzten Synchronisationsstatus laden
|
|
last_sync = load_sync_status()
|
|
|
|
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
|
|
paginator = s3.get_paginator('list_objects_v2')
|
|
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
|
|
|
new_emails = 0
|
|
total_emails = 0
|
|
emails_to_delete = []
|
|
email_info = {}
|
|
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
|
|
|
|
# Alle Seiten durchlaufen
|
|
for page in pages:
|
|
if 'Contents' not in page:
|
|
continue
|
|
|
|
objects = page['Contents']
|
|
total_emails += len(objects)
|
|
|
|
for obj in objects:
|
|
key = obj['Key']
|
|
|
|
# Verzeichnisse überspringen
|
|
if key.endswith('/'):
|
|
continue
|
|
|
|
# Eindeutiger Key für die Synchronisation
|
|
sync_key = f"{bucket}:{key}"
|
|
|
|
# Prüfen, ob die E-Mail bereits synchronisiert wurde
|
|
if sync_key in domain_last_sync:
|
|
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
|
|
continue
|
|
|
|
try:
|
|
# E-Mail aus S3 laden
|
|
response = s3.get_object(Bucket=bucket, Key=key)
|
|
email_content = response['Body'].read()
|
|
|
|
# Header parsen
|
|
try:
|
|
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
|
|
to_address = headers.get('To', '')
|
|
from_address = headers.get('From', '')
|
|
date = headers.get('Date', '')
|
|
message_id = headers.get('Message-ID', '')
|
|
|
|
# E-Mail-Informationen speichern
|
|
email_info[key] = {
|
|
'to': to_address,
|
|
'from': from_address,
|
|
'date': date
|
|
}
|
|
|
|
# Alle Domains-Konfigurationen für die Validierung verwenden
|
|
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
|
|
|
|
if is_valid:
|
|
logger.info(f"Gültige E-Mail für: {to_address}")
|
|
|
|
# Nur speichern, wenn die E-Mail zur aktuellen Domain gehört
|
|
if recipient_domain == domain_name:
|
|
# E-Mail speichern
|
|
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
|
|
# Status aktualisieren
|
|
last_sync[sync_key] = {
|
|
'timestamp': time.time(),
|
|
'to': to_address,
|
|
'message_id': message_id,
|
|
'domain': domain_name,
|
|
'bucket': bucket
|
|
}
|
|
new_emails += 1
|
|
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
|
|
|
|
# Status nach jeweils 10 E-Mails speichern
|
|
if new_emails % 10 == 0:
|
|
save_sync_status(last_sync)
|
|
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
|
|
else:
|
|
# Gültige E-Mail, aber für eine andere Domain - nicht löschen!
|
|
logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)")
|
|
else:
|
|
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
|
|
emails_to_delete.append(key)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
|
|
emails_to_delete.append(key)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
|
|
|
|
# Status speichern
|
|
save_sync_status(last_sync)
|
|
|
|
# Ungültige E-Mails löschen
|
|
deleted_count = 0
|
|
if emails_to_delete:
|
|
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info, auto_delete)
|
|
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
|
|
|
|
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
|
|
|
|
return {
|
|
"status": "success",
|
|
"domain": domain_name,
|
|
"total_emails": total_emails,
|
|
"new_emails": new_emails,
|
|
"emails_to_delete": len(emails_to_delete),
|
|
"deleted_emails": deleted_count
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
return {
|
|
"status": "error",
|
|
"domain": domain_name,
|
|
"error": str(e),
|
|
"traceback": traceback.format_exc()
|
|
}
|
|
|
|
# API-Endpunkte
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Gesundheitsprüfung für die API"""
|
|
return jsonify({
|
|
"status": "OK",
|
|
"message": "S3 E-Mail Downloader API ist aktiv"
|
|
})
|
|
|
|
@app.route('/sync/<domain>', methods=['POST'])
|
|
@require_token
|
|
def sync_domain(domain):
|
|
"""Synchronisiert E-Mails für eine bestimmte Domain"""
|
|
# Automatisches Löschen aus den Anfrageparametern
|
|
auto_delete = request.args.get('auto_delete', 'true').lower() in ['true', '1', 't', 'y', 'yes']
|
|
|
|
# Domain-Verarbeitung starten
|
|
result = process_domain(domain, auto_delete)
|
|
|
|
# Erfolg oder Fehler zurückgeben
|
|
if result.get("status") == "error":
|
|
return jsonify(result), 500
|
|
else:
|
|
return jsonify(result)
|
|
|
|
if __name__ == "__main__":
|
|
# Überprüfen, ob ein API-Token gesetzt ist
|
|
if not API_TOKEN:
|
|
logger.warning("WARNUNG: Kein API_TOKEN in Umgebungsvariablen definiert. API ist ungeschützt!")
|
|
|
|
# Flask-Server starten
|
|
app.run(host='0.0.0.0', port=5000, debug=False) |