Compare commits

...

3 Commits

Author SHA1 Message Date
Andreas Knuth 4388f6efc2 parse body, new metadata 2025-06-14 20:27:56 -05:00
Andreas Knuth fdbc32bed9 verbessertes Fehlerhandling 2025-06-14 20:22:34 -05:00
Andreas Knuth 4943bccb3e Korrekturen 2025-06-13 17:27:05 -05:00
1 changed files with 61 additions and 22 deletions

View File

@ -8,6 +8,9 @@ import urllib.parse
import logging import logging
import boto3 import boto3
import base64 import base64
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -18,20 +21,24 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760'))
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
def mark_email_processed(bucket, key, s3_client, processor='lambda'): def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'):
"""Setzt in S3 das processed-Flag per Metadata.""" """Setzt in S3 das processed-Flag per Metadata."""
s3_client.copy_object( try:
Bucket=bucket, s3_client.copy_object(
Key=key, Bucket=bucket,
CopySource={'Bucket': bucket, 'Key': key}, Key=key,
Metadata={ CopySource={'Bucket': bucket, 'Key': key},
'processed': 'true', Metadata={
'processed_timestamp': str(int(time.time())), 'processed': metadata,
'processor': processor 'processed_timestamp': str(int(time.time())),
}, 'processor': processor
MetadataDirective='REPLACE' },
) MetadataDirective='REPLACE'
logger.info(f"Marked S3 object {bucket}/{key} as processed") )
logger.info(f"Marked S3 object {bucket}/{key} as {metadata}")
except Exception as e:
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
def call_api_once(payload, domain, request_id): def call_api_once(payload, domain, request_id):
"""Single-shot POST, kein Retry.""" """Single-shot POST, kein Retry."""
@ -55,6 +62,7 @@ def call_api_once(payload, domain, request_id):
logger.error(f"[{request_id}] API returned {code}: {body}") logger.error(f"[{request_id}] API returned {code}: {body}")
return False return False
def lambda_handler(event, context): def lambda_handler(event, context):
req_id = context.aws_request_id req_id = context.aws_request_id
rec = event['Records'][0]['s3'] rec = event['Records'][0]['s3']
@ -64,14 +72,31 @@ def lambda_handler(event, context):
# Kopf-Check # Kopf-Check
head = s3_client.head_object(Bucket=bucket, Key=key) head = s3_client.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {})
if metadata.get('processed') == 'true':
logger.info(f"[{req_id}] Skipping already processed object")
return {'statusCode': 200, 'body': 'Already processed'}
size = head['ContentLength'] size = head['ContentLength']
if size > MAX_EMAIL_SIZE: if size > MAX_EMAIL_SIZE:
logger.warning(f"[{req_id}] Email too large: {size} bytes") logger.warning(f"[{req_id}] Email too large: {size} bytes")
# hier ggf. mark_large_email(...)
return {'statusCode': 413} return {'statusCode': 413}
# Inhalt laden und komprimieren # E-Mail Inhalt laden
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
# 1) Parsen und Loggen von from/to
try:
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else ''
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}")
except Exception as e:
logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}")
from_addr = ''
to_addrs = []
# 2) Komprimieren und Payload bauen
compressed = gzip.compress(body) compressed = gzip.compress(body)
payload = { payload = {
's3_bucket': bucket, 's3_bucket': bucket,
@ -85,11 +110,25 @@ def lambda_handler(event, context):
'compressed_size': len(compressed) 'compressed_size': len(compressed)
} }
# Single API call # 3) Single API call
if call_api_once(payload, payload['domain'], req_id): try:
# Metadaten setzen Mail bleibt in S3 success = call_api_once(payload, payload['domain'], req_id)
mark_email_processed(bucket, key, s3_client) except Exception as e:
return {'statusCode': 200, 'body': 'Processed'} logger.error(f"[{req_id}] API-Call-Exception: {e}")
success = False
# 4) Handling nach API-Call
if success:
# normal processed
mark_email_processed(bucket, key, 'true', s3_client)
else: else:
logger.error(f"[{req_id}] API call failed, leaving object unmodified") # nur wenn es to_addrs gibt
return {'statusCode': 500, 'body': 'API Error'} if to_addrs:
bucket_domain = payload['domain']
domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr]
status = 'unknownUser' if bucket_domain in domains else 'unknownDomain'
mark_email_processed(bucket, key, status, s3_client)
else:
logger.info(f"[{req_id}] Keine Empfänger, kein Markieren")
return {'statusCode': 200, 'body': 'Done'}