From cbe58d4cb28a016038ffb5b78d27173558c7bf38 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sun, 31 Aug 2025 16:11:14 -0500 Subject: [PATCH] ses2dms --- ses-lambda-new-python/lambda_function.py | 214 +++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 ses-lambda-new-python/lambda_function.py diff --git a/ses-lambda-new-python/lambda_function.py b/ses-lambda-new-python/lambda_function.py new file mode 100644 index 0000000..ebec032 --- /dev/null +++ b/ses-lambda-new-python/lambda_function.py @@ -0,0 +1,214 @@ +# lambda_andreasknuth.py +import os +import boto3 +import smtplib +import time +import traceback +from email.parser import BytesParser +from email.policy import default +from email.utils import getaddresses + +s3 = boto3.client('s3') + +# Environment variables (set these in the Lambda config) +SMTP_HOST = os.environ.get('MAILCOW_SMTP_HOST', 'mail.email-srvr.com') +SMTP_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', '2525')) # default to your mapped port +SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('MAILCOW_SMTP_USER') +SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('MAILCOW_SMTP_PASS') + +# Metadata key/value to mark processed objects (only set when at least one recipient delivered) +PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') +PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') + +def domain_to_bucket(domain: str) -> str: + return domain.replace('.', '-') + '-emails' + +def bucket_to_domain(bucket: str) -> str: + return bucket.replace('-emails', '').replace('-', '.') + +def parse_raw_message(raw_bytes: bytes): + try: + parsed = BytesParser(policy=default).parsebytes(raw_bytes) + except Exception: + parsed = None + return parsed + +def mark_object_processed(bucket: str, key: str): + try: + head = s3.head_object(Bucket=bucket, Key=key) + current_metadata = head.get('Metadata', {}) or {} + if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: + print(f"Object {key} in {bucket} already marked processed.") + return + new_meta = current_metadata.copy() + new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE + # Copy object onto itself replacing metadata + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_meta, + MetadataDirective='REPLACE' + ) + print(f"Marked {bucket}/{key} as processed.") + except Exception as e: + print("Failed to mark processed metadata:", e) + traceback.print_exc() + +def lambda_handler(event, context): + print("Event:", event) + ses = None + try: + rec = event['Records'][0] + except Exception as e: + print("No Records in event:", e) + return {'statusCode': 400, 'body': 'No Records'} + + # Determine bucket/key and initial recipients list + recipients = [] + bucket = None + key = None + + if 'ses' in rec: + ses = rec['ses'] + msg_id = ses['mail']['messageId'] + recipients = ses['receipt'].get('recipients', []) + # assume first recipient domain maps to bucket + if recipients: + domain = recipients[0].split('@', 1)[1] + bucket = domain_to_bucket(domain) + prefix = f"emails/{msg_id}" + print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}") + resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + if 'Contents' not in resp_list or not resp_list['Contents']: + raise Exception(f"No S3 object under {prefix} in {bucket}") + key = resp_list['Contents'][0]['Key'] + else: + raise Exception("SES event but no recipients found") + elif 's3' in rec: + s3info = rec['s3'] + bucket = s3info['bucket']['name'] + key = s3info['object']['key'] + print(f"S3 event: bucket={bucket}, key={key}") + # recipients will be parsed from message headers below + else: + raise Exception("Unknown event type") + + # Check if already processed (only to avoid unnecessary work; we still honor processed semantics) + try: + head = s3.head_object(Bucket=bucket, Key=key) + if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: + print(f"Object {key} in {bucket} already processed. Exiting.") + return {'statusCode': 200, 'body': 'already processed'} + except Exception as e: + # If head_object fails, continue and try to process (log for debugging) + print("head_object error (continuing):", e) + + # Get raw mail bytes + resp = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = resp['Body'].read() + print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}") + + parsed = parse_raw_message(raw_bytes) + subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)' + frm_addr = None + if parsed: + from_addrs = getaddresses(parsed.get_all('from', []) or []) + frm_addr = from_addrs[0][1] if from_addrs else None + if not frm_addr: + # fallback: try envelope sender if present in SES event + frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost')) + print(f"From: {frm_addr}, Subject: {subj}") + + # If recipients were not provided (S3 path), extract from headers + if not recipients: + if parsed: + to_addrs = [addr for _n, addr in getaddresses(parsed.get_all('to', []) or [])] + cc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('cc', []) or [])] + bcc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('bcc', []) or [])] + recipients = to_addrs + cc_addrs + bcc_addrs + print("Recipients from headers:", recipients) + # filter recipients to bucket domain (safety) + expected_domain = bucket_to_domain(bucket) + recipients = [r for r in recipients if r.lower().split('@')[-1] == expected_domain] + print(f"Recipients after domain filter ({expected_domain}): {recipients}") + else: + print("No parsed headers and no recipients provided; nothing to do.") + recipients = [] + + # If after all we have no recipients, skip SMTP + delivered = [] + refused = {} + if recipients: + # Prepare message string for sendmail (decode bytes safely) + try: + raw_message = raw_bytes.decode('utf-8') + except Exception: + raw_message = raw_bytes.decode('utf-8', errors='replace') + + # Determine a sensible local hostname (HELO). Prefer explicit ENV, else use domain of bucket (recipient domain). + env_local = os.environ.get('SMTP_LOCAL_HOSTNAME') + derived_local = bucket_to_domain(bucket) if bucket else None + local_helo = env_local or derived_local or 'localhost' + + print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}") + start = time.time() + try: + # Pass local_hostname so the server receives a proper FQDN in HELO/EHLO + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30, local_hostname=local_helo) as smtp: + smtp.ehlo() + # Try STARTTLS; if it fails, continue (server might be implicit TLS port) + try: + smtp.starttls() + smtp.ehlo() + except Exception as e: + print("STARTTLS not available or failed (continuing):", e) + + if SMTP_USER and SMTP_PASS: + try: + smtp.login(SMTP_USER, SMTP_PASS) + except Exception as e: + print("SMTP login failed (continuing):", e) + + try: + send_result = smtp.sendmail(frm_addr, recipients, raw_message) + # sendmail returns dict of refused recipients + if isinstance(send_result, dict): + refused = send_result + delivered = [r for r in recipients if r not in refused] + else: + # Unexpected but treat all as delivered + delivered = recipients[:] + refused = {} + except smtplib.SMTPRecipientsRefused as e: + print("SMTPRecipientsRefused:", e) + try: + refused = e.recipients + except Exception: + refused = {r: ('550', 'Recipient refused') for r in recipients} + delivered = [r for r in recipients if r not in refused] + except Exception as e: + print("SMTP sendmail error:", e) + refused = {r: ('error', str(e)) for r in recipients} + delivered = [] + except Exception as e: + print("Error connecting to SMTP host:", e) + refused = {r: ('connect-error', str(e)) for r in recipients} + delivered = [] + print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}") + else: + print("No recipients to send to; skipping SMTP.") + + # Only mark as processed if at least one recipient was delivered successfully + if delivered: + try: + mark_object_processed(bucket, key) + except Exception as e: + print("Failed to mark object processed after delivery:", e) + else: + print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.") + + return { + 'statusCode': 200, + 'body': f"processed={bool(delivered)}, delivered={delivered}, refused_count={len(refused)}" + }