import os import boto3 import smtplib import time import traceback import json from email.parser import BytesParser from email.policy import default from email.utils import getaddresses s3 = boto3.client('s3') # Environment variables (set these in the Lambda config) SMTP_HOST = os.environ.get('MAILCOW_SMTP_HOST', 'mail.email-srvr.com') SMTP_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', '2525')) # default to your mapped port SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('MAILCOW_SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('MAILCOW_SMTP_PASS') # Metadata key/value to mark processed objects (only set when at least one recipient delivered) PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') # Retry configuration MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3')) RETRY_DELAYS = [1, 5, 15] # Sekunden zwischen Versuchen def domain_to_bucket(domain: str) -> str: return domain.replace('.', '-') + '-emails' def bucket_to_domain(bucket: str) -> str: return bucket.replace('-emails', '').replace('-', '.') def parse_raw_message(raw_bytes: bytes): try: parsed = BytesParser(policy=default).parsebytes(raw_bytes) except Exception: parsed = None return parsed def mark_object_processed(bucket: str, key: str): try: head = s3.head_object(Bucket=bucket, Key=key) current_metadata = head.get('Metadata', {}) or {} if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: print(f"Object {key} in {bucket} already marked processed.") return new_meta = current_metadata.copy() new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE # Copy object onto itself replacing metadata s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"Marked {bucket}/{key} as processed.") except Exception as e: print("Failed to mark processed metadata:", e) traceback.print_exc() def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None): """Update S3 object metadata with retry information""" try: head = s3.head_object(Bucket=bucket, Key=key) current_metadata = head.get('Metadata', {}) or {} new_meta = current_metadata.copy() new_meta['retry_count'] = str(retry_count) new_meta['last_retry'] = str(int(time.time())) if last_error: # S3 metadata values must be ASCII, so we encode the error new_meta['last_error'] = last_error[:255].replace('\n', ' ') s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}") except Exception as e: print(f"Failed to update retry metadata: {e}") def get_retry_count(bucket: str, key: str) -> int: """Get current retry count from S3 metadata""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} return int(metadata.get('retry_count', '0')) except Exception: return 0 def is_temporary_smtp_error(error_code): """Check if SMTP error code indicates a temporary failure (4xx)""" if isinstance(error_code, tuple) and len(error_code) >= 1: code = error_code[0] if isinstance(code, int): return 400 <= code < 500 return False def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, frm_addr, recipients, raw_message, local_helo, max_retries=MAX_RETRIES): """Send email with retry logic for temporary failures""" delivered = [] refused = {} last_error = None for attempt in range(max_retries + 1): if attempt > 0: delay = RETRY_DELAYS[min(attempt - 1, len(RETRY_DELAYS) - 1)] print(f"Retry attempt {attempt}/{max_retries} after {delay}s delay...") time.sleep(delay) try: with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp: smtp.ehlo() # Try STARTTLS try: smtp.starttls() smtp.ehlo() except Exception as e: print("STARTTLS not available or failed (continuing):", e) # Login if credentials provided if smtp_user and smtp_pass: try: smtp.login(smtp_user, smtp_pass) except Exception as e: print("SMTP login failed (continuing):", e) # Attempt to send try: send_result = smtp.sendmail(frm_addr, recipients, raw_message) if isinstance(send_result, dict): # Separate temporary and permanent failures temp_refused = {} perm_refused = {} for rcpt, error in send_result.items(): if is_temporary_smtp_error(error): temp_refused[rcpt] = error else: perm_refused[rcpt] = error # If we have temporary failures and more retries, continue if temp_refused and attempt < max_retries: print(f"Temporary failures for {list(temp_refused.keys())}, will retry...") recipients = list(temp_refused.keys()) # Only retry temporary failures refused = perm_refused # Keep track of permanent failures last_error = str(temp_refused) continue else: # No temporary failures or no more retries refused = send_result delivered = [r for r in recipients if r not in refused] break else: # All delivered successfully delivered = recipients[:] refused = {} break except smtplib.SMTPRecipientsRefused as e: print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}") # Check if all are temporary failures temp_refused = {} perm_refused = {} for rcpt, error in e.recipients.items(): if is_temporary_smtp_error(error): temp_refused[rcpt] = error else: perm_refused[rcpt] = error if temp_refused and attempt < max_retries: recipients = list(temp_refused.keys()) refused = perm_refused last_error = str(e) continue else: refused = e.recipients delivered = [r for r in recipients if r not in refused] break except Exception as e: print(f"SMTP sendmail error on attempt {attempt + 1}: {e}") if attempt < max_retries: last_error = str(e) continue else: traceback.print_exc() refused = {r: ('error', str(e)) for r in recipients} delivered = [] break except Exception as e: print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}") if attempt < max_retries: last_error = str(e) continue else: traceback.print_exc() refused = {r: ('connect-error', str(e)) for r in recipients} delivered = [] break return delivered, refused def lambda_handler(event, context): print("Event:", event) ses = None try: rec = event['Records'][0] except Exception as e: print("No Records in event:", e) return {'statusCode': 400, 'body': 'No Records'} # Determine bucket/key and initial recipients list recipients = [] bucket = None key = None if 'ses' in rec: ses = rec['ses'] msg_id = ses['mail']['messageId'] recipients = ses['receipt'].get('recipients', []) # assume first recipient domain maps to bucket if recipients: domain = recipients[0].split('@', 1)[1] bucket = domain_to_bucket(domain) prefix = f"emails/{msg_id}" print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}") resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) if 'Contents' not in resp_list or not resp_list['Contents']: raise Exception(f"No S3 object under {prefix} in {bucket}") key = resp_list['Contents'][0]['Key'] else: raise Exception("SES event but no recipients found") elif 's3' in rec: s3info = rec['s3'] bucket = s3info['bucket']['name'] key = s3info['object']['key'] print(f"S3 event: bucket={bucket}, key={key}") # recipients will be parsed from message headers below else: raise Exception("Unknown event type") # Check if already processed try: head = s3.head_object(Bucket=bucket, Key=key) if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: print(f"Object {key} in {bucket} already processed. Exiting.") return {'statusCode': 200, 'body': 'already processed'} except Exception as e: print("head_object error (continuing):", e) # Check retry count - if too many retries, give up retry_count = get_retry_count(bucket, key) if retry_count >= MAX_RETRIES * 2: # Safety limit print(f"Object {key} has been retried {retry_count} times, giving up") return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'} # Get raw mail bytes resp = s3.get_object(Bucket=bucket, Key=key) raw_bytes = resp['Body'].read() print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}") parsed = parse_raw_message(raw_bytes) subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)' frm_addr = None if parsed: from_addrs = getaddresses(parsed.get_all('from', []) or []) frm_addr = from_addrs[0][1] if from_addrs else None if not frm_addr: frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost')) print(f"From: {frm_addr}, Subject: {subj}") # If recipients were not provided (S3 path), extract from headers if not recipients: if parsed: to_addrs = [addr for _n, addr in getaddresses(parsed.get_all('to', []) or [])] cc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('cc', []) or [])] bcc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('bcc', []) or [])] recipients = to_addrs + cc_addrs + bcc_addrs print("Recipients from headers:", recipients) # filter recipients to bucket domain (safety) expected_domain = bucket_to_domain(bucket) recipients = [r for r in recipients if r.lower().split('@')[-1] == expected_domain] print(f"Recipients after domain filter ({expected_domain}): {recipients}") else: print("No parsed headers and no recipients provided; nothing to do.") recipients = [] # If after all we have no recipients, skip SMTP delivered = [] refused = {} if recipients: # Use raw bytes directly (no decoding!) raw_message = raw_bytes # Determine HELO hostname env_local = os.environ.get('SMTP_LOCAL_HOSTNAME') derived_local = bucket_to_domain(bucket) if bucket else None local_helo = env_local or derived_local or 'localhost' print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}") start = time.time() # Send with retry logic delivered, refused = send_email_with_retry( SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, frm_addr, recipients, raw_message, local_helo, max_retries=MAX_RETRIES ) print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}") # Update retry count if we had temporary failures if refused and not delivered: temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)] if temp_failures: update_retry_metadata(bucket, key, retry_count + 1, str(refused)) else: print("No recipients to send to; skipping SMTP.") # Only mark as processed if at least one recipient was delivered successfully if delivered: try: mark_object_processed(bucket, key) except Exception as e: print("Failed to mark object processed after delivery:", e) else: print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.") return { 'statusCode': 200, 'body': json.dumps({ 'processed': bool(delivered), 'delivered': delivered, 'refused_count': len(refused), 'retry_count': retry_count }) }