parse body, new metadata

This commit is contained in:
Andreas Knuth 2025-06-14 20:27:56 -05:00
parent fdbc32bed9
commit 4388f6efc2
1 changed files with 41 additions and 16 deletions

View File

@ -8,6 +8,9 @@ import urllib.parse
import logging import logging
import boto3 import boto3
import base64 import base64
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -18,23 +21,25 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760'))
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
def mark_email_processed(bucket, key, s3_client, processor='lambda'): def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'):
"""Setzt in S3 das processed-Flag per Metadata."""
try: try:
s3_client.copy_object( s3_client.copy_object(
Bucket=bucket, Bucket=bucket,
Key=key, Key=key,
CopySource={'Bucket': bucket, 'Key': key}, CopySource={'Bucket': bucket, 'Key': key},
Metadata={ Metadata={
'processed': 'true', 'processed': metadata,
'processed_timestamp': str(int(time.time())), 'processed_timestamp': str(int(time.time())),
'processor': processor 'processor': processor
}, },
MetadataDirective='REPLACE' MetadataDirective='REPLACE'
) )
logger.info(f"Marked S3 object {bucket}/{key} as processed") logger.info(f"Marked S3 object {bucket}/{key} as {metadata}")
except botocore.exceptions.ClientError as e: except Exception as e:
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
def call_api_once(payload, domain, request_id): def call_api_once(payload, domain, request_id):
"""Single-shot POST, kein Retry.""" """Single-shot POST, kein Retry."""
url = f"{API_BASE_URL}/process/{domain}" url = f"{API_BASE_URL}/process/{domain}"
@ -57,6 +62,7 @@ def call_api_once(payload, domain, request_id):
logger.error(f"[{request_id}] API returned {code}: {body}") logger.error(f"[{request_id}] API returned {code}: {body}")
return False return False
def lambda_handler(event, context): def lambda_handler(event, context):
req_id = context.aws_request_id req_id = context.aws_request_id
rec = event['Records'][0]['s3'] rec = event['Records'][0]['s3']
@ -66,7 +72,6 @@ def lambda_handler(event, context):
# Kopf-Check # Kopf-Check
head = s3_client.head_object(Bucket=bucket, Key=key) head = s3_client.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) metadata = head.get('Metadata', {})
if metadata.get('processed') == 'true': if metadata.get('processed') == 'true':
logger.info(f"[{req_id}] Skipping already processed object") logger.info(f"[{req_id}] Skipping already processed object")
@ -75,11 +80,23 @@ def lambda_handler(event, context):
size = head['ContentLength'] size = head['ContentLength']
if size > MAX_EMAIL_SIZE: if size > MAX_EMAIL_SIZE:
logger.warning(f"[{req_id}] Email too large: {size} bytes") logger.warning(f"[{req_id}] Email too large: {size} bytes")
# hier ggf. mark_large_email(...)
return {'statusCode': 413} return {'statusCode': 413}
# Inhalt laden und komprimieren # E-Mail Inhalt laden
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
# 1) Parsen und Loggen von from/to
try:
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else ''
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}")
except Exception as e:
logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}")
from_addr = ''
to_addrs = []
# 2) Komprimieren und Payload bauen
compressed = gzip.compress(body) compressed = gzip.compress(body)
payload = { payload = {
's3_bucket': bucket, 's3_bucket': bucket,
@ -93,17 +110,25 @@ def lambda_handler(event, context):
'compressed_size': len(compressed) 'compressed_size': len(compressed)
} }
# Single API call # 3) Single API call
try: try:
success = call_api_once(payload, payload['domain'], req_id) success = call_api_once(payload, payload['domain'], req_id)
except Exception as e: except Exception as e:
logger.error(f"API-Call-Exception: {e}") logger.error(f"[{req_id}] API-Call-Exception: {e}")
success = False success = False
# Egal ob success True oder False: wir geben 200 zurück # 4) Handling nach API-Call
if success: if success:
try: # normal processed
mark_email_processed(bucket, key, s3_client) mark_email_processed(bucket, key, 'true', s3_client)
except Exception as e: else:
logger.error(f"Markieren fehlgeschlagen: {e}") # nur wenn es to_addrs gibt
return {'statusCode': 200, 'body': 'Done'} if to_addrs:
bucket_domain = payload['domain']
domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr]
status = 'unknownUser' if bucket_domain in domains else 'unknownDomain'
mark_email_processed(bucket, key, status, s3_client)
else:
logger.info(f"[{req_id}] Keine Empfänger, kein Markieren")
return {'statusCode': 200, 'body': 'Done'}