This commit is contained in:
Andreas Knuth 2025-06-12 19:12:21 -05:00
parent 9287e9be8b
commit 24b05aa210
4 changed files with 694 additions and 4 deletions

View File

@ -6,10 +6,16 @@ services:
network_mode: host network_mode: host
volumes: volumes:
- ./email_api:/app - ./email_api:/app
- /var/mail:/var/mail # Maildir-Zugriff für Health-Check
working_dir: /app working_dir: /app
env_file: env_file:
- .env - .env
environment: environment:
- API_TOKEN=${API_TOKEN} - API_TOKEN=${API_TOKEN}
- AWS_REGION=${AWS_REGION}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
command: > command: >
bash -c "pip install --upgrade pip && pip install flask python-dotenv && python app.py" bash -c "pip install --upgrade pip &&
pip install flask python-dotenv boto3 &&
python app.py"

View File

@ -1,10 +1,13 @@
import sys import sys
from flask import Flask, request, jsonify from flask import Flask, request, jsonify, g
import smtplib import smtplib
import base64 import base64
import gzip import gzip
import logging import logging
import os import os
import time
import boto3
from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
from email.parser import BytesParser from email.parser import BytesParser
from email.policy import default from email.policy import default
@ -25,13 +28,253 @@ logger = logging.getLogger(__name__)
SMTP_HOST = "localhost" # Host-Netzwerkmodus SMTP_HOST = "localhost" # Host-Netzwerkmodus
SMTP_PORT = 25 # Fest auf Port 25 ohne TLS SMTP_PORT = 25 # Fest auf Port 25 ohne TLS
API_TOKEN = os.environ.get('API_TOKEN') API_TOKEN = os.environ.get('API_TOKEN')
MAIL_DIR = "/var/mail" # Standard Maildir-Pfad
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
if not API_TOKEN: if not API_TOKEN:
raise ValueError("API_TOKEN Umgebungsvariable nicht gesetzt") raise ValueError("API_TOKEN Umgebungsvariable nicht gesetzt")
# S3-Client initialisieren
s3_client = boto3.client('s3', region_name=AWS_REGION)
# Cache für verarbeitete Requests
processed_requests = {}
logger.info(f"API_TOKEN loaded: {API_TOKEN}") logger.info(f"API_TOKEN loaded: {API_TOKEN}")
def load_domains_config():
"""Lädt die Domain-Konfiguration"""
# Hier könntest du eine echte Konfigurationsdatei laden
# Für jetzt als Beispiel hardcoded
return {
"andreasknuth.de": {"bucket": "andreasknuth-de-emails"},
# Weitere Domains können hier hinzugefügt werden
}
def get_bucket_name_for_domain(domain):
"""Konvertiert Domain-Namen zu S3-Bucket-Namen"""
return domain.replace(".", "-") + "-emails"
def mark_email_as_processed(bucket_name, key):
"""Markiert eine E-Mail als erfolgreich verarbeitet"""
try:
# Füge Metadata hinzu, um zu markieren, dass die E-Mail verarbeitet wurde
s3_client.copy_object(
Bucket=bucket_name,
Key=key,
CopySource={'Bucket': bucket_name, 'Key': key},
Metadata={
'processed': 'true',
'processed_timestamp': str(int(time.time())),
'processor': 'rest-api'
},
MetadataDirective='REPLACE'
)
logger.info(f"E-Mail {key} als verarbeitet markiert")
return True
except Exception as e:
logger.error(f"Fehler beim Markieren der E-Mail {key}: {str(e)}")
return False
def get_unprocessed_emails(bucket_name):
"""Holt alle unverarbeiteten E-Mails aus einem S3-Bucket"""
try:
unprocessed_emails = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' not in page:
continue
for obj in page['Contents']:
key = obj['Key']
# Prüfe Metadata der E-Mail
try:
head_response = s3_client.head_object(Bucket=bucket_name, Key=key)
metadata = head_response.get('Metadata', {})
# Wenn nicht als verarbeitet markiert, zur Liste hinzufügen
if metadata.get('processed') != 'true':
unprocessed_emails.append({
'key': key,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'metadata': metadata
})
except Exception as e:
logger.warning(f"Fehler beim Prüfen der Metadata für {key}: {str(e)}")
# Wenn Metadata nicht gelesen werden kann, als unverarbeitet betrachten
unprocessed_emails.append({
'key': key,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'metadata': {}
})
return unprocessed_emails
except Exception as e:
logger.error(f"Fehler beim Abrufen unverarbeiteter E-Mails: {str(e)}")
return []
@app.route('/health', methods=['GET'])
def health_check():
"""Erweiterte Gesundheitsprüfung"""
# Basis-Gesundheitscheck
health_status = {
"status": "OK",
"message": "S3 E-Mail Processor API ist aktiv",
"timestamp": int(time.time()),
"version": "2.0",
"request_id": getattr(g, 'request_id', 'health-check')
}
# Erweiterte Checks
try:
# Maildir-Zugriff prüfen
mail_path = Path(MAIL_DIR)
if mail_path.exists() and mail_path.is_dir():
health_status["mail_dir"] = "accessible"
else:
health_status["mail_dir"] = "not_accessible"
health_status["status"] = "WARNING"
# Domain-Konfiguration prüfen
domains = load_domains_config()
health_status["configured_domains"] = len(domains)
health_status["domains"] = list(domains.keys())
# Cache-Status
health_status["request_cache_size"] = len(processed_requests)
# SMTP-Verbindung testen
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as smtp:
smtp.noop()
health_status["smtp_connection"] = "OK"
except Exception as e:
health_status["smtp_connection"] = f"ERROR: {str(e)}"
health_status["status"] = "WARNING"
# S3-Verbindung testen
try:
s3_client.list_buckets()
health_status["s3_connection"] = "OK"
except Exception as e:
health_status["s3_connection"] = f"ERROR: {str(e)}"
health_status["status"] = "WARNING"
except Exception as e:
health_status["status"] = "ERROR"
health_status["error"] = str(e)
return jsonify(health_status)
@app.route('/retry/<domain>', methods=['GET'])
def retry_domain_emails(domain):
"""Verarbeitet alle unverarbeiteten E-Mails für eine Domain"""
auth_header = request.headers.get('Authorization')
if not auth_header or auth_header != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401
request_id = f"retry-{domain}-{int(time.time())}"
bucket_name = get_bucket_name_for_domain(domain)
logger.info(f"[{request_id}] Retry-Verarbeitung für Domain: {domain}, Bucket: {bucket_name}")
try:
# Hole alle unverarbeiteten E-Mails
unprocessed_emails = get_unprocessed_emails(bucket_name)
if not unprocessed_emails:
return jsonify({
'message': f'Keine unverarbeiteten E-Mails für Domain {domain} gefunden',
'domain': domain,
'bucket': bucket_name,
'processed_count': 0,
'request_id': request_id
}), 200
processed_count = 0
failed_count = 0
errors = []
for email_info in unprocessed_emails:
key = email_info['key']
try:
logger.info(f"[{request_id}] Verarbeite E-Mail: {key}")
# E-Mail aus S3 laden
response = s3_client.get_object(Bucket=bucket_name, Key=key)
email_content = response['Body'].read()
# E-Mail parsen
email_msg = BytesParser(policy=default).parsebytes(email_content)
# From-Adresse extrahieren
from_headers = email_msg.get_all('from', [])
if from_headers:
from_addr = getaddresses(from_headers)[0][1]
else:
from_addr = f'retry@{domain}'
# Empfänger extrahieren
to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))]
cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))]
bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
if not recipients:
raise ValueError(f"Keine Empfänger in E-Mail {key} gefunden")
# An SMTP weiterleiten
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.mail(from_addr)
for recipient in recipients:
smtp.rcpt(recipient)
smtp.data(email_content)
# Als verarbeitet markieren
mark_email_as_processed(bucket_name, key)
processed_count += 1
logger.info(f"[{request_id}] E-Mail {key} erfolgreich verarbeitet")
except Exception as e:
failed_count += 1
error_msg = f"Fehler bei E-Mail {key}: {str(e)}"
errors.append(error_msg)
logger.error(f"[{request_id}] {error_msg}")
result = {
'message': f'Retry-Verarbeitung für Domain {domain} abgeschlossen',
'domain': domain,
'bucket': bucket_name,
'total_found': len(unprocessed_emails),
'processed_count': processed_count,
'failed_count': failed_count,
'request_id': request_id
}
if errors:
result['errors'] = errors
return jsonify(result), 200
except Exception as e:
logger.error(f"[{request_id}] Fehler bei Retry-Verarbeitung: {str(e)}")
return jsonify({
'error': str(e),
'domain': domain,
'bucket': bucket_name,
'request_id': request_id
}), 500
@app.route('/process/<domain>', methods=['POST']) @app.route('/process/<domain>', methods=['POST'])
def process_email(domain): def process_email(domain):
"""Verarbeitet eine einzelne E-Mail von der Lambda-Funktion"""
auth_header = request.headers.get('Authorization') auth_header = request.headers.get('Authorization')
if not auth_header or auth_header != f'Bearer {API_TOKEN}': if not auth_header or auth_header != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401 return jsonify({'error': 'Unauthorized'}), 401
@ -43,8 +286,12 @@ def process_email(domain):
request_id = data.get('request_id') request_id = data.get('request_id')
email_content = data.get('email_content') email_content = data.get('email_content')
compressed = data.get('compressed', False) compressed = data.get('compressed', False)
s3_key = data.get('s3_key') # S3-Key für Marker-Funktionalität
s3_bucket = data.get('s3_bucket') # S3-Bucket für Marker-Funktionalität
logger.info(f"[{request_id}] Processing email for domain: {domain}") logger.info(f"[{request_id}] Processing email for domain: {domain}")
if s3_key:
logger.info(f"[{request_id}] S3 Location: {s3_bucket}/{s3_key}")
try: try:
# Entkomprimieren, falls komprimiert # Entkomprimieren, falls komprimiert
@ -62,7 +309,7 @@ def process_email(domain):
if from_headers: if from_headers:
from_addr = getaddresses(from_headers)[0][1] from_addr = getaddresses(from_headers)[0][1]
else: else:
from_addr = 'lambda@andreasknuth.de' # Fallback from_addr = f'lambda@{domain}' # Fallback mit Domain
# Empfänger aus allen relevanten Headern extrahieren (konsistent mit Lambda) # Empfänger aus allen relevanten Headern extrahieren (konsistent mit Lambda)
to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))] to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))]
@ -94,11 +341,32 @@ def process_email(domain):
logger.info(f"[{request_id}] Email forwarded to Postfix for {domain} - {len(recipients)} recipients") logger.info(f"[{request_id}] Email forwarded to Postfix for {domain} - {len(recipients)} recipients")
# E-Mail als verarbeitet markieren, falls S3-Informationen vorhanden
marked_as_processed = False
if s3_key and s3_bucket:
if mark_email_as_processed(s3_bucket, s3_key):
logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert")
marked_as_processed = True
else:
logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren")
elif s3_key:
# Fallback: Bucket-Name aus Domain ableiten
bucket_name = get_bucket_name_for_domain(domain)
if mark_email_as_processed(bucket_name, s3_key):
logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert (abgeleiteter Bucket: {bucket_name})")
marked_as_processed = True
else:
logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren")
return jsonify({ return jsonify({
'message': 'Email processed', 'message': 'Email processed',
'request_id': request_id, 'request_id': request_id,
'domain': domain,
'recipients_count': len(recipients), 'recipients_count': len(recipients),
'recipients': recipients 'recipients': recipients,
'marked_as_processed': marked_as_processed,
's3_bucket': s3_bucket,
's3_key': s3_key
}), 200 }), 200
except Exception as e: except Exception as e:

