Compare commits

..

No commits in common. "4388f6efc2122d2c34e33817dd7fe108cfa94365" and "dc57e080309c322049f6d42a9042cb41d9c85e08" have entirely different histories.

1 changed files with 22 additions and 61 deletions

View File

@ -8,9 +8,6 @@ import urllib.parse
import logging import logging
import boto3 import boto3
import base64 import base64
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -21,24 +18,20 @@ MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760'))
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'): def mark_email_processed(bucket, key, s3_client, processor='lambda'):
"""Setzt in S3 das processed-Flag per Metadata.""" """Setzt in S3 das processed-Flag per Metadata."""
try: s3_client.copy_object(
s3_client.copy_object( Bucket=bucket,
Bucket=bucket, Key=key,
Key=key, CopySource={'Bucket': bucket, 'Key': key},
CopySource={'Bucket': bucket, 'Key': key}, Metadata={
Metadata={ 'processed': 'true',
'processed': metadata, 'processed_timestamp': str(int(time.time())),
'processed_timestamp': str(int(time.time())), 'processor': processor
'processor': processor },
}, MetadataDirective='REPLACE'
MetadataDirective='REPLACE' )
) logger.info(f"Marked S3 object {bucket}/{key} as processed")
logger.info(f"Marked S3 object {bucket}/{key} as {metadata}")
except Exception as e:
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
def call_api_once(payload, domain, request_id): def call_api_once(payload, domain, request_id):
"""Single-shot POST, kein Retry.""" """Single-shot POST, kein Retry."""
@ -62,7 +55,6 @@ def call_api_once(payload, domain, request_id):
logger.error(f"[{request_id}] API returned {code}: {body}") logger.error(f"[{request_id}] API returned {code}: {body}")
return False return False
def lambda_handler(event, context): def lambda_handler(event, context):
req_id = context.aws_request_id req_id = context.aws_request_id
rec = event['Records'][0]['s3'] rec = event['Records'][0]['s3']
@ -72,31 +64,14 @@ def lambda_handler(event, context):
# Kopf-Check # Kopf-Check
head = s3_client.head_object(Bucket=bucket, Key=key) head = s3_client.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {})
if metadata.get('processed') == 'true':
logger.info(f"[{req_id}] Skipping already processed object")
return {'statusCode': 200, 'body': 'Already processed'}
size = head['ContentLength'] size = head['ContentLength']
if size > MAX_EMAIL_SIZE: if size > MAX_EMAIL_SIZE:
logger.warning(f"[{req_id}] Email too large: {size} bytes") logger.warning(f"[{req_id}] Email too large: {size} bytes")
# hier ggf. mark_large_email(...)
return {'statusCode': 413} return {'statusCode': 413}
# E-Mail Inhalt laden # Inhalt laden und komprimieren
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
# 1) Parsen und Loggen von from/to
try:
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else ''
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}")
except Exception as e:
logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}")
from_addr = ''
to_addrs = []
# 2) Komprimieren und Payload bauen
compressed = gzip.compress(body) compressed = gzip.compress(body)
payload = { payload = {
's3_bucket': bucket, 's3_bucket': bucket,
@ -110,25 +85,11 @@ def lambda_handler(event, context):
'compressed_size': len(compressed) 'compressed_size': len(compressed)
} }
# 3) Single API call # Single API call
try: if call_api_once(payload, payload['domain'], req_id):
success = call_api_once(payload, payload['domain'], req_id) # Metadaten setzen Mail bleibt in S3
except Exception as e: mark_email_processed(bucket, key, s3_client)
logger.error(f"[{req_id}] API-Call-Exception: {e}") return {'statusCode': 200, 'body': 'Processed'}
success = False
# 4) Handling nach API-Call
if success:
# normal processed
mark_email_processed(bucket, key, 'true', s3_client)
else: else:
# nur wenn es to_addrs gibt logger.error(f"[{req_id}] API call failed, leaving object unmodified")
if to_addrs: return {'statusCode': 500, 'body': 'API Error'}
bucket_domain = payload['domain']
domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr]
status = 'unknownUser' if bucket_domain in domains else 'unknownDomain'
mark_email_processed(bucket, key, status, s3_client)
else:
logger.info(f"[{req_id}] Keine Empfänger, kein Markieren")
return {'statusCode': 200, 'body': 'Done'}