import os import boto3 import smtplib import time import traceback import json import random import signal from email.parser import BytesParser from email.policy import default from email.utils import getaddresses s3 = boto3.client('s3') # Environment variables (set these in the Lambda config) SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com') SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) SMTP_USER = os.environ.get('SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') # Metadata key/value to mark processed objects (only set when at least one recipient delivered) PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') # Retry configuration MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3')) BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2')) # Email size limit (25 MB - Lambda memory safety margin) MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25')) MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024 # ============================================================================ # VERBESSERUNG 5: Timeout Protection # ============================================================================ class TimeoutException(Exception): pass def timeout_handler(signum, frame): raise TimeoutException("Lambda approaching timeout") def domain_to_bucket(domain: str) -> str: return domain.replace('.', '-') + '-emails' def bucket_to_domain(bucket: str) -> str: return bucket.replace('-emails', '').replace('-', '.') def parse_raw_message(raw_bytes: bytes): try: # Use SMTP policy for better compatibility with various email formats from email.policy import SMTP parsed = BytesParser(policy=SMTP).parsebytes(raw_bytes) except Exception as e: print(f"Error parsing with SMTP policy: {e}, trying with default policy") try: parsed = BytesParser(policy=default).parsebytes(raw_bytes) except Exception as e2: print(f"Error parsing with default policy: {e2}") parsed = None return parsed def mark_object_processed(bucket: str, key: str): try: head = s3.head_object(Bucket=bucket, Key=key) current_metadata = head.get('Metadata', {}) or {} if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: print(f"Object {key} in {bucket} already marked processed.") return new_meta = current_metadata.copy() new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE new_meta['processed_at'] = str(int(time.time())) # Remove processing lock new_meta.pop('processing_started', None) # Copy object onto itself replacing metadata s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}") except Exception as e: print("Failed to mark processed metadata:", e) traceback.print_exc() def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None): """Update S3 object metadata with retry information""" try: head = s3.head_object(Bucket=bucket, Key=key) current_metadata = head.get('Metadata', {}) or {} new_meta = current_metadata.copy() new_meta['retry_count'] = str(retry_count) new_meta['last_retry'] = str(int(time.time())) if last_error: # S3 metadata values must be ASCII, so we encode the error new_meta['last_error'] = last_error[:255].replace('\n', ' ') s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}") except Exception as e: print(f"Failed to update retry metadata: {e}") def get_retry_count(bucket: str, key: str) -> int: """Get current retry count from S3 metadata""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} return int(metadata.get('retry_count', '0')) except Exception: return 0 def is_temporary_smtp_error(error_code): """Check if SMTP error code indicates a temporary failure (4xx)""" if isinstance(error_code, tuple) and len(error_code) >= 1: code = error_code[0] if isinstance(code, int): return 400 <= code < 500 return False def is_spam_rejection(error_code): """Check if the error is a spam rejection (should not be retried)""" if isinstance(error_code, tuple) and len(error_code) >= 2: code = error_code[0] message = error_code[1] # 554 with spam message is permanent - don't retry if code == 554 and b'spam' in message.lower(): return True return False # ============================================================================ # VERBESSERUNG 3: Exponential Backoff with Jitter # ============================================================================ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, frm_addr, recipients, raw_message, local_helo, max_retries=MAX_RETRIES): """Send email with retry logic for temporary failures using exponential backoff""" delivered = [] refused = {} last_error = None for attempt in range(max_retries + 1): if attempt > 0: # Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY) delay = BASE_RETRY_DELAY * (2 ** (attempt - 1)) # Add jitter to prevent thundering herd jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter total_delay = delay + jitter print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)") time.sleep(total_delay) try: with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp: smtp.ehlo() # Try STARTTLS try: smtp.starttls() smtp.ehlo() except Exception as e: print("STARTTLS not available or failed (continuing):", e) # Login if credentials provided if smtp_user and smtp_pass: try: smtp.login(smtp_user, smtp_pass) except Exception as e: print("SMTP login failed (continuing):", e) # Attempt to send try: send_result = smtp.sendmail(frm_addr, recipients, raw_message) if isinstance(send_result, dict): # Separate temporary and permanent failures temp_refused = {} perm_refused = {} for rcpt, error in send_result.items(): if is_temporary_smtp_error(error): temp_refused[rcpt] = error else: perm_refused[rcpt] = error # If we have temporary failures and more retries, continue if temp_refused and attempt < max_retries: print(f"Temporary failures for {list(temp_refused.keys())}, will retry...") recipients = list(temp_refused.keys()) # Only retry temporary failures refused = perm_refused # Keep track of permanent failures last_error = str(temp_refused) continue else: # No temporary failures or no more retries refused = send_result delivered = [r for r in recipients if r not in refused] break else: # All delivered successfully delivered = recipients[:] refused = {} break except smtplib.SMTPRecipientsRefused as e: print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}") # Check if all are temporary failures temp_refused = {} perm_refused = {} for rcpt, error in e.recipients.items(): if is_temporary_smtp_error(error): temp_refused[rcpt] = error else: perm_refused[rcpt] = error if temp_refused and attempt < max_retries: recipients = list(temp_refused.keys()) refused = perm_refused last_error = str(e) continue else: refused = e.recipients delivered = [r for r in recipients if r not in refused] break except Exception as e: print(f"SMTP sendmail error on attempt {attempt + 1}: {e}") # Check if this is a spam rejection (permanent error that shouldn't be retried) if hasattr(e, 'smtp_code') and hasattr(e, 'smtp_error'): if is_spam_rejection((e.smtp_code, e.smtp_error)): print(f"Email rejected as spam (permanent error), not retrying") refused = {r: (e.smtp_code, e.smtp_error) for r in recipients} delivered = [] break # For other errors, check if it's worth retrying if attempt < max_retries: # Only retry if it might be temporary error_str = str(e) if '554' in error_str and 'spam' in error_str.lower(): print(f"Email rejected as spam, not retrying") refused = {r: ('spam', str(e)) for r in recipients} delivered = [] break else: last_error = str(e) continue else: traceback.print_exc() refused = {r: ('error', str(e)) for r in recipients} delivered = [] break except Exception as e: print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}") if attempt < max_retries: last_error = str(e) continue else: traceback.print_exc() refused = {r: ('connect-error', str(e)) for r in recipients} delivered = [] break return delivered, refused # ============================================================================ # VERBESSERUNG 4: Structured Logging # ============================================================================ def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int): """Log processing results to CloudWatch in structured format for easier analysis""" result = { 'timestamp': int(time.time()), 'bucket': bucket, 'key': key, 'delivered_count': len(delivered), 'refused_count': len(refused), 'retry_count': retry_count, 'delivered_recipients': delivered, 'refused_recipients': list(refused.keys()) if refused else [], 'success': len(delivered) > 0 } # Structured logging for CloudWatch Insights print(f"PROCESSING_RESULT: {json.dumps(result)}") return result # ============================================================================ # MAIN PROCESSING FUNCTION (extracted for timeout handling) # ============================================================================ def _process_email(event, context): """Main email processing logic (extracted for timeout protection)""" print("Event:", event) ses = None try: rec = event['Records'][0] except Exception as e: print("No Records in event:", e) return {'statusCode': 400, 'body': 'No Records'} # Determine bucket/key and initial recipients list recipients = [] bucket = None key = None is_ses_event = False if 'ses' in rec: # SES Event - vertrauenswürdig, hat die korrekten Empfänger is_ses_event = True ses = rec['ses'] msg_id = ses['mail']['messageId'] recipients = ses['receipt'].get('recipients', []) # assume first recipient domain maps to bucket if recipients: domain = recipients[0].split('@', 1)[1] bucket = domain_to_bucket(domain) prefix = f"{msg_id}" print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}") resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) if 'Contents' not in resp_list or not resp_list['Contents']: raise Exception(f"No S3 object under {prefix} in {bucket}") key = resp_list['Contents'][0]['Key'] else: raise Exception("SES event but no recipients found") elif 's3' in rec: # S3 Event - muss Empfänger aus Headers extrahieren s3info = rec['s3'] bucket = s3info['bucket']['name'] key = s3info['object']['key'] print(f"S3 event: bucket={bucket}, key={key}") # recipients will be parsed from message headers below else: raise Exception("Unknown event type") # ======================================================================== # VERBESSERUNG 1: Duplicate Prevention with Processing Lock # ======================================================================== try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} # Check if already processed if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: processed_at = metadata.get('processed_at', 'unknown time') print(f"Object {key} already processed at {processed_at}") return {'statusCode': 200, 'body': 'already processed'} # Check if currently being processed (lock mechanism) processing_started = metadata.get('processing_started') if processing_started: processing_age = time.time() - float(processing_started) if processing_age < 300: # 5 minutes lock print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)") return {'statusCode': 200, 'body': 'already being processed'} else: print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing") # Set processing lock new_meta = metadata.copy() new_meta['processing_started'] = str(int(time.time())) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"Set processing lock on {key}") except s3.exceptions.NoSuchKey: print(f"Object {key} no longer exists, skipping") return {'statusCode': 404, 'body': 'object not found'} except Exception as e: print(f"Error checking/setting processing lock: {e}") # Continue anyway if lock fails (better than dropping email) # Check retry count - if too many retries, give up retry_count = get_retry_count(bucket, key) if retry_count >= MAX_RETRIES * 2: # Safety limit print(f"Object {key} has been retried {retry_count} times, giving up") mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'} # ======================================================================== # VERBESSERUNG 2: Memory Optimization with Size Check # ======================================================================== try: resp = s3.get_object(Bucket=bucket, Key=key) content_length = int(resp.get('ContentLength', 0)) # Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB if content_length > MAX_EMAIL_SIZE_BYTES: print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB") # Mark as processed to prevent infinite retries mark_object_processed(bucket, key) return { 'statusCode': 413, # Payload Too Large 'body': json.dumps({ 'error': 'email_too_large', 'size_mb': round(content_length/1024/1024, 2), 'max_mb': MAX_EMAIL_SIZE_MB }) } raw_bytes = resp['Body'].read() print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}") except Exception as e: print(f"ERROR reading from S3: {e}") traceback.print_exc() return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'} parsed = parse_raw_message(raw_bytes) subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)' frm_addr = None if parsed: from_addrs = getaddresses(parsed.get_all('from', []) or []) frm_addr = from_addrs[0][1] if from_addrs else None if not frm_addr: frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost')) print(f"From: {frm_addr}, Subject: {subj}") # If recipients were not provided (S3 path), extract from headers if not recipients and not is_ses_event: if parsed: expected_domain = bucket_to_domain(bucket).lower() # Debug: Print raw headers to understand what we're getting print(f"=== DEBUG: Header Analysis ===") print(f"Expected domain: {expected_domain}") # The email parser is case-insensitive for headers, so we only need to check once # Get headers using standard case (parser handles case-insensitivity) to_headers = parsed.get_all('to', []) or [] cc_headers = parsed.get_all('cc', []) or [] bcc_headers = parsed.get_all('bcc', []) or [] if to_headers: print(f"Found 'To' header: {to_headers}") if cc_headers: print(f"Found 'Cc' header: {cc_headers}") if bcc_headers: print(f"Found 'Bcc' header: {bcc_headers}") # Parse addresses from headers to_addrs = [addr for _n, addr in getaddresses(to_headers) if addr] cc_addrs = [addr for _n, addr in getaddresses(cc_headers) if addr] bcc_addrs = [addr for _n, addr in getaddresses(bcc_headers) if addr] all_recipients = to_addrs + cc_addrs + bcc_addrs print(f"Parsed recipients - To: {to_addrs}, Cc: {cc_addrs}, Bcc: {bcc_addrs}") # Filter recipients to bucket domain with case-insensitive comparison # and deduplicate using a set (preserving case) recipients_set = set() recipients = [] for addr in all_recipients: # Extract domain part (everything after @) if '@' in addr: addr_lower = addr.lower() addr_domain = addr_lower.split('@')[-1] if addr_domain == expected_domain: # Only add if not already in set (case-insensitive deduplication) if addr_lower not in recipients_set: recipients_set.add(addr_lower) recipients.append(addr) # Keep original case print(f"Matched recipient: {addr} (domain: {addr_domain})") else: print(f"Skipped duplicate: {addr}") else: print(f"Skipped recipient: {addr} (domain: {addr_domain} != {expected_domain})") print(f"Final recipients after domain filter and deduplication: {recipients}") # If no recipients found, try additional headers if not recipients: print("WARNING: No recipients found in standard headers, checking additional headers...") # Check for X-Original-To, Delivered-To, Envelope-To fallback_headers = ['X-Original-To', 'Delivered-To', 'Envelope-To', 'x-original-to', 'delivered-to', 'envelope-to'] for header_name in fallback_headers: header_val = parsed.get(header_name) if header_val: print(f"Found {header_name}: {header_val}") fallback_addrs = [addr for _n, addr in getaddresses([header_val]) if addr] for addr in fallback_addrs: if '@' in addr and addr.split('@')[-1].lower() == expected_domain: recipients.append(addr) print(f"Found recipient in {header_name}: {addr}") if not recipients: print(f"ERROR: Could not find any recipients for domain {expected_domain}") print(f"All addresses found: {all_recipients}") # Print all headers for debugging print("=== All Email Headers ===") for key_h in parsed.keys(): print(f"{key_h}: {parsed.get(key_h)}") print("=== End Headers ===") else: print("ERROR: Could not parse email headers") recipients = [] # If after all we have no recipients, skip SMTP delivered = [] refused = {} if recipients: # Use raw bytes directly (no decoding!) raw_message = raw_bytes # Determine HELO hostname env_local = os.environ.get('SMTP_LOCAL_HOSTNAME') derived_local = bucket_to_domain(bucket) if bucket else None local_helo = env_local or derived_local or 'localhost' print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}") start = time.time() # Send with retry logic delivered, refused = send_email_with_retry( SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, frm_addr, recipients, raw_message, local_helo, max_retries=MAX_RETRIES ) print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}") # Update retry count if we had temporary failures if refused and not delivered: temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)] if temp_failures: update_retry_metadata(bucket, key, retry_count + 1, str(refused)) else: print("No recipients to send to; skipping SMTP.") # Only mark as processed if at least one recipient was delivered successfully if delivered: try: mark_object_processed(bucket, key) except Exception as e: print("Failed to mark object processed after delivery:", e) else: print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.") # Log structured result result = log_processing_result(bucket, key, delivered, refused, retry_count) return { 'statusCode': 200 if delivered else 500, 'body': json.dumps(result) } # ============================================================================ # LAMBDA HANDLER with TIMEOUT PROTECTION # ============================================================================ def lambda_handler(event, context): """ Lambda entry point with timeout protection Recommended Lambda Configuration: - Memory: 512 MB - Timeout: 60 seconds - Environment Variables: - SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS - MAX_SMTP_RETRIES=3 - BASE_RETRY_DELAY=2 - MAX_EMAIL_SIZE_MB=25 """ # Set up timeout protection (stop 5 seconds before Lambda timeout) remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60 safety_margin = 5 # seconds max_execution_time = max(10, remaining_time - safety_margin) signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(int(max_execution_time)) try: return _process_email(event, context) except TimeoutException: print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting") # Don't mark as processed so it can be retried return { 'statusCode': 408, # Request Timeout 'body': json.dumps({ 'error': 'lambda_timeout', 'execution_time': max_execution_time }) } except Exception as e: print(f"FATAL ERROR in lambda_handler: {e}") traceback.print_exc() return { 'statusCode': 500, 'body': json.dumps({ 'error': 'internal_error', 'message': str(e) }) } finally: signal.alarm(0) # Cancel alarm