View File

@ -0,0 +1,214 @@
# Optimiertes Lambda mit Verbesserungen
import json
import os
import urllib.request
import urllib.error
import urllib.parse
import logging
import ssl
import boto3
import base64
import hashlib
import time
import gzip
# Konfiguration
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true'
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def bucket_name_to_domain(bucket_name):
"""Konvertiert S3-Bucket-Namen zu Domain-Namen"""
# Beispiel: andreasknuth-de-emails -> andreasknuth.de
if bucket_name.endswith('-emails'):
# Entferne das "-emails" Suffix
domain_part = bucket_name[:-7] # Entferne die letzten 7 Zeichen ("-emails")
# Ersetze alle verbleibenden "-" durch "."
domain = domain_part.replace('-', '.')
return domain
else:
# Fallback: Bucket-Name entspricht nicht dem erwarteten Schema
logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema")
return None
def lambda_handler(event, context):
"""Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung"""
# Eindeutige Request-ID für Tracking
request_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen")
try:
# S3-Event-Details extrahieren
s3_event = event['Records'][0]['s3']
bucket_name = s3_event['bucket']['name']
object_key = urllib.parse.unquote_plus(s3_event['object']['key'])
logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}")
# Domain automatisch aus Bucket-Namen ermitteln
domain = bucket_name_to_domain(bucket_name)
if not domain:
logger.error(f"[{request_id}] Konnte Domain nicht aus Bucket-Namen {bucket_name} ermitteln")
return {'statusCode': 400, 'body': json.dumps('Ungültiger Bucket-Name')}
logger.info(f"[{request_id}] Ermittelte Domain: {domain}")
# Duplikat-Check mit S3-ETag
s3_client = boto3.client('s3')
head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key)
etag = head_response['ETag'].strip('"')
content_length = head_response['ContentLength']
# E-Mail-Größe prüfen
if content_length > MAX_EMAIL_SIZE:
logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})")
# Große E-Mails markieren statt löschen
mark_large_email(s3_client, bucket_name, object_key, content_length)
return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')}
# E-Mail laden
try:
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
email_content = response['Body'].read()
logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes")
except Exception as e:
logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}")
return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')}
# E-Mail-Content komprimieren für Übertragung
compressed_content = gzip.compress(email_content)
email_base64 = base64.b64encode(compressed_content).decode('utf-8')
# Payload erstellen
payload = {
's3_bucket': bucket_name, # S3-Bucket für Marker-Funktionalität
's3_key': object_key, # S3-Key für Marker-Funktionalität
'domain': domain,
'email_content': email_base64,
'compressed': True, # Flag für API
'etag': etag,
'request_id': request_id,
'original_size': len(email_content),
'compressed_size': len(compressed_content)
}
# API aufrufen mit Retry-Logik
success = call_api_with_retry(payload, domain, request_id)
if success:
# Bei Erfolg: E-Mail aus S3 löschen
try:
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'E-Mail erfolgreich verarbeitet',
'request_id': request_id,
'domain': domain
})
}
except Exception as e:
logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}")
# Trotzdem Erfolg melden, da E-Mail verarbeitet wurde
return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')}
else:
# Bei Fehler: E-Mail in S3 belassen für späteren Retry
logger.error(f"[{request_id}] E-Mail bleibt in S3 für Retry: {bucket_name}/{object_key}")
return {'statusCode': 500, 'body': json.dumps('API-Fehler')}
except Exception as e:
logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True)
return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')}
def call_api_with_retry(payload, domain, request_id, max_retries=3):
"""API-Aufruf mit Retry-Logik"""
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}")
try:
# Request erstellen
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
# SSL-Kontext
ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None
# API-Request mit Timeout
timeout = 25 # Lambda hat 30s Timeout, also etwas weniger
if DEBUG_MODE and ssl_context:
with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
else:
with urllib.request.urlopen(req, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
# Erfolgreiche Antwort
if response_code == 200:
logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}")
if DEBUG_MODE:
logger.info(f"[{request_id}] API-Antwort: {response_body}")
return True
else:
logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}")
except urllib.error.HTTPError as e:
error_body = ""
try:
error_body = e.read().decode('utf-8')
except:
pass
logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}")
logger.error(f"[{request_id}] Fehlerdetails: {error_body}")
# Bei 4xx-Fehlern nicht retry (Client-Fehler)
if 400 <= e.code < 500:
logger.error(f"[{request_id}] Client-Fehler - kein Retry")
return False
except Exception as e:
logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}")
# Warten vor nächstem Versuch (exponential backoff)
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch")
time.sleep(wait_time)
logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen")
return False
def mark_large_email(s3_client, bucket, key, size):
"""Markiert große E-Mails mit Metadaten statt sie zu löschen"""
try:
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'status': 'too_large',
'size': str(size),
'marked_at': str(int(time.time()))
},
MetadataDirective='REPLACE'
)
logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)")
except Exception as e:
logger.error(f"Konnte E-Mail nicht markieren: {str(e)}")

