This commit is contained in:
Andreas Knuth 2025-06-13 16:40:54 -05:00
parent bd6d7a8c92
commit ce87a9e3a5
2 changed files with 68 additions and 93 deletions

View File

@ -62,7 +62,15 @@ def process_email(domain):
request_id = data.get('request_id', 'no-request-id') request_id = data.get('request_id', 'no-request-id')
logger.info(f"[{request_id}] INCOMING POST /process/{domain}: payload={data}") payload_summary = {
**k: (len(v) if k == 'email_content' else v)**
for k, v in data.items()
if k != 'email_content' or isinstance(v, (str, bytes))
}
logger.info(
f"[{request_id}] INCOMING POST /process/{domain}: "
f"payload_summary={payload_summary}"
)
content = data.get('email_content') content = data.get('email_content')
compressed = data.get('compressed', False) compressed = data.get('compressed', False)

View File

@ -1,128 +1,95 @@
import time
import gzip
import json import json
import os import os
import urllib.request import urllib.request
import urllib.error import urllib.error
import urllib.parse import urllib.parse
import logging import logging
import ssl
import boto3 import boto3
import base64 import base64
import gzip
import time
# Konfiguration
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true'
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760'))
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
def bucket_name_to_domain(bucket_name): def mark_email_processed(bucket, key, s3_client, processor='lambda'):
if bucket_name.endswith('-emails'): """Setzt in S3 das processed-Flag per Metadata."""
domain_part = bucket_name[:-7] s3_client.copy_object(
return domain_part.replace('-', '.') Bucket=bucket,
logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema") Key=key,
return None CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'processed': 'true',
'processed_timestamp': str(int(time.time())),
'processor': processor
},
MetadataDirective='REPLACE'
)
logger.info(f"Marked S3 object {bucket}/{key} as processed")
def call_api_once(payload, domain, request_id):
"""Single-shot POST, kein Retry."""
url = f"{API_BASE_URL}/process/{domain}"
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('X-Request-ID', request_id)
logger.info(f"[{request_id}] OUTGOING POST {url}: "
f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, "
f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}")
def mark_email_processed(bucket, key, processor='lambda'): with urllib.request.urlopen(req, timeout=25) as resp:
timestamp = str(int(time.time())) code = resp.getcode()
try: if code == 200:
s3_client.copy_object( logger.info(f"[{request_id}] API returned 200 OK")
Bucket=bucket, return True
Key=key, else:
CopySource={'Bucket': bucket, 'Key': key}, body = resp.read().decode('utf-8', errors='ignore')
Metadata={ logger.error(f"[{request_id}] API returned {code}: {body}")
'processed': 'true', return False
'processed_timestamp': timestamp,
'processor': processor
},
MetadataDirective='REPLACE'
)
logger.info(f"E-Mail {key} als verarbeitet markiert durch {processor}")
return True
except Exception as e:
logger.error(f"Markierungsfehler für {key}: {e}")
return False
def call_api_with_retry(payload, domain, request_id, max_retries=3):
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
try:
logger.info(f"[{request_id}] Attempt {attempt+1}: POST payload to {sync_url}")
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
timeout = 25
context = ssl._create_unverified_context() if DEBUG_MODE else None
with urllib.request.urlopen(req, timeout=timeout, context=context) as response:
code = response.getcode()
if code == 200:
logger.info(f"[{request_id}] API response: {response_code}")
return True
except urllib.error.HTTPError as e:
if 400 <= e.code < 500:
return False
except Exception:
pass
time.sleep(2 ** attempt)
return False
def lambda_handler(event, context): def lambda_handler(event, context):
request_id = context.aws_request_id req_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen") rec = event['Records'][0]['s3']
s3_event = event['Records'][0]['s3'] bucket = rec['bucket']['name']
bucket = s3_event['bucket']['name'] key = urllib.parse.unquote_plus(rec['object']['key'])
key = urllib.parse.unquote_plus(s3_event['object']['key']) logger.info(f"[{req_id}] Processing {bucket}/{key}")
logger.info(f"[{request_id}] Verarbeite: {bucket}/{key}")
domain = bucket_name_to_domain(bucket)
if not domain:
return {'statusCode': 400, 'body': 'Ungültiger Bucket-Name'}
# Kopf-Check
head = s3_client.head_object(Bucket=bucket, Key=key) head = s3_client.head_object(Bucket=bucket, Key=key)
size = head['ContentLength'] size = head['ContentLength']
if size > MAX_EMAIL_SIZE: if size > MAX_EMAIL_SIZE:
# mark large logger.warning(f"[{req_id}] Email too large: {size} bytes")
s3_client.copy_object( # hier ggf. mark_large_email(...)
Bucket=bucket, return {'statusCode': 413}
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={'status': 'too_large', 'size': str(size), 'marked_at': str(int(time.time()))},
MetadataDirective='REPLACE'
)
return {'statusCode': 413, 'body': 'E-Mail zu groß'}
# Inhalt laden und komprimieren
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
compressed = gzip.compress(body) compressed = gzip.compress(body)
payload = { payload = {
's3_bucket': bucket, 's3_bucket': bucket,
's3_key': key, 's3_key': key,
'domain': domain, 'domain': bucket.replace('-', '.').rsplit('.emails',1)[0],
'email_content': base64.b64encode(compressed).decode(), 'email_content': base64.b64encode(compressed).decode(),
'compressed': True, 'compressed': True,
'etag': head['ETag'].strip('"'), 'etag': head['ETag'].strip('"'),
'request_id': request_id, 'request_id': req_id,
'original_size': len(body), 'original_size': len(body),
'compressed_size': len(compressed) 'compressed_size': len(compressed)
} }
logger.info(f"[{request_id}] OUTGOING payload to {API_BASE_URL}/process/{domain}: "
f"domain={domain}, key={key}, bucket={bucket}, "
f"orig_size={len(body)}, comp_size={len(compressed)}")
if call_api_with_retry(payload, domain, request_id): # Single API call
# Inline metadata marking statt Löschung if call_api_once(payload, payload['domain'], req_id):
mark_email_processed(bucket, key) # Metadaten setzen Mail bleibt in S3
return {'statusCode': 200, 'body': json.dumps({'message': 'E-Mail verarbeitet und markiert', 'request_id': request_id})} mark_email_processed(bucket, key, s3_client)
return {'statusCode': 200, 'body': 'Processed'}
else: else:
return {'statusCode': 500, 'body': 'API-Fehler'} logger.error(f"[{req_id}] API call failed, leaving object unmodified")
return {'statusCode': 500, 'body': 'API Error'}