diff --git a/ses-lambda/lambda-function.py b/ses-lambda/lambda-function.py index 998c53e..67c010d 100644 --- a/ses-lambda/lambda-function.py +++ b/ses-lambda/lambda-function.py @@ -8,6 +8,9 @@ import urllib.parse import logging import boto3 import base64 +from email.parser import BytesParser +from email.policy import default +from email.utils import getaddresses logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -18,23 +21,25 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) s3_client = boto3.client('s3') -def mark_email_processed(bucket, key, s3_client, processor='lambda'): +def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'): + """Setzt in S3 das processed-Flag per Metadata.""" try: s3_client.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata={ - 'processed': 'true', + 'processed': metadata, 'processed_timestamp': str(int(time.time())), 'processor': processor }, MetadataDirective='REPLACE' ) - logger.info(f"Marked S3 object {bucket}/{key} as processed") - except botocore.exceptions.ClientError as e: + logger.info(f"Marked S3 object {bucket}/{key} as {metadata}") + except Exception as e: logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") - + + def call_api_once(payload, domain, request_id): """Single-shot POST, kein Retry.""" url = f"{API_BASE_URL}/process/{domain}" @@ -57,6 +62,7 @@ def call_api_once(payload, domain, request_id): logger.error(f"[{request_id}] API returned {code}: {body}") return False + def lambda_handler(event, context): req_id = context.aws_request_id rec = event['Records'][0]['s3'] @@ -66,7 +72,6 @@ def lambda_handler(event, context): # Kopf-Check head = s3_client.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) if metadata.get('processed') == 'true': logger.info(f"[{req_id}] Skipping already processed object") @@ -75,11 +80,23 @@ def lambda_handler(event, context): size = head['ContentLength'] if size > MAX_EMAIL_SIZE: logger.warning(f"[{req_id}] Email too large: {size} bytes") - # hier ggf. mark_large_email(...) return {'statusCode': 413} - # Inhalt laden und komprimieren + # E-Mail Inhalt laden body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() + + # 1) Parsen und Loggen von from/to + try: + msg = BytesParser(policy=default).parsebytes(body) + from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else '' + to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))] + logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}") + except Exception as e: + logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}") + from_addr = '' + to_addrs = [] + + # 2) Komprimieren und Payload bauen compressed = gzip.compress(body) payload = { 's3_bucket': bucket, @@ -93,17 +110,25 @@ def lambda_handler(event, context): 'compressed_size': len(compressed) } - # Single API call + # 3) Single API call try: success = call_api_once(payload, payload['domain'], req_id) except Exception as e: - logger.error(f"API-Call-Exception: {e}") + logger.error(f"[{req_id}] API-Call-Exception: {e}") success = False - # Egal ob success True oder False: wir geben 200 zurück + # 4) Handling nach API-Call if success: - try: - mark_email_processed(bucket, key, s3_client) - except Exception as e: - logger.error(f"Markieren fehlgeschlagen: {e}") - return {'statusCode': 200, 'body': 'Done'} + # normal processed + mark_email_processed(bucket, key, 'true', s3_client) + else: + # nur wenn es to_addrs gibt + if to_addrs: + bucket_domain = payload['domain'] + domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr] + status = 'unknownUser' if bucket_domain in domains else 'unknownDomain' + mark_email_processed(bucket, key, status, s3_client) + else: + logger.info(f"[{req_id}] Keine Empfänger, kein Markieren") + + return {'statusCode': 200, 'body': 'Done'} \ No newline at end of file