import sys from flask import Flask, request, jsonify import smtplib import base64 import gzip import logging import os import time import boto3 from email.parser import BytesParser from email.policy import default from email.utils import getaddresses import requests if sys.version_info < (3, 12): raise RuntimeError("Python 3.12 oder höher erforderlich") # --- Logging mit Timestamp --- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger(__name__) load_dotenv = None try: from dotenv import load_dotenv as _ld load_dotenv = _ld except ImportError: pass if load_dotenv: load_dotenv() app = Flask(__name__) SMTP_HOST = "localhost" SMTP_PORT = 25 API_TOKEN = os.environ.get('API_TOKEN') AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1') API_KEY = os.environ['MAILCOW_API_KEY'] MAILCOW_API = os.environ['MAILCOW_API'] s3_client = boto3.client('s3', region_name=AWS_REGION) def domain_exists(domain): """ Prüft per /get/domain/all, ob `domain` im System ist. """ url = f"{MAILCOW_API}/get/domain/all" headers = {'X-API-Key': API_KEY} resp = requests.get(url, headers=headers, timeout=5) resp.raise_for_status() domains = resp.json() return any(d.get('domain_name', '').lower() == domain.lower() for d in domains) def inbox_exists(domain, local_part): """ Liefert True, wenn domain im System ist UND local_part@domain ein Postfach hat. """ # 1) Domain-Check if not domain_exists(domain): logger.info(f"Domain '{domain}' unknown – skip mailbox lookup") return False # 2) Nur dann Mailbox-Listing holen url = f"{MAILCOW_API}/get/mailbox/all/{domain}" headers = {'X-API-Key': API_KEY} resp = requests.get(url, headers=headers, timeout=5) resp.raise_for_status() mailboxes = resp.json() return any(m.get('local_part', '').lower() == local_part.lower() for m in mailboxes) def mark_email_as_processed(bucket, key, status, processor='rest-api'): """Setzt processed-Metadaten auf einen beliebigen Status.""" try: s3_client.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata={ 'processed': status, 'processed_timestamp': str(int(time.time())), 'processor': processor }, MetadataDirective='REPLACE' ) return True except Exception as e: logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") return False @app.route('/stats/', methods=['GET']) def stats_domain(domain): # Auth auth = request.headers.get('Authorization') if auth != f'Bearer {API_TOKEN}': return jsonify({'error': 'Unauthorized'}), 401 bucket = domain.replace('.', '-') + '-emails' paginator = s3_client.get_paginator('list_objects_v2') total = 0 counts = { 'true': 0, 'unknownDomain': 0, 'unknownUser': 0 } details = { 'unknownDomain': [], 'unknownUser': [] } for page in paginator.paginate(Bucket=bucket): for obj in page.get('Contents', []): key = obj['Key'] total += 1 head = s3_client.head_object(Bucket=bucket, Key=key) meta = head.get('Metadata', {}) status = meta.get('processed', 'none') if status in counts: counts[status] += 1 else: # wir ignorieren andere Status continue # Für unknownDomain und unknownUser zusätzlich E-Mail parsen if status in ('unknownDomain', 'unknownUser'): body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() try: msg = BytesParser(policy=default).parsebytes(body) from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else None to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))] except Exception as e: logger.error(f"Fehler beim Parsen {bucket}/{key}: {e}") from_addr = None to_addrs = [] details[status].append({ 'key': key, 'from': from_addr, 'to': to_addrs }) result = { 'domain': domain, 'total_messages': total, 'successful': counts['true'], 'wrong_domain': counts['unknownDomain'], 'unknown_user': counts['unknownUser'], 'details': details } logger.info(f"Stats for {domain}: {result}") return jsonify(result), 200 @app.route('/process/', methods=['POST']) def process_email(domain): auth = request.headers.get('Authorization') if auth != f'Bearer {API_TOKEN}': return jsonify({'error': 'Unauthorized'}), 401 data = request.get_json() if not data: return jsonify({'error': 'Invalid payload'}), 400 request_id = data.get('request_id', 'no-request-id') payload_summary = { k: (len(v) if k == 'email_content' else v) for k, v in data.items() if k != 'email_content' or isinstance(v, (str, bytes)) } logger.info( f"[{request_id}] INCOMING POST /process/{domain}: " f"payload_summary={payload_summary}" ) # 1) E-Mail decodieren und parsen wie gehabt content = data.get('email_content') compressed = data.get('compressed', False) raw = base64.b64decode(content) email_bytes = gzip.decompress(raw) if compressed else raw msg = BytesParser(policy=default).parsebytes(email_bytes) from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'lambda@{domain}' recipients = [] for hdr in ('to','cc','bcc'): recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))] if not recipients: return jsonify({'error': 'No recipients'}), 400 # 2) Filter: nur Postfächer der angefragten Domain, die auch existieren valid_recipients = [] for addr in recipients: try: local, dom = addr.split('@', 1) except ValueError: continue if dom.lower() != domain.lower(): # andere Domain: überspringen continue if inbox_exists(domain, local): valid_recipients.append(addr) else: logger.info(f"Skipping non-existent inbox: {addr}") if not valid_recipients: logger.info(f"[{request_id}] Keine gültigen Inboxes für {domain} – skip.") return jsonify({'message': 'No valid inboxes – skipped'}), 404 # 3) Senden an die gefilterten Adressen with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: smtp.sendmail(from_addr, valid_recipients, email_bytes) return jsonify({ 'message': 'Email forwarded', 'forwarded_to': valid_recipients }), 200 @app.route('/retry/', methods=['GET']) def retry_domain_emails(domain): auth = request.headers.get('Authorization') if auth != f'Bearer {API_TOKEN}': return jsonify({'error': 'Unauthorized'}), 401 # 1) Domain-Check ganz am Anfang if not domain_exists(domain): logger.info(f"Retry aborted: unknown domain '{domain}'") return jsonify({'error': f"Unknown domain '{domain}'"}), 404 bucket = domain.replace('.', '-') + '-emails' paginator = s3_client.get_paginator('list_objects_v2') # 2) alle unprocessed Keys sammeln unprocessed = [] for page in paginator.paginate(Bucket=bucket): for obj in page.get('Contents', []): head = s3_client.head_object(Bucket=bucket, Key=obj['Key']) if head.get('Metadata', {}).get('processed') != 'true': unprocessed.append(obj['Key']) request_id = f"retry-{domain}-{int(time.time())}" logger.info(f"[{request_id}] RETRY for domain={domain}, keys={unprocessed}") results = {'processed': [], 'failed': []} for key in unprocessed: try: body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() msg = BytesParser(policy=default).parsebytes(body) from_addr = ( getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'retry@{domain}' ) # Sammle alle To/Cc/Bcc recipients = [] for hdr in ('to', 'cc', 'bcc'): recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))] if not recipients: # gar keine Adressen → überspringen mark_email_as_processed(bucket, key, 'unknownDomain') results['processed'].append(key) results['failed'].append({ 'key': key, 'status': 'unknownDomain', 'reason': 'no recipients' }) continue # 3) Domain-Match: nur Mails, die an die angefragte Domain adressiert sind domains_in_mail = {addr.split('@')[-1].lower() for addr in recipients if '@' in addr} if domain.lower() not in domains_in_mail: mark_email_as_processed(bucket, key, 'unknownDomain') results['processed'].append(key) results['failed'].append({ 'key': key, 'status': 'unknownDomain', 'from': from_addr, 'to': recipients }) continue # 4) Inbox-Check: nur existierende Postfächer zulassen valid_recipients = [] for addr in recipients: try: local, dom = addr.split('@', 1) except ValueError: continue if dom.lower() == domain.lower() and inbox_exists(domain, local): valid_recipients.append(addr) else: logger.info(f"Skipping non-existent inbox: {addr}") if not valid_recipients: mark_email_as_processed(bucket, key, 'unknownUser') results['processed'].append(key) results['failed'].append({ 'key': key, 'status': 'unknownUser', 'from': from_addr, 'to': recipients }) continue # 5) Versand an die validierten Adressen try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: smtp.sendmail(from_addr, valid_recipients, body) mark_email_as_processed(bucket, key, 'true') results['processed'].append(key) except smtplib.SMTPRecipientsRefused as e: # falls Mailcow einzelne Adressen ablehnt mark_email_as_processed(bucket, key, 'unknownUser') refused = { addr: {'code': code, 'message': msg.decode('utf-8','ignore') if isinstance(msg, bytes) else str(msg)} for addr, (code, msg) in e.recipients.items() } results['processed'].append(key) results['failed'].append({ 'key': key, 'status': 'unknownUser', 'from': from_addr, 'to': valid_recipients, 'refused': refused }) except Exception as e: # alle anderen SMTP-Fehler behandeln wir als unknownDomain mark_email_as_processed(bucket, key, 'unknownDomain') results['processed'].append(key) results['failed'].append({ 'key': key, 'status': 'unknownDomain', 'from': from_addr, 'to': valid_recipients, 'error': str(e) }) except Exception as e: # Parsing- oder S3-Fehler results['failed'].append({'key': key, 'error': str(e)}) return jsonify(results), 200 @app.route('/health', methods=['GET']) def health_check(): return jsonify({'status': 'OK'}), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)