import time import gzip import json import os import urllib.request import urllib.error import urllib.parse import logging import boto3 import base64 from email.parser import BytesParser from email.policy import default from email.utils import getaddresses logger = logging.getLogger() logger.setLevel(logging.INFO) API_BASE_URL = os.environ['API_BASE_URL'] API_TOKEN = os.environ['API_TOKEN'] MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) s3_client = boto3.client('s3') def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'): """Setzt in S3 das processed-Flag per Metadata.""" try: s3_client.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata={ 'processed': metadata, 'processed_timestamp': str(int(time.time())), 'processor': processor }, MetadataDirective='REPLACE' ) logger.info(f"Marked S3 object {bucket}/{key} as {metadata}") except Exception as e: logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") def call_api_once(payload, domain, request_id): """Single-shot POST, kein Retry.""" url = f"{API_BASE_URL}/process/{domain}" data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, method='POST') req.add_header('Authorization', f'Bearer {API_TOKEN}') req.add_header('Content-Type', 'application/json') req.add_header('X-Request-ID', request_id) logger.info(f"[{request_id}] OUTGOING POST {url}: " f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, " f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}") with urllib.request.urlopen(req, timeout=25) as resp: code = resp.getcode() if code == 200: logger.info(f"[{request_id}] API returned 200 OK") return True else: body = resp.read().decode('utf-8', errors='ignore') logger.error(f"[{request_id}] API returned {code}: {body}") return False def lambda_handler(event, context): req_id = context.aws_request_id rec = event['Records'][0]['s3'] bucket = rec['bucket']['name'] key = urllib.parse.unquote_plus(rec['object']['key']) logger.info(f"[{req_id}] Processing {bucket}/{key}") # Kopf-Check head = s3_client.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) if metadata.get('processed') == 'true': logger.info(f"[{req_id}] Skipping already processed object") return {'statusCode': 200, 'body': 'Already processed'} size = head['ContentLength'] if size > MAX_EMAIL_SIZE: logger.warning(f"[{req_id}] Email too large: {size} bytes") return {'statusCode': 413} # E-Mail Inhalt laden body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() # 1) Parsen und Loggen von from/to try: msg = BytesParser(policy=default).parsebytes(body) from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else '' to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))] logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}") except Exception as e: logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}") from_addr = '' to_addrs = [] # 2) Komprimieren und Payload bauen compressed = gzip.compress(body) payload = { 's3_bucket': bucket, 's3_key': key, 'domain': bucket.replace('-', '.').rsplit('.emails',1)[0], 'email_content': base64.b64encode(compressed).decode(), 'compressed': True, 'etag': head['ETag'].strip('"'), 'request_id': req_id, 'original_size': len(body), 'compressed_size': len(compressed) } # 3) Single API call try: success = call_api_once(payload, payload['domain'], req_id) except Exception as e: logger.error(f"[{req_id}] API-Call-Exception: {e}") success = False # 4) Handling nach API-Call if success: # normal processed mark_email_processed(bucket, key, 'true', s3_client) else: # nur wenn es to_addrs gibt if to_addrs: bucket_domain = payload['domain'] domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr] status = 'unknownUser' if bucket_domain in domains else 'unknownDomain' mark_email_processed(bucket, key, status, s3_client) else: logger.info(f"[{req_id}] Keine Empfänger, kein Markieren") return {'statusCode': 200, 'body': 'Done'}