From b9066a8f590a166b384a9d48b964f05837dcceb9 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Thu, 9 Oct 2025 17:58:27 -0500 Subject: [PATCH] ac --- ses-lambda-new-python/lambda_function.py | 225 ++++++++++++++++++++--- 1 file changed, 197 insertions(+), 28 deletions(-) diff --git a/ses-lambda-new-python/lambda_function.py b/ses-lambda-new-python/lambda_function.py index e2426ee..be51094 100644 --- a/ses-lambda-new-python/lambda_function.py +++ b/ses-lambda-new-python/lambda_function.py @@ -4,6 +4,8 @@ import smtplib import time import traceback import json +import random +import signal from email.parser import BytesParser from email.policy import default from email.utils import getaddresses @@ -12,9 +14,9 @@ s3 = boto3.client('s3') # Environment variables (set these in the Lambda config) SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com') -SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) # default to your mapped port -SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('SMTP_USER') -SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('SMTP_PASS') +SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) +SMTP_USER = os.environ.get('SMTP_USER') +SMTP_PASS = os.environ.get('SMTP_PASS') # Metadata key/value to mark processed objects (only set when at least one recipient delivered) PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') @@ -22,14 +24,32 @@ PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') # Retry configuration MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3')) -RETRY_DELAYS = [1, 5, 15] # Sekunden zwischen Versuchen +BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2')) + +# Email size limit (25 MB - Lambda memory safety margin) +MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25')) +MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024 + + +# ============================================================================ +# VERBESSERUNG 5: Timeout Protection +# ============================================================================ +class TimeoutException(Exception): + pass + + +def timeout_handler(signum, frame): + raise TimeoutException("Lambda approaching timeout") + def domain_to_bucket(domain: str) -> str: return domain.replace('.', '-') + '-emails' + def bucket_to_domain(bucket: str) -> str: return bucket.replace('-emails', '').replace('-', '.') + def parse_raw_message(raw_bytes: bytes): try: # Use SMTP policy for better compatibility with various email formats @@ -44,6 +64,7 @@ def parse_raw_message(raw_bytes: bytes): parsed = None return parsed + def mark_object_processed(bucket: str, key: str): try: head = s3.head_object(Bucket=bucket, Key=key) @@ -53,6 +74,9 @@ def mark_object_processed(bucket: str, key: str): return new_meta = current_metadata.copy() new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE + new_meta['processed_at'] = str(int(time.time())) + # Remove processing lock + new_meta.pop('processing_started', None) # Copy object onto itself replacing metadata s3.copy_object( Bucket=bucket, @@ -61,11 +85,12 @@ def mark_object_processed(bucket: str, key: str): Metadata=new_meta, MetadataDirective='REPLACE' ) - print(f"Marked {bucket}/{key} as processed.") + print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}") except Exception as e: print("Failed to mark processed metadata:", e) traceback.print_exc() + def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None): """Update S3 object metadata with retry information""" try: @@ -89,6 +114,7 @@ def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: s except Exception as e: print(f"Failed to update retry metadata: {e}") + def get_retry_count(bucket: str, key: str) -> int: """Get current retry count from S3 metadata""" try: @@ -98,6 +124,7 @@ def get_retry_count(bucket: str, key: str) -> int: except Exception: return 0 + def is_temporary_smtp_error(error_code): """Check if SMTP error code indicates a temporary failure (4xx)""" if isinstance(error_code, tuple) and len(error_code) >= 1: @@ -106,6 +133,7 @@ def is_temporary_smtp_error(error_code): return 400 <= code < 500 return False + def is_spam_rejection(error_code): """Check if the error is a spam rejection (should not be retried)""" if isinstance(error_code, tuple) and len(error_code) >= 2: @@ -116,10 +144,14 @@ def is_spam_rejection(error_code): return True return False + +# ============================================================================ +# VERBESSERUNG 3: Exponential Backoff with Jitter +# ============================================================================ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, frm_addr, recipients, raw_message, local_helo, max_retries=MAX_RETRIES): - """Send email with retry logic for temporary failures""" + """Send email with retry logic for temporary failures using exponential backoff""" delivered = [] refused = {} @@ -127,9 +159,13 @@ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, for attempt in range(max_retries + 1): if attempt > 0: - delay = RETRY_DELAYS[min(attempt - 1, len(RETRY_DELAYS) - 1)] - print(f"Retry attempt {attempt}/{max_retries} after {delay}s delay...") - time.sleep(delay) + # Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY) + delay = BASE_RETRY_DELAY * (2 ** (attempt - 1)) + # Add jitter to prevent thundering herd + jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter + total_delay = delay + jitter + print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)") + time.sleep(total_delay) try: with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp: @@ -247,7 +283,34 @@ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, return delivered, refused -def lambda_handler(event, context): + +# ============================================================================ +# VERBESSERUNG 4: Structured Logging +# ============================================================================ +def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int): + """Log processing results to CloudWatch in structured format for easier analysis""" + result = { + 'timestamp': int(time.time()), + 'bucket': bucket, + 'key': key, + 'delivered_count': len(delivered), + 'refused_count': len(refused), + 'retry_count': retry_count, + 'delivered_recipients': delivered, + 'refused_recipients': list(refused.keys()) if refused else [], + 'success': len(delivered) > 0 + } + + # Structured logging for CloudWatch Insights + print(f"PROCESSING_RESULT: {json.dumps(result)}") + return result + + +# ============================================================================ +# MAIN PROCESSING FUNCTION (extracted for timeout handling) +# ============================================================================ +def _process_email(event, context): + """Main email processing logic (extracted for timeout protection)""" print("Event:", event) ses = None try: @@ -290,25 +353,83 @@ def lambda_handler(event, context): else: raise Exception("Unknown event type") - # Check if already processed + # ======================================================================== + # VERBESSERUNG 1: Duplicate Prevention with Processing Lock + # ======================================================================== try: head = s3.head_object(Bucket=bucket, Key=key) - if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: - print(f"Object {key} in {bucket} already processed. Exiting.") + metadata = head.get('Metadata', {}) or {} + + # Check if already processed + if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: + processed_at = metadata.get('processed_at', 'unknown time') + print(f"Object {key} already processed at {processed_at}") return {'statusCode': 200, 'body': 'already processed'} + + # Check if currently being processed (lock mechanism) + processing_started = metadata.get('processing_started') + if processing_started: + processing_age = time.time() - float(processing_started) + if processing_age < 300: # 5 minutes lock + print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)") + return {'statusCode': 200, 'body': 'already being processed'} + else: + print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing") + + # Set processing lock + new_meta = metadata.copy() + new_meta['processing_started'] = str(int(time.time())) + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_meta, + MetadataDirective='REPLACE' + ) + print(f"Set processing lock on {key}") + + except s3.exceptions.NoSuchKey: + print(f"Object {key} no longer exists, skipping") + return {'statusCode': 404, 'body': 'object not found'} except Exception as e: - print("head_object error (continuing):", e) + print(f"Error checking/setting processing lock: {e}") + # Continue anyway if lock fails (better than dropping email) # Check retry count - if too many retries, give up retry_count = get_retry_count(bucket, key) if retry_count >= MAX_RETRIES * 2: # Safety limit print(f"Object {key} has been retried {retry_count} times, giving up") + mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'} - # Get raw mail bytes - resp = s3.get_object(Bucket=bucket, Key=key) - raw_bytes = resp['Body'].read() - print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}") + # ======================================================================== + # VERBESSERUNG 2: Memory Optimization with Size Check + # ======================================================================== + try: + resp = s3.get_object(Bucket=bucket, Key=key) + content_length = int(resp.get('ContentLength', 0)) + + # Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB + if content_length > MAX_EMAIL_SIZE_BYTES: + print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB") + # Mark as processed to prevent infinite retries + mark_object_processed(bucket, key) + return { + 'statusCode': 413, # Payload Too Large + 'body': json.dumps({ + 'error': 'email_too_large', + 'size_mb': round(content_length/1024/1024, 2), + 'max_mb': MAX_EMAIL_SIZE_MB + }) + } + + raw_bytes = resp['Body'].read() + print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}") + + except Exception as e: + print(f"ERROR reading from S3: {e}") + traceback.print_exc() + return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'} parsed = parse_raw_message(raw_bytes) subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)' @@ -396,8 +517,8 @@ def lambda_handler(event, context): print(f"All addresses found: {all_recipients}") # Print all headers for debugging print("=== All Email Headers ===") - for key in parsed.keys(): - print(f"{key}: {parsed.get(key)}") + for key_h in parsed.keys(): + print(f"{key_h}: {parsed.get(key_h)}") print("=== End Headers ===") else: print("ERROR: Could not parse email headers") @@ -444,12 +565,60 @@ def lambda_handler(event, context): else: print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.") + # Log structured result + result = log_processing_result(bucket, key, delivered, refused, retry_count) + return { - 'statusCode': 200, - 'body': json.dumps({ - 'processed': bool(delivered), - 'delivered': delivered, - 'refused_count': len(refused), - 'retry_count': retry_count - }) - } \ No newline at end of file + 'statusCode': 200 if delivered else 500, + 'body': json.dumps(result) + } + + +# ============================================================================ +# LAMBDA HANDLER with TIMEOUT PROTECTION +# ============================================================================ +def lambda_handler(event, context): + """ + Lambda entry point with timeout protection + Recommended Lambda Configuration: + - Memory: 512 MB + - Timeout: 60 seconds + - Environment Variables: + - SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS + - MAX_SMTP_RETRIES=3 + - BASE_RETRY_DELAY=2 + - MAX_EMAIL_SIZE_MB=25 + """ + + # Set up timeout protection (stop 5 seconds before Lambda timeout) + remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60 + safety_margin = 5 # seconds + max_execution_time = max(10, remaining_time - safety_margin) + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(max_execution_time)) + + try: + return _process_email(event, context) + except TimeoutException: + print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting") + # Don't mark as processed so it can be retried + return { + 'statusCode': 408, # Request Timeout + 'body': json.dumps({ + 'error': 'lambda_timeout', + 'execution_time': max_execution_time + }) + } + except Exception as e: + print(f"FATAL ERROR in lambda_handler: {e}") + traceback.print_exc() + return { + 'statusCode': 500, + 'body': json.dumps({ + 'error': 'internal_error', + 'message': str(e) + }) + } + finally: + signal.alarm(0) # Cancel alarm \ No newline at end of file