docker/ses-lambda/lambda-function.py

128 lines
4.6 KiB
Python

import json
import os
import urllib.request
import urllib.error
import urllib.parse
import logging
import ssl
import boto3
import base64
import gzip
import time
# Konfiguration
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'false').lower() == 'true'
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) # 10MB Standard
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def bucket_name_to_domain(bucket_name):
if bucket_name.endswith('-emails'):
domain_part = bucket_name[:-7]
return domain_part.replace('-', '.')
logger.warning(f"Bucket-Name {bucket_name} entspricht nicht dem erwarteten Schema")
return None
def mark_email_processed(bucket, key, processor='lambda'):
timestamp = str(int(time.time()))
try:
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'processed': 'true',
'processed_timestamp': timestamp,
'processor': processor
},
MetadataDirective='REPLACE'
)
logger.info(f"E-Mail {key} als verarbeitet markiert durch {processor}")
return True
except Exception as e:
logger.error(f"Markierungsfehler für {key}: {e}")
return False
def call_api_with_retry(payload, domain, request_id, max_retries=3):
sync_url = f"{API_BASE_URL}/process/{domain}"
payload_json = json.dumps(payload).encode('utf-8')
for attempt in range(max_retries):
try:
logger.info(f"[{request_id}] Attempt {attempt+1}: POST payload to {sync_url}")
req = urllib.request.Request(sync_url, data=payload_json, method="POST")
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', f'AWS-Lambda-EmailProcessor/2.0-{request_id}')
req.add_header('X-Request-ID', request_id)
timeout = 25
context = ssl._create_unverified_context() if DEBUG_MODE else None
with urllib.request.urlopen(req, timeout=timeout, context=context) as response:
code = response.getcode()
if code == 200:
logger.info(f"[{request_id}] API response: {response_code}")
return True
except urllib.error.HTTPError as e:
if 400 <= e.code < 500:
return False
except Exception:
pass
time.sleep(2 ** attempt)
return False
def lambda_handler(event, context):
request_id = context.aws_request_id
logger.info(f"[{request_id}] S3-Ereignis empfangen")
s3_event = event['Records'][0]['s3']
bucket = s3_event['bucket']['name']
key = urllib.parse.unquote_plus(s3_event['object']['key'])
logger.info(f"[{request_id}] Verarbeite: {bucket}/{key}")
domain = bucket_name_to_domain(bucket)
if not domain:
return {'statusCode': 400, 'body': 'Ungültiger Bucket-Name'}
head = s3_client.head_object(Bucket=bucket, Key=key)
size = head['ContentLength']
if size > MAX_EMAIL_SIZE:
# mark large
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={'status': 'too_large', 'size': str(size), 'marked_at': str(int(time.time()))},
MetadataDirective='REPLACE'
)
return {'statusCode': 413, 'body': 'E-Mail zu groß'}
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
compressed = gzip.compress(body)
payload = {
's3_bucket': bucket,
's3_key': key,
'domain': domain,
'email_content': base64.b64encode(compressed).decode(),
'compressed': True,
'etag': head['ETag'].strip('"'),
'request_id': request_id,
'original_size': len(body),
'compressed_size': len(compressed)
}
logger.info(f"[{request_id}] OUTGOING payload to {API_BASE_URL}/process/{domain}: "
f"domain={domain}, key={key}, bucket={bucket}, "
f"orig_size={len(body)}, comp_size={len(compressed)}")
if call_api_with_retry(payload, domain, request_id):
# Inline metadata marking statt Löschung
mark_email_processed(bucket, key)
return {'statusCode': 200, 'body': json.dumps({'message': 'E-Mail verarbeitet und markiert', 'request_id': request_id})}
else:
return {'statusCode': 500, 'body': 'API-Fehler'}