From 286de26c87ad690bc2300170195ddfed39df9d6f Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Thu, 16 Oct 2025 21:34:23 -0500 Subject: [PATCH] actual --- ses-lambda-new-python/lambda_function.py | 899 +++++++++-------------- 1 file changed, 331 insertions(+), 568 deletions(-) diff --git a/ses-lambda-new-python/lambda_function.py b/ses-lambda-new-python/lambda_function.py index 7170406..6cae4de 100644 --- a/ses-lambda-new-python/lambda_function.py +++ b/ses-lambda-new-python/lambda_function.py @@ -1,624 +1,387 @@ import os import boto3 -import smtplib -import time -import traceback import json -import random -import signal +import time from email.parser import BytesParser -from email.policy import default -from email.utils import getaddresses +from email.policy import SMTP as SMTPPolicy s3 = boto3.client('s3') +sqs = boto3.client('sqs', region_name='us-east-2') -# Environment variables (set these in the Lambda config) -SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com') -SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) -SMTP_USER = os.environ.get('SMTP_USER') -SMTP_PASS = os.environ.get('SMTP_PASS') +# AWS Region +AWS_REGION = 'us-east-2' -# Metadata key/value to mark processed objects (only set when at least one recipient delivered) -PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') -PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') - -# Retry configuration -MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3')) -BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2')) - -# Email size limit (25 MB - Lambda memory safety margin) -MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25')) -MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024 - - -# ============================================================================ -# VERBESSERUNG 5: Timeout Protection -# ============================================================================ -class TimeoutException(Exception): - pass - - -def timeout_handler(signum, frame): - raise TimeoutException("Lambda approaching timeout") +# Metadata Keys +PROCESSED_KEY = 'processed' +PROCESSED_VALUE = 'true' def domain_to_bucket(domain: str) -> str: + """Konvertiert Domain zu S3 Bucket Namen""" return domain.replace('.', '-') + '-emails' -def bucket_to_domain(bucket: str) -> str: - return bucket.replace('-emails', '').replace('-', '.') +def domain_to_queue_name(domain: str) -> str: + """Konvertiert Domain zu SQS Queue Namen""" + return domain.replace('.', '-') + '-queue' -def parse_raw_message(raw_bytes: bytes): +def get_queue_url_for_domain(domain: str) -> str: + """ + Ermittelt SQS Queue URL für Domain + Queue Name: domain-mit-bindestrichen-queue + """ + queue_name = domain_to_queue_name(domain) + try: - # Use SMTP policy for better compatibility with various email formats - from email.policy import SMTP - parsed = BytesParser(policy=SMTP).parsebytes(raw_bytes) - except Exception as e: - print(f"Error parsing with SMTP policy: {e}, trying with default policy") - try: - parsed = BytesParser(policy=default).parsebytes(raw_bytes) - except Exception as e2: - print(f"Error parsing with default policy: {e2}") - parsed = None - return parsed - - -def mark_object_processed(bucket: str, key: str): - try: - head = s3.head_object(Bucket=bucket, Key=key) - current_metadata = head.get('Metadata', {}) or {} - if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: - print(f"Object {key} in {bucket} already marked processed.") - return - new_meta = current_metadata.copy() - new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE - new_meta['processed_at'] = str(int(time.time())) - # Remove processing lock - new_meta.pop('processing_started', None) - # Copy object onto itself replacing metadata - s3.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata=new_meta, - MetadataDirective='REPLACE' - ) - print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}") - except Exception as e: - print("Failed to mark processed metadata:", e) - traceback.print_exc() - - -def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None): - """Update S3 object metadata with retry information""" - try: - head = s3.head_object(Bucket=bucket, Key=key) - current_metadata = head.get('Metadata', {}) or {} - new_meta = current_metadata.copy() - new_meta['retry_count'] = str(retry_count) - new_meta['last_retry'] = str(int(time.time())) - if last_error: - # S3 metadata values must be ASCII, so we encode the error - new_meta['last_error'] = last_error[:255].replace('\n', ' ') + response = sqs.get_queue_url(QueueName=queue_name) + queue_url = response['QueueUrl'] + print(f"✓ Found queue: {queue_name}") + return queue_url - s3.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata=new_meta, - MetadataDirective='REPLACE' + except sqs.exceptions.QueueDoesNotExist: + raise Exception( + f"Queue does not exist: {queue_name} " + f"(for domain: {domain})" ) - print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}") except Exception as e: - print(f"Failed to update retry metadata: {e}") + raise Exception(f"Error getting queue URL for {domain}: {e}") -def get_retry_count(bucket: str, key: str) -> int: - """Get current retry count from S3 metadata""" +def is_already_processed(bucket: str, key: str) -> bool: + """Prüft ob E-Mail bereits verarbeitet wurde""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} - return int(metadata.get('retry_count', '0')) - except Exception: - return 0 - - -def is_temporary_smtp_error(error_code): - """Check if SMTP error code indicates a temporary failure (4xx)""" - if isinstance(error_code, tuple) and len(error_code) >= 1: - code = error_code[0] - if isinstance(code, int): - return 400 <= code < 500 - return False - - -def is_spam_rejection(error_code): - """Check if the error is a spam rejection (should not be retried)""" - if isinstance(error_code, tuple) and len(error_code) >= 2: - code = error_code[0] - message = error_code[1] - # 554 with spam message is permanent - don't retry - if code == 554 and b'spam' in message.lower(): + + if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: + processed_at = metadata.get('processed_at', 'unknown') + print(f"✓ Already processed at {processed_at}") return True - return False - - -# ============================================================================ -# VERBESSERUNG 3: Exponential Backoff with Jitter -# ============================================================================ -def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, - frm_addr, recipients, raw_message, local_helo, - max_retries=MAX_RETRIES): - """Send email with retry logic for temporary failures using exponential backoff""" - - delivered = [] - refused = {} - last_error = None - - for attempt in range(max_retries + 1): - if attempt > 0: - # Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY) - delay = BASE_RETRY_DELAY * (2 ** (attempt - 1)) - # Add jitter to prevent thundering herd - jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter - total_delay = delay + jitter - print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)") - time.sleep(total_delay) - try: - with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp: - smtp.ehlo() - - # Try STARTTLS - try: - smtp.starttls() - smtp.ehlo() - except Exception as e: - print("STARTTLS not available or failed (continuing):", e) - - # Login if credentials provided - if smtp_user and smtp_pass: - try: - smtp.login(smtp_user, smtp_pass) - except Exception as e: - print("SMTP login failed (continuing):", e) - - # Attempt to send - try: - send_result = smtp.sendmail(frm_addr, recipients, raw_message) - - if isinstance(send_result, dict): - # Separate temporary and permanent failures - temp_refused = {} - perm_refused = {} - - for rcpt, error in send_result.items(): - if is_temporary_smtp_error(error): - temp_refused[rcpt] = error - else: - perm_refused[rcpt] = error - - # If we have temporary failures and more retries, continue - if temp_refused and attempt < max_retries: - print(f"Temporary failures for {list(temp_refused.keys())}, will retry...") - recipients = list(temp_refused.keys()) # Only retry temporary failures - refused = perm_refused # Keep track of permanent failures - last_error = str(temp_refused) - continue - else: - # No temporary failures or no more retries - refused = send_result - delivered = [r for r in recipients if r not in refused] - break - else: - # All delivered successfully - delivered = recipients[:] - refused = {} - break - - except smtplib.SMTPRecipientsRefused as e: - print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}") - - # Check if all are temporary failures - temp_refused = {} - perm_refused = {} - - for rcpt, error in e.recipients.items(): - if is_temporary_smtp_error(error): - temp_refused[rcpt] = error - else: - perm_refused[rcpt] = error - - if temp_refused and attempt < max_retries: - recipients = list(temp_refused.keys()) - refused = perm_refused - last_error = str(e) - continue - else: - refused = e.recipients - delivered = [r for r in recipients if r not in refused] - break - - except Exception as e: - print(f"SMTP sendmail error on attempt {attempt + 1}: {e}") - - # Check if this is a spam rejection (permanent error that shouldn't be retried) - if hasattr(e, 'smtp_code') and hasattr(e, 'smtp_error'): - if is_spam_rejection((e.smtp_code, e.smtp_error)): - print(f"Email rejected as spam (permanent error), not retrying") - refused = {r: (e.smtp_code, e.smtp_error) for r in recipients} - delivered = [] - break - - # For other errors, check if it's worth retrying - if attempt < max_retries: - # Only retry if it might be temporary - error_str = str(e) - if '554' in error_str and 'spam' in error_str.lower(): - print(f"Email rejected as spam, not retrying") - refused = {r: ('spam', str(e)) for r in recipients} - delivered = [] - break - else: - last_error = str(e) - continue - else: - traceback.print_exc() - refused = {r: ('error', str(e)) for r in recipients} - delivered = [] - break - - except Exception as e: - print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}") - if attempt < max_retries: - last_error = str(e) - continue - else: - traceback.print_exc() - refused = {r: ('connect-error', str(e)) for r in recipients} - delivered = [] - break - - return delivered, refused - - -# ============================================================================ -# VERBESSERUNG 4: Structured Logging -# ============================================================================ -def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int): - """Log processing results to CloudWatch in structured format for easier analysis""" - result = { - 'timestamp': int(time.time()), - 'bucket': bucket, - 'key': key, - 'delivered_count': len(delivered), - 'refused_count': len(refused), - 'retry_count': retry_count, - 'delivered_recipients': delivered, - 'refused_recipients': list(refused.keys()) if refused else [], - 'success': len(delivered) > 0 - } - - # Structured logging for CloudWatch Insights - print(f"PROCESSING_RESULT: {json.dumps(result)}") - return result - - -# ============================================================================ -# MAIN PROCESSING FUNCTION (extracted for timeout handling) -# ============================================================================ -def _process_email(event, context): - """Main email processing logic (extracted for timeout protection)""" - print("Event:", event) - ses = None - try: - rec = event['Records'][0] - except Exception as e: - print("No Records in event:", e) - return {'statusCode': 400, 'body': 'No Records'} - - # Determine bucket/key and initial recipients list - recipients = [] - bucket = None - key = None - is_ses_event = False - - if 'ses' in rec: - # SES Event - vertrauenswürdig, hat die korrekten Empfänger - is_ses_event = True - ses = rec['ses'] - msg_id = ses['mail']['messageId'] - recipients = ses['receipt'].get('recipients', []) - # assume first recipient domain maps to bucket - if recipients: - domain = recipients[0].split('@', 1)[1] - bucket = domain_to_bucket(domain) - prefix = f"{msg_id}" - print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}") - resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) - if 'Contents' not in resp_list or not resp_list['Contents']: - raise Exception(f"No S3 object under {prefix} in {bucket}") - key = resp_list['Contents'][0]['Key'] - else: - raise Exception("SES event but no recipients found") - elif 's3' in rec: - # S3 Event - muss Empfänger aus Headers extrahieren - s3info = rec['s3'] - bucket = s3info['bucket']['name'] - key = s3info['object']['key'] - print(f"S3 event: bucket={bucket}, key={key}") - # recipients will be parsed from message headers below - else: - raise Exception("Unknown event type") - - # ======================================================================== - # VERBESSERUNG 1: Duplicate Prevention with Processing Lock - # ======================================================================== - try: - head = s3.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) or {} - - # Check if already processed - if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: - processed_at = metadata.get('processed_at', 'unknown time') - print(f"Object {key} already processed at {processed_at}") - return {'statusCode': 200, 'body': 'already processed'} - - # Check if currently being processed (lock mechanism) - processing_started = metadata.get('processing_started') - if processing_started: - processing_age = time.time() - float(processing_started) - if processing_age < 300: # 5 minutes lock - print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)") - return {'statusCode': 200, 'body': 'already being processed'} - else: - print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing") - - # Set processing lock - new_meta = metadata.copy() - new_meta['processing_started'] = str(int(time.time())) - s3.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata=new_meta, - MetadataDirective='REPLACE' - ) - print(f"Set processing lock on {key}") + return False except s3.exceptions.NoSuchKey: - print(f"Object {key} no longer exists, skipping") - return {'statusCode': 404, 'body': 'object not found'} + print(f"⚠ Object {key} not found in {bucket}") + return True # Wenn nicht existiert, als verarbeitet betrachten + except Exception as e: - print(f"Error checking/setting processing lock: {e}") - # Continue anyway if lock fails (better than dropping email) + print(f"⚠ Error checking processed status: {e}") + return False - # Check retry count - if too many retries, give up - retry_count = get_retry_count(bucket, key) - if retry_count >= MAX_RETRIES * 2: # Safety limit - print(f"Object {key} has been retried {retry_count} times, giving up") - mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries - return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'} - # ======================================================================== - # VERBESSERUNG 2: Memory Optimization with Size Check - # ======================================================================== +def set_processing_lock(bucket: str, key: str) -> bool: + """ + Setzt Processing Lock um Duplicate Processing zu verhindern + Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked + """ try: - resp = s3.get_object(Bucket=bucket, Key=key) - content_length = int(resp.get('ContentLength', 0)) + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} - # Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB - if content_length > MAX_EMAIL_SIZE_BYTES: - print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB") - # Mark as processed to prevent infinite retries - mark_object_processed(bucket, key) - return { - 'statusCode': 413, # Payload Too Large - 'body': json.dumps({ - 'error': 'email_too_large', - 'size_mb': round(content_length/1024/1024, 2), - 'max_mb': MAX_EMAIL_SIZE_MB - }) - } + # Prüfe auf existierenden Lock + processing_started = metadata.get('processing_started') + if processing_started: + lock_age = time.time() - float(processing_started) + + if lock_age < 300: # 5 Minuten Lock + print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") + return False + else: + print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") - raw_bytes = resp['Body'].read() - print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}") + # Setze neuen Lock + new_meta = metadata.copy() + new_meta['processing_started'] = str(int(time.time())) - except Exception as e: - print(f"ERROR reading from S3: {e}") - traceback.print_exc() - return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'} - - parsed = parse_raw_message(raw_bytes) - subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)' - frm_addr = None - if parsed: - from_addrs = getaddresses(parsed.get_all('from', []) or []) - frm_addr = from_addrs[0][1] if from_addrs else None - if not frm_addr: - frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost')) - print(f"From: {frm_addr}, Subject: {subj}") - - # If recipients were not provided (S3 path), extract from headers - if not recipients and not is_ses_event: - if parsed: - expected_domain = bucket_to_domain(bucket).lower() - - # Debug: Print raw headers to understand what we're getting - print(f"=== DEBUG: Header Analysis ===") - print(f"Expected domain: {expected_domain}") - - # The email parser is case-insensitive for headers, so we only need to check once - # Get headers using standard case (parser handles case-insensitivity) - to_headers = parsed.get_all('to', []) or [] - cc_headers = parsed.get_all('cc', []) or [] - bcc_headers = parsed.get_all('bcc', []) or [] - - if to_headers: - print(f"Found 'To' header: {to_headers}") - if cc_headers: - print(f"Found 'Cc' header: {cc_headers}") - if bcc_headers: - print(f"Found 'Bcc' header: {bcc_headers}") - - # Parse addresses from headers - to_addrs = [addr for _n, addr in getaddresses(to_headers) if addr] - cc_addrs = [addr for _n, addr in getaddresses(cc_headers) if addr] - bcc_addrs = [addr for _n, addr in getaddresses(bcc_headers) if addr] - - all_recipients = to_addrs + cc_addrs + bcc_addrs - - print(f"Parsed recipients - To: {to_addrs}, Cc: {cc_addrs}, Bcc: {bcc_addrs}") - - # Filter recipients to bucket domain with case-insensitive comparison - # and deduplicate using a set (preserving case) - recipients_set = set() - recipients = [] - for addr in all_recipients: - # Extract domain part (everything after @) - if '@' in addr: - addr_lower = addr.lower() - addr_domain = addr_lower.split('@')[-1] - if addr_domain == expected_domain: - # Only add if not already in set (case-insensitive deduplication) - if addr_lower not in recipients_set: - recipients_set.add(addr_lower) - recipients.append(addr) # Keep original case - print(f"Matched recipient: {addr} (domain: {addr_domain})") - else: - print(f"Skipped duplicate: {addr}") - else: - print(f"Skipped recipient: {addr} (domain: {addr_domain} != {expected_domain})") - - print(f"Final recipients after domain filter and deduplication: {recipients}") - - # If no recipients found, try additional headers - if not recipients: - print("WARNING: No recipients found in standard headers, checking additional headers...") - - # Check for X-Original-To, Delivered-To, Envelope-To - fallback_headers = ['X-Original-To', 'Delivered-To', 'Envelope-To', - 'x-original-to', 'delivered-to', 'envelope-to'] - - for header_name in fallback_headers: - header_val = parsed.get(header_name) - if header_val: - print(f"Found {header_name}: {header_val}") - fallback_addrs = [addr for _n, addr in getaddresses([header_val]) if addr] - for addr in fallback_addrs: - if '@' in addr and addr.split('@')[-1].lower() == expected_domain: - recipients.append(addr) - print(f"Found recipient in {header_name}: {addr}") - - if not recipients: - print(f"ERROR: Could not find any recipients for domain {expected_domain}") - print(f"All addresses found: {all_recipients}") - # Print all headers for debugging - print("=== All Email Headers ===") - for key_h in parsed.keys(): - print(f"{key_h}: {parsed.get(key_h)}") - print("=== End Headers ===") - else: - print("ERROR: Could not parse email headers") - recipients = [] - - # If after all we have no recipients, skip SMTP - delivered = [] - refused = {} - if recipients: - # Use raw bytes directly (no decoding!) - raw_message = raw_bytes - - # Determine HELO hostname - env_local = os.environ.get('SMTP_LOCAL_HOSTNAME') - derived_local = bucket_to_domain(bucket) if bucket else None - local_helo = env_local or derived_local or 'localhost' - - print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}") - start = time.time() - - # Send with retry logic - delivered, refused = send_email_with_retry( - SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, - frm_addr, recipients, raw_message, local_helo, - max_retries=MAX_RETRIES + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_meta, + MetadataDirective='REPLACE' ) - print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}") + print(f"✓ Processing lock set") + return True - # Update retry count if we had temporary failures - if refused and not delivered: - temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)] - if temp_failures: - update_retry_metadata(bucket, key, retry_count + 1, str(refused)) - else: - print("No recipients to send to; skipping SMTP.") + except Exception as e: + print(f"⚠ Error setting processing lock: {e}") + return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren) - # Only mark as processed if at least one recipient was delivered successfully - if delivered: - try: - mark_object_processed(bucket, key) - except Exception as e: - print("Failed to mark object processed after delivery:", e) - else: - print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.") - # Log structured result - result = log_processing_result(bucket, key, delivered, refused, retry_count) +def mark_as_queued(bucket: str, key: str, queue_name: str): + """Markiert E-Mail als in Queue eingereiht""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['queued_at'] = str(int(time.time())) + metadata['queued_to'] = queue_name + metadata['status'] = 'queued' + metadata.pop('processing_started', None) # Lock entfernen + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + print(f"✓ Marked as queued to {queue_name}") + + except Exception as e: + print(f"⚠ Failed to mark as queued: {e}") + + +def send_to_queue(queue_url: str, bucket: str, key: str, + from_addr: str, recipient: str, domain: str, + subject: str, message_id: str): + """ + Sendet E-Mail-Job in domain-spezifische SQS Queue + """ - return { - 'statusCode': 200 if delivered else 500, - 'body': json.dumps(result) + # Queue Name aus URL extrahieren (für Logging) + queue_name = queue_url.split('/')[-1] + + message = { + 'bucket': bucket, + 'key': key, + 'from': from_addr, + 'recipient': recipient, # Nur 1 Empfänger + 'domain': domain, + 'subject': subject, + 'message_id': message_id, + 'timestamp': int(time.time()) } - - -# ============================================================================ -# LAMBDA HANDLER with TIMEOUT PROTECTION -# ============================================================================ -def lambda_handler(event, context): - """ - Lambda entry point with timeout protection - Recommended Lambda Configuration: - - Memory: 512 MB - - Timeout: 60 seconds - - Environment Variables: - - SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS - - MAX_SMTP_RETRIES=3 - - BASE_RETRY_DELAY=2 - - MAX_EMAIL_SIZE_MB=25 - """ - - # Set up timeout protection (stop 5 seconds before Lambda timeout) - remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60 - safety_margin = 5 # seconds - max_execution_time = max(10, remaining_time - safety_margin) - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(int(max_execution_time)) try: - return _process_email(event, context) - except TimeoutException: - print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting") - # Don't mark as processed so it can be retried + response = sqs.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps(message, ensure_ascii=False), + MessageAttributes={ + 'domain': { + 'StringValue': domain, + 'DataType': 'String' + }, + 'bucket': { + 'StringValue': bucket, + 'DataType': 'String' + }, + 'recipient': { + 'StringValue': recipient, + 'DataType': 'String' + }, + 'message_id': { + 'StringValue': message_id, + 'DataType': 'String' + } + } + ) + + sqs_message_id = response['MessageId'] + print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") + + # Als queued markieren + mark_as_queued(bucket, key, queue_name) + + return sqs_message_id + + except Exception as e: + print(f"✗ Failed to queue message: {e}") + raise + + +def lambda_handler(event, context): + """ + Lambda Handler für SES Events + WICHTIG: SES ruft Lambda einmal PRO Empfänger auf! + Jedes Event hat genau 1 Empfänger in receipt.recipients + """ + + print(f"{'='*70}") + print(f"Lambda invoked: {context.aws_request_id}") + print(f"Region: {AWS_REGION}") + print(f"{'='*70}") + + # SES Event parsen + try: + record = event['Records'][0] + ses = record['ses'] + except (KeyError, IndexError) as e: + print(f"✗ Invalid event structure: {e}") return { - 'statusCode': 408, # Request Timeout + 'statusCode': 400, + 'body': json.dumps({'error': 'Invalid SES event'}) + } + + mail = ses['mail'] + receipt = ses['receipt'] + + message_id = mail['messageId'] + source = mail['source'] + timestamp = mail.get('timestamp', '') + + # ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event + # (NICHT mail.destination verwenden - das hat alle Original-Empfänger) + recipients = receipt.get('recipients', []) + + if not recipients or len(recipients) != 1: + print(f"✗ Unexpected recipients count: {len(recipients)}") + return { + 'statusCode': 400, 'body': json.dumps({ - 'error': 'lambda_timeout', - 'execution_time': max_execution_time + 'error': 'Expected exactly 1 recipient', + 'found': len(recipients) }) } + + # SES garantiert: genau 1 Empfänger pro Event + recipient = recipients[0] + domain = recipient.split('@')[1] + bucket = domain_to_bucket(domain) + + print(f"\n📧 Email Event:") + print(f" MessageId: {message_id}") + print(f" From: {source}") + print(f" To: {recipient}") + print(f" Domain: {domain}") + print(f" Bucket: {bucket}") + print(f" Timestamp: {timestamp}") + + # Queue für Domain ermitteln + try: + queue_url = get_queue_url_for_domain(domain) + queue_name = queue_url.split('/')[-1] + print(f" Queue: {queue_name}") except Exception as e: - print(f"FATAL ERROR in lambda_handler: {e}") - traceback.print_exc() + print(f"\n✗ ERROR: {e}") return { 'statusCode': 500, 'body': json.dumps({ - 'error': 'internal_error', + 'error': 'queue_not_configured', + 'domain': domain, + 'recipient': recipient, 'message': str(e) }) } - finally: - signal.alarm(0) # Cancel alarm \ No newline at end of file + + # S3 Object finden + try: + print(f"\n📦 Searching S3...") + response = s3.list_objects_v2( + Bucket=bucket, + Prefix=message_id, + MaxKeys=1 + ) + + if 'Contents' not in response or not response['Contents']: + raise Exception(f"No S3 object found for message {message_id}") + + key = response['Contents'][0]['Key'] + size = response['Contents'][0]['Size'] + print(f" Found: s3://{bucket}/{key}") + print(f" Size: {size:,} bytes ({size/1024:.1f} KB)") + + except Exception as e: + print(f"\n✗ S3 ERROR: {e}") + return { + 'statusCode': 404, + 'body': json.dumps({ + 'error': 's3_object_not_found', + 'message_id': message_id, + 'bucket': bucket, + 'details': str(e) + }) + } + + # Duplicate Check + print(f"\n🔍 Checking for duplicates...") + if is_already_processed(bucket, key): + print(f" Already processed, skipping") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'already_processed', + 'message_id': message_id, + 'recipient': recipient + }) + } + + # Processing Lock setzen + print(f"\n🔒 Setting processing lock...") + if not set_processing_lock(bucket, key): + print(f" Already being processed by another instance") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'already_processing', + 'message_id': message_id, + 'recipient': recipient + }) + } + + # E-Mail laden um Subject zu extrahieren (optional, für besseres Logging) + subject = '(unknown)' + try: + print(f"\n📖 Reading email for metadata...") + obj = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = obj['Body'].read() + + # Nur Headers parsen (schneller) + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + subject = parsed.get('subject', '(no subject)') + + print(f" Subject: {subject}") + + except Exception as e: + print(f" ⚠ Could not parse email (continuing): {e}") + + # In domain-spezifische Queue einreihen + try: + print(f"\n📤 Queuing to {queue_name}...") + + sqs_message_id = send_to_queue( + queue_url=queue_url, + bucket=bucket, + key=key, + from_addr=source, + recipient=recipient, # Nur 1 Empfänger + domain=domain, + subject=subject, + message_id=message_id + ) + + print(f"\n{'='*70}") + print(f"✅ SUCCESS - Email queued for delivery") + print(f"{'='*70}\n") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'queued', + 'message_id': message_id, + 'sqs_message_id': sqs_message_id, + 'queue': queue_name, + 'domain': domain, + 'recipient': recipient, + 'subject': subject + }) + } + + except Exception as e: + print(f"\n{'='*70}") + print(f"✗ FAILED TO QUEUE") + print(f"{'='*70}") + print(f"Error: {e}") + + return { + 'statusCode': 500, + 'body': json.dumps({ + 'error': 'failed_to_queue', + 'message': str(e), + 'message_id': message_id, + 'recipient': recipient + }) + } \ No newline at end of file