View File

@ -0,0 +1,202 @@
# Optimiertes Lambda mit Verbesserungen
import json
import os
import urllib.request
import urllib.error
import urllib.parse
import logging
import ssl
import boto3
import base64
import hashlib
# Konfiguration
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true'
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard
# Bucket zu Domain Mapping
BUCKET_DOMAIN_MAP = {
'bizmatch-ses-mails': 'bizmatch.net',
'haiky-app-emails-339712845857': 'haiky.app',
'andreasknuth-de-emails': 'andreasknuth.de',
'wetter-playadelingles-de-emails': 'wetter-playadelingles.de',
}
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung"""
# Eindeutige Request-ID für Tracking
request_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen")
try:
# S3-Event-Details extrahieren
s3_event = event['Records'][0]['s3']
bucket_name = s3_event['bucket']['name']
object_key = urllib.parse.unquote_plus(s3_event['object']['key'])
logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}")
# Domain ermitteln
domain = BUCKET_DOMAIN_MAP.get(bucket_name)
if not domain:
logger.warning(f"[{request_id}] Keine Domain-Zuordnung für Bucket {bucket_name}")
return {'statusCode': 400, 'body': json.dumps('Unbekannter Bucket')}
# Duplikat-Check mit S3-ETag
s3_client = boto3.client('s3')
head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key)
etag = head_response['ETag'].strip('"')
content_length = head_response['ContentLength']
# E-Mail-Größe prüfen
if content_length > MAX_EMAIL_SIZE:
logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})")
# Große E-Mails markieren statt löschen
mark_large_email(s3_client, bucket_name, object_key, content_length)
return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')}
# E-Mail laden
try:
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
email_content = response['Body'].read()
logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes")
except Exception as e:
logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}")
return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')}
# E-Mail-Content komprimieren für Übertragung (optional)
import gzip
compressed_content = gzip.compress(email_content)
email_base64 = base64.b64encode(compressed_content).decode('utf-8')
# Payload erstellen
payload = {
'bucket': bucket_name,
'key': object_key,
'domain': domain,
'email_content': email_base64,
'compressed': True, # Flag für API
'etag': etag,
'request_id': request_id,
'original_size': len(email_content),
'compressed_size': len(compressed_content)
}
# API aufrufen mit Retry-Logik
success = call_api_with_retry(payload, domain, request_id)
if success:
# Bei Erfolg: E-Mail aus S3 löschen
try:
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'E-Mail erfolgreich verarbeitet',
'request_id': request_id
})
}
except Exception as e:
logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}")
# Trotzdem Erfolg melden, da E-Mail verarbeitet wurde
return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')}
else:
# Bei Fehler: E-Mail in S3 belassen für späteren Retry
return {'statusCode': 500, 'body': json.dumps('API-Fehler')}
except Exception as e:
logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True)
return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')}
def call_api_with_retry(payload, domain, request_id, max_retries=3):
"""API-Aufruf mit Retry-Logik"""
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}")
try:
# Request erstellen
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
# SSL-Kontext
ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None
# API-Request mit Timeout
timeout = 25 # Lambda hat 30s Timeout, also etwas weniger
if DEBUG_MODE and ssl_context:
with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
else:
with urllib.request.urlopen(req, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
# Erfolgreiche Antwort
if response_code == 200:
logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}")
return True
else:
logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}")
except urllib.error.HTTPError as e:
error_body = ""
try:
error_body = e.read().decode('utf-8')
except:
pass
logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}")
logger.error(f"[{request_id}] Fehlerdetails: {error_body}")
# Bei 4xx-Fehlern nicht retry (Client-Fehler)
if 400 <= e.code < 500:
logger.error(f"[{request_id}] Client-Fehler - kein Retry")
return False
except Exception as e:
logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}")
# Warten vor nächstem Versuch (exponential backoff)
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch")
import time
time.sleep(wait_time)
logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen")
return False
def mark_large_email(s3_client, bucket, key, size):
"""Markiert große E-Mails mit Metadaten statt sie zu löschen"""
try:
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'status': 'too_large',
'size': str(size),
'marked_at': str(int(time.time()))
},
MetadataDirective='REPLACE'
)
logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)")
except Exception as e:
logger.error(f"Konnte E-Mail nicht markieren: {str(e)}")