import time import gzip import json import os import urllib.request import urllib.error import urllib.parse import logging import boto3 import base64 logger = logging.getLogger() logger.setLevel(logging.INFO) API_BASE_URL = os.environ['API_BASE_URL'] API_TOKEN = os.environ['API_TOKEN'] MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) s3_client = boto3.client('s3') def mark_email_processed(bucket, key, s3_client, processor='lambda'): try: s3_client.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata={ 'processed': 'true', 'processed_timestamp': str(int(time.time())), 'processor': processor }, MetadataDirective='REPLACE' ) logger.info(f"Marked S3 object {bucket}/{key} as processed") except botocore.exceptions.ClientError as e: logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") def call_api_once(payload, domain, request_id): """Single-shot POST, kein Retry.""" url = f"{API_BASE_URL}/process/{domain}" data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, method='POST') req.add_header('Authorization', f'Bearer {API_TOKEN}') req.add_header('Content-Type', 'application/json') req.add_header('X-Request-ID', request_id) logger.info(f"[{request_id}] OUTGOING POST {url}: " f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, " f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}") with urllib.request.urlopen(req, timeout=25) as resp: code = resp.getcode() if code == 200: logger.info(f"[{request_id}] API returned 200 OK") return True else: body = resp.read().decode('utf-8', errors='ignore') logger.error(f"[{request_id}] API returned {code}: {body}") return False def lambda_handler(event, context): req_id = context.aws_request_id rec = event['Records'][0]['s3'] bucket = rec['bucket']['name'] key = urllib.parse.unquote_plus(rec['object']['key']) logger.info(f"[{req_id}] Processing {bucket}/{key}") # Kopf-Check head = s3_client.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) if metadata.get('processed') == 'true': logger.info(f"[{req_id}] Skipping already processed object") return {'statusCode': 200, 'body': 'Already processed'} size = head['ContentLength'] if size > MAX_EMAIL_SIZE: logger.warning(f"[{req_id}] Email too large: {size} bytes") # hier ggf. mark_large_email(...) return {'statusCode': 413} # Inhalt laden und komprimieren body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() compressed = gzip.compress(body) payload = { 's3_bucket': bucket, 's3_key': key, 'domain': bucket.replace('-', '.').rsplit('.emails',1)[0], 'email_content': base64.b64encode(compressed).decode(), 'compressed': True, 'etag': head['ETag'].strip('"'), 'request_id': req_id, 'original_size': len(body), 'compressed_size': len(compressed) } # Single API call if call_api_once(payload, payload['domain'], req_id): # Metadaten setzen – Mail bleibt in S3 mark_email_processed(bucket, key, s3_client) return {'statusCode': 200, 'body': 'Processed'} else: logger.error(f"[{req_id}] API call failed, leaving object unmodified") return {'statusCode': 500, 'body': 'API Error'}