diff --git a/email_api/email_api/app.py b/email_api/email_api/app.py index 84b045a..e58bbfd 100644 --- a/email_api/email_api/app.py +++ b/email_api/email_api/app.py @@ -1,5 +1,5 @@ import sys -from flask import Flask, request, jsonify, g +from flask import Flask, request, jsonify import smtplib import base64 import gzip @@ -13,56 +13,31 @@ from email.parser import BytesParser from email.policy import default from email.utils import getaddresses -# Python-Version prüfen if sys.version_info < (3, 12): raise RuntimeError("Python 3.12 oder höher erforderlich") -# .env-Datei laden load_dotenv() - app = Flask(__name__) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Konfiguration -SMTP_HOST = "localhost" # Host-Netzwerkmodus -SMTP_PORT = 25 # Fest auf Port 25 ohne TLS +SMTP_HOST = "localhost" +SMTP_PORT = 25 API_TOKEN = os.environ.get('API_TOKEN') -MAIL_DIR = "/var/mail" # Standard Maildir-Pfad AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1') - -if not API_TOKEN: - raise ValueError("API_TOKEN Umgebungsvariable nicht gesetzt") - -# S3-Client initialisieren s3_client = boto3.client('s3', region_name=AWS_REGION) -# Cache für verarbeitete Requests processed_requests = {} -logger.info(f"API_TOKEN loaded: {API_TOKEN}") - def load_domains_config(): - """Lädt die Domain-Konfiguration""" - # Hier könntest du eine echte Konfigurationsdatei laden - # Für jetzt als Beispiel hardcoded - return { - "andreasknuth.de": {"bucket": "andreasknuth-de-emails"}, - # Weitere Domains können hier hinzugefügt werden - } + return {"andreasknuth.de": {"bucket": "andreasknuth-de-emails"}} -def get_bucket_name_for_domain(domain): - """Konvertiert Domain-Namen zu S3-Bucket-Namen""" - return domain.replace(".", "-") + "-emails" - -def mark_email_as_processed(bucket_name, key): - """Markiert eine E-Mail als erfolgreich verarbeitet""" +def mark_email_as_processed(bucket, key): try: - # Füge Metadata hinzu, um zu markieren, dass die E-Mail verarbeitet wurde s3_client.copy_object( - Bucket=bucket_name, + Bucket=bucket, Key=key, - CopySource={'Bucket': bucket_name, 'Key': key}, + CopySource={'Bucket': bucket, 'Key': key}, Metadata={ 'processed': 'true', 'processed_timestamp': str(int(time.time())), @@ -70,302 +45,82 @@ def mark_email_as_processed(bucket_name, key): }, MetadataDirective='REPLACE' ) - logger.info(f"E-Mail {key} als verarbeitet markiert") return True except Exception as e: - logger.error(f"Fehler beim Markieren der E-Mail {key}: {str(e)}") + logger.error(f"Fehler beim Markieren: {e}") return False -def get_unprocessed_emails(bucket_name): - """Holt alle unverarbeiteten E-Mails aus einem S3-Bucket""" - try: - unprocessed_emails = [] - paginator = s3_client.get_paginator('list_objects_v2') - - for page in paginator.paginate(Bucket=bucket_name): - if 'Contents' not in page: - continue - - for obj in page['Contents']: - key = obj['Key'] - - # Prüfe Metadata der E-Mail - try: - head_response = s3_client.head_object(Bucket=bucket_name, Key=key) - metadata = head_response.get('Metadata', {}) - - # Wenn nicht als verarbeitet markiert, zur Liste hinzufügen - if metadata.get('processed') != 'true': - unprocessed_emails.append({ - 'key': key, - 'size': obj['Size'], - 'last_modified': obj['LastModified'].isoformat(), - 'metadata': metadata - }) - except Exception as e: - logger.warning(f"Fehler beim Prüfen der Metadata für {key}: {str(e)}") - # Wenn Metadata nicht gelesen werden kann, als unverarbeitet betrachten - unprocessed_emails.append({ - 'key': key, - 'size': obj['Size'], - 'last_modified': obj['LastModified'].isoformat(), - 'metadata': {} - }) - - return unprocessed_emails - except Exception as e: - logger.error(f"Fehler beim Abrufen unverarbeiteter E-Mails: {str(e)}") - return [] - -@app.route('/health', methods=['GET']) -def health_check(): - """Erweiterte Gesundheitsprüfung""" - - # Basis-Gesundheitscheck - health_status = { - "status": "OK", - "message": "S3 E-Mail Processor API ist aktiv", - "timestamp": int(time.time()), - "version": "2.0", - "request_id": getattr(g, 'request_id', 'health-check') - } - - # Erweiterte Checks - try: - # Maildir-Zugriff prüfen - mail_path = Path(MAIL_DIR) - if mail_path.exists() and mail_path.is_dir(): - health_status["mail_dir"] = "accessible" - else: - health_status["mail_dir"] = "not_accessible" - health_status["status"] = "WARNING" - - # Domain-Konfiguration prüfen - domains = load_domains_config() - health_status["configured_domains"] = len(domains) - health_status["domains"] = list(domains.keys()) - - # Cache-Status - health_status["request_cache_size"] = len(processed_requests) - - # SMTP-Verbindung testen - try: - with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as smtp: - smtp.noop() - health_status["smtp_connection"] = "OK" - except Exception as e: - health_status["smtp_connection"] = f"ERROR: {str(e)}" - health_status["status"] = "WARNING" - - # S3-Verbindung testen - try: - s3_client.list_buckets() - health_status["s3_connection"] = "OK" - except Exception as e: - health_status["s3_connection"] = f"ERROR: {str(e)}" - health_status["status"] = "WARNING" - - except Exception as e: - health_status["status"] = "ERROR" - health_status["error"] = str(e) - - return jsonify(health_status) - -@app.route('/retry/', methods=['GET']) -def retry_domain_emails(domain): - """Verarbeitet alle unverarbeiteten E-Mails für eine Domain""" - auth_header = request.headers.get('Authorization') - if not auth_header or auth_header != f'Bearer {API_TOKEN}': - return jsonify({'error': 'Unauthorized'}), 401 - - request_id = f"retry-{domain}-{int(time.time())}" - bucket_name = get_bucket_name_for_domain(domain) - - logger.info(f"[{request_id}] Retry-Verarbeitung für Domain: {domain}, Bucket: {bucket_name}") - - try: - # Hole alle unverarbeiteten E-Mails - unprocessed_emails = get_unprocessed_emails(bucket_name) - - if not unprocessed_emails: - return jsonify({ - 'message': f'Keine unverarbeiteten E-Mails für Domain {domain} gefunden', - 'domain': domain, - 'bucket': bucket_name, - 'processed_count': 0, - 'request_id': request_id - }), 200 - - processed_count = 0 - failed_count = 0 - errors = [] - - for email_info in unprocessed_emails: - key = email_info['key'] - - try: - logger.info(f"[{request_id}] Verarbeite E-Mail: {key}") - - # E-Mail aus S3 laden - response = s3_client.get_object(Bucket=bucket_name, Key=key) - email_content = response['Body'].read() - - # E-Mail parsen - email_msg = BytesParser(policy=default).parsebytes(email_content) - - # From-Adresse extrahieren - from_headers = email_msg.get_all('from', []) - if from_headers: - from_addr = getaddresses(from_headers)[0][1] - else: - from_addr = f'retry@{domain}' - - # Empfänger extrahieren - to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))] - cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))] - bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))] - recipients = to_addrs + cc_addrs + bcc_addrs - - if not recipients: - raise ValueError(f"Keine Empfänger in E-Mail {key} gefunden") - - # An SMTP weiterleiten - verwende sendmail() - with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: - smtp.sendmail(from_addr, recipients, email_content) - - # Als verarbeitet markieren - mark_email_as_processed(bucket_name, key) - processed_count += 1 - - logger.info(f"[{request_id}] E-Mail {key} erfolgreich verarbeitet") - - except Exception as e: - failed_count += 1 - error_msg = f"Fehler bei E-Mail {key}: {str(e)}" - errors.append(error_msg) - logger.error(f"[{request_id}] {error_msg}") - - result = { - 'message': f'Retry-Verarbeitung für Domain {domain} abgeschlossen', - 'domain': domain, - 'bucket': bucket_name, - 'total_found': len(unprocessed_emails), - 'processed_count': processed_count, - 'failed_count': failed_count, - 'request_id': request_id - } - - if errors: - result['errors'] = errors - - return jsonify(result), 200 - - except Exception as e: - logger.error(f"[{request_id}] Fehler bei Retry-Verarbeitung: {str(e)}") - return jsonify({ - 'error': str(e), - 'domain': domain, - 'bucket': bucket_name, - 'request_id': request_id - }), 500 - @app.route('/process/', methods=['POST']) def process_email(domain): - """Verarbeitet eine einzelne E-Mail von der Lambda-Funktion""" - auth_header = request.headers.get('Authorization') - if not auth_header or auth_header != f'Bearer {API_TOKEN}': + auth = request.headers.get('Authorization') + if auth != f'Bearer {API_TOKEN}': return jsonify({'error': 'Unauthorized'}), 401 data = request.get_json() if not data: - return jsonify({'error': 'Invalid JSON payload'}), 400 + return jsonify({'error': 'Invalid payload'}), 400 - request_id = data.get('request_id') - email_content = data.get('email_content') + content = data.get('email_content') compressed = data.get('compressed', False) - s3_key = data.get('s3_key') # S3-Key für Marker-Funktionalität - s3_bucket = data.get('s3_bucket') # S3-Bucket für Marker-Funktionalität + raw = base64.b64decode(content) + email_bytes = gzip.decompress(raw) if compressed else raw - logger.info(f"[{request_id}] Processing email for domain: {domain}") - if s3_key: - logger.info(f"[{request_id}] S3 Location: {s3_bucket}/{s3_key}") + msg = BytesParser(policy=default).parsebytes(email_bytes) + from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'lambda@{domain}' + recipients = [] + for hdr in ('to', 'cc', 'bcc'): + recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))] - try: - # Entkomprimieren, falls komprimiert - if compressed: - email_bytes = base64.b64decode(email_content) - email_content = gzip.decompress(email_bytes) - else: - email_content = base64.b64decode(email_content) + if not recipients: + return jsonify({'error': 'No recipients'}), 400 - # E-Mail-Header parsen - email_msg = BytesParser(policy=default).parsebytes(email_content) - - # From-Adresse extrahieren (konsistent mit Lambda) - from_headers = email_msg.get_all('from', []) - if from_headers: - from_addr = getaddresses(from_headers)[0][1] - else: - from_addr = f'lambda@{domain}' # Fallback mit Domain - - # Empfänger aus allen relevanten Headern extrahieren (konsistent mit Lambda) - to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))] - cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))] - bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))] - recipients = to_addrs + cc_addrs + bcc_addrs - - # Debug-Ausgabe für Empfänger - logger.info(f"[{request_id}] To-Adressen: {to_addrs}") - logger.info(f"[{request_id}] CC-Adressen: {cc_addrs}") - logger.info(f"[{request_id}] BCC-Adressen: {bcc_addrs}") - logger.info(f"[{request_id}] Alle Empfänger: {recipients}") - - if not recipients: - # Zusätzliche Debug-Info - logger.error(f"[{request_id}] Verfügbare Header: {list(email_msg.keys())}") - logger.error(f"[{request_id}] To-Header roh: {email_msg.get('To')}") - logger.error(f"[{request_id}] CC-Header roh: {email_msg.get('Cc')}") - raise ValueError("Keine Empfänger (To/CC/BCC) in der E-Mail gefunden") + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: + smtp.sendmail(from_addr, recipients, email_bytes) - logger.info(f"[{request_id}] From: {from_addr}, Recipients: {recipients}") + # Keine Markierung hier mehr; übernimmt Lambda + return jsonify({'message': 'Email forwarded', 'recipients': recipients}), 200 - # An Postfix weiterleiten - verwende sendmail() statt manueller SMTP-Befehle - with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: - smtp.sendmail(from_addr, recipients, email_content) - - logger.info(f"[{request_id}] Email forwarded to Postfix for {domain} - {len(recipients)} recipients") +@app.route('/retry/', methods=['GET']) +def retry_domain_emails(domain): + auth = request.headers.get('Authorization') + if auth != f'Bearer {API_TOKEN}': + return jsonify({'error': 'Unauthorized'}), 401 - # E-Mail als verarbeitet markieren, falls S3-Informationen vorhanden - marked_as_processed = False - if s3_key and s3_bucket: - if mark_email_as_processed(s3_bucket, s3_key): - logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert") - marked_as_processed = True + bucket = domain.replace('.', '-') + '-emails' + unprocessed = [] + paginator = s3_client.get_paginator('list_objects_v2') + for page in paginator.paginate(Bucket=bucket): + for obj in page.get('Contents', []): + head = s3_client.head_object(Bucket=bucket, Key=obj['Key']) + if head.get('Metadata', {}).get('processed') != 'true': + unprocessed.append(obj['Key']) + + results = {'processed': [], 'failed': []} + for key in unprocessed: + try: + body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() + msg = BytesParser(policy=default).parsebytes(body) + from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'retry@{domain}' + to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))] + cc_addrs = [addr for _n, addr in getaddresses(msg.get_all('cc', []))] + bcc_addrs = [addr for _n, addr in getaddresses(msg.get_all('bcc', []))] + recipients = to_addrs + cc_addrs + bcc_addrs + + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: + smtp.sendmail(from_addr, recipients, body) + + if mark_email_as_processed(bucket, key): + results['processed'].append(key) else: - logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren") - elif s3_key: - # Fallback: Bucket-Name aus Domain ableiten - bucket_name = get_bucket_name_for_domain(domain) - if mark_email_as_processed(bucket_name, s3_key): - logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert (abgeleiteter Bucket: {bucket_name})") - marked_as_processed = True - else: - logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren") + results['failed'].append(key) + except Exception as e: + results['failed'].append(f"{key}: {e}") - return jsonify({ - 'message': 'Email processed', - 'request_id': request_id, - 'domain': domain, - 'recipients_count': len(recipients), - 'recipients': recipients, - 'marked_as_processed': marked_as_processed, - 's3_bucket': s3_bucket, - 's3_key': s3_key - }), 200 - - except Exception as e: - logger.error(f"[{request_id}] Error processing email: {str(e)}") - return jsonify({'error': str(e), 'request_id': request_id}), 500 + return jsonify(results), 200 + +@app.route('/health', methods=['GET']) +def health_check(): + return jsonify({'status': 'OK'}), 200 if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) \ No newline at end of file + app.run(host='0.0.0.0', port=5000) diff --git a/ses-lambda/lambda-function.py b/ses-lambda/lambda-function.py index b538b9d..cf4276c 100644 --- a/ses-lambda/lambda-function.py +++ b/ses-lambda/lambda-function.py @@ -1,4 +1,3 @@ -# Optimiertes Lambda mit Verbesserungen import json import os import urllib.request @@ -8,9 +7,8 @@ import logging import ssl import boto3 import base64 -import hashlib -import time import gzip +import time # Konfiguration API_BASE_URL = os.environ['API_BASE_URL'] @@ -21,194 +19,104 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Stand logger = logging.getLogger() logger.setLevel(logging.INFO) +s3_client = boto3.client('s3') + def bucket_name_to_domain(bucket_name): - """Konvertiert S3-Bucket-Namen zu Domain-Namen""" - # Beispiel: andreasknuth-de-emails -> andreasknuth.de if bucket_name.endswith('-emails'): - # Entferne das "-emails" Suffix - domain_part = bucket_name[:-7] # Entferne die letzten 7 Zeichen ("-emails") - # Ersetze alle verbleibenden "-" durch "." - domain = domain_part.replace('-', '.') - return domain - else: - # Fallback: Bucket-Name entspricht nicht dem erwarteten Schema - logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema") - return None + domain_part = bucket_name[:-7] + return domain_part.replace('-', '.') + logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema") + return None -def lambda_handler(event, context): - """Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung""" - - # Eindeutige Request-ID für Tracking - request_id = context.aws_request_id - logger.info(f"[{request_id}] S3-Ereignis empfangen") - - try: - # S3-Event-Details extrahieren - s3_event = event['Records'][0]['s3'] - bucket_name = s3_event['bucket']['name'] - object_key = urllib.parse.unquote_plus(s3_event['object']['key']) - - logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}") - - # Domain automatisch aus Bucket-Namen ermitteln - domain = bucket_name_to_domain(bucket_name) - if not domain: - logger.error(f"[{request_id}] Konnte Domain nicht aus Bucket-Namen {bucket_name} ermitteln") - return {'statusCode': 400, 'body': json.dumps('Ungültiger Bucket-Name')} - - logger.info(f"[{request_id}] Ermittelte Domain: {domain}") - - # Duplikat-Check mit S3-ETag - s3_client = boto3.client('s3') - head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key) - etag = head_response['ETag'].strip('"') - content_length = head_response['ContentLength'] - - # E-Mail-Größe prüfen - if content_length > MAX_EMAIL_SIZE: - logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})") - # Große E-Mails markieren statt löschen - mark_large_email(s3_client, bucket_name, object_key, content_length) - return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')} - - # E-Mail laden - try: - response = s3_client.get_object(Bucket=bucket_name, Key=object_key) - email_content = response['Body'].read() - logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes") - - except Exception as e: - logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}") - return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')} - - # E-Mail-Content komprimieren für Übertragung - compressed_content = gzip.compress(email_content) - email_base64 = base64.b64encode(compressed_content).decode('utf-8') - - # Payload erstellen - payload = { - 's3_bucket': bucket_name, # S3-Bucket für Marker-Funktionalität - 's3_key': object_key, # S3-Key für Marker-Funktionalität - 'domain': domain, - 'email_content': email_base64, - 'compressed': True, # Flag für API - 'etag': etag, - 'request_id': request_id, - 'original_size': len(email_content), - 'compressed_size': len(compressed_content) - } - - # API aufrufen mit Retry-Logik - success = call_api_with_retry(payload, domain, request_id) - - if success: - # Bei Erfolg: E-Mail aus S3 löschen - try: - s3_client.delete_object(Bucket=bucket_name, Key=object_key) - logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht") - return { - 'statusCode': 200, - 'body': json.dumps({ - 'message': 'E-Mail erfolgreich verarbeitet', - 'request_id': request_id, - 'domain': domain - }) - } - except Exception as e: - logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}") - # Trotzdem Erfolg melden, da E-Mail verarbeitet wurde - return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')} - else: - # Bei Fehler: E-Mail in S3 belassen für späteren Retry - logger.error(f"[{request_id}] E-Mail bleibt in S3 für Retry: {bucket_name}/{object_key}") - return {'statusCode': 500, 'body': json.dumps('API-Fehler')} - - except Exception as e: - logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True) - return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')} -def call_api_with_retry(payload, domain, request_id, max_retries=3): - """API-Aufruf mit Retry-Logik""" - - sync_url = f"{API_BASE_URL}/process/{domain}" - payload_json = json.dumps(payload).encode('utf-8') - - for attempt in range(max_retries): - logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}") - - try: - # Request erstellen - req = urllib.request.Request(sync_url, data=payload_json, method="POST") - req.add_header('Authorization', f'Bearer {API_TOKEN}') - req.add_header('Content-Type', 'application/json') - req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}') - req.add_header('X-Request-ID', request_id) - - # SSL-Kontext - ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None - - # API-Request mit Timeout - timeout = 25 # Lambda hat 30s Timeout, also etwas weniger - - if DEBUG_MODE and ssl_context: - with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response: - response_body = response.read().decode('utf-8') - response_code = response.getcode() - else: - with urllib.request.urlopen(req, timeout=timeout) as response: - response_body = response.read().decode('utf-8') - response_code = response.getcode() - - # Erfolgreiche Antwort - if response_code == 200: - logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}") - if DEBUG_MODE: - logger.info(f"[{request_id}] API-Antwort: {response_body}") - return True - else: - logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}") - - except urllib.error.HTTPError as e: - error_body = "" - try: - error_body = e.read().decode('utf-8') - except: - pass - - logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}") - logger.error(f"[{request_id}] Fehlerdetails: {error_body}") - - # Bei 4xx-Fehlern nicht retry (Client-Fehler) - if 400 <= e.code < 500: - logger.error(f"[{request_id}] Client-Fehler - kein Retry") - return False - - except Exception as e: - logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}") - - # Warten vor nächstem Versuch (exponential backoff) - if attempt < max_retries - 1: - wait_time = 2 ** attempt # 1s, 2s, 4s - logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch") - time.sleep(wait_time) - - logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen") - return False - -def mark_large_email(s3_client, bucket, key, size): - """Markiert große E-Mails mit Metadaten statt sie zu löschen""" +def mark_email_processed(bucket, key, processor='lambda'): + timestamp = str(int(time.time())) try: s3_client.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata={ - 'status': 'too_large', - 'size': str(size), - 'marked_at': str(int(time.time())) + 'processed': 'true', + 'processed_timestamp': timestamp, + 'processor': processor }, MetadataDirective='REPLACE' ) - logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)") + logger.info(f"E-Mail {key} als verarbeitet markiert durch {processor}") + return True except Exception as e: - logger.error(f"Konnte E-Mail nicht markieren: {str(e)}") \ No newline at end of file + logger.error(f"Markierungsfehler für {key}: {e}") + return False + + +def call_api_with_retry(payload, domain, request_id, max_retries=3): + sync_url = f"{API_BASE_URL}/process/{domain}" + payload_json = json.dumps(payload).encode('utf-8') + for attempt in range(max_retries): + try: + req = urllib.request.Request(sync_url, data=payload_json, method="POST") + req.add_header('Authorization', f'Bearer {API_TOKEN}') + req.add_header('Content-Type', 'application/json') + req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}') + req.add_header('X-Request-ID', request_id) + timeout = 25 + context = ssl._create_unverified_context() if DEBUG_MODE else None + with urllib.request.urlopen(req, timeout=timeout, context=context) as response: + code = response.getcode() + if code == 200: + return True + except urllib.error.HTTPError as e: + if 400 <= e.code < 500: + return False + except Exception: + pass + time.sleep(2 ** attempt) + return False + + +def lambda_handler(event, context): + request_id = context.aws_request_id + logger.info(f"[{request_id}] S3-Ereignis empfangen") + s3_event = event['Records'][0]['s3'] + bucket = s3_event['bucket']['name'] + key = urllib.parse.unquote_plus(s3_event['object']['key']) + logger.info(f"[{request_id}] Verarbeite: {bucket}/{key}") + + domain = bucket_name_to_domain(bucket) + if not domain: + return {'statusCode': 400, 'body': 'Ungültiger Bucket-Name'} + + head = s3_client.head_object(Bucket=bucket, Key=key) + size = head['ContentLength'] + if size > MAX_EMAIL_SIZE: + # mark large + s3_client.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata={'status': 'too_large', 'size': str(size), 'marked_at': str(int(time.time()))}, + MetadataDirective='REPLACE' + ) + return {'statusCode': 413, 'body': 'E-Mail zu groß'} + + body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() + compressed = gzip.compress(body) + payload = { + 's3_bucket': bucket, + 's3_key': key, + 'domain': domain, + 'email_content': base64.b64encode(compressed).decode(), + 'compressed': True, + 'etag': head['ETag'].strip('"'), + 'request_id': request_id, + 'original_size': len(body), + 'compressed_size': len(compressed) + } + + if call_api_with_retry(payload, domain, request_id): + # Inline metadata marking statt Löschung + mark_email_processed(bucket, key) + return {'statusCode': 200, 'body': json.dumps({'message': 'E-Mail verarbeitet und markiert', 'request_id': request_id})} + else: + return {'statusCode': 500, 'body': 'API-Fehler'} \ No newline at end of file