docker/dovecot/s3_email_downloader.py

315 lines
11 KiB
Python

#!/usr/bin/env python3
"""
S3 E-Mail Downloader Script
Dieses Script lädt E-Mails aus einem S3-Bucket herunter und speichert sie im
Maildir-Format für Dovecot. Es filtert nach bestimmten Domains und Benutzernamen
und kann nicht passende E-Mails löschen.
Nutzung:
python3 s3_email_downloader.py # Mit Bestätigungsabfrage für Löschungen
python3 s3_email_downloader.py y # Ohne Bestätigungsabfrage für Löschungen
"""
import boto3
import os
import time
import logging
import json
import re
import hashlib
import sys
from pathlib import Path
from email.parser import BytesParser
from email import policy
from dotenv import load_dotenv
# .env-Datei laden
load_dotenv()
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('s3_email_downloader.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("s3-email-downloader")
# Konfiguration aus Umgebungsvariablen
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2')
S3_BUCKET = os.environ.get('S3_BUCKET', '')
EMAIL_PREFIX = os.environ.get('EMAIL_PREFIX', 'emails/')
MAIL_DIR = os.environ.get('MAIL_DIR', './mail') # Pfad zum Docker-Volume
VALID_DOMAIN = os.environ.get('VALID_DOMAIN', 'bizmatch.net')
VALID_USERNAMES = os.environ.get('VALID_USERNAMES', 'accounting,info,sales,support,test1,test2,test3')
# Umwandlung der komma-getrennten Liste in ein Set für schnellere Suche
VALID_USERNAMES_SET = set(VALID_USERNAMES.split(','))
# Status-Datei für die Synchronisation
STATUS_FILE = Path('sync_status.json')
# Prüfen, ob automatische Löschung aktiviert ist
AUTO_DELETE = len(sys.argv) > 1 and sys.argv[1].lower() == 'y'
def load_sync_status():
"""Lädt den letzten Synchronisationsstatus"""
if STATUS_FILE.exists():
try:
with open(STATUS_FILE, 'r') as f:
status = json.load(f)
return status.get('last_sync', {})
except Exception as e:
logger.error(f"Fehler beim Laden des Sync-Status: {str(e)}")
return {}
def save_sync_status(last_sync):
"""Speichert den aktuellen Synchronisationsstatus"""
try:
with open(STATUS_FILE, 'w') as f:
json.dump({
'last_sync': last_sync,
'last_sync_time': time.time()
}, f)
except Exception as e:
logger.error(f"Fehler beim Speichern des Sync-Status: {str(e)}")
def extract_email_address(address):
"""Extrahiert die E-Mail-Adresse aus einem komplexen Adressformat"""
if not address:
return 'unknown@bizmatch.net'
# Einfacher Fall: nur E-Mail-Adresse
if '@' in address and '<' not in address:
return address.strip()
# Komplexer Fall: "Name <email@domain.com>"
match = re.search(r'<([^>]+)>', address)
if match:
return match.group(1)
# Fallback
return address.strip()
def is_valid_recipient(to_address):
"""Prüft, ob die Empfängeradresse gültig ist (passende Domain und Username)"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if '@' not in email:
return False
username, domain = email.split('@', 1)
# Domain und Username prüfen
return domain.lower() == VALID_DOMAIN.lower() and username.lower() in VALID_USERNAMES_SET
def get_maildir_path(to_address, mail_dir):
"""
Ermittelt den Pfad im Maildir-Format basierend auf der Empfängeradresse
Format: {mail_dir}/domain.com/user/
"""
email = extract_email_address(to_address)
# E-Mail-Adresse aufteilen
if '@' in email:
user, domain = email.split('@', 1)
else:
user, domain = email, VALID_DOMAIN
# Pfad erstellen
mail_dir_path = Path(mail_dir)
domain_dir = mail_dir_path / domain
user_dir = domain_dir / user
# Maildir-Struktur sicherstellen
for directory in [mail_dir_path, domain_dir, user_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Maildir-Unterverzeichnisse
for subdir in ['cur', 'new', 'tmp']:
(user_dir / subdir).mkdir(exist_ok=True)
return user_dir
def store_email(email_content, to_address, message_id, s3_key, mail_dir):
"""Speichert eine E-Mail im Maildir-Format"""
try:
# Maildir-Pfad ermitteln
maildir = get_maildir_path(to_address, mail_dir)
# Eindeutigen Dateinamen generieren
timestamp = int(time.time())
hostname = 'bizmatch'
unique_id = hashlib.md5(f"{s3_key}:{timestamp}".encode()).hexdigest()
# Maildir-Dateiname im Format: timestamp.unique_id.hostname:2,
filename = f"{timestamp}.{unique_id}.{hostname}:2,"
# E-Mail in "new" speichern
email_path = maildir / 'new' / filename
with open(email_path, 'wb') as f:
f.write(email_content)
logger.info(f"E-Mail gespeichert: {email_path}")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der E-Mail {s3_key}: {str(e)}")
return False
def delete_s3_emails(s3, emails_to_delete, email_info):
"""Löscht E-Mails aus dem S3-Bucket"""
if not emails_to_delete:
return 0
if not AUTO_DELETE:
# Bestätigung einholen
print(f"\nFolgende {len(emails_to_delete)} E-Mails werden gelöscht:")
for key in emails_to_delete:
info = email_info.get(key, {})
from_addr = info.get('from', 'Unbekannt')
to_addr = info.get('to', 'Unbekannt')
date = info.get('date', 'Unbekannt')
print(f" - {key}")
print(f" Von: {from_addr}")
print(f" An: {to_addr}")
print(f" Datum: {date}\n")
confirmation = input("\nMöchten Sie diese E-Mails wirklich löschen? (j/n): ")
if confirmation.lower() not in ['j', 'ja', 'y', 'yes']:
logger.info("Löschung abgebrochen")
return 0
# Löschung durchführen
deleted_count = 0
for key in emails_to_delete:
try:
s3.delete_object(Bucket=S3_BUCKET, Key=key)
logger.info(f"E-Mail gelöscht: {key}")
deleted_count += 1
except Exception as e:
logger.error(f"Fehler beim Löschen der E-Mail {key}: {str(e)}")
return deleted_count
def main():
"""Hauptfunktion"""
# Prüfen, ob die erforderlichen Umgebungsvariablen gesetzt sind
if not S3_BUCKET:
logger.error("S3_BUCKET Umgebungsvariable nicht gesetzt")
return
logger.info(f"S3 E-Mail Downloader gestartet. Bucket: {S3_BUCKET}, Präfix: {EMAIL_PREFIX}")
logger.info(f"E-Mails werden nach {MAIL_DIR} heruntergeladen")
logger.info(f"Gültige Domain: {VALID_DOMAIN}")
logger.info(f"Gültige Usernames: {VALID_USERNAMES}")
logger.info(f"Automatisches Löschen: {'Ja' if AUTO_DELETE else 'Nein (mit Bestätigung)'}")
try:
# S3-Client initialisieren
s3 = boto3.client('s3', region_name=AWS_REGION)
# Letzten Synchronisationsstatus laden
last_sync = load_sync_status()
# Um alle E-Mails zu verarbeiten, müssen wir den Paginator verwenden
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=EMAIL_PREFIX)
new_emails = 0
total_emails = 0
emails_to_delete = []
email_info = {}
# Alle Seiten durchlaufen
for page in pages:
if 'Contents' not in page:
continue
objects = page['Contents']
total_emails += len(objects)
for obj in objects:
key = obj['Key']
# Verzeichnisse überspringen
if key.endswith('/'):
continue
# Prüfen, ob die E-Mail bereits synchronisiert wurde
if key in last_sync:
logger.debug(f"E-Mail {key} bereits synchronisiert - übersprungen")
continue
try:
# E-Mail aus S3 laden
response = s3.get_object(Bucket=S3_BUCKET, Key=key)
email_content = response['Body'].read()
# Header parsen
try:
headers = BytesParser(policy=policy.default).parsebytes(email_content, headersonly=True)
to_address = headers.get('To', '')
from_address = headers.get('From', '')
date = headers.get('Date', '')
message_id = headers.get('Message-ID', '')
# E-Mail-Informationen speichern
email_info[key] = {
'to': to_address,
'from': from_address,
'date': date
}
# Prüfen, ob die Empfängeradresse gültig ist
if is_valid_recipient(to_address):
logger.info(f"Gültige E-Mail für: {to_address}")
# E-Mail speichern
if store_email(email_content, to_address, message_id, key, MAIL_DIR):
# Status aktualisieren
last_sync[key] = {
'timestamp': time.time(),
'to': to_address,
'message_id': message_id
}
new_emails += 1
logger.info(f"E-Mail {key} erfolgreich synchronisiert ({new_emails})")
# Status nach jeweils 10 E-Mails speichern
if new_emails % 10 == 0:
save_sync_status(last_sync)
logger.info(f"Zwischenspeicherung: {new_emails} neue E-Mails bisher")
else:
logger.info(f"Ungültige Empfängeradresse: {to_address} für {key}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler beim Parsen der E-Mail-Header {key}: {str(e)}")
emails_to_delete.append(key)
except Exception as e:
logger.error(f"Fehler bei der Verarbeitung der E-Mail {key}: {str(e)}")
# Abschließend den Status speichern
save_sync_status(last_sync)
# Ungültige E-Mails löschen
if emails_to_delete:
deleted_count = delete_s3_emails(s3, emails_to_delete, email_info)
logger.info(f"Insgesamt {deleted_count} von {len(emails_to_delete)} ungültigen E-Mails gelöscht")
logger.info(f"Verarbeitung abgeschlossen. Insgesamt {total_emails} E-Mails gefunden, {new_emails} neue heruntergeladen.")
except Exception as e:
logger.error(f"Fehler: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
main()