This commit is contained in:
parent
5533fbff14
commit
b9066a8f59
|
|
@ -4,6 +4,8 @@ import smtplib
|
|||
import time
|
||||
import traceback
|
||||
import json
|
||||
import random
|
||||
import signal
|
||||
from email.parser import BytesParser
|
||||
from email.policy import default
|
||||
from email.utils import getaddresses
|
||||
|
|
@ -12,9 +14,9 @@ s3 = boto3.client('s3')
|
|||
|
||||
# Environment variables (set these in the Lambda config)
|
||||
SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com')
|
||||
SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) # default to your mapped port
|
||||
SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('SMTP_USER')
|
||||
SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('SMTP_PASS')
|
||||
SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525'))
|
||||
SMTP_USER = os.environ.get('SMTP_USER')
|
||||
SMTP_PASS = os.environ.get('SMTP_PASS')
|
||||
|
||||
# Metadata key/value to mark processed objects (only set when at least one recipient delivered)
|
||||
PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed')
|
||||
|
|
@ -22,14 +24,32 @@ PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true')
|
|||
|
||||
# Retry configuration
|
||||
MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3'))
|
||||
RETRY_DELAYS = [1, 5, 15] # Sekunden zwischen Versuchen
|
||||
BASE_RETRY_DELAY = int(os.environ.get('BASE_RETRY_DELAY', '2'))
|
||||
|
||||
# Email size limit (25 MB - Lambda memory safety margin)
|
||||
MAX_EMAIL_SIZE_MB = int(os.environ.get('MAX_EMAIL_SIZE_MB', '25'))
|
||||
MAX_EMAIL_SIZE_BYTES = MAX_EMAIL_SIZE_MB * 1024 * 1024
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# VERBESSERUNG 5: Timeout Protection
|
||||
# ============================================================================
|
||||
class TimeoutException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutException("Lambda approaching timeout")
|
||||
|
||||
|
||||
def domain_to_bucket(domain: str) -> str:
|
||||
return domain.replace('.', '-') + '-emails'
|
||||
|
||||
|
||||
def bucket_to_domain(bucket: str) -> str:
|
||||
return bucket.replace('-emails', '').replace('-', '.')
|
||||
|
||||
|
||||
def parse_raw_message(raw_bytes: bytes):
|
||||
try:
|
||||
# Use SMTP policy for better compatibility with various email formats
|
||||
|
|
@ -44,6 +64,7 @@ def parse_raw_message(raw_bytes: bytes):
|
|||
parsed = None
|
||||
return parsed
|
||||
|
||||
|
||||
def mark_object_processed(bucket: str, key: str):
|
||||
try:
|
||||
head = s3.head_object(Bucket=bucket, Key=key)
|
||||
|
|
@ -53,6 +74,9 @@ def mark_object_processed(bucket: str, key: str):
|
|||
return
|
||||
new_meta = current_metadata.copy()
|
||||
new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE
|
||||
new_meta['processed_at'] = str(int(time.time()))
|
||||
# Remove processing lock
|
||||
new_meta.pop('processing_started', None)
|
||||
# Copy object onto itself replacing metadata
|
||||
s3.copy_object(
|
||||
Bucket=bucket,
|
||||
|
|
@ -61,11 +85,12 @@ def mark_object_processed(bucket: str, key: str):
|
|||
Metadata=new_meta,
|
||||
MetadataDirective='REPLACE'
|
||||
)
|
||||
print(f"Marked {bucket}/{key} as processed.")
|
||||
print(f"Marked {bucket}/{key} as processed at {new_meta['processed_at']}")
|
||||
except Exception as e:
|
||||
print("Failed to mark processed metadata:", e)
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None):
|
||||
"""Update S3 object metadata with retry information"""
|
||||
try:
|
||||
|
|
@ -89,6 +114,7 @@ def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: s
|
|||
except Exception as e:
|
||||
print(f"Failed to update retry metadata: {e}")
|
||||
|
||||
|
||||
def get_retry_count(bucket: str, key: str) -> int:
|
||||
"""Get current retry count from S3 metadata"""
|
||||
try:
|
||||
|
|
@ -98,6 +124,7 @@ def get_retry_count(bucket: str, key: str) -> int:
|
|||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def is_temporary_smtp_error(error_code):
|
||||
"""Check if SMTP error code indicates a temporary failure (4xx)"""
|
||||
if isinstance(error_code, tuple) and len(error_code) >= 1:
|
||||
|
|
@ -106,6 +133,7 @@ def is_temporary_smtp_error(error_code):
|
|||
return 400 <= code < 500
|
||||
return False
|
||||
|
||||
|
||||
def is_spam_rejection(error_code):
|
||||
"""Check if the error is a spam rejection (should not be retried)"""
|
||||
if isinstance(error_code, tuple) and len(error_code) >= 2:
|
||||
|
|
@ -116,10 +144,14 @@ def is_spam_rejection(error_code):
|
|||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# VERBESSERUNG 3: Exponential Backoff with Jitter
|
||||
# ============================================================================
|
||||
def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
|
||||
frm_addr, recipients, raw_message, local_helo,
|
||||
max_retries=MAX_RETRIES):
|
||||
"""Send email with retry logic for temporary failures"""
|
||||
"""Send email with retry logic for temporary failures using exponential backoff"""
|
||||
|
||||
delivered = []
|
||||
refused = {}
|
||||
|
|
@ -127,9 +159,13 @@ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
|
|||
|
||||
for attempt in range(max_retries + 1):
|
||||
if attempt > 0:
|
||||
delay = RETRY_DELAYS[min(attempt - 1, len(RETRY_DELAYS) - 1)]
|
||||
print(f"Retry attempt {attempt}/{max_retries} after {delay}s delay...")
|
||||
time.sleep(delay)
|
||||
# Exponential backoff with jitter: 2s, 4s, 8s (configurable via BASE_RETRY_DELAY)
|
||||
delay = BASE_RETRY_DELAY * (2 ** (attempt - 1))
|
||||
# Add jitter to prevent thundering herd
|
||||
jitter = random.uniform(0, delay * 0.3) # +0-30% random jitter
|
||||
total_delay = delay + jitter
|
||||
print(f"Retry attempt {attempt}/{max_retries} after {total_delay:.1f}s delay (base: {delay}s + jitter: {jitter:.1f}s)")
|
||||
time.sleep(total_delay)
|
||||
|
||||
try:
|
||||
with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp:
|
||||
|
|
@ -247,7 +283,34 @@ def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
|
|||
|
||||
return delivered, refused
|
||||
|
||||
def lambda_handler(event, context):
|
||||
|
||||
# ============================================================================
|
||||
# VERBESSERUNG 4: Structured Logging
|
||||
# ============================================================================
|
||||
def log_processing_result(bucket: str, key: str, delivered: list, refused: dict, retry_count: int):
|
||||
"""Log processing results to CloudWatch in structured format for easier analysis"""
|
||||
result = {
|
||||
'timestamp': int(time.time()),
|
||||
'bucket': bucket,
|
||||
'key': key,
|
||||
'delivered_count': len(delivered),
|
||||
'refused_count': len(refused),
|
||||
'retry_count': retry_count,
|
||||
'delivered_recipients': delivered,
|
||||
'refused_recipients': list(refused.keys()) if refused else [],
|
||||
'success': len(delivered) > 0
|
||||
}
|
||||
|
||||
# Structured logging for CloudWatch Insights
|
||||
print(f"PROCESSING_RESULT: {json.dumps(result)}")
|
||||
return result
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# MAIN PROCESSING FUNCTION (extracted for timeout handling)
|
||||
# ============================================================================
|
||||
def _process_email(event, context):
|
||||
"""Main email processing logic (extracted for timeout protection)"""
|
||||
print("Event:", event)
|
||||
ses = None
|
||||
try:
|
||||
|
|
@ -290,25 +353,83 @@ def lambda_handler(event, context):
|
|||
else:
|
||||
raise Exception("Unknown event type")
|
||||
|
||||
# Check if already processed
|
||||
# ========================================================================
|
||||
# VERBESSERUNG 1: Duplicate Prevention with Processing Lock
|
||||
# ========================================================================
|
||||
try:
|
||||
head = s3.head_object(Bucket=bucket, Key=key)
|
||||
if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
|
||||
print(f"Object {key} in {bucket} already processed. Exiting.")
|
||||
metadata = head.get('Metadata', {}) or {}
|
||||
|
||||
# Check if already processed
|
||||
if metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
|
||||
processed_at = metadata.get('processed_at', 'unknown time')
|
||||
print(f"Object {key} already processed at {processed_at}")
|
||||
return {'statusCode': 200, 'body': 'already processed'}
|
||||
|
||||
# Check if currently being processed (lock mechanism)
|
||||
processing_started = metadata.get('processing_started')
|
||||
if processing_started:
|
||||
processing_age = time.time() - float(processing_started)
|
||||
if processing_age < 300: # 5 minutes lock
|
||||
print(f"Object {key} is being processed by another Lambda (started {processing_age:.0f}s ago)")
|
||||
return {'statusCode': 200, 'body': 'already being processed'}
|
||||
else:
|
||||
print(f"Stale processing lock detected ({processing_age:.0f}s old), continuing")
|
||||
|
||||
# Set processing lock
|
||||
new_meta = metadata.copy()
|
||||
new_meta['processing_started'] = str(int(time.time()))
|
||||
s3.copy_object(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
CopySource={'Bucket': bucket, 'Key': key},
|
||||
Metadata=new_meta,
|
||||
MetadataDirective='REPLACE'
|
||||
)
|
||||
print(f"Set processing lock on {key}")
|
||||
|
||||
except s3.exceptions.NoSuchKey:
|
||||
print(f"Object {key} no longer exists, skipping")
|
||||
return {'statusCode': 404, 'body': 'object not found'}
|
||||
except Exception as e:
|
||||
print("head_object error (continuing):", e)
|
||||
print(f"Error checking/setting processing lock: {e}")
|
||||
# Continue anyway if lock fails (better than dropping email)
|
||||
|
||||
# Check retry count - if too many retries, give up
|
||||
retry_count = get_retry_count(bucket, key)
|
||||
if retry_count >= MAX_RETRIES * 2: # Safety limit
|
||||
print(f"Object {key} has been retried {retry_count} times, giving up")
|
||||
mark_object_processed(bucket, key) # Mark as processed to prevent infinite retries
|
||||
return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'}
|
||||
|
||||
# Get raw mail bytes
|
||||
resp = s3.get_object(Bucket=bucket, Key=key)
|
||||
raw_bytes = resp['Body'].read()
|
||||
print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}")
|
||||
# ========================================================================
|
||||
# VERBESSERUNG 2: Memory Optimization with Size Check
|
||||
# ========================================================================
|
||||
try:
|
||||
resp = s3.get_object(Bucket=bucket, Key=key)
|
||||
content_length = int(resp.get('ContentLength', 0))
|
||||
|
||||
# Safety check: Skip emails larger than MAX_EMAIL_SIZE_MB
|
||||
if content_length > MAX_EMAIL_SIZE_BYTES:
|
||||
print(f"ERROR: Email too large ({content_length/1024/1024:.1f} MB), maximum is {MAX_EMAIL_SIZE_MB} MB")
|
||||
# Mark as processed to prevent infinite retries
|
||||
mark_object_processed(bucket, key)
|
||||
return {
|
||||
'statusCode': 413, # Payload Too Large
|
||||
'body': json.dumps({
|
||||
'error': 'email_too_large',
|
||||
'size_mb': round(content_length/1024/1024, 2),
|
||||
'max_mb': MAX_EMAIL_SIZE_MB
|
||||
})
|
||||
}
|
||||
|
||||
raw_bytes = resp['Body'].read()
|
||||
print(f"Loaded {len(raw_bytes)} bytes ({len(raw_bytes)/1024:.1f} KB) from s3://{bucket}/{key}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR reading from S3: {e}")
|
||||
traceback.print_exc()
|
||||
return {'statusCode': 500, 'body': f'S3 read error: {str(e)}'}
|
||||
|
||||
parsed = parse_raw_message(raw_bytes)
|
||||
subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)'
|
||||
|
|
@ -396,8 +517,8 @@ def lambda_handler(event, context):
|
|||
print(f"All addresses found: {all_recipients}")
|
||||
# Print all headers for debugging
|
||||
print("=== All Email Headers ===")
|
||||
for key in parsed.keys():
|
||||
print(f"{key}: {parsed.get(key)}")
|
||||
for key_h in parsed.keys():
|
||||
print(f"{key_h}: {parsed.get(key_h)}")
|
||||
print("=== End Headers ===")
|
||||
else:
|
||||
print("ERROR: Could not parse email headers")
|
||||
|
|
@ -444,12 +565,60 @@ def lambda_handler(event, context):
|
|||
else:
|
||||
print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.")
|
||||
|
||||
# Log structured result
|
||||
result = log_processing_result(bucket, key, delivered, refused, retry_count)
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({
|
||||
'processed': bool(delivered),
|
||||
'delivered': delivered,
|
||||
'refused_count': len(refused),
|
||||
'retry_count': retry_count
|
||||
})
|
||||
'statusCode': 200 if delivered else 500,
|
||||
'body': json.dumps(result)
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# LAMBDA HANDLER with TIMEOUT PROTECTION
|
||||
# ============================================================================
|
||||
def lambda_handler(event, context):
|
||||
"""
|
||||
Lambda entry point with timeout protection
|
||||
Recommended Lambda Configuration:
|
||||
- Memory: 512 MB
|
||||
- Timeout: 60 seconds
|
||||
- Environment Variables:
|
||||
- SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS
|
||||
- MAX_SMTP_RETRIES=3
|
||||
- BASE_RETRY_DELAY=2
|
||||
- MAX_EMAIL_SIZE_MB=25
|
||||
"""
|
||||
|
||||
# Set up timeout protection (stop 5 seconds before Lambda timeout)
|
||||
remaining_time = context.get_remaining_time_in_millis() / 1000 if context else 60
|
||||
safety_margin = 5 # seconds
|
||||
max_execution_time = max(10, remaining_time - safety_margin)
|
||||
|
||||
signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(int(max_execution_time))
|
||||
|
||||
try:
|
||||
return _process_email(event, context)
|
||||
except TimeoutException:
|
||||
print(f"WARNING: Lambda approaching timeout after {max_execution_time}s, gracefully exiting")
|
||||
# Don't mark as processed so it can be retried
|
||||
return {
|
||||
'statusCode': 408, # Request Timeout
|
||||
'body': json.dumps({
|
||||
'error': 'lambda_timeout',
|
||||
'execution_time': max_execution_time
|
||||
})
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"FATAL ERROR in lambda_handler: {e}")
|
||||
traceback.print_exc()
|
||||
return {
|
||||
'statusCode': 500,
|
||||
'body': json.dumps({
|
||||
'error': 'internal_error',
|
||||
'message': str(e)
|
||||
})
|
||||
}
|
||||
finally:
|
||||
signal.alarm(0) # Cancel alarm
|
||||
Loading…
Reference in New Issue