diff --git a/.gitignore b/.gitignore index 4f87aee..091bf1b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ auth .env .venv* __pycache__ -node_modules \ No newline at end of file +node_modules +ses-lambda-python/* +!ses-lambda-python/lambda_function.py \ No newline at end of file diff --git a/ses-lambda/email2json.js b/ses-lambda-nodejs/email2json.js similarity index 100% rename from ses-lambda/email2json.js rename to ses-lambda-nodejs/email2json.js diff --git a/ses-lambda/email2json1.js b/ses-lambda-nodejs/email2json1.js similarity index 100% rename from ses-lambda/email2json1.js rename to ses-lambda-nodejs/email2json1.js diff --git a/ses-lambda/eml/1.eml b/ses-lambda-nodejs/eml/1.eml similarity index 100% rename from ses-lambda/eml/1.eml rename to ses-lambda-nodejs/eml/1.eml diff --git a/ses-lambda/eml/2.eml b/ses-lambda-nodejs/eml/2.eml similarity index 100% rename from ses-lambda/eml/2.eml rename to ses-lambda-nodejs/eml/2.eml diff --git a/ses-lambda/eml/3.eml b/ses-lambda-nodejs/eml/3.eml similarity index 100% rename from ses-lambda/eml/3.eml rename to ses-lambda-nodejs/eml/3.eml diff --git a/ses-lambda/eml/4.eml b/ses-lambda-nodejs/eml/4.eml similarity index 100% rename from ses-lambda/eml/4.eml rename to ses-lambda-nodejs/eml/4.eml diff --git a/ses-lambda/eml/5.eml b/ses-lambda-nodejs/eml/5.eml similarity index 100% rename from ses-lambda/eml/5.eml rename to ses-lambda-nodejs/eml/5.eml diff --git a/ses-lambda/eml/6.eml b/ses-lambda-nodejs/eml/6.eml similarity index 100% rename from ses-lambda/eml/6.eml rename to ses-lambda-nodejs/eml/6.eml diff --git a/ses-lambda/eml/7.eml b/ses-lambda-nodejs/eml/7.eml similarity index 100% rename from ses-lambda/eml/7.eml rename to ses-lambda-nodejs/eml/7.eml diff --git a/ses-lambda/eml/nodemailer.eml b/ses-lambda-nodejs/eml/nodemailer.eml similarity index 100% rename from ses-lambda/eml/nodemailer.eml rename to ses-lambda-nodejs/eml/nodemailer.eml diff --git a/ses-lambda/index.js b/ses-lambda-nodejs/index.js similarity index 100% rename from ses-lambda/index.js rename to ses-lambda-nodejs/index.js diff --git a/ses-lambda/package-lock.json b/ses-lambda-nodejs/package-lock.json similarity index 100% rename from ses-lambda/package-lock.json rename to ses-lambda-nodejs/package-lock.json diff --git a/ses-lambda/package.json b/ses-lambda-nodejs/package.json similarity index 100% rename from ses-lambda/package.json rename to ses-lambda-nodejs/package.json diff --git a/ses-lambda/ses-lambda.zip b/ses-lambda-nodejs/ses-lambda.zip similarity index 100% rename from ses-lambda/ses-lambda.zip rename to ses-lambda-nodejs/ses-lambda.zip diff --git a/ses-lambda/test-parser.js b/ses-lambda-nodejs/test-parser.js similarity index 100% rename from ses-lambda/test-parser.js rename to ses-lambda-nodejs/test-parser.js diff --git a/ses-lambda-python/lambda_function.py b/ses-lambda-python/lambda_function.py new file mode 100644 index 0000000..f18f3e6 --- /dev/null +++ b/ses-lambda-python/lambda_function.py @@ -0,0 +1,145 @@ +import os +import boto3 +import smtplib +import time +import requests +from email.parser import BytesParser +from email.policy import default +from email.utils import getaddresses + +s3 = boto3.client('s3') + +MAILCOW_HOST = os.environ['MAILCOW_SMTP_HOST'] +MAILCOW_PORT = int(os.environ.get('MAILCOW_SMTP_PORT', 587)) +SMTP_USER = os.environ.get('MAILCOW_SMTP_USER') +SMTP_PASS = os.environ.get('MAILCOW_SMTP_PASS') +MAILCOW_API_KEY = os.environ.get('MAILCOW_API_KEY') + +def domain_to_bucket(domain): + return domain.replace('.', '-') + '-emails' + +def get_valid_inboxes(): + url = 'https://mail.email-srvr.com/api/v1/get/mailbox/all' + headers = {'X-API-Key': MAILCOW_API_KEY} + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + mailboxes = response.json() + return {mb['username'].lower() for mb in mailboxes if mb['active_int'] == 1} + except requests.RequestException as e: + print(f"Fehler beim Abrufen der Postfächer: {e}") + raise Exception("Konnte gültige Postfächer nicht abrufen") + +def lambda_handler(event, context): + rec = event['Records'][0] + + if 'ses' in rec: + ses = rec['ses'] + msg_id = ses['mail']['messageId'] + recipients = ses['receipt']['recipients'] + first_recipient = recipients[0] + domain = first_recipient.split('@')[1] + bucket = domain_to_bucket(domain) + prefix = f"emails/{msg_id}" + print(f"SES-Receipt erkannt, domain={domain}, bucket={bucket}, prefix={prefix}") + + resp_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + if 'Contents' not in resp_list or not resp_list['Contents']: + raise Exception(f"Kein Objekt unter Prefix {prefix} in Bucket {bucket} gefunden") + key = resp_list['Contents'][0]['Key'] + + elif 's3' in rec: + s3info = rec['s3'] + bucket = s3info['bucket']['name'] + key = s3info['object']['key'] + print("S3-Put erkannt, bucket =", bucket, "key =", key) + recipients = [] + + else: + raise Exception("Unbekannter Event-Typ") + + # Prüfen, ob das Objekt bereits verarbeitet wurde + try: + resp = s3.head_object(Bucket=bucket, Key=key) + if resp.get('Metadata', {}).get('processed') == 'true': + print(f"Objekt {key} bereits verarbeitet (processed=true), überspringe Verarbeitung") + return { + 'statusCode': 200, + 'body': f"Objekt {key} bereits verarbeitet, keine erneute Weiterleitung" + } + except Exception as e: + print(f"Fehler beim Prüfen der Metadaten: {e}") + # Fortfahren, falls Metadaten nicht lesbar sind + + # Raw-Mail aus S3 holen + resp = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = resp['Body'].read() + print(f"E-Mail geladen: {len(raw_bytes)} Bytes") + + # Parsen für Logging + parsed = BytesParser(policy=default).parsebytes(raw_bytes) + subj = parsed.get('subject', '(kein Subject)') + frm_addr = getaddresses(parsed.get_all('from', []))[0][1] + print(f"Parsed: From={frm_addr} Subject={subj}") + + # Empfänger aus Headern ziehen, falls nicht aus SES + if not recipients: + to_addrs = [addr for _name, addr in getaddresses(parsed.get_all('to', []))] + cc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('cc', []))] + bcc_addrs = [addr for _name, addr in getaddresses(parsed.get_all('bcc', []))] + recipients = to_addrs + cc_addrs + bcc_addrs + print("Empfänger aus Headern:", recipients) + + if not recipients: + raise Exception("Keine Empfänger gefunden, Abbruch") + + # Gültige Postfächer abrufen und Empfänger filtern + valid_inboxes = get_valid_inboxes() + valid_recipients = [rcpt for rcpt in recipients if rcpt.lower() in valid_inboxes] + print(f"Gültige Empfänger: {valid_recipients}") + + if not valid_recipients: + raise Exception("Keine gültigen Postfächer für die Empfänger gefunden, Abbruch") + + # SMTP-Verbindung und Envelope + start = time.time() + print("=== SMTP: Verbinde zu", MAILCOW_HOST, "Port", MAILCOW_PORT) + with smtplib.SMTP(MAILCOW_HOST, MAILCOW_PORT, timeout=30) as smtp: + smtp.ehlo() + smtp.starttls() + smtp.ehlo() + if SMTP_USER and SMTP_PASS: + smtp.login(SMTP_USER, SMTP_PASS) + + print("=== SMTP: MAIL FROM", frm_addr) + smtp.mail(frm_addr) + + for rcpt in valid_recipients: + print("=== SMTP: RCPT TO", rcpt) + smtp.rcpt(rcpt) + + smtp.data(raw_bytes) + print(f"SMTP-Transfer in {time.time()-start:.2f}s abgeschlossen ...") + + # Metadatum "processed": "true" hinzufügen + try: + resp = s3.head_object(Bucket=bucket, Key=key) + current_metadata = resp.get('Metadata', {}) + new_metadata = current_metadata.copy() + new_metadata['processed'] = 'true' + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_metadata, + MetadataDirective='REPLACE' + ) + print("Metadatum 'processed:true' hinzugefügt.") + except Exception as e: + print(f"Fehler beim Schreiben des Metadatums: {e}") + raise # Fehler weiterleiten, um in CloudWatch sichtbar zu bleiben + + return { + 'statusCode': 200, + 'body': f"E-Mail erfolgreich an {MAILCOW_HOST}:{MAILCOW_PORT} weitergeleitet ..." + } \ No newline at end of file diff --git a/ses-lambda/lambda-function.py b/ses-lambda/lambda-function.py deleted file mode 100644 index 67c010d..0000000 --- a/ses-lambda/lambda-function.py +++ /dev/null @@ -1,134 +0,0 @@ -import time -import gzip -import json -import os -import urllib.request -import urllib.error -import urllib.parse -import logging -import boto3 -import base64 -from email.parser import BytesParser -from email.policy import default -from email.utils import getaddresses - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -API_BASE_URL = os.environ['API_BASE_URL'] -API_TOKEN = os.environ['API_TOKEN'] -MAX_EMAIL_SIZE = int(os.environ.get('MAX_EMAIL_SIZE', '10485760')) - -s3_client = boto3.client('s3') - -def mark_email_processed(bucket, key, metadata, s3_client, processor='lambda'): - """Setzt in S3 das processed-Flag per Metadata.""" - try: - s3_client.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata={ - 'processed': metadata, - 'processed_timestamp': str(int(time.time())), - 'processor': processor - }, - MetadataDirective='REPLACE' - ) - logger.info(f"Marked S3 object {bucket}/{key} as {metadata}") - except Exception as e: - logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}") - - -def call_api_once(payload, domain, request_id): - """Single-shot POST, kein Retry.""" - url = f"{API_BASE_URL}/process/{domain}" - data = json.dumps(payload).encode('utf-8') - req = urllib.request.Request(url, data=data, method='POST') - req.add_header('Authorization', f'Bearer {API_TOKEN}') - req.add_header('Content-Type', 'application/json') - req.add_header('X-Request-ID', request_id) - logger.info(f"[{request_id}] OUTGOING POST {url}: " - f"domain={domain}, key={payload['s3_key']}, bucket={payload['s3_bucket']}, " - f"orig_size={payload['original_size']}, comp_size={payload['compressed_size']}") - - with urllib.request.urlopen(req, timeout=25) as resp: - code = resp.getcode() - if code == 200: - logger.info(f"[{request_id}] API returned 200 OK") - return True - else: - body = resp.read().decode('utf-8', errors='ignore') - logger.error(f"[{request_id}] API returned {code}: {body}") - return False - - -def lambda_handler(event, context): - req_id = context.aws_request_id - rec = event['Records'][0]['s3'] - bucket = rec['bucket']['name'] - key = urllib.parse.unquote_plus(rec['object']['key']) - logger.info(f"[{req_id}] Processing {bucket}/{key}") - - # Kopf-Check - head = s3_client.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) - if metadata.get('processed') == 'true': - logger.info(f"[{req_id}] Skipping already processed object") - return {'statusCode': 200, 'body': 'Already processed'} - - size = head['ContentLength'] - if size > MAX_EMAIL_SIZE: - logger.warning(f"[{req_id}] Email too large: {size} bytes") - return {'statusCode': 413} - - # E-Mail Inhalt laden - body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read() - - # 1) Parsen und Loggen von from/to - try: - msg = BytesParser(policy=default).parsebytes(body) - from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else '' - to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))] - logger.info(f"[{req_id}] Parsed email: from={from_addr}, to={to_addrs}") - except Exception as e: - logger.error(f"[{req_id}] Fehler beim Parsen der Email: {e}") - from_addr = '' - to_addrs = [] - - # 2) Komprimieren und Payload bauen - compressed = gzip.compress(body) - payload = { - 's3_bucket': bucket, - 's3_key': key, - 'domain': bucket.replace('-', '.').rsplit('.emails',1)[0], - 'email_content': base64.b64encode(compressed).decode(), - 'compressed': True, - 'etag': head['ETag'].strip('"'), - 'request_id': req_id, - 'original_size': len(body), - 'compressed_size': len(compressed) - } - - # 3) Single API call - try: - success = call_api_once(payload, payload['domain'], req_id) - except Exception as e: - logger.error(f"[{req_id}] API-Call-Exception: {e}") - success = False - - # 4) Handling nach API-Call - if success: - # normal processed - mark_email_processed(bucket, key, 'true', s3_client) - else: - # nur wenn es to_addrs gibt - if to_addrs: - bucket_domain = payload['domain'] - domains = [addr.split('@')[-1] for addr in to_addrs if '@' in addr] - status = 'unknownUser' if bucket_domain in domains else 'unknownDomain' - mark_email_processed(bucket, key, status, s3_client) - else: - logger.info(f"[{req_id}] Keine Empfänger, kein Markieren") - - return {'statusCode': 200, 'body': 'Done'} \ No newline at end of file