diff --git a/email_api/docker-compose.yml b/email_api/docker-compose.yml index f5ecaf5..c69ab97 100644 --- a/email_api/docker-compose.yml +++ b/email_api/docker-compose.yml @@ -6,10 +6,16 @@ services: network_mode: host volumes: - ./email_api:/app + - /var/mail:/var/mail # Maildir-Zugriff für Health-Check working_dir: /app env_file: - .env environment: - API_TOKEN=${API_TOKEN} + - AWS_REGION=${AWS_REGION} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} command: > - bash -c "pip install --upgrade pip && pip install flask python-dotenv && python app.py" \ No newline at end of file + bash -c "pip install --upgrade pip && + pip install flask python-dotenv boto3 && + python app.py" \ No newline at end of file diff --git a/email_api/email_api/app.py b/email_api/email_api/app.py index b38e121..098dc28 100644 --- a/email_api/email_api/app.py +++ b/email_api/email_api/app.py @@ -1,10 +1,13 @@ import sys -from flask import Flask, request, jsonify +from flask import Flask, request, jsonify, g import smtplib import base64 import gzip import logging import os +import time +import boto3 +from pathlib import Path from dotenv import load_dotenv from email.parser import BytesParser from email.policy import default @@ -25,13 +28,253 @@ logger = logging.getLogger(__name__) SMTP_HOST = "localhost" # Host-Netzwerkmodus SMTP_PORT = 25 # Fest auf Port 25 ohne TLS API_TOKEN = os.environ.get('API_TOKEN') +MAIL_DIR = "/var/mail" # Standard Maildir-Pfad +AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1') + if not API_TOKEN: raise ValueError("API_TOKEN Umgebungsvariable nicht gesetzt") +# S3-Client initialisieren +s3_client = boto3.client('s3', region_name=AWS_REGION) + +# Cache für verarbeitete Requests +processed_requests = {} + logger.info(f"API_TOKEN loaded: {API_TOKEN}") +def load_domains_config(): + """Lädt die Domain-Konfiguration""" + # Hier könntest du eine echte Konfigurationsdatei laden + # Für jetzt als Beispiel hardcoded + return { + "andreasknuth.de": {"bucket": "andreasknuth-de-emails"}, + # Weitere Domains können hier hinzugefügt werden + } + +def get_bucket_name_for_domain(domain): + """Konvertiert Domain-Namen zu S3-Bucket-Namen""" + return domain.replace(".", "-") + "-emails" + +def mark_email_as_processed(bucket_name, key): + """Markiert eine E-Mail als erfolgreich verarbeitet""" + try: + # Füge Metadata hinzu, um zu markieren, dass die E-Mail verarbeitet wurde + s3_client.copy_object( + Bucket=bucket_name, + Key=key, + CopySource={'Bucket': bucket_name, 'Key': key}, + Metadata={ + 'processed': 'true', + 'processed_timestamp': str(int(time.time())), + 'processor': 'rest-api' + }, + MetadataDirective='REPLACE' + ) + logger.info(f"E-Mail {key} als verarbeitet markiert") + return True + except Exception as e: + logger.error(f"Fehler beim Markieren der E-Mail {key}: {str(e)}") + return False + +def get_unprocessed_emails(bucket_name): + """Holt alle unverarbeiteten E-Mails aus einem S3-Bucket""" + try: + unprocessed_emails = [] + paginator = s3_client.get_paginator('list_objects_v2') + + for page in paginator.paginate(Bucket=bucket_name): + if 'Contents' not in page: + continue + + for obj in page['Contents']: + key = obj['Key'] + + # Prüfe Metadata der E-Mail + try: + head_response = s3_client.head_object(Bucket=bucket_name, Key=key) + metadata = head_response.get('Metadata', {}) + + # Wenn nicht als verarbeitet markiert, zur Liste hinzufügen + if metadata.get('processed') != 'true': + unprocessed_emails.append({ + 'key': key, + 'size': obj['Size'], + 'last_modified': obj['LastModified'].isoformat(), + 'metadata': metadata + }) + except Exception as e: + logger.warning(f"Fehler beim Prüfen der Metadata für {key}: {str(e)}") + # Wenn Metadata nicht gelesen werden kann, als unverarbeitet betrachten + unprocessed_emails.append({ + 'key': key, + 'size': obj['Size'], + 'last_modified': obj['LastModified'].isoformat(), + 'metadata': {} + }) + + return unprocessed_emails + except Exception as e: + logger.error(f"Fehler beim Abrufen unverarbeiteter E-Mails: {str(e)}") + return [] + +@app.route('/health', methods=['GET']) +def health_check(): + """Erweiterte Gesundheitsprüfung""" + + # Basis-Gesundheitscheck + health_status = { + "status": "OK", + "message": "S3 E-Mail Processor API ist aktiv", + "timestamp": int(time.time()), + "version": "2.0", + "request_id": getattr(g, 'request_id', 'health-check') + } + + # Erweiterte Checks + try: + # Maildir-Zugriff prüfen + mail_path = Path(MAIL_DIR) + if mail_path.exists() and mail_path.is_dir(): + health_status["mail_dir"] = "accessible" + else: + health_status["mail_dir"] = "not_accessible" + health_status["status"] = "WARNING" + + # Domain-Konfiguration prüfen + domains = load_domains_config() + health_status["configured_domains"] = len(domains) + health_status["domains"] = list(domains.keys()) + + # Cache-Status + health_status["request_cache_size"] = len(processed_requests) + + # SMTP-Verbindung testen + try: + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as smtp: + smtp.noop() + health_status["smtp_connection"] = "OK" + except Exception as e: + health_status["smtp_connection"] = f"ERROR: {str(e)}" + health_status["status"] = "WARNING" + + # S3-Verbindung testen + try: + s3_client.list_buckets() + health_status["s3_connection"] = "OK" + except Exception as e: + health_status["s3_connection"] = f"ERROR: {str(e)}" + health_status["status"] = "WARNING" + + except Exception as e: + health_status["status"] = "ERROR" + health_status["error"] = str(e) + + return jsonify(health_status) + +@app.route('/retry/', methods=['GET']) +def retry_domain_emails(domain): + """Verarbeitet alle unverarbeiteten E-Mails für eine Domain""" + auth_header = request.headers.get('Authorization') + if not auth_header or auth_header != f'Bearer {API_TOKEN}': + return jsonify({'error': 'Unauthorized'}), 401 + + request_id = f"retry-{domain}-{int(time.time())}" + bucket_name = get_bucket_name_for_domain(domain) + + logger.info(f"[{request_id}] Retry-Verarbeitung für Domain: {domain}, Bucket: {bucket_name}") + + try: + # Hole alle unverarbeiteten E-Mails + unprocessed_emails = get_unprocessed_emails(bucket_name) + + if not unprocessed_emails: + return jsonify({ + 'message': f'Keine unverarbeiteten E-Mails für Domain {domain} gefunden', + 'domain': domain, + 'bucket': bucket_name, + 'processed_count': 0, + 'request_id': request_id + }), 200 + + processed_count = 0 + failed_count = 0 + errors = [] + + for email_info in unprocessed_emails: + key = email_info['key'] + + try: + logger.info(f"[{request_id}] Verarbeite E-Mail: {key}") + + # E-Mail aus S3 laden + response = s3_client.get_object(Bucket=bucket_name, Key=key) + email_content = response['Body'].read() + + # E-Mail parsen + email_msg = BytesParser(policy=default).parsebytes(email_content) + + # From-Adresse extrahieren + from_headers = email_msg.get_all('from', []) + if from_headers: + from_addr = getaddresses(from_headers)[0][1] + else: + from_addr = f'retry@{domain}' + + # Empfänger extrahieren + to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))] + cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))] + bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))] + recipients = to_addrs + cc_addrs + bcc_addrs + + if not recipients: + raise ValueError(f"Keine Empfänger in E-Mail {key} gefunden") + + # An SMTP weiterleiten + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: + smtp.mail(from_addr) + for recipient in recipients: + smtp.rcpt(recipient) + smtp.data(email_content) + + # Als verarbeitet markieren + mark_email_as_processed(bucket_name, key) + processed_count += 1 + + logger.info(f"[{request_id}] E-Mail {key} erfolgreich verarbeitet") + + except Exception as e: + failed_count += 1 + error_msg = f"Fehler bei E-Mail {key}: {str(e)}" + errors.append(error_msg) + logger.error(f"[{request_id}] {error_msg}") + + result = { + 'message': f'Retry-Verarbeitung für Domain {domain} abgeschlossen', + 'domain': domain, + 'bucket': bucket_name, + 'total_found': len(unprocessed_emails), + 'processed_count': processed_count, + 'failed_count': failed_count, + 'request_id': request_id + } + + if errors: + result['errors'] = errors + + return jsonify(result), 200 + + except Exception as e: + logger.error(f"[{request_id}] Fehler bei Retry-Verarbeitung: {str(e)}") + return jsonify({ + 'error': str(e), + 'domain': domain, + 'bucket': bucket_name, + 'request_id': request_id + }), 500 + @app.route('/process/', methods=['POST']) def process_email(domain): + """Verarbeitet eine einzelne E-Mail von der Lambda-Funktion""" auth_header = request.headers.get('Authorization') if not auth_header or auth_header != f'Bearer {API_TOKEN}': return jsonify({'error': 'Unauthorized'}), 401 @@ -43,8 +286,12 @@ def process_email(domain): request_id = data.get('request_id') email_content = data.get('email_content') compressed = data.get('compressed', False) + s3_key = data.get('s3_key') # S3-Key für Marker-Funktionalität + s3_bucket = data.get('s3_bucket') # S3-Bucket für Marker-Funktionalität logger.info(f"[{request_id}] Processing email for domain: {domain}") + if s3_key: + logger.info(f"[{request_id}] S3 Location: {s3_bucket}/{s3_key}") try: # Entkomprimieren, falls komprimiert @@ -62,7 +309,7 @@ def process_email(domain): if from_headers: from_addr = getaddresses(from_headers)[0][1] else: - from_addr = 'lambda@andreasknuth.de' # Fallback + from_addr = f'lambda@{domain}' # Fallback mit Domain # Empfänger aus allen relevanten Headern extrahieren (konsistent mit Lambda) to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))] @@ -94,11 +341,32 @@ def process_email(domain): logger.info(f"[{request_id}] Email forwarded to Postfix for {domain} - {len(recipients)} recipients") + # E-Mail als verarbeitet markieren, falls S3-Informationen vorhanden + marked_as_processed = False + if s3_key and s3_bucket: + if mark_email_as_processed(s3_bucket, s3_key): + logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert") + marked_as_processed = True + else: + logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren") + elif s3_key: + # Fallback: Bucket-Name aus Domain ableiten + bucket_name = get_bucket_name_for_domain(domain) + if mark_email_as_processed(bucket_name, s3_key): + logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert (abgeleiteter Bucket: {bucket_name})") + marked_as_processed = True + else: + logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren") + return jsonify({ 'message': 'Email processed', 'request_id': request_id, + 'domain': domain, 'recipients_count': len(recipients), - 'recipients': recipients + 'recipients': recipients, + 'marked_as_processed': marked_as_processed, + 's3_bucket': s3_bucket, + 's3_key': s3_key }), 200 except Exception as e: diff --git a/ses-lambda/lambda-function.py b/ses-lambda/lambda-function.py new file mode 100644 index 0000000..b538b9d --- /dev/null +++ b/ses-lambda/lambda-function.py @@ -0,0 +1,214 @@ +# Optimiertes Lambda mit Verbesserungen +import json +import os +import urllib.request +import urllib.error +import urllib.parse +import logging +import ssl +import boto3 +import base64 +import hashlib +import time +import gzip + +# Konfiguration +API_BASE_URL = os.environ['API_BASE_URL'] +API_TOKEN = os.environ['API_TOKEN'] +DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true' +MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def bucket_name_to_domain(bucket_name): + """Konvertiert S3-Bucket-Namen zu Domain-Namen""" + # Beispiel: andreasknuth-de-emails -> andreasknuth.de + if bucket_name.endswith('-emails'): + # Entferne das "-emails" Suffix + domain_part = bucket_name[:-7] # Entferne die letzten 7 Zeichen ("-emails") + # Ersetze alle verbleibenden "-" durch "." + domain = domain_part.replace('-', '.') + return domain + else: + # Fallback: Bucket-Name entspricht nicht dem erwarteten Schema + logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema") + return None + +def lambda_handler(event, context): + """Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung""" + + # Eindeutige Request-ID für Tracking + request_id = context.aws_request_id + logger.info(f"[{request_id}] S3-Ereignis empfangen") + + try: + # S3-Event-Details extrahieren + s3_event = event['Records'][0]['s3'] + bucket_name = s3_event['bucket']['name'] + object_key = urllib.parse.unquote_plus(s3_event['object']['key']) + + logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}") + + # Domain automatisch aus Bucket-Namen ermitteln + domain = bucket_name_to_domain(bucket_name) + if not domain: + logger.error(f"[{request_id}] Konnte Domain nicht aus Bucket-Namen {bucket_name} ermitteln") + return {'statusCode': 400, 'body': json.dumps('Ungültiger Bucket-Name')} + + logger.info(f"[{request_id}] Ermittelte Domain: {domain}") + + # Duplikat-Check mit S3-ETag + s3_client = boto3.client('s3') + head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key) + etag = head_response['ETag'].strip('"') + content_length = head_response['ContentLength'] + + # E-Mail-Größe prüfen + if content_length > MAX_EMAIL_SIZE: + logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})") + # Große E-Mails markieren statt löschen + mark_large_email(s3_client, bucket_name, object_key, content_length) + return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')} + + # E-Mail laden + try: + response = s3_client.get_object(Bucket=bucket_name, Key=object_key) + email_content = response['Body'].read() + logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes") + + except Exception as e: + logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}") + return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')} + + # E-Mail-Content komprimieren für Übertragung + compressed_content = gzip.compress(email_content) + email_base64 = base64.b64encode(compressed_content).decode('utf-8') + + # Payload erstellen + payload = { + 's3_bucket': bucket_name, # S3-Bucket für Marker-Funktionalität + 's3_key': object_key, # S3-Key für Marker-Funktionalität + 'domain': domain, + 'email_content': email_base64, + 'compressed': True, # Flag für API + 'etag': etag, + 'request_id': request_id, + 'original_size': len(email_content), + 'compressed_size': len(compressed_content) + } + + # API aufrufen mit Retry-Logik + success = call_api_with_retry(payload, domain, request_id) + + if success: + # Bei Erfolg: E-Mail aus S3 löschen + try: + s3_client.delete_object(Bucket=bucket_name, Key=object_key) + logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'E-Mail erfolgreich verarbeitet', + 'request_id': request_id, + 'domain': domain + }) + } + except Exception as e: + logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}") + # Trotzdem Erfolg melden, da E-Mail verarbeitet wurde + return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')} + else: + # Bei Fehler: E-Mail in S3 belassen für späteren Retry + logger.error(f"[{request_id}] E-Mail bleibt in S3 für Retry: {bucket_name}/{object_key}") + return {'statusCode': 500, 'body': json.dumps('API-Fehler')} + + except Exception as e: + logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True) + return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')} + +def call_api_with_retry(payload, domain, request_id, max_retries=3): + """API-Aufruf mit Retry-Logik""" + + sync_url = f"{API_BASE_URL}/process/{domain}" + payload_json = json.dumps(payload).encode('utf-8') + + for attempt in range(max_retries): + logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}") + + try: + # Request erstellen + req = urllib.request.Request(sync_url, data=payload_json, method="POST") + req.add_header('Authorization', f'Bearer {API_TOKEN}') + req.add_header('Content-Type', 'application/json') + req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}') + req.add_header('X-Request-ID', request_id) + + # SSL-Kontext + ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None + + # API-Request mit Timeout + timeout = 25 # Lambda hat 30s Timeout, also etwas weniger + + if DEBUG_MODE and ssl_context: + with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response: + response_body = response.read().decode('utf-8') + response_code = response.getcode() + else: + with urllib.request.urlopen(req, timeout=timeout) as response: + response_body = response.read().decode('utf-8') + response_code = response.getcode() + + # Erfolgreiche Antwort + if response_code == 200: + logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}") + if DEBUG_MODE: + logger.info(f"[{request_id}] API-Antwort: {response_body}") + return True + else: + logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}") + + except urllib.error.HTTPError as e: + error_body = "" + try: + error_body = e.read().decode('utf-8') + except: + pass + + logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}") + logger.error(f"[{request_id}] Fehlerdetails: {error_body}") + + # Bei 4xx-Fehlern nicht retry (Client-Fehler) + if 400 <= e.code < 500: + logger.error(f"[{request_id}] Client-Fehler - kein Retry") + return False + + except Exception as e: + logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}") + + # Warten vor nächstem Versuch (exponential backoff) + if attempt < max_retries - 1: + wait_time = 2 ** attempt # 1s, 2s, 4s + logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch") + time.sleep(wait_time) + + logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen") + return False + +def mark_large_email(s3_client, bucket, key, size): + """Markiert große E-Mails mit Metadaten statt sie zu löschen""" + try: + s3_client.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata={ + 'status': 'too_large', + 'size': str(size), + 'marked_at': str(int(time.time())) + }, + MetadataDirective='REPLACE' + ) + logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)") + except Exception as e: + logger.error(f"Konnte E-Mail nicht markieren: {str(e)}") \ No newline at end of file diff --git a/ses-lambda/lambda_funtion_old.py b/ses-lambda/lambda_funtion_old.py new file mode 100644 index 0000000..6073c2e --- /dev/null +++ b/ses-lambda/lambda_funtion_old.py @@ -0,0 +1,202 @@ +# Optimiertes Lambda mit Verbesserungen +import json +import os +import urllib.request +import urllib.error +import urllib.parse +import logging +import ssl +import boto3 +import base64 +import hashlib + +# Konfiguration +API_BASE_URL = os.environ['API_BASE_URL'] +API_TOKEN = os.environ['API_TOKEN'] +DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true' +MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard + +# Bucket zu Domain Mapping +BUCKET_DOMAIN_MAP = { + 'bizmatch-ses-mails': 'bizmatch.net', + 'haiky-app-emails-339712845857': 'haiky.app', + 'andreasknuth-de-emails': 'andreasknuth.de', + 'wetter-playadelingles-de-emails': 'wetter-playadelingles.de', +} + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def lambda_handler(event, context): + """Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung""" + + # Eindeutige Request-ID für Tracking + request_id = context.aws_request_id + logger.info(f"[{request_id}] S3-Ereignis empfangen") + + try: + # S3-Event-Details extrahieren + s3_event = event['Records'][0]['s3'] + bucket_name = s3_event['bucket']['name'] + object_key = urllib.parse.unquote_plus(s3_event['object']['key']) + + logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}") + + # Domain ermitteln + domain = BUCKET_DOMAIN_MAP.get(bucket_name) + if not domain: + logger.warning(f"[{request_id}] Keine Domain-Zuordnung für Bucket {bucket_name}") + return {'statusCode': 400, 'body': json.dumps('Unbekannter Bucket')} + + # Duplikat-Check mit S3-ETag + s3_client = boto3.client('s3') + head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key) + etag = head_response['ETag'].strip('"') + content_length = head_response['ContentLength'] + + # E-Mail-Größe prüfen + if content_length > MAX_EMAIL_SIZE: + logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})") + # Große E-Mails markieren statt löschen + mark_large_email(s3_client, bucket_name, object_key, content_length) + return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')} + + # E-Mail laden + try: + response = s3_client.get_object(Bucket=bucket_name, Key=object_key) + email_content = response['Body'].read() + logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes") + + except Exception as e: + logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}") + return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')} + + # E-Mail-Content komprimieren für Übertragung (optional) + import gzip + compressed_content = gzip.compress(email_content) + email_base64 = base64.b64encode(compressed_content).decode('utf-8') + + # Payload erstellen + payload = { + 'bucket': bucket_name, + 'key': object_key, + 'domain': domain, + 'email_content': email_base64, + 'compressed': True, # Flag für API + 'etag': etag, + 'request_id': request_id, + 'original_size': len(email_content), + 'compressed_size': len(compressed_content) + } + + # API aufrufen mit Retry-Logik + success = call_api_with_retry(payload, domain, request_id) + + if success: + # Bei Erfolg: E-Mail aus S3 löschen + try: + s3_client.delete_object(Bucket=bucket_name, Key=object_key) + logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'E-Mail erfolgreich verarbeitet', + 'request_id': request_id + }) + } + except Exception as e: + logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}") + # Trotzdem Erfolg melden, da E-Mail verarbeitet wurde + return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')} + else: + # Bei Fehler: E-Mail in S3 belassen für späteren Retry + return {'statusCode': 500, 'body': json.dumps('API-Fehler')} + + except Exception as e: + logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True) + return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')} + +def call_api_with_retry(payload, domain, request_id, max_retries=3): + """API-Aufruf mit Retry-Logik""" + + sync_url = f"{API_BASE_URL}/process/{domain}" + payload_json = json.dumps(payload).encode('utf-8') + + for attempt in range(max_retries): + logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}") + + try: + # Request erstellen + req = urllib.request.Request(sync_url, data=payload_json, method="POST") + req.add_header('Authorization', f'Bearer {API_TOKEN}') + req.add_header('Content-Type', 'application/json') + req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}') + req.add_header('X-Request-ID', request_id) + + # SSL-Kontext + ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None + + # API-Request mit Timeout + timeout = 25 # Lambda hat 30s Timeout, also etwas weniger + + if DEBUG_MODE and ssl_context: + with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response: + response_body = response.read().decode('utf-8') + response_code = response.getcode() + else: + with urllib.request.urlopen(req, timeout=timeout) as response: + response_body = response.read().decode('utf-8') + response_code = response.getcode() + + # Erfolgreiche Antwort + if response_code == 200: + logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}") + return True + else: + logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}") + + except urllib.error.HTTPError as e: + error_body = "" + try: + error_body = e.read().decode('utf-8') + except: + pass + + logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}") + logger.error(f"[{request_id}] Fehlerdetails: {error_body}") + + # Bei 4xx-Fehlern nicht retry (Client-Fehler) + if 400 <= e.code < 500: + logger.error(f"[{request_id}] Client-Fehler - kein Retry") + return False + + except Exception as e: + logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}") + + # Warten vor nächstem Versuch (exponential backoff) + if attempt < max_retries - 1: + wait_time = 2 ** attempt # 1s, 2s, 4s + logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch") + import time + time.sleep(wait_time) + + logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen") + return False + +def mark_large_email(s3_client, bucket, key, size): + """Markiert große E-Mails mit Metadaten statt sie zu löschen""" + try: + s3_client.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata={ + 'status': 'too_large', + 'size': str(size), + 'marked_at': str(int(time.time())) + }, + MetadataDirective='REPLACE' + ) + logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)") + except Exception as e: + logger.error(f"Konnte E-Mail nicht markieren: {str(e)}") \ No newline at end of file