From ce87a9e3a545a0b58e37e4a0d1617ccb4bf21953 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Fri, 13 Jun 2025 16:40:54 -0500 Subject: [PATCH] BugFixes --- email_api/email_api/app.py | 10 ++- ses-lambda/lambda-function.py | 151 +++++++++++++--------------------- 2 files changed, 68 insertions(+), 93 deletions(-) diff --git a/email_api/email_api/app.py b/email_api/email_api/app.py index f65f46d..083c737 100644 --- a/email_api/email_api/app.py +++ b/email_api/email_api/app.py @@ -62,7 +62,15 @@ def process_email(domain): request_id = data.get('request_id', 'no-request-id') - logger.info(f"[{request_id}] INCOMING POST /process/{domain}: payload={data}") + payload_summary = { + **k: (len(v) if k == 'email_content' else v)** + for k, v in data.items() + if k != 'email_content' or isinstance(v, (str, bytes)) + } + logger.info( + f"[{request_id}] INCOMING POST /process/{domain}: " + f"payload_summary={payload_summary}" + ) content = data.get('email_content') compressed = data.get('compressed', False) diff --git a/ses-lambda/lambda-function.py b/ses-lambda/lambda-function.py index c685446..29e8255 100644 --- a/ses-lambda/lambda-function.py +++ b/ses-lambda/lambda-function.py @@ -1,128 +1,95 @@ +import time +import gzip import json import os import urllib.request import urllib.error import urllib.parse import logging -import ssl import boto3 import base64 -import gzip -import time - -# Konfiguration -API_BASE_URL = os.environ['API_BASE_URL'] -API_TOKEN = os.environ['API_TOKEN'] -DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true' -MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard logger = logging.getLogger() logger.setLevel(logging.INFO) +API_BASE_URL = os.environ['API_BASE_URL'] +API_TOKEN = os.environ['API_TOKEN'] +MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) + s3_client = boto3.client('s3') -def bucket_name_to_domain(bucket_name): - if bucket_name.endswith('-emails'): - domain_part = bucket_name[:-7] - return domain_part.replace('-', '.') - logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema") - return None +def mark_email_processed(bucket, key, s3_client, processor='lambda'): + """Setzt in S3 das processed-Flag per Metadata.""" + s3_client.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata={ + 'processed': 'true', + 'processed_timestamp': str(int(time.time())), + 'processor': processor + }, + MetadataDirective='REPLACE' + ) + logger.info(f"Marked S3 object {bucket}/{key} as processed") +def call_api_once(payload, domain, request_id): + """Single-shot POST, kein Retry.""" + url = f"{API_BASE_URL}/process/{domain}" + data = json.dumps(payload).encode('utf-8') + req = urllib.request.Request(url, data=data, method='POST') + req.add_header('Authorization', f'Bearer {API_TOKEN}') + req.add_header('Content-Type', 'application/json') + req.add_header('X-Request-ID', request_id) + logger.info(f"[{request_id}] OUTGOING POST {url}: " + f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, " + f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}") -def mark_email_processed(bucket, key, processor='lambda'): - timestamp = str(int(time.time())) - try: - s3_client.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata={ - 'processed': 'true', - 'processed_timestamp': timestamp, - 'processor': processor - }, - MetadataDirective='REPLACE' - ) - logger.info(f"E-Mail {key} als verarbeitet markiert durch {processor}") - return True - except Exception as e: - logger.error(f"Markierungsfehler für {key}: {e}") - return False - - -def call_api_with_retry(payload, domain, request_id, max_retries=3): - sync_url = f"{API_BASE_URL}/process/{domain}" - payload_json = json.dumps(payload).encode('utf-8') - for attempt in range(max_retries): - try: - logger.info(f"[{request_id}] Attempt {attempt+1}: POST payload to {sync_url}") - req = urllib.request.Request(sync_url, data=payload_json, method="POST") - req.add_header('Authorization', f'Bearer {API_TOKEN}') - req.add_header('Content-Type', 'application/json') - req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}') - req.add_header('X-Request-ID', request_id) - timeout = 25 - context = ssl._create_unverified_context() if DEBUG_MODE else None - with urllib.request.urlopen(req, timeout=timeout, context=context) as response: - code = response.getcode() - if code == 200: - logger.info(f"[{request_id}] API response: {response_code}") - return True - except urllib.error.HTTPError as e: - if 400 <= e.code < 500: - return False - except Exception: - pass - time.sleep(2 ** attempt) - return False - + with urllib.request.urlopen(req, timeout=25) as resp: + code = resp.getcode() + if code == 200: + logger.info(f"[{request_id}] API returned 200 OK") + return True + else: + body = resp.read().decode('utf-8', errors='ignore') + logger.error(f"[{request_id}] API returned {code}: {body}") + return False def lambda_handler(event, context): - request_id = context.aws_request_id - logger.info(f"[{request_id}] S3-Ereignis empfangen") - s3_event = event['Records'][0]['s3'] - bucket = s3_event['bucket']['name'] - key = urllib.parse.unquote_plus(s3_event['object']['key']) - logger.info(f"[{request_id}] Verarbeite: {bucket}/{key}") - - domain = bucket_name_to_domain(bucket) - if not domain: - return {'statusCode': 400, 'body': 'Ungültiger Bucket-Name'} + req_id = context.aws_request_id + rec = event['Records'][0]['s3'] + bucket = rec['bucket']['name'] + key = urllib.parse.unquote_plus(rec['object']['key']) + logger.info(f"[{req_id}] Processing {bucket}/{key}") + # Kopf-Check head = s3_client.head_object(Bucket=bucket, Key=key) size = head['ContentLength'] if size > MAX_EMAIL_SIZE: - # mark large - s3_client.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata={'status': 'too_large', 'size': str(size), 'marked_at': str(int(time.time()))}, - MetadataDirective='REPLACE' - ) - return {'statusCode': 413, 'body': 'E-Mail zu groß'} + logger.warning(f"[{req_id}] Email too large: {size} bytes") + # hier ggf. mark_large_email(...) + return {'statusCode': 413} + # Inhalt laden und komprimieren body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() compressed = gzip.compress(body) payload = { 's3_bucket': bucket, 's3_key': key, - 'domain': domain, + 'domain': bucket.replace('-', '.').rsplit('.emails',1)[0], 'email_content': base64.b64encode(compressed).decode(), 'compressed': True, 'etag': head['ETag'].strip('"'), - 'request_id': request_id, + 'request_id': req_id, 'original_size': len(body), 'compressed_size': len(compressed) } - - logger.info(f"[{request_id}] OUTGOING payload to {API_BASE_URL}/process/{domain}: " - f"domain={domain}, key={key}, bucket={bucket}, " - f"orig_size={len(body)}, comp_size={len(compressed)}") - if call_api_with_retry(payload, domain, request_id): - # Inline metadata marking statt Löschung - mark_email_processed(bucket, key) - return {'statusCode': 200, 'body': json.dumps({'message': 'E-Mail verarbeitet und markiert', 'request_id': request_id})} + # Single API call + if call_api_once(payload, payload['domain'], req_id): + # Metadaten setzen – Mail bleibt in S3 + mark_email_processed(bucket, key, s3_client) + return {'statusCode': 200, 'body': 'Processed'} else: - return {'statusCode': 500, 'body': 'API-Fehler'} \ No newline at end of file + logger.error(f"[{req_id}] API call failed, leaving object unmodified") + return {'statusCode': 500, 'body': 'API Error'}