docker/dovecot/dovecot_passwd_manager.py

354 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Dovecot Passwd Manager
Verwaltet Benutzerkonten in der Dovecot passwd-Datei basierend auf den konfigurierten
E-Mail-Domains und Benutzernamen. Das Script liest die gleichen Umgebungsvariablen wie
der s3_email_downloader.py und kann separat ausgeführt werden.
Nutzung:
python3 dovecot_passwd_manager.py # Nur Überprüfung, keine Änderungen
python3 dovecot_passwd_manager.py update # Aktualisiert die passwd-Datei
python3 dovecot_passwd_manager.py force # Erzwingt Aktualisierung auch ohne Änderungen
"""
import os
import sys
import json
import logging
import datetime
import subprocess
import filecmp
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from dotenv import load_dotenv
# .env-Datei laden
load_dotenv()
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('dovecot_passwd_manager.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger("dovecot-passwd-manager")
# Konfiguration
MAIL_DIR = os.environ.get('MAIL_DIR', './mail')
DOVECOT_PASSWD_FILE = os.environ.get('DOVECOT_PASSWD_FILE', './config/passwd')
DOVECOT_PASSWD_BACKUP = os.environ.get('DOVECOT_PASSWD_BACKUP', './config/passwd.bak')
DOVECOT_CONTAINER = os.environ.get('DOVECOT_CONTAINER', 'dovecot')
DEFAULT_PASSWORD_PATTERN = os.environ.get('DEFAULT_PASSWORD_PATTERN', '{domain}{year}!')
UID = os.environ.get('MAIL_UID', '1001')
GID = os.environ.get('MAIL_GID', '1000')
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2')
# Domains-Konfiguration
DOMAINS_CONFIG_FILE = os.environ.get('DOMAINS_CONFIG_FILE', 'domains_config.json')
def load_domains_config():
"""Lädt die Domain-Konfiguration aus einer JSON-Datei oder aus Umgebungsvariablen"""
domains = {}
# Versuchen, Konfiguration aus JSON-Datei zu laden
config_file = Path(DOMAINS_CONFIG_FILE)
if config_file.exists():
try:
with open(config_file, 'r') as f:
domains = json.load(f)
logger.info(f"Domain-Konfiguration aus {DOMAINS_CONFIG_FILE} geladen")
return domains
except Exception as e:
logger.error(f"Fehler beim Laden der Domain-Konfiguration aus {DOMAINS_CONFIG_FILE}: {str(e)}")
# Fallback: Konfiguration aus Umgebungsvariablen
domain_index = 1
while True:
domain_key = f"DOMAIN_{domain_index}"
domain_name = os.environ.get(domain_key)
if not domain_name:
break # Keine weitere Domain-Definition gefunden
bucket = os.environ.get(f"{domain_key}_BUCKET", "")
prefix = os.environ.get(f"{domain_key}_PREFIX", "emails/")
usernames = os.environ.get(f"{domain_key}_USERNAMES", "")
region = os.environ.get(f"{domain_key}_REGION", AWS_REGION)
if domain_name and usernames:
domains[domain_name] = {
"bucket": bucket,
"prefix": prefix,
"usernames": usernames.split(','),
"region": region
}
logger.info(f"Domain {domain_name} aus Umgebungsvariablen konfiguriert")
else:
logger.warning(f"Unvollständige Konfiguration für {domain_name}, wird übersprungen")
domain_index += 1
# Fallback für Abwärtskompatibilität
if not domains:
old_domain = os.environ.get('VALID_DOMAIN', '')
old_bucket = os.environ.get('S3_BUCKET', '')
old_prefix = os.environ.get('EMAIL_PREFIX', 'emails/')
old_usernames = os.environ.get('VALID_USERNAMES', '')
if old_domain and old_usernames:
domains[old_domain] = {
"bucket": old_bucket,
"prefix": old_prefix,
"usernames": old_usernames.split(','),
"region": AWS_REGION
}
logger.info(f"Alte Konfiguration für Domain {old_domain} geladen")
return domains
def generate_password(domain):
"""Generiert ein Passwort nach dem Muster {domain}{year}!"""
current_year = datetime.datetime.now().year
domain_prefix = domain.split('.')[0] # Nur den ersten Teil der Domain verwenden
return DEFAULT_PASSWORD_PATTERN.replace('{domain}', domain_prefix).replace('{year}', str(current_year))
def get_required_emails(domains_config):
"""
Ermittelt eine Liste aller E-Mail-Adressen, die in der passwd-Datei
vorhanden sein sollten, basierend auf der Domain-Konfiguration.
"""
required_emails = []
for domain, config in domains_config.items():
for username in config["usernames"]:
email = f"{username}@{domain}"
required_emails.append(email)
return sorted(required_emails)
def get_existing_emails(passwd_file):
"""
Liest die bestehende passwd-Datei und gibt eine Liste aller
bereits konfigurierten E-Mail-Adressen zurück.
"""
existing_emails = []
if os.path.exists(passwd_file):
try:
with open(passwd_file, 'r') as f:
for line in f:
if line.strip() and ':' in line:
email = line.strip().split(':')[0]
existing_emails.append(email)
except Exception as e:
logger.error(f"Fehler beim Lesen der passwd-Datei: {str(e)}")
return sorted(existing_emails)
def read_existing_entries(passwd_file):
"""
Liest alle bestehenden Einträge aus der passwd-Datei und
gibt ein Dictionary mit E-Mail-Adressen als Schlüssel zurück.
"""
existing_entries = {}
if os.path.exists(passwd_file):
try:
with open(passwd_file, 'r') as f:
for line in f:
if line.strip() and ':' in line:
parts = line.strip().split(':')
email = parts[0]
existing_entries[email] = line.strip()
except Exception as e:
logger.error(f"Fehler beim Lesen der passwd-Datei: {str(e)}")
return existing_entries
def update_dovecot_passwd(domains_config, force=False):
"""
Aktualisiert die Dovecot-Passwortdatei basierend auf der Domain-Konfiguration.
Erstellt eine neue temporäre Datei und vergleicht sie mit der vorhandenen,
um festzustellen, ob ein Reload von Dovecot erforderlich ist.
Returns:
bool: True, wenn Änderungen vorgenommen wurden, sonst False
"""
logger.info("Überprüfe Dovecot-Passwortdatei...")
# Prüfen, ob Aktualisierung überhaupt notwendig ist
required_emails = get_required_emails(domains_config)
existing_emails = get_existing_emails(DOVECOT_PASSWD_FILE)
# Prüfen auf fehlende oder überflüssige E-Mail-Adressen
missing_emails = [email for email in required_emails if email not in existing_emails]
surplus_emails = [email for email in existing_emails if email not in required_emails]
if not missing_emails and not surplus_emails and not force:
logger.info("Alle erforderlichen E-Mail-Adressen sind bereits konfiguriert, keine Änderung notwendig")
return False
if missing_emails:
logger.info(f"Fehlende E-Mail-Adressen: {', '.join(missing_emails)}")
if surplus_emails:
logger.info(f"Überflüssige E-Mail-Adressen: {', '.join(surplus_emails)}")
logger.info("Aktualisiere Dovecot-Passwortdatei...")
# Temporäre Datei für die neue Passwd erstellen
temp_passwd = NamedTemporaryFile(delete=False, mode='w')
temp_passwd_path = temp_passwd.name
try:
# Bestehende Einträge einlesen
existing_entries = read_existing_entries(DOVECOT_PASSWD_FILE)
# Neue Einträge generieren
new_entries = {}
for domain, config in domains_config.items():
domain_password = generate_password(domain)
for username in config["usernames"]:
email = f"{username}@{domain}"
# Wenn der Eintrag bereits existiert, verwenden wir diesen (damit Passwörter erhalten bleiben)
if email in existing_entries:
new_entries[email] = existing_entries[email]
else:
# Format: email:{PLAIN}password:uid:gid::/var/mail/domain/username:/bin/false
new_entry = f"{email}:{{PLAIN}}{domain_password}:{UID}:{GID}::/var/mail/{domain}/{username}:/bin/false"
new_entries[email] = new_entry
logger.info(f"Neuer E-Mail-Account erstellt: {email} mit Standardpasswort")
# Sortierte Einträge in die temporäre Datei schreiben
for email in sorted(new_entries.keys()):
temp_passwd.write(f"{new_entries[email]}\n")
temp_passwd.close()
# Prüfen, ob Änderungen vorgenommen wurden
if os.path.exists(DOVECOT_PASSWD_FILE) and filecmp.cmp(temp_passwd_path, DOVECOT_PASSWD_FILE):
logger.info("Keine inhaltlichen Änderungen an der passwd-Datei erforderlich")
os.unlink(temp_passwd_path)
return False
# Sicherungskopie erstellen
if os.path.exists(DOVECOT_PASSWD_FILE):
try:
shutil.copy2(DOVECOT_PASSWD_FILE, DOVECOT_PASSWD_BACKUP)
logger.info(f"Sicherungskopie erstellt: {DOVECOT_PASSWD_BACKUP}")
except Exception as e:
logger.warning(f"Konnte keine Sicherungskopie erstellen: {str(e)}")
# Neue Datei aktivieren
try:
# Stellen Sie sicher, dass das Verzeichnis existiert
passwd_dir = os.path.dirname(DOVECOT_PASSWD_FILE)
if passwd_dir and not os.path.exists(passwd_dir):
os.makedirs(passwd_dir, exist_ok=True)
shutil.move(temp_passwd_path, DOVECOT_PASSWD_FILE)
# Berechtigungen setzen
os.chmod(DOVECOT_PASSWD_FILE, 0o600) # Nur Besitzer darf lesen/schreiben
logger.info(f"Neue passwd-Datei aktiviert: {DOVECOT_PASSWD_FILE}")
return True
except Exception as e:
logger.error(f"Fehler beim Aktivieren der neuen passwd-Datei: {str(e)}")
return False
except Exception as e:
logger.error(f"Fehler bei der Aktualisierung der passwd-Datei: {str(e)}")
if os.path.exists(temp_passwd_path):
os.unlink(temp_passwd_path)
return False
def reload_dovecot():
"""
Führt einen Reload von Dovecot durch, um die neue Passwortdatei zu aktivieren,
ohne den Container neu starten zu müssen.
"""
try:
logger.info(f"Führe Dovecot-Reload durch...")
result = subprocess.run(
["docker", "exec", DOVECOT_CONTAINER, "doveadm", "reload"],
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
logger.info("Dovecot-Reload erfolgreich durchgeführt")
return True
else:
logger.error(f"Dovecot-Reload fehlgeschlagen: {result.stderr}")
return False
except Exception as e:
logger.error(f"Fehler beim Dovecot-Reload: {str(e)}")
return False
def main():
"""Hauptfunktion"""
logger.info("Dovecot Passwd Manager gestartet")
# Kommandozeilenargumente prüfen
update_mode = False
force_mode = False
if len(sys.argv) > 1:
if sys.argv[1].lower() == "update":
update_mode = True
elif sys.argv[1].lower() == "force":
update_mode = True
force_mode = True
try:
# Domain-Konfigurationen laden
domains_config = load_domains_config()
if not domains_config:
logger.error("Keine Domain-Konfigurationen gefunden. Bitte konfigurieren Sie mindestens eine Domain.")
return
logger.info(f"Folgende Domains werden verarbeitet: {', '.join(domains_config.keys())}")
# Im Nur-Prüfungs-Modus zeigen wir einfach die erforderlichen Änderungen an
if not update_mode:
required_emails = get_required_emails(domains_config)
existing_emails = get_existing_emails(DOVECOT_PASSWD_FILE)
missing_emails = [email for email in required_emails if email not in existing_emails]
surplus_emails = [email for email in existing_emails if email not in required_emails]
if not missing_emails and not surplus_emails:
logger.info("Alle erforderlichen E-Mail-Adressen sind bereits konfiguriert")
else:
if missing_emails:
logger.info(f"Fehlende E-Mail-Adressen: {', '.join(missing_emails)}")
if surplus_emails:
logger.info(f"Überflüssige E-Mail-Adressen: {', '.join(surplus_emails)}")
logger.info("Verwenden Sie 'update' als Parameter, um die Änderungen anzuwenden")
return
# Im Update-Modus führen wir die Änderungen durch
changes_made = update_dovecot_passwd(domains_config, force=force_mode)
if changes_made:
reload_dovecot()
else:
logger.info("Keine Änderungen vorgenommen, Dovecot-Reload nicht erforderlich")
except Exception as e:
logger.error(f"Fehler: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
main()