This commit is contained in:
Andreas Knuth 2025-07-21 11:40:40 -05:00
parent 7012f1ffd3
commit 5fadef1aac
18 changed files with 148 additions and 135 deletions

2
.gitignore vendored
View File

@ -4,3 +4,5 @@ auth
.venv*
__pycache__
node_modules
ses-lambda-python/*
!ses-lambda-python/lambda_function.py

View File

@ -0,0 +1,145 @@
import os
import boto3
import smtplib
import time
import requests
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
s3 = boto3.client('s3')
MAILCOW_HOST = os.environ['MAILCOW_SMTP_HOST']
MAILCOW_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', 587))
SMTP_USER = os.environ.get('MAILCOW_SMTP_USER')
SMTP_PASS = os.environ.get('MAILCOW_SMTP_PASS')
MAILCOW_API_KEY = os.environ.get('MAILCOW_API_KEY')
def domain_to_bucket(domain):
return domain.replace('.', '-') + '-emails'
def get_valid_inboxes():
url = 'https://mail.email-srvr.com/api/v1/get/mailbox/all'
headers = {'X-API-Key': MAILCOW_API_KEY}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
mailboxes = response.json()
return {mb['username'].lower() for mb in mailboxes if mb['active_int'] == 1}
except requests.RequestException as e:
print(f"Fehler beim Abrufen der Postfächer: {e}")
raise Exception("Konnte gültige Postfächer nicht abrufen")
def lambda_handler(event, context):
rec = event['Records'][0]
if 'ses' in rec:
ses = rec['ses']
msg_id = ses['mail']['messageId']
recipients = ses['receipt']['recipients']
first_recipient = recipients[0]
domain = first_recipient.split('@')[1]
bucket = domain_to_bucket(domain)
prefix = f"emails/{msg_id}"
print(f"SES-Receipt erkannt, domain={domain}, bucket={bucket}, prefix={prefix}")
resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if 'Contents' not in resp_list or not resp_list['Contents']:
raise Exception(f"Kein Objekt unter Prefix {prefix} in Bucket {bucket} gefunden")
key = resp_list['Contents'][0]['Key']
elif 's3' in rec:
s3info = rec['s3']
bucket = s3info['bucket']['name']
key = s3info['object']['key']
print("S3-Put erkannt, bucket =", bucket, "key =", key)
recipients = []
else:
raise Exception("Unbekannter Event-Typ")
# Prüfen, ob das Objekt bereits verarbeitet wurde
try:
resp = s3.head_object(Bucket=bucket, Key=key)
if resp.get('Metadata', {}).get('processed') == 'true':
print(f"Objekt {key} bereits verarbeitet (processed=true), überspringe Verarbeitung")
return {
'statusCode': 200,
'body': f"Objekt {key} bereits verarbeitet, keine erneute Weiterleitung"
}
except Exception as e:
print(f"Fehler beim Prüfen der Metadaten: {e}")
# Fortfahren, falls Metadaten nicht lesbar sind
# Raw-Mail aus S3 holen
resp = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = resp['Body'].read()
print(f"E-Mail geladen: {len(raw_bytes)} Bytes")
# Parsen für Logging
parsed = BytesParser(policy=default).parsebytes(raw_bytes)
subj = parsed.get('subject', '(kein Subject)')
frm_addr = getaddresses(parsed.get_all('from', []))[0][1]
print(f"Parsed: From={frm_addr} Subject={subj}")
# Empfänger aus Headern ziehen, falls nicht aus SES
if not recipients:
to_addrs = [addr for _name, addr in getaddresses(parsed.get_all('to', []))]
cc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('cc', []))]
bcc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('bcc', []))]
recipients = to_addrs + cc_addrs + bcc_addrs
print("Empfänger aus Headern:", recipients)
if not recipients:
raise Exception("Keine Empfänger gefunden, Abbruch")
# Gültige Postfächer abrufen und Empfänger filtern
valid_inboxes = get_valid_inboxes()
valid_recipients = [rcpt for rcpt in recipients if rcpt.lower() in valid_inboxes]
print(f"Gültige Empfänger: {valid_recipients}")
if not valid_recipients:
raise Exception("Keine gültigen Postfächer für die Empfänger gefunden, Abbruch")
# SMTP-Verbindung und Envelope
start = time.time()
print("=== SMTP: Verbinde zu", MAILCOW_HOST, "Port", MAILCOW_PORT)
with smtplib.SMTP(MAILCOW_HOST, MAILCOW_PORT, timeout=30) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
if SMTP_USER and SMTP_PASS:
smtp.login(SMTP_USER, SMTP_PASS)
print("=== SMTP: MAIL FROM", frm_addr)
smtp.mail(frm_addr)
for rcpt in valid_recipients:
print("=== SMTP: RCPT TO", rcpt)
smtp.rcpt(rcpt)
smtp.data(raw_bytes)
print(f"SMTP-Transfer in {time.time()-start:.2f}s abgeschlossen ...")
# Metadatum "processed": "true" hinzufügen
try:
resp = s3.head_object(Bucket=bucket, Key=key)
current_metadata = resp.get('Metadata', {})
new_metadata = current_metadata.copy()
new_metadata['processed'] = 'true'
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_metadata,
MetadataDirective='REPLACE'
)
print("Metadatum 'processed:true' hinzugefügt.")
except Exception as e:
print(f"Fehler beim Schreiben des Metadatums: {e}")
raise # Fehler weiterleiten, um in CloudWatch sichtbar zu bleiben
return {
'statusCode': 200,
'body': f"E-Mail erfolgreich an {MAILCOW_HOST}:{MAILCOW_PORT} weitergeleitet ..."
}

View File

@ -1,134 +0,0 @@
import time
import gzip
import json
import os
import urllib.request
import urllib.error
import urllib.parse
import logging
import boto3
import base64
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
logger = logging.getLogger()
logger.setLevel(logging.INFO)
API_BASE_URL = os.environ['API_BASE_URL']
API_TOKEN = os.environ['API_TOKEN']
MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760'))
s3_client = boto3.client('s3')
def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'):
"""Setzt in S3 das processed-Flag per Metadata."""
try:
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'processed': metadata,
'processed_timestamp': str(int(time.time())),
'processor': processor
},
MetadataDirective='REPLACE'
)
logger.info(f"Marked S3 object {bucket}/{key} as {metadata}")
except Exception as e:
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
def call_api_once(payload, domain, request_id):
"""Single-shot POST, kein Retry."""
url = f"{API_BASE_URL}/process/{domain}"
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Authorization', f'Bearer {API_TOKEN}')
req.add_header('Content-Type', 'application/json')
req.add_header('X-Request-ID', request_id)
logger.info(f"[{request_id}] OUTGOING POST {url}: "
f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, "
f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}")
with urllib.request.urlopen(req, timeout=25) as resp:
code = resp.getcode()
if code == 200:
logger.info(f"[{request_id}] API returned 200 OK")
return True
else:
body = resp.read().decode('utf-8', errors='ignore')
logger.error(f"[{request_id}] API returned {code}: {body}")
return False
def lambda_handler(event, context):
req_id = context.aws_request_id
rec = event['Records'][0]['s3']
bucket = rec['bucket']['name']
key = urllib.parse.unquote_plus(rec['object']['key'])
logger.info(f"[{req_id}] Processing {bucket}/{key}")
# Kopf-Check
head = s3_client.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {})
if metadata.get('processed') == 'true':
logger.info(f"[{req_id}] Skipping already processed object")
return {'statusCode': 200, 'body': 'Already processed'}
size = head['ContentLength']
if size > MAX_EMAIL_SIZE:
logger.warning(f"[{req_id}] Email too large: {size} bytes")
return {'statusCode': 413}
# E-Mail Inhalt laden
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
# 1) Parsen und Loggen von from/to
try:
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else ''
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}")
except Exception as e:
logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}")
from_addr = ''
to_addrs = []
# 2) Komprimieren und Payload bauen
compressed = gzip.compress(body)
payload = {
's3_bucket': bucket,
's3_key': key,
'domain': bucket.replace('-', '.').rsplit('.emails',1)[0],
'email_content': base64.b64encode(compressed).decode(),
'compressed': True,
'etag': head['ETag'].strip('"'),
'request_id': req_id,
'original_size': len(body),
'compressed_size': len(compressed)
}
# 3) Single API call
try:
success = call_api_once(payload, payload['domain'], req_id)
except Exception as e:
logger.error(f"[{req_id}] API-Call-Exception: {e}")
success = False
# 4) Handling nach API-Call
if success:
# normal processed
mark_email_processed(bucket, key, 'true', s3_client)
else:
# nur wenn es to_addrs gibt
if to_addrs:
bucket_domain = payload['domain']
domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr]
status = 'unknownUser' if bucket_domain in domains else 'unknownDomain'
mark_email_processed(bucket, key, status, s3_client)
else:
logger.info(f"[{req_id}] Keine Empfänger, kein Markieren")
return {'statusCode': 200, 'body': 'Done'}