updated Lambda & REST API

This commit is contained in:
Andreas Knuth 2025-06-13 11:00:52 -05:00
parent 0d0391b6ee
commit fc6fa76bc0
2 changed files with 152 additions and 489 deletions

View File

@ -1,5 +1,5 @@
import sys import sys
from flask import Flask, request, jsonify, g from flask import Flask, request, jsonify
import smtplib import smtplib
import base64 import base64
import gzip import gzip
@ -13,56 +13,31 @@ from email.parser import BytesParser
from email.policy import default from email.policy import default
from email.utils import getaddresses from email.utils import getaddresses
# Python-Version prüfen
if sys.version_info < (3, 12): if sys.version_info < (3, 12):
raise RuntimeError("Python 3.12 oder höher erforderlich") raise RuntimeError("Python 3.12 oder höher erforderlich")
# .env-Datei laden
load_dotenv() load_dotenv()
app = Flask(__name__) app = Flask(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Konfiguration SMTP_HOST = "localhost"
SMTP_HOST = "localhost" # Host-Netzwerkmodus SMTP_PORT = 25
SMTP_PORT = 25 # Fest auf Port 25 ohne TLS
API_TOKEN = os.environ.get('API_TOKEN') API_TOKEN = os.environ.get('API_TOKEN')
MAIL_DIR = "/var/mail" # Standard Maildir-Pfad
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1') AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
if not API_TOKEN:
raise ValueError("API_TOKEN Umgebungsvariable nicht gesetzt")
# S3-Client initialisieren
s3_client = boto3.client('s3', region_name=AWS_REGION) s3_client = boto3.client('s3', region_name=AWS_REGION)
# Cache für verarbeitete Requests
processed_requests = {} processed_requests = {}
logger.info(f"API_TOKEN loaded: {API_TOKEN}")
def load_domains_config(): def load_domains_config():
"""Lädt die Domain-Konfiguration""" return {"andreasknuth.de": {"bucket": "andreasknuth-de-emails"}}
# Hier könntest du eine echte Konfigurationsdatei laden
# Für jetzt als Beispiel hardcoded
return {
"andreasknuth.de": {"bucket": "andreasknuth-de-emails"},
# Weitere Domains können hier hinzugefügt werden
}
def get_bucket_name_for_domain(domain): def mark_email_as_processed(bucket, key):
"""Konvertiert Domain-Namen zu S3-Bucket-Namen"""
return domain.replace(".", "-") + "-emails"
def mark_email_as_processed(bucket_name, key):
"""Markiert eine E-Mail als erfolgreich verarbeitet"""
try: try:
# Füge Metadata hinzu, um zu markieren, dass die E-Mail verarbeitet wurde
s3_client.copy_object( s3_client.copy_object(
Bucket=bucket_name, Bucket=bucket,
Key=key, Key=key,
CopySource={'Bucket': bucket_name, 'Key': key}, CopySource={'Bucket': bucket, 'Key': key},
Metadata={ Metadata={
'processed': 'true', 'processed': 'true',
'processed_timestamp': str(int(time.time())), 'processed_timestamp': str(int(time.time())),
@ -70,302 +45,82 @@ def mark_email_as_processed(bucket_name, key):
}, },
MetadataDirective='REPLACE' MetadataDirective='REPLACE'
) )
logger.info(f"E-Mail {key} als verarbeitet markiert")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Fehler beim Markieren der E-Mail {key}: {str(e)}") logger.error(f"Fehler beim Markieren: {e}")
return False return False
def get_unprocessed_emails(bucket_name):
"""Holt alle unverarbeiteten E-Mails aus einem S3-Bucket"""
try:
unprocessed_emails = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' not in page:
continue
for obj in page['Contents']:
key = obj['Key']
# Prüfe Metadata der E-Mail
try:
head_response = s3_client.head_object(Bucket=bucket_name, Key=key)
metadata = head_response.get('Metadata', {})
# Wenn nicht als verarbeitet markiert, zur Liste hinzufügen
if metadata.get('processed') != 'true':
unprocessed_emails.append({
'key': key,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'metadata': metadata
})
except Exception as e:
logger.warning(f"Fehler beim Prüfen der Metadata für {key}: {str(e)}")
# Wenn Metadata nicht gelesen werden kann, als unverarbeitet betrachten
unprocessed_emails.append({
'key': key,
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'metadata': {}
})
return unprocessed_emails
except Exception as e:
logger.error(f"Fehler beim Abrufen unverarbeiteter E-Mails: {str(e)}")
return []
@app.route('/health', methods=['GET'])
def health_check():
"""Erweiterte Gesundheitsprüfung"""
# Basis-Gesundheitscheck
health_status = {
"status": "OK",
"message": "S3 E-Mail Processor API ist aktiv",
"timestamp": int(time.time()),
"version": "2.0",
"request_id": getattr(g, 'request_id', 'health-check')
}
# Erweiterte Checks
try:
# Maildir-Zugriff prüfen
mail_path = Path(MAIL_DIR)
if mail_path.exists() and mail_path.is_dir():
health_status["mail_dir"] = "accessible"
else:
health_status["mail_dir"] = "not_accessible"
health_status["status"] = "WARNING"
# Domain-Konfiguration prüfen
domains = load_domains_config()
health_status["configured_domains"] = len(domains)
health_status["domains"] = list(domains.keys())
# Cache-Status
health_status["request_cache_size"] = len(processed_requests)
# SMTP-Verbindung testen
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as smtp:
smtp.noop()
health_status["smtp_connection"] = "OK"
except Exception as e:
health_status["smtp_connection"] = f"ERROR: {str(e)}"
health_status["status"] = "WARNING"
# S3-Verbindung testen
try:
s3_client.list_buckets()
health_status["s3_connection"] = "OK"
except Exception as e:
health_status["s3_connection"] = f"ERROR: {str(e)}"
health_status["status"] = "WARNING"
except Exception as e:
health_status["status"] = "ERROR"
health_status["error"] = str(e)
return jsonify(health_status)
@app.route('/retry/<domain>', methods=['GET'])
def retry_domain_emails(domain):
"""Verarbeitet alle unverarbeiteten E-Mails für eine Domain"""
auth_header = request.headers.get('Authorization')
if not auth_header or auth_header != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401
request_id = f"retry-{domain}-{int(time.time())}"
bucket_name = get_bucket_name_for_domain(domain)
logger.info(f"[{request_id}] Retry-Verarbeitung für Domain: {domain}, Bucket: {bucket_name}")
try:
# Hole alle unverarbeiteten E-Mails
unprocessed_emails = get_unprocessed_emails(bucket_name)
if not unprocessed_emails:
return jsonify({
'message': f'Keine unverarbeiteten E-Mails für Domain {domain} gefunden',
'domain': domain,
'bucket': bucket_name,
'processed_count': 0,
'request_id': request_id
}), 200
processed_count = 0
failed_count = 0
errors = []
for email_info in unprocessed_emails:
key = email_info['key']
try:
logger.info(f"[{request_id}] Verarbeite E-Mail: {key}")
# E-Mail aus S3 laden
response = s3_client.get_object(Bucket=bucket_name, Key=key)
email_content = response['Body'].read()
# E-Mail parsen
email_msg = BytesParser(policy=default).parsebytes(email_content)
# From-Adresse extrahieren
from_headers = email_msg.get_all('from', [])
if from_headers:
from_addr = getaddresses(from_headers)[0][1]
else:
from_addr = f'retry@{domain}'
# Empfänger extrahieren
to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))]
cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))]
bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
if not recipients:
raise ValueError(f"Keine Empfänger in E-Mail {key} gefunden")
# An SMTP weiterleiten - verwende sendmail()
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.sendmail(from_addr, recipients, email_content)
# Als verarbeitet markieren
mark_email_as_processed(bucket_name, key)
processed_count += 1
logger.info(f"[{request_id}] E-Mail {key} erfolgreich verarbeitet")
except Exception as e:
failed_count += 1
error_msg = f"Fehler bei E-Mail {key}: {str(e)}"
errors.append(error_msg)
logger.error(f"[{request_id}] {error_msg}")
result = {
'message': f'Retry-Verarbeitung für Domain {domain} abgeschlossen',
'domain': domain,
'bucket': bucket_name,
'total_found': len(unprocessed_emails),
'processed_count': processed_count,
'failed_count': failed_count,
'request_id': request_id
}
if errors:
result['errors'] = errors
return jsonify(result), 200
except Exception as e:
logger.error(f"[{request_id}] Fehler bei Retry-Verarbeitung: {str(e)}")
return jsonify({
'error': str(e),
'domain': domain,
'bucket': bucket_name,
'request_id': request_id
}), 500
@app.route('/process/<domain>', methods=['POST']) @app.route('/process/<domain>', methods=['POST'])
def process_email(domain): def process_email(domain):
"""Verarbeitet eine einzelne E-Mail von der Lambda-Funktion""" auth = request.headers.get('Authorization')
auth_header = request.headers.get('Authorization') if auth != f'Bearer {API_TOKEN}':
if not auth_header or auth_header != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401 return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json() data = request.get_json()
if not data: if not data:
return jsonify({'error': 'Invalid JSON payload'}), 400 return jsonify({'error': 'Invalid payload'}), 400
request_id = data.get('request_id') content = data.get('email_content')
email_content = data.get('email_content')
compressed = data.get('compressed', False) compressed = data.get('compressed', False)
s3_key = data.get('s3_key') # S3-Key für Marker-Funktionalität raw = base64.b64decode(content)
s3_bucket = data.get('s3_bucket') # S3-Bucket für Marker-Funktionalität email_bytes = gzip.decompress(raw) if compressed else raw
logger.info(f"[{request_id}] Processing email for domain: {domain}") msg = BytesParser(policy=default).parsebytes(email_bytes)
if s3_key: from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'lambda@{domain}'
logger.info(f"[{request_id}] S3 Location: {s3_bucket}/{s3_key}") recipients = []
for hdr in ('to', 'cc', 'bcc'):
recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))]
try: if not recipients:
# Entkomprimieren, falls komprimiert return jsonify({'error': 'No recipients'}), 400
if compressed:
email_bytes = base64.b64decode(email_content)
email_content = gzip.decompress(email_bytes)
else:
email_content = base64.b64decode(email_content)
# E-Mail-Header parsen with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
email_msg = BytesParser(policy=default).parsebytes(email_content) smtp.sendmail(from_addr, recipients, email_bytes)
# From-Adresse extrahieren (konsistent mit Lambda)
from_headers = email_msg.get_all('from', [])
if from_headers:
from_addr = getaddresses(from_headers)[0][1]
else:
from_addr = f'lambda@{domain}' # Fallback mit Domain
# Empfänger aus allen relevanten Headern extrahieren (konsistent mit Lambda)
to_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('to', []))]
cc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('cc', []))]
bcc_addrs = [addr for _name, addr in getaddresses(email_msg.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
# Debug-Ausgabe für Empfänger
logger.info(f"[{request_id}] To-Adressen: {to_addrs}")
logger.info(f"[{request_id}] CC-Adressen: {cc_addrs}")
logger.info(f"[{request_id}] BCC-Adressen: {bcc_addrs}")
logger.info(f"[{request_id}] Alle Empfänger: {recipients}")
if not recipients:
# Zusätzliche Debug-Info
logger.error(f"[{request_id}] Verfügbare Header: {list(email_msg.keys())}")
logger.error(f"[{request_id}] To-Header roh: {email_msg.get('To')}")
logger.error(f"[{request_id}] CC-Header roh: {email_msg.get('Cc')}")
raise ValueError("Keine Empfänger (To/CC/BCC) in der E-Mail gefunden")
logger.info(f"[{request_id}] From: {from_addr}, Recipients: {recipients}") # Keine Markierung hier mehr; übernimmt Lambda
return jsonify({'message': 'Email forwarded', 'recipients': recipients}), 200
# An Postfix weiterleiten - verwende sendmail() statt manueller SMTP-Befehle @app.route('/retry/<domain>', methods=['GET'])
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp: def retry_domain_emails(domain):
smtp.sendmail(from_addr, recipients, email_content) auth = request.headers.get('Authorization')
if auth != f'Bearer {API_TOKEN}':
logger.info(f"[{request_id}] Email forwarded to Postfix for {domain} - {len(recipients)} recipients") return jsonify({'error': 'Unauthorized'}), 401
# E-Mail als verarbeitet markieren, falls S3-Informationen vorhanden bucket = domain.replace('.', '-') + '-emails'
marked_as_processed = False unprocessed = []
if s3_key and s3_bucket: paginator = s3_client.get_paginator('list_objects_v2')
if mark_email_as_processed(s3_bucket, s3_key): for page in paginator.paginate(Bucket=bucket):
logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert") for obj in page.get('Contents', []):
marked_as_processed = True head = s3_client.head_object(Bucket=bucket, Key=obj['Key'])
if head.get('Metadata', {}).get('processed') != 'true':
unprocessed.append(obj['Key'])
results = {'processed': [], 'failed': []}
for key in unprocessed:
try:
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'retry@{domain}'
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
cc_addrs = [addr for _n, addr in getaddresses(msg.get_all('cc', []))]
bcc_addrs = [addr for _n, addr in getaddresses(msg.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.sendmail(from_addr, recipients, body)
if mark_email_as_processed(bucket, key):
results['processed'].append(key)
else: else:
logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren") results['failed'].append(key)
elif s3_key: except Exception as e:
# Fallback: Bucket-Name aus Domain ableiten results['failed'].append(f"{key}: {e}")
bucket_name = get_bucket_name_for_domain(domain)
if mark_email_as_processed(bucket_name, s3_key):
logger.info(f"[{request_id}] E-Mail {s3_key} als verarbeitet markiert (abgeleiteter Bucket: {bucket_name})")
marked_as_processed = True
else:
logger.warning(f"[{request_id}] Konnte E-Mail {s3_key} nicht als verarbeitet markieren")
return jsonify({ return jsonify(results), 200
'message': 'Email processed',
'request_id': request_id, @app.route('/health', methods=['GET'])
'domain': domain, def health_check():
'recipients_count': len(recipients), return jsonify({'status': 'OK'}), 200
'recipients': recipients,
'marked_as_processed': marked_as_processed,
's3_bucket': s3_bucket,
's3_key': s3_key
}), 200
except Exception as e:
logger.error(f"[{request_id}] Error processing email: {str(e)}")
return jsonify({'error': str(e), 'request_id': request_id}), 500
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000) app.run(host='0.0.0.0', port=5000)

View File

@ -1,4 +1,3 @@
# Optimiertes Lambda mit Verbesserungen
import json import json
import os import os
import urllib.request import urllib.request
@ -8,9 +7,8 @@ import logging
import ssl import ssl
import boto3 import boto3
import base64 import base64
import hashlib
import time
import gzip import gzip
import time
# Konfiguration # Konfiguration
API_BASE_URL = os.environ['API_BASE_URL'] API_BASE_URL = os.environ['API_BASE_URL']
@ -21,194 +19,104 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Stand
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def bucket_name_to_domain(bucket_name): def bucket_name_to_domain(bucket_name):
"""Konvertiert S3-Bucket-Namen zu Domain-Namen"""
# Beispiel: andreasknuth-de-emails -> andreasknuth.de
if bucket_name.endswith('-emails'): if bucket_name.endswith('-emails'):
# Entferne das "-emails" Suffix domain_part = bucket_name[:-7]
domain_part = bucket_name[:-7] # Entferne die letzten 7 Zeichen ("-emails") return domain_part.replace('-', '.')
# Ersetze alle verbleibenden "-" durch "." logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema")
domain = domain_part.replace('-', '.') return None
return domain
else:
# Fallback: Bucket-Name entspricht nicht dem erwarteten Schema
logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema")
return None
def lambda_handler(event, context):
"""Optimierter Lambda-Handler mit verbesserter Fehlerbehandlung"""
# Eindeutige Request-ID für Tracking
request_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen")
try:
# S3-Event-Details extrahieren
s3_event = event['Records'][0]['s3']
bucket_name = s3_event['bucket']['name']
object_key = urllib.parse.unquote_plus(s3_event['object']['key'])
logger.info(f"[{request_id}] Verarbeite: {bucket_name}/{object_key}")
# Domain automatisch aus Bucket-Namen ermitteln
domain = bucket_name_to_domain(bucket_name)
if not domain:
logger.error(f"[{request_id}] Konnte Domain nicht aus Bucket-Namen {bucket_name} ermitteln")
return {'statusCode': 400, 'body': json.dumps('Ungültiger Bucket-Name')}
logger.info(f"[{request_id}] Ermittelte Domain: {domain}")
# Duplikat-Check mit S3-ETag
s3_client = boto3.client('s3')
head_response = s3_client.head_object(Bucket=bucket_name, Key=object_key)
etag = head_response['ETag'].strip('"')
content_length = head_response['ContentLength']
# E-Mail-Größe prüfen
if content_length > MAX_EMAIL_SIZE:
logger.warning(f"[{request_id}] E-Mail zu groß: {content_length} Bytes (max: {MAX_EMAIL_SIZE})")
# Große E-Mails markieren statt löschen
mark_large_email(s3_client, bucket_name, object_key, content_length)
return {'statusCode': 413, 'body': json.dumps('E-Mail zu groß')}
# E-Mail laden
try:
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
email_content = response['Body'].read()
logger.info(f"[{request_id}] E-Mail geladen: {len(email_content)} Bytes")
except Exception as e:
logger.error(f"[{request_id}] Fehler beim Laden der E-Mail: {str(e)}")
return {'statusCode': 500, 'body': json.dumps(f'S3-Fehler: {str(e)}')}
# E-Mail-Content komprimieren für Übertragung
compressed_content = gzip.compress(email_content)
email_base64 = base64.b64encode(compressed_content).decode('utf-8')
# Payload erstellen
payload = {
's3_bucket': bucket_name, # S3-Bucket für Marker-Funktionalität
's3_key': object_key, # S3-Key für Marker-Funktionalität
'domain': domain,
'email_content': email_base64,
'compressed': True, # Flag für API
'etag': etag,
'request_id': request_id,
'original_size': len(email_content),
'compressed_size': len(compressed_content)
}
# API aufrufen mit Retry-Logik
success = call_api_with_retry(payload, domain, request_id)
if success:
# Bei Erfolg: E-Mail aus S3 löschen
try:
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"[{request_id}] E-Mail erfolgreich verarbeitet und gelöscht")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'E-Mail erfolgreich verarbeitet',
'request_id': request_id,
'domain': domain
})
}
except Exception as e:
logger.error(f"[{request_id}] Konnte E-Mail nicht löschen: {str(e)}")
# Trotzdem Erfolg melden, da E-Mail verarbeitet wurde
return {'statusCode': 200, 'body': json.dumps('Verarbeitet, aber nicht gelöscht')}
else:
# Bei Fehler: E-Mail in S3 belassen für späteren Retry
logger.error(f"[{request_id}] E-Mail bleibt in S3 für Retry: {bucket_name}/{object_key}")
return {'statusCode': 500, 'body': json.dumps('API-Fehler')}
except Exception as e:
logger.error(f"[{request_id}] Unerwarteter Fehler: {str(e)}", exc_info=True)
return {'statusCode': 500, 'body': json.dumps(f'Lambda-Fehler: {str(e)}')}
def call_api_with_retry(payload, domain, request_id, max_retries=3): def mark_email_processed(bucket, key, processor='lambda'):
"""API-Aufruf mit Retry-Logik""" timestamp = str(int(time.time()))
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
logger.info(f"[{request_id}] API-Aufruf Versuch {attempt + 1}/{max_retries}")
try:
# Request erstellen
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
# SSL-Kontext
ssl_context = ssl._create_unverified_context() if DEBUG_MODE else None
# API-Request mit Timeout
timeout = 25 # Lambda hat 30s Timeout, also etwas weniger
if DEBUG_MODE and ssl_context:
with urllib.request.urlopen(req, context=ssl_context, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
else:
with urllib.request.urlopen(req, timeout=timeout) as response:
response_body = response.read().decode('utf-8')
response_code = response.getcode()
# Erfolgreiche Antwort
if response_code == 200:
logger.info(f"[{request_id}] API-Erfolg nach Versuch {attempt + 1}")
if DEBUG_MODE:
logger.info(f"[{request_id}] API-Antwort: {response_body}")
return True
else:
logger.warning(f"[{request_id}] API-Antwort {response_code}: {response_body}")
except urllib.error.HTTPError as e:
error_body = ""
try:
error_body = e.read().decode('utf-8')
except:
pass
logger.error(f"[{request_id}] HTTP-Fehler {e.code} (Versuch {attempt + 1}): {e.reason}")
logger.error(f"[{request_id}] Fehlerdetails: {error_body}")
# Bei 4xx-Fehlern nicht retry (Client-Fehler)
if 400 <= e.code < 500:
logger.error(f"[{request_id}] Client-Fehler - kein Retry")
return False
except Exception as e:
logger.error(f"[{request_id}] Netzwerk-Fehler (Versuch {attempt + 1}): {str(e)}")
# Warten vor nächstem Versuch (exponential backoff)
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"[{request_id}] Warte {wait_time}s vor nächstem Versuch")
time.sleep(wait_time)
logger.error(f"[{request_id}] API-Aufruf nach {max_retries} Versuchen fehlgeschlagen")
return False
def mark_large_email(s3_client, bucket, key, size):
"""Markiert große E-Mails mit Metadaten statt sie zu löschen"""
try: try:
s3_client.copy_object( s3_client.copy_object(
Bucket=bucket, Bucket=bucket,
Key=key, Key=key,
CopySource={'Bucket': bucket, 'Key': key}, CopySource={'Bucket': bucket, 'Key': key},
Metadata={ Metadata={
'status': 'too_large', 'processed': 'true',
'size': str(size), 'processed_timestamp': timestamp,
'marked_at': str(int(time.time())) 'processor': processor
}, },
MetadataDirective='REPLACE' MetadataDirective='REPLACE'
) )
logger.info(f"E-Mail als zu groß markiert: {key} ({size} Bytes)") logger.info(f"E-Mail {key} als verarbeitet markiert durch {processor}")
return True
except Exception as e: except Exception as e:
logger.error(f"Konnte E-Mail nicht markieren: {str(e)}") logger.error(f"Markierungsfehler für {key}: {e}")
return False
def call_api_with_retry(payload, domain, request_id, max_retries=3):
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
try:
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
timeout = 25
context = ssl._create_unverified_context() if DEBUG_MODE else None
with urllib.request.urlopen(req, timeout=timeout, context=context) as response:
code = response.getcode()
if code == 200:
return True
except urllib.error.HTTPError as e:
if 400 <= e.code < 500:
return False
except Exception:
pass
time.sleep(2 ** attempt)
return False
def lambda_handler(event, context):
request_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen")
s3_event = event['Records'][0]['s3']
bucket = s3_event['bucket']['name']
key = urllib.parse.unquote_plus(s3_event['object']['key'])
logger.info(f"[{request_id}] Verarbeite: {bucket}/{key}")
domain = bucket_name_to_domain(bucket)
if not domain:
return {'statusCode': 400, 'body': 'Ungültiger Bucket-Name'}
head = s3_client.head_object(Bucket=bucket, Key=key)
size = head['ContentLength']
if size > MAX_EMAIL_SIZE:
# mark large
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={'status': 'too_large', 'size': str(size), 'marked_at': str(int(time.time()))},
MetadataDirective='REPLACE'
)
return {'statusCode': 413, 'body': 'E-Mail zu groß'}
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
compressed = gzip.compress(body)
payload = {
's3_bucket': bucket,
's3_key': key,
'domain': domain,
'email_content': base64.b64encode(compressed).decode(),
'compressed': True,
'etag': head['ETag'].strip('"'),
'request_id': request_id,
'original_size': len(body),
'compressed_size': len(compressed)
}
if call_api_with_retry(payload, domain, request_id):
# Inline metadata marking statt Löschung
mark_email_processed(bucket, key)
return {'statusCode': 200, 'body': json.dumps({'message': 'E-Mail verarbeitet und markiert', 'request_id': request_id})}
else:
return {'statusCode': 500, 'body': 'API-Fehler'}