docker/ses-lambda-new-python/lambda_function.py

455 lines
19 KiB
Python

import os
import boto3
import smtplib
import time
import traceback
import json
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
s3 = boto3.client('s3')
# Environment variables (set these in the Lambda config)
SMTP_HOST = os.environ.get('SMTP_HOST', 'mail.email-srvr.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '2525')) # default to your mapped port
SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('SMTP_PASS')
# Metadata key/value to mark processed objects (only set when at least one recipient delivered)
PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed')
PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true')
# Retry configuration
MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3'))
RETRY_DELAYS = [1, 5, 15] # Sekunden zwischen Versuchen
def domain_to_bucket(domain: str) -> str:
return domain.replace('.', '-') + '-emails'
def bucket_to_domain(bucket: str) -> str:
return bucket.replace('-emails', '').replace('-', '.')
def parse_raw_message(raw_bytes: bytes):
try:
# Use SMTP policy for better compatibility with various email formats
from email.policy import SMTP
parsed = BytesParser(policy=SMTP).parsebytes(raw_bytes)
except Exception as e:
print(f"Error parsing with SMTP policy: {e}, trying with default policy")
try:
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
except Exception as e2:
print(f"Error parsing with default policy: {e2}")
parsed = None
return parsed
def mark_object_processed(bucket: str, key: str):
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
print(f"Object {key} in {bucket} already marked processed.")
return
new_meta = current_metadata.copy()
new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE
# Copy object onto itself replacing metadata
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Marked {bucket}/{key} as processed.")
except Exception as e:
print("Failed to mark processed metadata:", e)
traceback.print_exc()
def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None):
"""Update S3 object metadata with retry information"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
current_metadata = head.get('Metadata', {}) or {}
new_meta = current_metadata.copy()
new_meta['retry_count'] = str(retry_count)
new_meta['last_retry'] = str(int(time.time()))
if last_error:
# S3 metadata values must be ASCII, so we encode the error
new_meta['last_error'] = last_error[:255].replace('\n', ' ')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}")
except Exception as e:
print(f"Failed to update retry metadata: {e}")
def get_retry_count(bucket: str, key: str) -> int:
"""Get current retry count from S3 metadata"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
return int(metadata.get('retry_count', '0'))
except Exception:
return 0
def is_temporary_smtp_error(error_code):
"""Check if SMTP error code indicates a temporary failure (4xx)"""
if isinstance(error_code, tuple) and len(error_code) >= 1:
code = error_code[0]
if isinstance(code, int):
return 400 <= code < 500
return False
def is_spam_rejection(error_code):
"""Check if the error is a spam rejection (should not be retried)"""
if isinstance(error_code, tuple) and len(error_code) >= 2:
code = error_code[0]
message = error_code[1]
# 554 with spam message is permanent - don't retry
if code == 554 and b'spam' in message.lower():
return True
return False
def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES):
"""Send email with retry logic for temporary failures"""
delivered = []
refused = {}
last_error = None
for attempt in range(max_retries + 1):
if attempt > 0:
delay = RETRY_DELAYS[min(attempt - 1, len(RETRY_DELAYS) - 1)]
print(f"Retry attempt {attempt}/{max_retries} after {delay}s delay...")
time.sleep(delay)
try:
with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp:
smtp.ehlo()
# Try STARTTLS
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
print("STARTTLS not available or failed (continuing):", e)
# Login if credentials provided
if smtp_user and smtp_pass:
try:
smtp.login(smtp_user, smtp_pass)
except Exception as e:
print("SMTP login failed (continuing):", e)
# Attempt to send
try:
send_result = smtp.sendmail(frm_addr, recipients, raw_message)
if isinstance(send_result, dict):
# Separate temporary and permanent failures
temp_refused = {}
perm_refused = {}
for rcpt, error in send_result.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
# If we have temporary failures and more retries, continue
if temp_refused and attempt < max_retries:
print(f"Temporary failures for {list(temp_refused.keys())}, will retry...")
recipients = list(temp_refused.keys()) # Only retry temporary failures
refused = perm_refused # Keep track of permanent failures
last_error = str(temp_refused)
continue
else:
# No temporary failures or no more retries
refused = send_result
delivered = [r for r in recipients if r not in refused]
break
else:
# All delivered successfully
delivered = recipients[:]
refused = {}
break
except smtplib.SMTPRecipientsRefused as e:
print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}")
# Check if all are temporary failures
temp_refused = {}
perm_refused = {}
for rcpt, error in e.recipients.items():
if is_temporary_smtp_error(error):
temp_refused[rcpt] = error
else:
perm_refused[rcpt] = error
if temp_refused and attempt < max_retries:
recipients = list(temp_refused.keys())
refused = perm_refused
last_error = str(e)
continue
else:
refused = e.recipients
delivered = [r for r in recipients if r not in refused]
break
except Exception as e:
print(f"SMTP sendmail error on attempt {attempt + 1}: {e}")
# Check if this is a spam rejection (permanent error that shouldn't be retried)
if hasattr(e, 'smtp_code') and hasattr(e, 'smtp_error'):
if is_spam_rejection((e.smtp_code, e.smtp_error)):
print(f"Email rejected as spam (permanent error), not retrying")
refused = {r: (e.smtp_code, e.smtp_error) for r in recipients}
delivered = []
break
# For other errors, check if it's worth retrying
if attempt < max_retries:
# Only retry if it might be temporary
error_str = str(e)
if '554' in error_str and 'spam' in error_str.lower():
print(f"Email rejected as spam, not retrying")
refused = {r: ('spam', str(e)) for r in recipients}
delivered = []
break
else:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('error', str(e)) for r in recipients}
delivered = []
break
except Exception as e:
print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}")
if attempt < max_retries:
last_error = str(e)
continue
else:
traceback.print_exc()
refused = {r: ('connect-error', str(e)) for r in recipients}
delivered = []
break
return delivered, refused
def lambda_handler(event, context):
print("Event:", event)
ses = None
try:
rec = event['Records'][0]
except Exception as e:
print("No Records in event:", e)
return {'statusCode': 400, 'body': 'No Records'}
# Determine bucket/key and initial recipients list
recipients = []
bucket = None
key = None
is_ses_event = False
if 'ses' in rec:
# SES Event - vertrauenswürdig, hat die korrekten Empfänger
is_ses_event = True
ses = rec['ses']
msg_id = ses['mail']['messageId']
recipients = ses['receipt'].get('recipients', [])
# assume first recipient domain maps to bucket
if recipients:
domain = recipients[0].split('@', 1)[1]
bucket = domain_to_bucket(domain)
prefix = f"emails/{msg_id}"
print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}")
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in resp_list or not resp_list['Contents']:
raise Exception(f"No S3 object under {prefix} in {bucket}")
key = resp_list['Contents'][0]['Key']
else:
raise Exception("SES event but no recipients found")
elif 's3' in rec:
# S3 Event - muss Empfänger aus Headers extrahieren
s3info = rec['s3']
bucket = s3info['bucket']['name']
key = s3info['object']['key']
print(f"S3 event: bucket={bucket}, key={key}")
# recipients will be parsed from message headers below
else:
raise Exception("Unknown event type")
# Check if already processed
try:
head = s3.head_object(Bucket=bucket, Key=key)
if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
print(f"Object {key} in {bucket} already processed. Exiting.")
return {'statusCode': 200, 'body': 'already processed'}
except Exception as e:
print("head_object error (continuing):", e)
# Check retry count - if too many retries, give up
retry_count = get_retry_count(bucket, key)
if retry_count >= MAX_RETRIES * 2: # Safety limit
print(f"Object {key} has been retried {retry_count} times, giving up")
return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'}
# Get raw mail bytes
resp = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = resp['Body'].read()
print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}")
parsed = parse_raw_message(raw_bytes)
subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)'
frm_addr = None
if parsed:
from_addrs = getaddresses(parsed.get_all('from', []) or [])
frm_addr = from_addrs[0][1] if from_addrs else None
if not frm_addr:
frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost'))
print(f"From: {frm_addr}, Subject: {subj}")
# If recipients were not provided (S3 path), extract from headers
if not recipients and not is_ses_event:
if parsed:
expected_domain = bucket_to_domain(bucket).lower()
# Debug: Print raw headers to understand what we're getting
print(f"=== DEBUG: Header Analysis ===")
print(f"Expected domain: {expected_domain}")
# The email parser is case-insensitive for headers, so we only need to check once
# Get headers using standard case (parser handles case-insensitivity)
to_headers = parsed.get_all('to', []) or []
cc_headers = parsed.get_all('cc', []) or []
bcc_headers = parsed.get_all('bcc', []) or []
if to_headers:
print(f"Found 'To' header: {to_headers}")
if cc_headers:
print(f"Found 'Cc' header: {cc_headers}")
if bcc_headers:
print(f"Found 'Bcc' header: {bcc_headers}")
# Parse addresses from headers
to_addrs = [addr for _n, addr in getaddresses(to_headers) if addr]
cc_addrs = [addr for _n, addr in getaddresses(cc_headers) if addr]
bcc_addrs = [addr for _n, addr in getaddresses(bcc_headers) if addr]
all_recipients = to_addrs + cc_addrs + bcc_addrs
print(f"Parsed recipients - To: {to_addrs}, Cc: {cc_addrs}, Bcc: {bcc_addrs}")
# Filter recipients to bucket domain with case-insensitive comparison
# and deduplicate using a set (preserving case)
recipients_set = set()
recipients = []
for addr in all_recipients:
# Extract domain part (everything after @)
if '@' in addr:
addr_lower = addr.lower()
addr_domain = addr_lower.split('@')[-1]
if addr_domain == expected_domain:
# Only add if not already in set (case-insensitive deduplication)
if addr_lower not in recipients_set:
recipients_set.add(addr_lower)
recipients.append(addr) # Keep original case
print(f"Matched recipient: {addr} (domain: {addr_domain})")
else:
print(f"Skipped duplicate: {addr}")
else:
print(f"Skipped recipient: {addr} (domain: {addr_domain} != {expected_domain})")
print(f"Final recipients after domain filter and deduplication: {recipients}")
# If no recipients found, try additional headers
if not recipients:
print("WARNING: No recipients found in standard headers, checking additional headers...")
# Check for X-Original-To, Delivered-To, Envelope-To
fallback_headers = ['X-Original-To', 'Delivered-To', 'Envelope-To',
'x-original-to', 'delivered-to', 'envelope-to']
for header_name in fallback_headers:
header_val = parsed.get(header_name)
if header_val:
print(f"Found {header_name}: {header_val}")
fallback_addrs = [addr for _n, addr in getaddresses([header_val]) if addr]
for addr in fallback_addrs:
if '@' in addr and addr.split('@')[-1].lower() == expected_domain:
recipients.append(addr)
print(f"Found recipient in {header_name}: {addr}")
if not recipients:
print(f"ERROR: Could not find any recipients for domain {expected_domain}")
print(f"All addresses found: {all_recipients}")
# Print all headers for debugging
print("=== All Email Headers ===")
for key in parsed.keys():
print(f"{key}: {parsed.get(key)}")
print("=== End Headers ===")
else:
print("ERROR: Could not parse email headers")
recipients = []
# If after all we have no recipients, skip SMTP
delivered = []
refused = {}
if recipients:
# Use raw bytes directly (no decoding!)
raw_message = raw_bytes
# Determine HELO hostname
env_local = os.environ.get('SMTP_LOCAL_HOSTNAME')
derived_local = bucket_to_domain(bucket) if bucket else None
local_helo = env_local or derived_local or 'localhost'
print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}")
start = time.time()
# Send with retry logic
delivered, refused = send_email_with_retry(
SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS,
frm_addr, recipients, raw_message, local_helo,
max_retries=MAX_RETRIES
)
print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}")
# Update retry count if we had temporary failures
if refused and not delivered:
temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)]
if temp_failures:
update_retry_metadata(bucket, key, retry_count + 1, str(refused))
else:
print("No recipients to send to; skipping SMTP.")
# Only mark as processed if at least one recipient was delivered successfully
if delivered:
try:
mark_object_processed(bucket, key)
except Exception as e:
print("Failed to mark object processed after delivery:", e)
else:
print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.")
return {
'statusCode': 200,
'body': json.dumps({
'processed': bool(delivered),
'delivered': delivered,
'refused_count': len(refused),
'retry_count': retry_count
})
}