This commit is contained in:
Andreas Knuth 2025-10-16 21:34:23 -05:00
parent cd731c502b
commit 286de26c87
1 changed files with 331 additions and 568 deletions

View File

@ -1,624 +1,387 @@
import os
import boto3
import smtplib
import time
import traceback
import json
import random
import signal
import time
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
from email.policy import SMTP as SMTPPolicy
s3 = boto3.client('s3')
sqs = boto3.client('sqs', region_name='us-east-2')
# Environment variables (set these in the Lambda config)
SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525'))
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# AWS Region
AWS_REGION = 'us-east-2'
# Metadata key/value to mark processed objects (only set when at least one recipient delivered)
PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed')
PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true')
# Retry configuration
MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3'))
BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2'))
# Email size limit (25 MB - Lambda memory safety margin)
MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25'))
MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024
# ============================================================================
# VERBESSERUNG 5: Timeout Protection
# ============================================================================
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Lambda approaching timeout")
# Metadata Keys
PROCESSED_KEY = 'processed'
PROCESSED_VALUE = 'true'
def domain_to_bucket(domain: str) -> str:
"""Konvertiert Domain zu S3 Bucket Namen"""
return domain.replace('.', '-') + '-emails'
def bucket_to_domain(bucket: str) -> str:
return bucket.replace('-emails', '').replace('-', '.')
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def parse_raw_message(raw_bytes: bytes):
def get_queue_url_for_domain(domain: str) -> str:
"""
Ermittelt SQS Queue URL für Domain
Queue Name: domain-mit-bindestrichen-queue
"""
queue_name = domain_to_queue_name(domain)
try:
# Use SMTP policy for better compatibility with various email formats
from email.policy import SMTP
parsed = BytesParser(policy=SMTP).parsebytes(raw_bytes)
except Exception as e:
print(f"Error parsing with SMTP policy: {e}, trying with default policy")
try:
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
except Exception as e2:
print(f"Error parsing with default policy: {e2}")
parsed = None
return parsed
response = sqs.get_queue_url(QueueName=queue_name)
queue_url = response['QueueUrl']
print(f"✓ Found queue: {queue_name}")
return queue_url
def mark_object_processed(bucket: str, key: str):
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
print(f"Object {key} in {bucket} already marked processed.")
return
new_meta = current_metadata.copy()
new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE
new_meta['processed_at'] = str(int(time.time()))
# Remove processing lock
new_meta.pop('processing_started', None)
# Copy object onto itself replacing metadata
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
except sqs.exceptions.QueueDoesNotExist:
raise Exception(
f"Queue does not exist: {queue_name} "
f"(for domain: {domain})"
)
print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}")
except Exception as e:
print("Failed to mark processed metadata:", e)
traceback.print_exc()
raise Exception(f"Error getting queue URL for {domain}: {e}")
def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None):
"""Update S3 object metadata with retry information"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
new_meta = current_metadata.copy()
new_meta['retry_count'] = str(retry_count)
new_meta['last_retry'] = str(int(time.time()))
if last_error:
# S3 metadata values must be ASCII, so we encode the error
new_meta['last_error'] = last_error[:255].replace('\n', ' ')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}")
except Exception as e:
print(f"Failed to update retry metadata: {e}")
def get_retry_count(bucket: str, key: str) -> int:
"""Get current retry count from S3 metadata"""
def is_already_processed(bucket: str, key: str) -> bool:
"""Prüft ob E-Mail bereits verarbeitet wurde"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
return int(metadata.get('retry_count', '0'))
except Exception:
return 0
def is_temporary_smtp_error(error_code):
"""Check if SMTP error code indicates a temporary failure (4xx)"""
if isinstance(error_code, tuple) and len(error_code) >= 1:
code = error_code[0]
if isinstance(code, int):
return 400 <= code < 500
return False
def is_spam_rejection(error_code):
"""Check if the error is a spam rejection (should not be retried)"""
if isinstance(error_code, tuple) and len(error_code) >= 2:
code = error_code[0]
message = error_code[1]
# 554 with spam message is permanent - don't retry
if code == 554 and b'spam' in message.lower():
if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE:
processed_at = metadata.get('processed_at', 'unknown')
print(f"✓ Already processed at {processed_at}")
return True
return False
# ============================================================================
# VERBESSERUNG 3: Exponential Backoff with Jitter
# ============================================================================
def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES):
"""Send email with retry logic for temporary failures using exponential backoff"""
delivered = []
refused = {}
last_error = None
for attempt in range(max_retries + 1):
if attempt > 0:
# Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY)
delay = BASE_RETRY_DELAY * (2 ** (attempt - 1))
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter
total_delay = delay + jitter
print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)")
time.sleep(total_delay)
try:
with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp:
smtp.ehlo()
# Try STARTTLS
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
print("STARTTLS not available or failed (continuing):", e)
# Login if credentials provided
if smtp_user and smtp_pass:
try:
smtp.login(smtp_user, smtp_pass)
except Exception as e:
print("SMTP login failed (continuing):", e)
# Attempt to send
try:
send_result = smtp.sendmail(frm_addr, recipients, raw_message)
if isinstance(send_result, dict):
# Separate temporary and permanent failures
temp_refused = {}
perm_refused = {}
for rcpt, error in send_result.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
# If we have temporary failures and more retries, continue
if temp_refused and attempt < max_retries:
print(f"Temporary failures for {list(temp_refused.keys())}, will retry...")
recipients = list(temp_refused.keys()) # Only retry temporary failures
refused = perm_refused # Keep track of permanent failures
last_error = str(temp_refused)
continue
else:
# No temporary failures or no more retries
refused = send_result
delivered = [r for r in recipients if r not in refused]
break
else:
# All delivered successfully
delivered = recipients[:]
refused = {}
break
except smtplib.SMTPRecipientsRefused as e:
print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}")
# Check if all are temporary failures
temp_refused = {}
perm_refused = {}
for rcpt, error in e.recipients.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
if temp_refused and attempt < max_retries:
recipients = list(temp_refused.keys())
refused = perm_refused
last_error = str(e)
continue
else:
refused = e.recipients
delivered = [r for r in recipients if r not in refused]
break
except Exception as e:
print(f"SMTP sendmail error on attempt {attempt + 1}: {e}")
# Check if this is a spam rejection (permanent error that shouldn't be retried)
if hasattr(e, 'smtp_code') and hasattr(e, 'smtp_error'):
if is_spam_rejection((e.smtp_code, e.smtp_error)):
print(f"Email rejected as spam (permanent error), not retrying")
refused = {r: (e.smtp_code, e.smtp_error) for r in recipients}
delivered = []
break
# For other errors, check if it's worth retrying
if attempt < max_retries:
# Only retry if it might be temporary
error_str = str(e)
if '554' in error_str and 'spam' in error_str.lower():
print(f"Email rejected as spam, not retrying")
refused = {r: ('spam', str(e)) for r in recipients}
delivered = []
break
else:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('error', str(e)) for r in recipients}
delivered = []
break
except Exception as e:
print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}")
if attempt < max_retries:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('connect-error', str(e)) for r in recipients}
delivered = []
break
return delivered, refused
# ============================================================================
# VERBESSERUNG 4: Structured Logging
# ============================================================================
def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int):
"""Log processing results to CloudWatch in structured format for easier analysis"""
result = {
'timestamp': int(time.time()),
'bucket': bucket,
'key': key,
'delivered_count': len(delivered),
'refused_count': len(refused),
'retry_count': retry_count,
'delivered_recipients': delivered,
'refused_recipients': list(refused.keys()) if refused else [],
'success': len(delivered) > 0
}
# Structured logging for CloudWatch Insights
print(f"PROCESSING_RESULT: {json.dumps(result)}")
return result
# ============================================================================
# MAIN PROCESSING FUNCTION (extracted for timeout handling)
# ============================================================================
def _process_email(event, context):
"""Main email processing logic (extracted for timeout protection)"""
print("Event:", event)
ses = None
try:
rec = event['Records'][0]
except Exception as e:
print("No Records in event:", e)
return {'statusCode': 400, 'body': 'No Records'}
# Determine bucket/key and initial recipients list
recipients = []
bucket = None
key = None
is_ses_event = False
if 'ses' in rec:
# SES Event - vertrauenswürdig, hat die korrekten Empfänger
is_ses_event = True
ses = rec['ses']
msg_id = ses['mail']['messageId']
recipients = ses['receipt'].get('recipients', [])
# assume first recipient domain maps to bucket
if recipients:
domain = recipients[0].split('@', 1)[1]
bucket = domain_to_bucket(domain)
prefix = f"{msg_id}"
print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}")
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in resp_list or not resp_list['Contents']:
raise Exception(f"No S3 object under {prefix} in {bucket}")
key = resp_list['Contents'][0]['Key']
else:
raise Exception("SES event but no recipients found")
elif 's3' in rec:
# S3 Event - muss Empfänger aus Headers extrahieren
s3info = rec['s3']
bucket = s3info['bucket']['name']
key = s3info['object']['key']
print(f"S3 event: bucket={bucket}, key={key}")
# recipients will be parsed from message headers below
else:
raise Exception("Unknown event type")
# ========================================================================
# VERBESSERUNG 1: Duplicate Prevention with Processing Lock
# ========================================================================
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
# Check if already processed
if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
processed_at = metadata.get('processed_at', 'unknown time')
print(f"Object {key} already processed at {processed_at}")
return {'statusCode': 200, 'body': 'already processed'}
# Check if currently being processed (lock mechanism)
processing_started = metadata.get('processing_started')
if processing_started:
processing_age = time.time() - float(processing_started)
if processing_age < 300: # 5 minutes lock
print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)")
return {'statusCode': 200, 'body': 'already being processed'}
else:
print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing")
# Set processing lock
new_meta = metadata.copy()
new_meta['processing_started'] = str(int(time.time()))
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Set processing lock on {key}")
except s3.exceptions.NoSuchKey:
print(f"Object {key} no longer exists, skipping")
return {'statusCode': 404, 'body': 'object not found'}
print(f"⚠ Object {key} not found in {bucket}")
return True # Wenn nicht existiert, als verarbeitet betrachten
except Exception as e:
print(f"Error checking/setting processing lock: {e}")
# Continue anyway if lock fails (better than dropping email)
print(f"⚠ Error checking processed status: {e}")
return False
# Check retry count - if too many retries, give up
retry_count = get_retry_count(bucket, key)
if retry_count >= MAX_RETRIES * 2: # Safety limit
print(f"Object {key} has been retried {retry_count} times, giving up")
mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries
return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'}
# ========================================================================
# VERBESSERUNG 2: Memory Optimization with Size Check
# ========================================================================
def set_processing_lock(bucket: str, key: str) -> bool:
"""
Setzt Processing Lock um Duplicate Processing zu verhindern
Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked
"""
try:
resp = s3.get_object(Bucket=bucket, Key=key)
content_length = int(resp.get('ContentLength', 0))
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
# Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB
if content_length > MAX_EMAIL_SIZE_BYTES:
print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB")
# Mark as processed to prevent infinite retries
mark_object_processed(bucket, key)
return {
'statusCode': 413, # Payload Too Large
'body': json.dumps({
'error': 'email_too_large',
'size_mb': round(content_length/1024/1024, 2),
'max_mb': MAX_EMAIL_SIZE_MB
})
}
# Prüfe auf existierenden Lock
processing_started = metadata.get('processing_started')
if processing_started:
lock_age = time.time() - float(processing_started)
raw_bytes = resp['Body'].read()
print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}")
except Exception as e:
print(f"ERROR reading from S3: {e}")
traceback.print_exc()
return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'}
parsed = parse_raw_message(raw_bytes)
subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)'
frm_addr = None
if parsed:
from_addrs = getaddresses(parsed.get_all('from', []) or [])
frm_addr = from_addrs[0][1] if from_addrs else None
if not frm_addr:
frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost'))
print(f"From: {frm_addr}, Subject: {subj}")
# If recipients were not provided (S3 path), extract from headers
if not recipients and not is_ses_event:
if parsed:
expected_domain = bucket_to_domain(bucket).lower()
# Debug: Print raw headers to understand what we're getting
print(f"=== DEBUG: Header Analysis ===")
print(f"Expected domain: {expected_domain}")
# The email parser is case-insensitive for headers, so we only need to check once
# Get headers using standard case (parser handles case-insensitivity)
to_headers = parsed.get_all('to', []) or []
cc_headers = parsed.get_all('cc', []) or []
bcc_headers = parsed.get_all('bcc', []) or []
if to_headers:
print(f"Found 'To' header: {to_headers}")
if cc_headers:
print(f"Found 'Cc' header: {cc_headers}")
if bcc_headers:
print(f"Found 'Bcc' header: {bcc_headers}")
# Parse addresses from headers
to_addrs = [addr for _n, addr in getaddresses(to_headers) if addr]
cc_addrs = [addr for _n, addr in getaddresses(cc_headers) if addr]
bcc_addrs = [addr for _n, addr in getaddresses(bcc_headers) if addr]
all_recipients = to_addrs + cc_addrs + bcc_addrs
print(f"Parsed recipients - To: {to_addrs}, Cc: {cc_addrs}, Bcc: {bcc_addrs}")
# Filter recipients to bucket domain with case-insensitive comparison
# and deduplicate using a set (preserving case)
recipients_set = set()
recipients = []
for addr in all_recipients:
# Extract domain part (everything after @)
if '@' in addr:
addr_lower = addr.lower()
addr_domain = addr_lower.split('@')[-1]
if addr_domain == expected_domain:
# Only add if not already in set (case-insensitive deduplication)
if addr_lower not in recipients_set:
recipients_set.add(addr_lower)
recipients.append(addr) # Keep original case
print(f"Matched recipient: {addr} (domain: {addr_domain})")
if lock_age < 300: # 5 Minuten Lock
print(f"⚠ Processing lock active (age: {lock_age:.0f}s)")
return False
else:
print(f"Skipped duplicate: {addr}")
else:
print(f"Skipped recipient: {addr} (domain: {addr_domain} != {expected_domain})")
print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding")
print(f"Final recipients after domain filter and deduplication: {recipients}")
# Setze neuen Lock
new_meta = metadata.copy()
new_meta['processing_started'] = str(int(time.time()))
# If no recipients found, try additional headers
if not recipients:
print("WARNING: No recipients found in standard headers, checking additional headers...")
# Check for X-Original-To, Delivered-To, Envelope-To
fallback_headers = ['X-Original-To', 'Delivered-To', 'Envelope-To',
'x-original-to', 'delivered-to', 'envelope-to']
for header_name in fallback_headers:
header_val = parsed.get(header_name)
if header_val:
print(f"Found {header_name}: {header_val}")
fallback_addrs = [addr for _n, addr in getaddresses([header_val]) if addr]
for addr in fallback_addrs:
if '@' in addr and addr.split('@')[-1].lower() == expected_domain:
recipients.append(addr)
print(f"Found recipient in {header_name}: {addr}")
if not recipients:
print(f"ERROR: Could not find any recipients for domain {expected_domain}")
print(f"All addresses found: {all_recipients}")
# Print all headers for debugging
print("=== All Email Headers ===")
for key_h in parsed.keys():
print(f"{key_h}: {parsed.get(key_h)}")
print("=== End Headers ===")
else:
print("ERROR: Could not parse email headers")
recipients = []
# If after all we have no recipients, skip SMTP
delivered = []
refused = {}
if recipients:
# Use raw bytes directly (no decoding!)
raw_message = raw_bytes
# Determine HELO hostname
env_local = os.environ.get('SMTP_LOCAL_HOSTNAME')
derived_local = bucket_to_domain(bucket) if bucket else None
local_helo = env_local or derived_local or 'localhost'
print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}")
start = time.time()
# Send with retry logic
delivered, refused = send_email_with_retry(
SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}")
print(f"✓ Processing lock set")
return True
# Update retry count if we had temporary failures
if refused and not delivered:
temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)]
if temp_failures:
update_retry_metadata(bucket, key, retry_count + 1, str(refused))
else:
print("No recipients to send to; skipping SMTP.")
# Only mark as processed if at least one recipient was delivered successfully
if delivered:
try:
mark_object_processed(bucket, key)
except Exception as e:
print("Failed to mark object processed after delivery:", e)
else:
print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.")
print(f"⚠ Error setting processing lock: {e}")
return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren)
# Log structured result
result = log_processing_result(bucket, key, delivered, refused, retry_count)
return {
'statusCode': 200 if delivered else 500,
'body': json.dumps(result)
def mark_as_queued(bucket: str, key: str, queue_name: str):
"""Markiert E-Mail als in Queue eingereiht"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['queued_at'] = str(int(time.time()))
metadata['queued_to'] = queue_name
metadata['status'] = 'queued'
metadata.pop('processing_started', None) # Lock entfernen
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
print(f"✓ Marked as queued to {queue_name}")
except Exception as e:
print(f"⚠ Failed to mark as queued: {e}")
def send_to_queue(queue_url: str, bucket: str, key: str,
from_addr: str, recipient: str, domain: str,
subject: str, message_id: str):
"""
Sendet E-Mail-Job in domain-spezifische SQS Queue
"""
# Queue Name aus URL extrahieren (für Logging)
queue_name = queue_url.split('/')[-1]
message = {
'bucket': bucket,
'key': key,
'from': from_addr,
'recipient': recipient, # Nur 1 Empfänger
'domain': domain,
'subject': subject,
'message_id': message_id,
'timestamp': int(time.time())
}
try:
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message, ensure_ascii=False),
MessageAttributes={
'domain': {
'StringValue': domain,
'DataType': 'String'
},
'bucket': {
'StringValue': bucket,
'DataType': 'String'
},
'recipient': {
'StringValue': recipient,
'DataType': 'String'
},
'message_id': {
'StringValue': message_id,
'DataType': 'String'
}
}
)
sqs_message_id = response['MessageId']
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
# Als queued markieren
mark_as_queued(bucket, key, queue_name)
return sqs_message_id
except Exception as e:
print(f"✗ Failed to queue message: {e}")
raise
# ============================================================================
# LAMBDA HANDLER with TIMEOUT PROTECTION
# ============================================================================
def lambda_handler(event, context):
"""
Lambda entry point with timeout protection
Recommended Lambda Configuration:
- Memory: 512 MB
- Timeout: 60 seconds
- Environment Variables:
- SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS
- MAX_SMTP_RETRIES=3
- BASE_RETRY_DELAY=2
- MAX_EMAIL_SIZE_MB=25
Lambda Handler für SES Events
WICHTIG: SES ruft Lambda einmal PRO Empfänger auf!
Jedes Event hat genau 1 Empfänger in receipt.recipients
"""
# Set up timeout protection (stop 5 seconds before Lambda timeout)
remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60
safety_margin = 5 # seconds
max_execution_time = max(10, remaining_time - safety_margin)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(max_execution_time))
print(f"{'='*70}")
print(f"Lambda invoked: {context.aws_request_id}")
print(f"Region: {AWS_REGION}")
print(f"{'='*70}")
# SES Event parsen
try:
return _process_email(event, context)
except TimeoutException:
print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting")
# Don't mark as processed so it can be retried
record = event['Records'][0]
ses = record['ses']
except (KeyError, IndexError) as e:
print(f"✗ Invalid event structure: {e}")
return {
'statusCode': 408, # Request Timeout
'statusCode': 400,
'body': json.dumps({'error': 'Invalid SES event'})
}
mail = ses['mail']
receipt = ses['receipt']
message_id = mail['messageId']
source = mail['source']
timestamp = mail.get('timestamp', '')
# ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event
# (NICHT mail.destination verwenden - das hat alle Original-Empfänger)
recipients = receipt.get('recipients', [])
if not recipients or len(recipients) != 1:
print(f"✗ Unexpected recipients count: {len(recipients)}")
return {
'statusCode': 400,
'body': json.dumps({
'error': 'lambda_timeout',
'execution_time': max_execution_time
'error': 'Expected exactly 1 recipient',
'found': len(recipients)
})
}
# SES garantiert: genau 1 Empfänger pro Event
recipient = recipients[0]
domain = recipient.split('@')[1]
bucket = domain_to_bucket(domain)
print(f"\n📧 Email Event:")
print(f" MessageId: {message_id}")
print(f" From: {source}")
print(f" To: {recipient}")
print(f" Domain: {domain}")
print(f" Bucket: {bucket}")
print(f" Timestamp: {timestamp}")
# Queue für Domain ermitteln
try:
queue_url = get_queue_url_for_domain(domain)
queue_name = queue_url.split('/')[-1]
print(f" Queue: {queue_name}")
except Exception as e:
print(f"FATAL ERROR in lambda_handler: {e}")
traceback.print_exc()
print(f"\n✗ ERROR: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'internal_error',
'error': 'queue_not_configured',
'domain': domain,
'recipient': recipient,
'message': str(e)
})
}
finally:
signal.alarm(0) # Cancel alarm
# S3 Object finden
try:
print(f"\n📦 Searching S3...")
response = s3.list_objects_v2(
Bucket=bucket,
Prefix=message_id,
MaxKeys=1
)
if 'Contents' not in response or not response['Contents']:
raise Exception(f"No S3 object found for message {message_id}")
key = response['Contents'][0]['Key']
size = response['Contents'][0]['Size']
print(f" Found: s3://{bucket}/{key}")
print(f" Size: {size:,} bytes ({size/1024:.1f} KB)")
except Exception as e:
print(f"\n✗ S3 ERROR: {e}")
return {
'statusCode': 404,
'body': json.dumps({
'error': 's3_object_not_found',
'message_id': message_id,
'bucket': bucket,
'details': str(e)
})
}
# Duplicate Check
print(f"\n🔍 Checking for duplicates...")
if is_already_processed(bucket, key):
print(f" Already processed, skipping")
return {
'statusCode': 200,
'body': json.dumps({
'status': 'already_processed',
'message_id': message_id,
'recipient': recipient
})
}
# Processing Lock setzen
print(f"\n🔒 Setting processing lock...")
if not set_processing_lock(bucket, key):
print(f" Already being processed by another instance")
return {
'statusCode': 200,
'body': json.dumps({
'status': 'already_processing',
'message_id': message_id,
'recipient': recipient
})
}
# E-Mail laden um Subject zu extrahieren (optional, für besseres Logging)
subject = '(unknown)'
try:
print(f"\n📖 Reading email for metadata...")
obj = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = obj['Body'].read()
# Nur Headers parsen (schneller)
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('subject', '(no subject)')
print(f" Subject: {subject}")
except Exception as e:
print(f" ⚠ Could not parse email (continuing): {e}")
# In domain-spezifische Queue einreihen
try:
print(f"\n📤 Queuing to {queue_name}...")
sqs_message_id = send_to_queue(
queue_url=queue_url,
bucket=bucket,
key=key,
from_addr=source,
recipient=recipient, # Nur 1 Empfänger
domain=domain,
subject=subject,
message_id=message_id
)
print(f"\n{'='*70}")
print(f"✅ SUCCESS - Email queued for delivery")
print(f"{'='*70}\n")
return {
'statusCode': 200,
'body': json.dumps({
'status': 'queued',
'message_id': message_id,
'sqs_message_id': sqs_message_id,
'queue': queue_name,
'domain': domain,
'recipient': recipient,
'subject': subject
})
}
except Exception as e:
print(f"\n{'='*70}")
print(f"✗ FAILED TO QUEUE")
print(f"{'='*70}")
print(f"Error: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'failed_to_queue',
'message': str(e),
'message_id': message_id,
'recipient': recipient
})
}