ses2dms
This commit is contained in:
parent
31aea63c61
commit
cbe58d4cb2
|
|
@ -0,0 +1,214 @@
|
|||
# lambda_andreasknuth.py
|
||||
import os
|
||||
import boto3
|
||||
import smtplib
|
||||
import time
|
||||
import traceback
|
||||
from email.parser import BytesParser
|
||||
from email.policy import default
|
||||
from email.utils import getaddresses
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
|
||||
# Environment variables (set these in the Lambda config)
|
||||
SMTP_HOST = os.environ.get('MAILCOW_SMTP_HOST', 'mail.email-srvr.com')
|
||||
SMTP_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', '2525')) # default to your mapped port
|
||||
SMTP_USER = os.environ.get('SMTP_USER') or os.environ.get('MAILCOW_SMTP_USER')
|
||||
SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('MAILCOW_SMTP_PASS')
|
||||
|
||||
# Metadata key/value to mark processed objects (only set when at least one recipient delivered)
|
||||
PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed')
|
||||
PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true')
|
||||
|
||||
def domain_to_bucket(domain: str) -> str:
|
||||
return domain.replace('.', '-') + '-emails'
|
||||
|
||||
def bucket_to_domain(bucket: str) -> str:
|
||||
return bucket.replace('-emails', '').replace('-', '.')
|
||||
|
||||
def parse_raw_message(raw_bytes: bytes):
|
||||
try:
|
||||
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
|
||||
except Exception:
|
||||
parsed = None
|
||||
return parsed
|
||||
|
||||
def mark_object_processed(bucket: str, key: str):
|
||||
try:
|
||||
head = s3.head_object(Bucket=bucket, Key=key)
|
||||
current_metadata = head.get('Metadata', {}) or {}
|
||||
if current_metadata.get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
|
||||
print(f"Object {key} in {bucket} already marked processed.")
|
||||
return
|
||||
new_meta = current_metadata.copy()
|
||||
new_meta[PROCESSED_META_KEY] = PROCESSED_META_VALUE
|
||||
# Copy object onto itself replacing metadata
|
||||
s3.copy_object(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
CopySource={'Bucket': bucket, 'Key': key},
|
||||
Metadata=new_meta,
|
||||
MetadataDirective='REPLACE'
|
||||
)
|
||||
print(f"Marked {bucket}/{key} as processed.")
|
||||
except Exception as e:
|
||||
print("Failed to mark processed metadata:", e)
|
||||
traceback.print_exc()
|
||||
|
||||
def lambda_handler(event, context):
|
||||
print("Event:", event)
|
||||
ses = None
|
||||
try:
|
||||
rec = event['Records'][0]
|
||||
except Exception as e:
|
||||
print("No Records in event:", e)
|
||||
return {'statusCode': 400, 'body': 'No Records'}
|
||||
|
||||
# Determine bucket/key and initial recipients list
|
||||
recipients = []
|
||||
bucket = None
|
||||
key = None
|
||||
|
||||
if 'ses' in rec:
|
||||
ses = rec['ses']
|
||||
msg_id = ses['mail']['messageId']
|
||||
recipients = ses['receipt'].get('recipients', [])
|
||||
# assume first recipient domain maps to bucket
|
||||
if recipients:
|
||||
domain = recipients[0].split('@', 1)[1]
|
||||
bucket = domain_to_bucket(domain)
|
||||
prefix = f"emails/{msg_id}"
|
||||
print(f"SES event: domain={domain}, bucket={bucket}, prefix={prefix}")
|
||||
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
|
||||
if 'Contents' not in resp_list or not resp_list['Contents']:
|
||||
raise Exception(f"No S3 object under {prefix} in {bucket}")
|
||||
key = resp_list['Contents'][0]['Key']
|
||||
else:
|
||||
raise Exception("SES event but no recipients found")
|
||||
elif 's3' in rec:
|
||||
s3info = rec['s3']
|
||||
bucket = s3info['bucket']['name']
|
||||
key = s3info['object']['key']
|
||||
print(f"S3 event: bucket={bucket}, key={key}")
|
||||
# recipients will be parsed from message headers below
|
||||
else:
|
||||
raise Exception("Unknown event type")
|
||||
|
||||
# Check if already processed (only to avoid unnecessary work; we still honor processed semantics)
|
||||
try:
|
||||
head = s3.head_object(Bucket=bucket, Key=key)
|
||||
if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE:
|
||||
print(f"Object {key} in {bucket} already processed. Exiting.")
|
||||
return {'statusCode': 200, 'body': 'already processed'}
|
||||
except Exception as e:
|
||||
# If head_object fails, continue and try to process (log for debugging)
|
||||
print("head_object error (continuing):", e)
|
||||
|
||||
# Get raw mail bytes
|
||||
resp = s3.get_object(Bucket=bucket, Key=key)
|
||||
raw_bytes = resp['Body'].read()
|
||||
print(f"Loaded {len(raw_bytes)} bytes from s3://{bucket}/{key}")
|
||||
|
||||
parsed = parse_raw_message(raw_bytes)
|
||||
subj = parsed.get('subject', '(no subject)') if parsed else '(no subject)'
|
||||
frm_addr = None
|
||||
if parsed:
|
||||
from_addrs = getaddresses(parsed.get_all('from', []) or [])
|
||||
frm_addr = from_addrs[0][1] if from_addrs else None
|
||||
if not frm_addr:
|
||||
# fallback: try envelope sender if present in SES event
|
||||
frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost'))
|
||||
print(f"From: {frm_addr}, Subject: {subj}")
|
||||
|
||||
# If recipients were not provided (S3 path), extract from headers
|
||||
if not recipients:
|
||||
if parsed:
|
||||
to_addrs = [addr for _n, addr in getaddresses(parsed.get_all('to', []) or [])]
|
||||
cc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('cc', []) or [])]
|
||||
bcc_addrs = [addr for _n, addr in getaddresses(parsed.get_all('bcc', []) or [])]
|
||||
recipients = to_addrs + cc_addrs + bcc_addrs
|
||||
print("Recipients from headers:", recipients)
|
||||
# filter recipients to bucket domain (safety)
|
||||
expected_domain = bucket_to_domain(bucket)
|
||||
recipients = [r for r in recipients if r.lower().split('@')[-1] == expected_domain]
|
||||
print(f"Recipients after domain filter ({expected_domain}): {recipients}")
|
||||
else:
|
||||
print("No parsed headers and no recipients provided; nothing to do.")
|
||||
recipients = []
|
||||
|
||||
# If after all we have no recipients, skip SMTP
|
||||
delivered = []
|
||||
refused = {}
|
||||
if recipients:
|
||||
# Prepare message string for sendmail (decode bytes safely)
|
||||
try:
|
||||
raw_message = raw_bytes.decode('utf-8')
|
||||
except Exception:
|
||||
raw_message = raw_bytes.decode('utf-8', errors='replace')
|
||||
|
||||
# Determine a sensible local hostname (HELO). Prefer explicit ENV, else use domain of bucket (recipient domain).
|
||||
env_local = os.environ.get('SMTP_LOCAL_HOSTNAME')
|
||||
derived_local = bucket_to_domain(bucket) if bucket else None
|
||||
local_helo = env_local or derived_local or 'localhost'
|
||||
|
||||
print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}")
|
||||
start = time.time()
|
||||
try:
|
||||
# Pass local_hostname so the server receives a proper FQDN in HELO/EHLO
|
||||
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30, local_hostname=local_helo) as smtp:
|
||||
smtp.ehlo()
|
||||
# Try STARTTLS; if it fails, continue (server might be implicit TLS port)
|
||||
try:
|
||||
smtp.starttls()
|
||||
smtp.ehlo()
|
||||
except Exception as e:
|
||||
print("STARTTLS not available or failed (continuing):", e)
|
||||
|
||||
if SMTP_USER and SMTP_PASS:
|
||||
try:
|
||||
smtp.login(SMTP_USER, SMTP_PASS)
|
||||
except Exception as e:
|
||||
print("SMTP login failed (continuing):", e)
|
||||
|
||||
try:
|
||||
send_result = smtp.sendmail(frm_addr, recipients, raw_message)
|
||||
# sendmail returns dict of refused recipients
|
||||
if isinstance(send_result, dict):
|
||||
refused = send_result
|
||||
delivered = [r for r in recipients if r not in refused]
|
||||
else:
|
||||
# Unexpected but treat all as delivered
|
||||
delivered = recipients[:]
|
||||
refused = {}
|
||||
except smtplib.SMTPRecipientsRefused as e:
|
||||
print("SMTPRecipientsRefused:", e)
|
||||
try:
|
||||
refused = e.recipients
|
||||
except Exception:
|
||||
refused = {r: ('550', 'Recipient refused') for r in recipients}
|
||||
delivered = [r for r in recipients if r not in refused]
|
||||
except Exception as e:
|
||||
print("SMTP sendmail error:", e)
|
||||
refused = {r: ('error', str(e)) for r in recipients}
|
||||
delivered = []
|
||||
except Exception as e:
|
||||
print("Error connecting to SMTP host:", e)
|
||||
refused = {r: ('connect-error', str(e)) for r in recipients}
|
||||
delivered = []
|
||||
print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}")
|
||||
else:
|
||||
print("No recipients to send to; skipping SMTP.")
|
||||
|
||||
# Only mark as processed if at least one recipient was delivered successfully
|
||||
if delivered:
|
||||
try:
|
||||
mark_object_processed(bucket, key)
|
||||
except Exception as e:
|
||||
print("Failed to mark object processed after delivery:", e)
|
||||
else:
|
||||
print("No successful deliveries; NOT setting processed metadata so message can be re-evaluated later.")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': f"processed={bool(delivered)}, delivered={delivered}, refused_count={len(refused)}"
|
||||
}
|
||||
Loading…
Reference in New Issue