s3 downloader API + startScript

This commit is contained in:
Andreas Knuth 2025-03-22 22:04:18 +01:00
parent ba36b6753a
commit 7873002167
4 changed files with 517 additions and 1 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
*.jar *.jar
auth auth
.env .env
.venv*
__pycache__

View File

@ -0,0 +1,439 @@
#!/usr/bin/env python3
"""
S3 E-Mail Downloader REST API
Diese API ermöglicht den Abruf von E-Mails aus S3-Buckets für eine bestimmte Domain.
Die E-Mails werden im Maildir-Format für Dovecot gespeichert.
Endpunkte:
GET /health - API-Statusprüfung
POST /sync/{domain} - Synchronisiert E-Mails für eine bestimmte Domain
Authentifizierung:
Der Endpunkt /sync/{domain} erfordert einen gültigen API-Token im Header.
"""
import boto3
import os
import time
import logging
import json
import re
import hashlib
import sys
from pathlib import Path
from email.parser import BytesParser
from email import policy
from dotenv import load_dotenv
from flask import Flask, request, jsonify, abort
from functools import wraps
# .env-Datei laden
load_dotenv()
# Flask-App initialisieren
app = Flask(__name__)
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('s3_email_downloader_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("s3-email-downloader-api")
# Hauptkonfiguration
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') # Standard-Region
API_TOKEN = os.environ.get('API_TOKEN') # API-Token für die Authentifizierung
# Status-Datei für die Synchronisation
STATUS_FILE = Path('sync_status.json')
# Token-Authentifizierung
def require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.warning("Fehlender oder falsch formatierter Authorization-Header")
abort(401, description="Fehlender Authorization-Header")
token = auth_header[7:] # "Bearer " entfernen
if not API_TOKEN or token != API_TOKEN:
logger.warning("Ungültiger API-Token")
abort(403, description="Ungültiger API-Token")
return f(*args, **kwargs)
return decorated_function
def load_domains_config():
"""Lädt die Domain-Konfiguration aus Umgebungsvariablen"""
domains = {}
# Konfiguration aus Umgebungsvariablen laden
domain_index = 1
while True:
domain_key = f"DOMAIN_{domain_index}"
domain_name = os.environ.get(domain_key)
if not domain_name:
break # Keine weitere Domain-Definition gefunden
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
if bucket and usernames:
domains[domain_name.lower()] = {
"bucket": bucket,
"prefix": prefix,
"usernames": usernames.split(','),
"region": region
}
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
else:
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
domain_index += 1
return domains
def load_sync_status():
"""Lädt den letzten Synchronisationsstatus"""
if STATUS_FILE.exists():
try:
with open(STATUS_FILE, 'r') as f:
status = json.load(f)
return status.get('last_sync', {})
except Exception as e:
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
return {}
def save_sync_status(last_sync):
"""Speichert den aktuellen Synchronisationsstatus"""
try:
with open(STATUS_FILE, 'w') as f:
json.dump({
'last_sync': last_sync,
'last_sync_time': time.time()
}, f)
except Exception as e:
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
def extract_email_address(address):
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
if not address:
return None
# Einfacher Fall: nur E-Mail-Adresse
if '@' in address and '<' not in address:
return address.strip()
# Komplexer Fall: "Name <email@domain.com>"
match = re.search(r'<([^>]+)>', address)
if match:
return match.group(1)
# Fallback
return address.strip()
def is_valid_recipient(to_address, domains_config):
"""
Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)
Gibt ein Tuple zurück: (ist_gültig, domain_name)
"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if not email or '@' not in email:
return False, None
username, domain = email.split('@', 1)
# Prüfen, ob die Domain in der Konfiguration existiert
if domain.lower() in domains_config:
domain_config = domains_config[domain.lower()]
# Prüfen, ob der Benutzername in der Liste der gültigen Benutzernamen ist
if username.lower() in [u.lower() for u in domain_config["usernames"]]:
return True, domain.lower()
return False, None
def get_maildir_path(to_address, mail_dir):
"""
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
Format: {mail_dir}/domain.com/user/
"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if '@' in email:
user, domain = email.split('@', 1)
else:
return None # Ungültige E-Mail-Adresse
# Pfad erstellen
mail_dir_path = Path(mail_dir)
domain_dir = mail_dir_path / domain
user_dir = domain_dir / user
# Maildir-Struktur sicherstellen
for directory in [mail_dir_path, domain_dir, user_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Maildir-Unterverzeichnisse
for subdir in ['cur', 'new', 'tmp']:
(user_dir / subdir).mkdir(exist_ok=True)
return user_dir
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
"""Speichert eine E-Mail im Maildir-Format"""
try:
# Maildir-Pfad ermitteln
maildir = get_maildir_path(to_address, mail_dir)
if not maildir:
logger.error(f"Konnte Maildir für {to_address} nicht ermitteln")
return False
# Eindeutigen Dateinamen generieren
timestamp = int(time.time())
hostname = 'mail'
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
# E-Mail in "new" speichern
email_path = maildir / 'new' / filename
with open(email_path, 'wb') as f:
f.write(email_content)
logger.info(f"E-Mail gespeichert: {email_path}")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
return False
def delete_s3_emails(s3_client, bucket, emails_to_delete, email_info, auto_delete=True):
"""Löscht E-Mails aus dem S3-Bucket"""
if not emails_to_delete:
return 0
# Für API immer automatisch löschen, wenn der Parameter gesetzt ist
if not auto_delete:
logger.info(f"Automatisches Löschen deaktiviert, {len(emails_to_delete)} E-Mails werden nicht gelöscht.")
return 0
# Löschung durchführen
deleted_count = 0
for key in emails_to_delete:
try:
s3_client.delete_object(Bucket=bucket, Key=key)
logger.info(f"E-Mail gelöscht: {key} aus Bucket {bucket}")
deleted_count += 1
except Exception as e:
logger.error(f"Fehler beim Löschen der E-Mail {key} aus Bucket {bucket}: {str(e)}")
return deleted_count
def process_domain(domain_name, auto_delete=True):
"""Verarbeitet eine einzelne Domain"""
# Domain-Konfigurationen laden
all_domains_config = load_domains_config()
# Prüfen, ob die Domain konfiguriert ist
domain_name = domain_name.lower()
if domain_name not in all_domains_config:
logger.error(f"Domain {domain_name} nicht konfiguriert")
return {
"error": f"Domain {domain_name} nicht konfiguriert",
"status": "error"
}
domain_config = all_domains_config[domain_name]
bucket = domain_config["bucket"]
prefix = domain_config["prefix"]
region = domain_config["region"]
logger.info(f"Verarbeite Domain: {domain_name}")
logger.info(f" Bucket: {bucket}")
logger.info(f" Präfix: {prefix}")
logger.info(f" Region: {region}")
logger.info(f" Gültige Usernames: {', '.join(domain_config['usernames'])}")
logger.info(f" Automatisches Löschen: {auto_delete}")
try:
# S3-Client initialisieren
s3 = boto3.client('s3', region_name=region)
# Letzten Synchronisationsstatus laden
last_sync = load_sync_status()
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
new_emails = 0
total_emails = 0
emails_to_delete = []
email_info = {}
domain_last_sync = {k: v for k, v in last_sync.items() if k.startswith(f"{bucket}:")}
# Alle Seiten durchlaufen
for page in pages:
if 'Contents' not in page:
continue
objects = page['Contents']
total_emails += len(objects)
for obj in objects:
key = obj['Key']
# Verzeichnisse überspringen
if key.endswith('/'):
continue
# Eindeutiger Key für die Synchronisation
sync_key = f"{bucket}:{key}"
# Prüfen, ob die E-Mail bereits synchronisiert wurde
if sync_key in domain_last_sync:
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
continue
try:
# E-Mail aus S3 laden
response = s3.get_object(Bucket=bucket, Key=key)
email_content = response['Body'].read()
# Header parsen
try:
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
to_address = headers.get('To', '')
from_address = headers.get('From', '')
date = headers.get('Date', '')
message_id = headers.get('Message-ID', '')
# E-Mail-Informationen speichern
email_info[key] = {
'to': to_address,
'from': from_address,
'date': date
}
# Alle Domains-Konfigurationen für die Validierung verwenden
is_valid, recipient_domain = is_valid_recipient(to_address, all_domains_config)
if is_valid:
logger.info(f"Gültige E-Mail für: {to_address}")
# Nur speichern, wenn die E-Mail zur aktuellen Domain gehört
if recipient_domain == domain_name:
# E-Mail speichern
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
# Status aktualisieren
last_sync[sync_key] = {
'timestamp': time.time(),
'to': to_address,
'message_id': message_id,
'domain': domain_name,
'bucket': bucket
}
new_emails += 1
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
# Status nach jeweils 10 E-Mails speichern
if new_emails % 10 == 0:
save_sync_status(last_sync)
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
else:
# Gültige E-Mail, aber für eine andere Domain - nicht löschen!
logger.info(f"E-Mail {key} ist für Domain {recipient_domain}, wird übersprungen (nicht gelöscht)")
else:
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
# Status speichern
save_sync_status(last_sync)
# Ungültige E-Mails löschen
deleted_count = 0
if emails_to_delete:
deleted_count = delete_s3_emails(s3, bucket, emails_to_delete, email_info, auto_delete)
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht aus Bucket {bucket}")
logger.info(f"Verarbeitung für Domain {domain_name} abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
return {
"status": "success",
"domain": domain_name,
"total_emails": total_emails,
"new_emails": new_emails,
"emails_to_delete": len(emails_to_delete),
"deleted_emails": deleted_count
}
except Exception as e:
logger.error(f"Fehler bei der Verarbeitung der Domain {domain_name}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return {
"status": "error",
"domain": domain_name,
"error": str(e),
"traceback": traceback.format_exc()
}
# API-Endpunkte
@app.route('/health', methods=['GET'])
def health_check():
"""Gesundheitsprüfung für die API"""
return jsonify({
"status": "OK",
"message": "S3 E-Mail Downloader API ist aktiv"
})
@app.route('/sync/<domain>', methods=['POST'])
@require_token
def sync_domain(domain):
"""Synchronisiert E-Mails für eine bestimmte Domain"""
# Automatisches Löschen aus den Anfrageparametern
auto_delete = request.args.get('auto_delete', 'true').lower() in ['true', '1', 't', 'y', 'yes']
# Domain-Verarbeitung starten
result = process_domain(domain, auto_delete)
# Erfolg oder Fehler zurückgeben
if result.get("status") == "error":
return jsonify(result), 500
else:
return jsonify(result)
if __name__ == "__main__":
# Überprüfen, ob ein API-Token gesetzt ist
if not API_TOKEN:
logger.warning("WARNUNG: Kein API_TOKEN in Umgebungsvariablen definiert. API ist ungeschützt!")
# Flask-Server starten
app.run(host='0.0.0.0', port=5000, debug=False)

0
dovecot/setup_email_domain.sh Normal file → Executable file
View File

75
dovecot/start_email_api.sh Executable file
View File

@ -0,0 +1,75 @@
#!/bin/bash
# start_email_api.sh
# Dieses Script startet die S3 Email Downloader REST API in einer virtuellen Python-Umgebung
# Verzeichnis, in dem sich das Script befindet
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
# Name der virtuellen Umgebung
VENV_NAME="venv"
# Python-Executable (falls spezifische Version benötigt wird)
PYTHON_EXEC="python3"
# API Port
PORT=${EMAIL_API_PORT:-5000}
# Umgebungsvariablen Datei
ENV_FILE=".env"
# Prüfen, ob virtuelle Umgebung existiert
if [ ! -d "$VENV_NAME" ]; then
echo "Virtuelle Umgebung wird erstellt..."
$PYTHON_EXEC -m venv "$VENV_NAME"
if [ $? -ne 0 ]; then
echo "Fehler beim Erstellen der virtuellen Umgebung!"
exit 1
fi
fi
# Virtuelle Umgebung aktivieren
source "$VENV_NAME/bin/activate"
# Abhängigkeiten installieren, falls erforderlich
echo "Abhängigkeiten werden installiert..."
pip install --upgrade pip
pip install boto3 flask python-dotenv gunicorn
# Prüfen, ob .env-Datei existiert
if [ ! -f "$ENV_FILE" ]; then
echo "WARNUNG: $ENV_FILE nicht gefunden!"
echo "Bitte erstellen Sie eine .env-Datei mit den erforderlichen Konfigurationen."
echo "Beispiel:"
echo "API_TOKEN=ihr_geheimer_token"
echo "MAIL_DIR=./mail"
echo "AWS_REGION=us-east-2"
echo "AWS_ACCESS_KEY_ID=your_access_key"
echo "AWS_SECRET_ACCESS_KEY=your_secret_key"
echo "DOMAIN_1=example.com"
echo "DOMAIN_1_BUCKET=example-bucket"
echo "DOMAIN_1_USERNAMES=user1,user2,user3"
exit 1
fi
# Überprüfen, ob API_TOKEN in der .env-Datei gesetzt ist
if ! grep -q "API_TOKEN" "$ENV_FILE"; then
echo "WARNUNG: API_TOKEN nicht in $ENV_FILE gefunden!"
echo "Die API wird ohne Token-Schutz gestartet. Dies ist unsicher für Produktionsumgebungen."
read -p "Möchten Sie fortfahren? (j/n): " choice
if [[ ! "$choice" =~ ^[jJyY]$ ]]; then
echo "Abbruch."
exit 1
fi
fi
# Prüfen, ob das Python-Script existiert
API_SCRIPT="s3_email_downloader_api.py"
if [ ! -f "$API_SCRIPT" ]; then
echo "Fehler: $API_SCRIPT nicht gefunden!"
exit 1
fi
# API im Produktionsmodus mit Gunicorn starten
echo "Starte S3 Email Downloader API auf Port $PORT..."
exec gunicorn --bind "0.0.0.0:$PORT" --workers 2 "s3_email_downloader_api:app"