docker/ses-lambda-python/lambda_function.py

152 lines
5.9 KiB
Python

import os
import boto3
import smtplib
import time
import requests
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
s3 = boto3.client('s3')
MAILCOW_HOST = os.environ['MAILCOW_SMTP_HOST']
MAILCOW_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', 587))
SMTP_USER = os.environ.get('MAILCOW_SMTP_USER')
SMTP_PASS = os.environ.get('MAILCOW_SMTP_PASS')
MAILCOW_API_KEY = os.environ.get('MAILCOW_API_KEY')
def domain_to_bucket(domain):
return domain.replace('.', '-') + '-emails'
def bucket_to_domain(bucket):
return bucket.replace('-emails', '').replace('-', '.')
def get_valid_inboxes():
url = 'https://mail.email-srvr.com/api/v1/get/mailbox/all'
headers = {'X-API-Key': MAILCOW_API_KEY}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
mailboxes = response.json()
return {mb['username'].lower() for mb in mailboxes if mb['active_int'] == 1}
except requests.RequestException as e:
print(f"Fehler beim Abrufen der Postfächer: {e}")
raise Exception("Konnte gültige Postfächer nicht abrufen")
def lambda_handler(event, context):
rec = event['Records'][0]
if 'ses' in rec:
ses = rec['ses']
msg_id = ses['mail']['messageId']
recipients = ses['receipt']['recipients']
first_recipient = recipients[0]
domain = first_recipient.split('@')[1]
bucket = domain_to_bucket(domain)
prefix = f"emails/{msg_id}"
print(f"SES-Receipt erkannt, domain={domain}, bucket={bucket}, prefix={prefix}")
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in resp_list or not resp_list['Contents']:
raise Exception(f"Kein Objekt unter Prefix {prefix} in Bucket {bucket} gefunden")
key = resp_list['Contents'][0]['Key']
elif 's3' in rec:
s3info = rec['s3']
bucket = s3info['bucket']['name']
key = s3info['object']['key']
print("S3-Put erkannt, bucket =", bucket, "key =", key)
recipients = []
else:
raise Exception("Unbekannter Event-Typ")
# Prüfen, ob das Objekt bereits verarbeitet wurde
try:
resp = s3.head_object(Bucket=bucket, Key=key)
if resp.get('Metadata', {}).get('processed') == 'true':
print(f"Objekt {key} bereits verarbeitet (processed=true), überspringe Verarbeitung")
return {
'statusCode': 200,
'body': f"Objekt {key} bereits verarbeitet, keine erneute Weiterleitung"
}
except Exception as e:
print(f"Fehler beim Prüfen der Metadaten: {e}")
# Raw-Mail aus S3 holen
resp = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = resp['Body'].read()
print(f"E-Mail geladen: {len(raw_bytes)} Bytes")
# Parsen für Logging
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
subj = parsed.get('subject', '(kein Subject)')
frm_addr = getaddresses(parsed.get_all('from', []))[0][1]
print(f"Parsed: From={frm_addr} Subject={subj}")
# Empfänger aus Headern ziehen, falls nicht aus SES
if not recipients:
to_addrs = [addr for _name, addr in getaddresses(parsed.get_all('to', []))]
cc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('cc', []))]
bcc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
print("Empfänger aus Headern:", recipients)
# Im S3-Flow nur Empfänger mit passender Domain behalten
expected_domain = bucket_to_domain(bucket)
recipients = [rcpt for rcpt in recipients if rcpt.lower().split('@')[1] == expected_domain]
print(f"Empfänger nach Domain-Filter ({expected_domain}): {recipients}")
if not recipients:
print("Keine Empfänger gefunden, setze Metadatum und überspringe SMTP")
else:
# Gültige Postfächer abrufen und Empfänger filtern
valid_inboxes = get_valid_inboxes()
valid_recipients = [rcpt for rcpt in recipients if rcpt.lower() in valid_inboxes]
print(f"Gültige Empfänger: {valid_recipients}")
if valid_recipients:
# SMTP-Verbindung und Envelope
start = time.time()
print("=== SMTP: Verbinde zu", MAILCOW_HOST, "Port", MAILCOW_PORT)
with smtplib.SMTP(MAILCOW_HOST, MAILCOW_PORT, timeout=30) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
if SMTP_USER and SMTP_PASS:
smtp.login(SMTP_USER, SMTP_PASS)
print("=== SMTP: MAIL FROM", frm_addr)
smtp.mail(frm_addr)
for rcpt in valid_recipients:
print("=== SMTP: RCPT TO", rcpt)
smtp.rcpt(rcpt)
smtp.data(raw_bytes)
print(f"SMTP-Transfer in {time.time()-start:.2f}s abgeschlossen ...")
else:
print("Keine gültigen Postfächer für die Empfänger gefunden, setze Metadatum und überspringe SMTP")
# Metadatum "processed": "true" hinzufügen
try:
resp = s3.head_object(Bucket=bucket, Key=key)
current_metadata = resp.get('Metadata', {})
new_metadata = current_metadata.copy()
new_metadata['processed'] = 'true'
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_metadata,
MetadataDirective='REPLACE'
)
print("Metadatum 'processed:true' hinzugefügt.")
except Exception as e:
print(f"Fehler beim Schreiben des Metadatums: {e}")
raise
return {
'statusCode': 200,
'body': f"E-Mail verarbeitet für {bucket}, SMTP-Weiterleitung: {bool(valid_recipients)}"
}