docker/ses-lambda-new-python/lambda_function.py

624 lines
26 KiB
Python

import os
import boto3
import smtplib
import time
import traceback
import json
import random
import signal
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
s3 = boto3.client('s3')
# Environment variables (set these in the Lambda config)
SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525'))
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Metadata key/value to mark processed objects (only set when at least one recipient delivered)
PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed')
PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true')
# Retry configuration
MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3'))
BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2'))
# Email size limit (25 MB - Lambda memory safety margin)
MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25'))
MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024
# ============================================================================
# VERBESSERUNG 5: Timeout Protection
# ============================================================================
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Lambda approaching timeout")
def domain_to_bucket(domain: str) -> str:
return domain.replace('.', '-') + '-emails'
def bucket_to_domain(bucket: str) -> str:
return bucket.replace('-emails', '').replace('-', '.')
def parse_raw_message(raw_bytes: bytes):
try:
# Use SMTP policy for better compatibility with various email formats
from email.policy import SMTP
parsed = BytesParser(policy=SMTP).parsebytes(raw_bytes)
except Exception as e:
print(f"Error parsing with SMTP policy: {e}, trying with default policy")
try:
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
except Exception as e2:
print(f"Error parsing with default policy: {e2}")
parsed = None
return parsed
def mark_object_processed(bucket: str, key: str):
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
print(f"Object {key} in {bucket} already marked processed.")
return
new_meta = current_metadata.copy()
new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE
new_meta['processed_at'] = str(int(time.time()))
# Remove processing lock
new_meta.pop('processing_started', None)
# Copy object onto itself replacing metadata
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}")
except Exception as e:
print("Failed to mark processed metadata:", e)
traceback.print_exc()
def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None):
"""Update S3 object metadata with retry information"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
new_meta = current_metadata.copy()
new_meta['retry_count'] = str(retry_count)
new_meta['last_retry'] = str(int(time.time()))
if last_error:
# S3 metadata values must be ASCII, so we encode the error
new_meta['last_error'] = last_error[:255].replace('\n', ' ')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}")
except Exception as e:
print(f"Failed to update retry metadata: {e}")
def get_retry_count(bucket: str, key: str) -> int:
"""Get current retry count from S3 metadata"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
return int(metadata.get('retry_count', '0'))
except Exception:
return 0
def is_temporary_smtp_error(error_code):
"""Check if SMTP error code indicates a temporary failure (4xx)"""
if isinstance(error_code, tuple) and len(error_code) >= 1:
code = error_code[0]
if isinstance(code, int):
return 400 <= code < 500
return False
def is_spam_rejection(error_code):
"""Check if the error is a spam rejection (should not be retried)"""
if isinstance(error_code, tuple) and len(error_code) >= 2:
code = error_code[0]
message = error_code[1]
# 554 with spam message is permanent - don't retry
if code == 554 and b'spam' in message.lower():
return True
return False
# ============================================================================
# VERBESSERUNG 3: Exponential Backoff with Jitter
# ============================================================================
def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES):
"""Send email with retry logic for temporary failures using exponential backoff"""
delivered = []
refused = {}
last_error = None
for attempt in range(max_retries + 1):
if attempt > 0:
# Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY)
delay = BASE_RETRY_DELAY * (2 ** (attempt - 1))
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter
total_delay = delay + jitter
print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)")
time.sleep(total_delay)
try:
with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp:
smtp.ehlo()
# Try STARTTLS
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
print("STARTTLS not available or failed (continuing):", e)
# Login if credentials provided
if smtp_user and smtp_pass:
try:
smtp.login(smtp_user, smtp_pass)
except Exception as e:
print("SMTP login failed (continuing):", e)
# Attempt to send
try:
send_result = smtp.sendmail(frm_addr, recipients, raw_message)
if isinstance(send_result, dict):
# Separate temporary and permanent failures
temp_refused = {}
perm_refused = {}
for rcpt, error in send_result.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
# If we have temporary failures and more retries, continue
if temp_refused and attempt < max_retries:
print(f"Temporary failures for {list(temp_refused.keys())}, will retry...")
recipients = list(temp_refused.keys()) # Only retry temporary failures
refused = perm_refused # Keep track of permanent failures
last_error = str(temp_refused)
continue
else:
# No temporary failures or no more retries
refused = send_result
delivered = [r for r in recipients if r not in refused]
break
else:
# All delivered successfully
delivered = recipients[:]
refused = {}
break
except smtplib.SMTPRecipientsRefused as e:
print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}")
# Check if all are temporary failures
temp_refused = {}
perm_refused = {}
for rcpt, error in e.recipients.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
if temp_refused and attempt < max_retries:
recipients = list(temp_refused.keys())
refused = perm_refused
last_error = str(e)
continue
else:
refused = e.recipients
delivered = [r for r in recipients if r not in refused]
break
except Exception as e:
print(f"SMTP sendmail error on attempt {attempt + 1}: {e}")
# Check if this is a spam rejection (permanent error that shouldn't be retried)
if hasattr(e, 'smtp_code') and hasattr(e, 'smtp_error'):
if is_spam_rejection((e.smtp_code, e.smtp_error)):
print(f"Email rejected as spam (permanent error), not retrying")
refused = {r: (e.smtp_code, e.smtp_error) for r in recipients}
delivered = []
break
# For other errors, check if it's worth retrying
if attempt < max_retries:
# Only retry if it might be temporary
error_str = str(e)
if '554' in error_str and 'spam' in error_str.lower():
print(f"Email rejected as spam, not retrying")
refused = {r: ('spam', str(e)) for r in recipients}
delivered = []
break
else:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('error', str(e)) for r in recipients}
delivered = []
break
except Exception as e:
print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}")
if attempt < max_retries:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('connect-error', str(e)) for r in recipients}
delivered = []
break
return delivered, refused
# ============================================================================
# VERBESSERUNG 4: Structured Logging
# ============================================================================
def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int):
"""Log processing results to CloudWatch in structured format for easier analysis"""
result = {
'timestamp': int(time.time()),
'bucket': bucket,
'key': key,
'delivered_count': len(delivered),
'refused_count': len(refused),
'retry_count': retry_count,
'delivered_recipients': delivered,
'refused_recipients': list(refused.keys()) if refused else [],
'success': len(delivered) > 0
}
# Structured logging for CloudWatch Insights
print(f"PROCESSING_RESULT: {json.dumps(result)}")
return result
# ============================================================================
# MAIN PROCESSING FUNCTION (extracted for timeout handling)
# ============================================================================
def _process_email(event, context):
"""Main email processing logic (extracted for timeout protection)"""
print("Event:", event)
ses = None
try:
rec = event['Records'][0]
except Exception as e:
print("No Records in event:", e)
return {'statusCode': 400, 'body': 'No Records'}
# Determine bucket/key and initial recipients list
recipients = []
bucket = None
key = None
is_ses_event = False
if 'ses' in rec:
# SES Event - vertrauenswürdig, hat die korrekten Empfänger
is_ses_event = True
ses = rec['ses']
msg_id = ses['mail']['messageId']
recipients = ses['receipt'].get('recipients', [])
# assume first recipient domain maps to bucket
if recipients:
domain = recipients[0].split('@', 1)[1]
bucket = domain_to_bucket(domain)
prefix = f"{msg_id}"
print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}")
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in resp_list or not resp_list['Contents']:
raise Exception(f"No S3 object under {prefix} in {bucket}")
key = resp_list['Contents'][0]['Key']
else:
raise Exception("SES event but no recipients found")
elif 's3' in rec:
# S3 Event - muss Empfänger aus Headers extrahieren
s3info = rec['s3']
bucket = s3info['bucket']['name']
key = s3info['object']['key']
print(f"S3 event: bucket={bucket}, key={key}")
# recipients will be parsed from message headers below
else:
raise Exception("Unknown event type")
# ========================================================================
# VERBESSERUNG 1: Duplicate Prevention with Processing Lock
# ========================================================================
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
# Check if already processed
if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
processed_at = metadata.get('processed_at', 'unknown time')
print(f"Object {key} already processed at {processed_at}")
return {'statusCode': 200, 'body': 'already processed'}
# Check if currently being processed (lock mechanism)
processing_started = metadata.get('processing_started')
if processing_started:
processing_age = time.time() - float(processing_started)
if processing_age < 300: # 5 minutes lock
print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)")
return {'statusCode': 200, 'body': 'already being processed'}
else:
print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing")
# Set processing lock
new_meta = metadata.copy()
new_meta['processing_started'] = str(int(time.time()))
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Set processing lock on {key}")
except s3.exceptions.NoSuchKey:
print(f"Object {key} no longer exists, skipping")
return {'statusCode': 404, 'body': 'object not found'}
except Exception as e:
print(f"Error checking/setting processing lock: {e}")
# Continue anyway if lock fails (better than dropping email)
# Check retry count - if too many retries, give up
retry_count = get_retry_count(bucket, key)
if retry_count >= MAX_RETRIES * 2: # Safety limit
print(f"Object {key} has been retried {retry_count} times, giving up")
mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries
return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'}
# ========================================================================
# VERBESSERUNG 2: Memory Optimization with Size Check
# ========================================================================
try:
resp = s3.get_object(Bucket=bucket, Key=key)
content_length = int(resp.get('ContentLength', 0))
# Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB
if content_length > MAX_EMAIL_SIZE_BYTES:
print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB")
# Mark as processed to prevent infinite retries
mark_object_processed(bucket, key)
return {
'statusCode': 413, # Payload Too Large
'body': json.dumps({
'error': 'email_too_large',
'size_mb': round(content_length/1024/1024, 2),
'max_mb': MAX_EMAIL_SIZE_MB
})
}
raw_bytes = resp['Body'].read()
print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}")
except Exception as e:
print(f"ERROR reading from S3: {e}")
traceback.print_exc()
return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'}
parsed = parse_raw_message(raw_bytes)
subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)'
frm_addr = None
if parsed:
from_addrs = getaddresses(parsed.get_all('from', []) or [])
frm_addr = from_addrs[0][1] if from_addrs else None
if not frm_addr:
frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost'))
print(f"From: {frm_addr}, Subject: {subj}")
# If recipients were not provided (S3 path), extract from headers
if not recipients and not is_ses_event:
if parsed:
expected_domain = bucket_to_domain(bucket).lower()
# Debug: Print raw headers to understand what we're getting
print(f"=== DEBUG: Header Analysis ===")
print(f"Expected domain: {expected_domain}")
# The email parser is case-insensitive for headers, so we only need to check once
# Get headers using standard case (parser handles case-insensitivity)
to_headers = parsed.get_all('to', []) or []
cc_headers = parsed.get_all('cc', []) or []
bcc_headers = parsed.get_all('bcc', []) or []
if to_headers:
print(f"Found 'To' header: {to_headers}")
if cc_headers:
print(f"Found 'Cc' header: {cc_headers}")
if bcc_headers:
print(f"Found 'Bcc' header: {bcc_headers}")
# Parse addresses from headers
to_addrs = [addr for _n, addr in getaddresses(to_headers) if addr]
cc_addrs = [addr for _n, addr in getaddresses(cc_headers) if addr]
bcc_addrs = [addr for _n, addr in getaddresses(bcc_headers) if addr]
all_recipients = to_addrs + cc_addrs + bcc_addrs
print(f"Parsed recipients - To: {to_addrs}, Cc: {cc_addrs}, Bcc: {bcc_addrs}")
# Filter recipients to bucket domain with case-insensitive comparison
# and deduplicate using a set (preserving case)
recipients_set = set()
recipients = []
for addr in all_recipients:
# Extract domain part (everything after @)
if '@' in addr:
addr_lower = addr.lower()
addr_domain = addr_lower.split('@')[-1]
if addr_domain == expected_domain:
# Only add if not already in set (case-insensitive deduplication)
if addr_lower not in recipients_set:
recipients_set.add(addr_lower)
recipients.append(addr) # Keep original case
print(f"Matched recipient: {addr} (domain: {addr_domain})")
else:
print(f"Skipped duplicate: {addr}")
else:
print(f"Skipped recipient: {addr} (domain: {addr_domain} != {expected_domain})")
print(f"Final recipients after domain filter and deduplication: {recipients}")
# If no recipients found, try additional headers
if not recipients:
print("WARNING: No recipients found in standard headers, checking additional headers...")
# Check for X-Original-To, Delivered-To, Envelope-To
fallback_headers = ['X-Original-To', 'Delivered-To', 'Envelope-To',
'x-original-to', 'delivered-to', 'envelope-to']
for header_name in fallback_headers:
header_val = parsed.get(header_name)
if header_val:
print(f"Found {header_name}: {header_val}")
fallback_addrs = [addr for _n, addr in getaddresses([header_val]) if addr]
for addr in fallback_addrs:
if '@' in addr and addr.split('@')[-1].lower() == expected_domain:
recipients.append(addr)
print(f"Found recipient in {header_name}: {addr}")
if not recipients:
print(f"ERROR: Could not find any recipients for domain {expected_domain}")
print(f"All addresses found: {all_recipients}")
# Print all headers for debugging
print("=== All Email Headers ===")
for key_h in parsed.keys():
print(f"{key_h}: {parsed.get(key_h)}")
print("=== End Headers ===")
else:
print("ERROR: Could not parse email headers")
recipients = []
# If after all we have no recipients, skip SMTP
delivered = []
refused = {}
if recipients:
# Use raw bytes directly (no decoding!)
raw_message = raw_bytes
# Determine HELO hostname
env_local = os.environ.get('SMTP_LOCAL_HOSTNAME')
derived_local = bucket_to_domain(bucket) if bucket else None
local_helo = env_local or derived_local or 'localhost'
print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}")
start = time.time()
# Send with retry logic
delivered, refused = send_email_with_retry(
SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES
)
print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}")
# Update retry count if we had temporary failures
if refused and not delivered:
temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)]
if temp_failures:
update_retry_metadata(bucket, key, retry_count + 1, str(refused))
else:
print("No recipients to send to; skipping SMTP.")
# Only mark as processed if at least one recipient was delivered successfully
if delivered:
try:
mark_object_processed(bucket, key)
except Exception as e:
print("Failed to mark object processed after delivery:", e)
else:
print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.")
# Log structured result
result = log_processing_result(bucket, key, delivered, refused, retry_count)
return {
'statusCode': 200 if delivered else 500,
'body': json.dumps(result)
}
# ============================================================================
# LAMBDA HANDLER with TIMEOUT PROTECTION
# ============================================================================
def lambda_handler(event, context):
"""
Lambda entry point with timeout protection
Recommended Lambda Configuration:
- Memory: 512 MB
- Timeout: 60 seconds
- Environment Variables:
- SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS
- MAX_SMTP_RETRIES=3
- BASE_RETRY_DELAY=2
- MAX_EMAIL_SIZE_MB=25
"""
# Set up timeout protection (stop 5 seconds before Lambda timeout)
remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60
safety_margin = 5 # seconds
max_execution_time = max(10, remaining_time - safety_margin)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(max_execution_time))
try:
return _process_email(event, context)
except TimeoutException:
print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting")
# Don't mark as processed so it can be retried
return {
'statusCode': 408, # Request Timeout
'body': json.dumps({
'error': 'lambda_timeout',
'execution_time': max_execution_time
})
}
except Exception as e:
print(f"FATAL ERROR in lambda_handler: {e}")
traceback.print_exc()
return {
'statusCode': 500,
'body': json.dumps({
'error': 'internal_error',
'message': str(e)
})
}
finally:
signal.alarm(0) # Cancel alarm