diff --git a/DMS/docker-compose.yml b/DMS/docker-compose.yml index 4f5e870..3101df5 100644 --- a/DMS/docker-compose.yml +++ b/DMS/docker-compose.yml @@ -27,8 +27,7 @@ services: - ENABLE_OPENDMARC=0 - ENABLE_POLICYD_SPF=0 - ENABLE_AMAVIS=0 - - ENABLE_SPAMASSASSIN=0 - - ENABLE_POSTGREY=0 + - ENABLE_SPAMASSASSIN=0 - ENABLE_POSTGREY=0 - RSPAMD_GREYLISTING=0 - ENABLE_CLAMAV=0 #- ENABLE_FAIL2BAN=1 @@ -57,12 +56,7 @@ services: - POSTFIX_MAILBOX_SIZE_LIMIT=0 - POSTFIX_MESSAGE_SIZE_LIMIT=0 - SPOOF_PROTECTION=0 - - ENABLE_SRS=1 - - SRS_EXCLUDE_DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net,hotshpotshgallery.com - - SRS_SENDER_CLASSES=envelope_sender - - SRS_SECRET=EBk/ndWRA2s8ZMQFIXq0mJnS6SRbgoj77wv00PZNpNw= - - SRS_DOMAINNAME=email-srvr.com - #- SRS_DOMAINNAME=bayarea-cc.com + - ENABLE_SRS=0 # Debug-Einstellungen - LOG_LEVEL=info cap_add: @@ -118,4 +112,4 @@ services: networks: mail_network: - external: true \ No newline at end of file + external: true \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py index 872792e..3eb945f 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -1,419 +1,123 @@ +import json import os import boto3 -import json +import uuid +import logging +from datetime import datetime +from botocore.exceptions import ClientError import time -from email.parser import BytesParser -from email.policy import SMTP as SMTPPolicy +import random -s3 = boto3.client('s3') -sqs = boto3.client('sqs', region_name='us-east-2') +# Logging konfigurieren +logger = logging.getLogger() +logger.setLevel(logging.INFO) -# AWS Region -AWS_REGION = 'us-east-2' +sqs = boto3.client('sqs') -# Dynamo Table -dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) -msg_table = dynamo.Table('ses-outbound-messages') +# Retry-Konfiguration +MAX_RETRIES = 3 +BASE_BACKOFF = 1 # Sekunden -# Metadata Keys -PROCESSED_KEY = 'processed' -PROCESSED_VALUE = 'true' +def exponential_backoff(attempt): + """Exponential Backoff mit Jitter""" + return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1) -def is_ses_bounce_or_autoreply(parsed): - """Erkennt SES Bounces und Auto-Replies""" - from_h = (parsed.get('From') or '').lower() - auto_sub = (parsed.get('Auto-Submitted') or '').lower() - - # SES MAILER-DAEMON oder Auto-Submitted Header - is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h - is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub - - return is_mailer_daemon or is_auto_replied - - -def extract_original_message_id(parsed): - """Extrahiert die ursprüngliche Message-ID aus In-Reply-To oder References""" - # Versuche In-Reply-To - in_reply_to = (parsed.get('In-Reply-To') or '').strip() - if in_reply_to: - msg_id = in_reply_to - if msg_id.startswith('<') and '>' in msg_id: - msg_id = msg_id[1:msg_id.find('>')] - - # ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden - if '@' in msg_id: - msg_id = msg_id.split('@')[0] - - return msg_id - - # Fallback: References Header (nimm die ERSTE ID) - refs = (parsed.get('References') or '').strip() - if refs: - first_ref = refs.split()[0] - if first_ref.startswith('<') and '>' in first_ref: - first_ref = first_ref[1:first_ref.find('>')] - - # ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden - if '@' in first_ref: - first_ref = first_ref.split('@')[0] - - return first_ref - - return None - - -def domain_to_bucket(domain: str) -> str: - """Konvertiert Domain zu S3 Bucket Namen""" - domain = domain.lower() - return domain.replace('.', '-') + '-emails' - - -def domain_to_queue_name(domain: str) -> str: - """Konvertiert Domain zu SQS Queue Namen""" - domain = domain.lower() - return domain.replace('.', '-') + '-queue' - - -def get_queue_url_for_domain(domain: str) -> str: - """Ermittelt SQS Queue URL für Domain""" - queue_name = domain_to_queue_name(domain) - +def get_queue_url(domain): + """ + Generiert Queue-Namen aus Domain und holt URL. + Konvention: domain.tld -> domain-tld-queue + """ + queue_name = domain.replace('.', '-') + '-queue' try: response = sqs.get_queue_url(QueueName=queue_name) - queue_url = response['QueueUrl'] - print(f"✓ Found queue: {queue_name}") - return queue_url - - except sqs.exceptions.QueueDoesNotExist: - raise Exception( - f"Queue does not exist: {queue_name} " - f"(for domain: {domain.lower()})" - ) - except Exception as e: - raise Exception(f"Error getting queue URL for {domain.lower()}: {e}") - - -def is_already_processed(bucket: str, key: str) -> bool: - """Prüft ob E-Mail bereits verarbeitet wurde""" - try: - head = s3.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) or {} - - if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: - processed_at = metadata.get('processed_at', 'unknown') - print(f"✓ Already processed at {processed_at}") - return True - - return False - - except s3.exceptions.NoSuchKey: - print(f"⚠ Object {key} not found in {bucket}") - return True - - except Exception as e: - print(f"⚠ Error checking processed status: {e}") - return False - - -def set_processing_lock(bucket: str, key: str) -> bool: - """ - Setzt Processing Lock um Duplicate Processing zu verhindern - Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked - """ - try: - head = s3.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) or {} - - # Prüfe auf existierenden Lock - processing_started = metadata.get('processing_started') - if processing_started: - lock_age = time.time() - float(processing_started) - - if lock_age < 300: # 5 Minuten Lock - print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") - return False - else: - print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") - - # Setze neuen Lock - new_meta = metadata.copy() - new_meta['processing_started'] = str(int(time.time())) - - s3.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata=new_meta, - MetadataDirective='REPLACE' - ) - - print(f"✓ Processing lock set") - return True - - except Exception as e: - print(f"⚠ Error setting processing lock: {e}") - return True - - -def mark_as_queued(bucket: str, key: str, queue_name: str): - """Markiert E-Mail als in Queue eingereiht""" - try: - head = s3.head_object(Bucket=bucket, Key=key) - metadata = head.get('Metadata', {}) or {} - - metadata['queued_at'] = str(int(time.time())) - metadata['queued_to'] = queue_name - metadata['status'] = 'queued' - metadata.pop('processing_started', None) - - s3.copy_object( - Bucket=bucket, - Key=key, - CopySource={'Bucket': bucket, 'Key': key}, - Metadata=metadata, - MetadataDirective='REPLACE' - ) - - print(f"✓ Marked as queued to {queue_name}") - - except Exception as e: - print(f"⚠ Failed to mark as queued: {e}") - - -def send_to_queue(queue_url: str, bucket: str, key: str, - from_addr: str, recipients: list, domain: str, - subject: str, message_id: str): - """ - Sendet E-Mail-Job in domain-spezifische SQS Queue - EINE Message mit ALLEN Recipients für diese Domain - """ - - queue_name = queue_url.split('/')[-1] - - message = { - 'bucket': bucket, - 'key': key, - 'from': from_addr, - 'recipients': recipients, - 'domain': domain, - 'subject': subject, - 'message_id': message_id, - 'timestamp': int(time.time()) - } - - try: - response = sqs.send_message( - QueueUrl=queue_url, - MessageBody=json.dumps(message, ensure_ascii=False), - MessageAttributes={ - 'domain': { - 'StringValue': domain, - 'DataType': 'String' - }, - 'bucket': { - 'StringValue': bucket, - 'DataType': 'String' - }, - 'recipient_count': { - 'StringValue': str(len(recipients)), - 'DataType': 'Number' - }, - 'message_id': { - 'StringValue': message_id, - 'DataType': 'String' - } - } - ) - - sqs_message_id = response['MessageId'] - print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") - print(f" Recipients: {len(recipients)} - {', '.join(recipients)}") - - mark_as_queued(bucket, key, queue_name) - - return sqs_message_id - - except Exception as e: - print(f"✗ Failed to queue message: {e}") - raise - + return response['QueueUrl'] + except ClientError as e: + if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': + logger.error(f"Queue nicht gefunden für Domain: {domain}") + raise ValueError(f"Keine Queue für Domain {domain}") + else: + raise def lambda_handler(event, context): """ - Lambda Handler für SES Inbound Events + Nimmt SES Event entgegen, extrahiert Domain dynamisch, + verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS. + Mit integrierter Retry-Logik für SQS-Send. """ - - print(f"{'='*70}") - print(f"Lambda invoked: {context.aws_request_id}") - print(f"Region: {AWS_REGION}") - print(f"{'='*70}") - - # SES Event parsen try: - record = event['Records'][0] - ses = record['ses'] - except (KeyError, IndexError) as e: - print(f"✗ Invalid event structure: {e}") - return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'})} - - mail = ses['mail'] - receipt = ses['receipt'] - - message_id = mail['messageId'] - source = mail['source'] - timestamp = mail.get('timestamp', '') - recipients = receipt.get('recipients', []) - - print(f"\n🔑 S3 Key: {message_id}") - print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}") - - if not recipients: - print(f"✗ No recipients found in event") - return {'statusCode': 400, 'body': json.dumps({'error': 'No recipients'})} - - # Domain extrahieren - domain = recipients[0].split('@')[1].lower() - bucket = domain_to_bucket(domain) - - print(f"\n📧 Email Event:") - print(f" MessageId: {message_id}") - print(f" From: {source}") - print(f" Domain: {domain}") - print(f" Bucket: {bucket}") - - # Queue ermitteln - try: - queue_url = get_queue_url_for_domain(domain) - queue_name = queue_url.split('/')[-1] - except Exception as e: - print(f"\n✗ Queue ERROR: {e}") - return {'statusCode': 500, 'body': json.dumps({'error': str(e)})} - - # S3 Object finden - try: - response = s3.list_objects_v2(Bucket=bucket, Prefix=message_id, MaxKeys=1) + records = event.get('Records', []) + logger.info(f"Received event with {len(records)} records.") - if 'Contents' not in response: - raise Exception(f"No S3 object found for {message_id}") - - key = response['Contents'][0]['Key'] - size = response['Contents'][0]['Size'] - print(f" Found: s3://{bucket}/{key} ({size/1024:.1f} KB)") - - except Exception as e: - print(f"\n✗ S3 ERROR: {e}") - return {'statusCode': 404, 'body': json.dumps({'error': str(e)})} - - # Duplicate Check - if is_already_processed(bucket, key): - return {'statusCode': 200, 'body': json.dumps({'status': 'already_processed'})} - - # Processing Lock - if not set_processing_lock(bucket, key): - return {'statusCode': 200, 'body': json.dumps({'status': 'already_processing'})} - - # E-Mail laden und ggf. umschreiben - subject = '(unknown)' - modified = False - - try: - print(f"\n📖 Reading email...") - obj = s3.get_object(Bucket=bucket, Key=key) - raw_bytes = obj['Body'].read() - metadata = obj.get('Metadata', {}) or {} - - parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) - subject = parsed.get('Subject', '(no subject)') - print(f" Subject: {subject}") - - # 🔁 Auto-Response / Bounce Detection - if is_ses_bounce_or_autoreply(parsed): - print(f" 🔍 Detected auto-response/bounce from SES") + for record in records: + ses_data = record.get('ses', {}) + if not ses_data: + logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}") + continue - # Extrahiere ursprüngliche Message-ID - original_msg_id = extract_original_message_id(parsed) + mail = ses_data.get('mail', {}) + receipt = ses_data.get('receipt', {}) - if original_msg_id: - print(f" 📋 Original MessageId: {original_msg_id}") - + # Domain extrahieren (aus erstem Recipient) + recipients = receipt.get('recipients', []) or mail.get('destination', []) + if not recipients: + logger.warning("No recipients in event - skipping") + continue + + first_recipient = recipients[0] + domain = first_recipient.split('@')[-1].lower() + if not domain: + logger.error("Could not extract domain from recipient") + continue + + # Wichtige Metadaten loggen + msg_id = mail.get('messageId', 'unknown') + source = mail.get('source', 'unknown') + logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}") + logger.info(f" From: {source}") + logger.info(f" To: {recipients}") + + # SES JSON als String serialisieren + ses_json_string = json.dumps(ses_data) + + # Payload Größe loggen und checken (Safeguard) + payload_size = len(ses_json_string.encode('utf-8')) + logger.info(f" Metadata Payload Size: {payload_size} bytes") + if payload_size > 200000: # Arbitrary Limit < SQS 256KB + raise ValueError("Payload too large for SQS") + + # Fake SNS Payload + fake_sns_payload = { + "Type": "Notification", + "MessageId": str(uuid.uuid4()), + "TopicArn": "arn:aws:sns:ses-shim:global-topic", + "Subject": "Amazon SES Email Receipt Notification", + "Message": ses_json_string, + "Timestamp": datetime.utcnow().isoformat() + "Z" + } + + # Queue URL dynamisch holen + queue_url = get_queue_url(domain) + + # SQS Send mit Retries + attempt = 0 + while attempt < MAX_RETRIES: try: - # Hole Original-Send aus DynamoDB - result = msg_table.get_item(Key={'MessageId': original_msg_id}) - original_send = result.get('Item') - - if original_send: - orig_source = original_send.get('source', '') - orig_destinations = original_send.get('destinations', []) - - print(f" ✓ Found original send:") - print(f" Original From: {orig_source}") - print(f" Original To: {orig_destinations}") - - # **WICHTIG**: Der erste Empfänger war der eigentliche Empfänger - original_recipient = orig_destinations[0] if orig_destinations else '' - - if original_recipient: - # Absender umschreiben auf ursprünglichen Empfänger - original_from = parsed.get('From', '') - parsed['X-Original-SES-From'] = original_from - parsed['X-Original-MessageId'] = original_msg_id - - # **From auf den ursprünglichen Empfänger setzen** - parsed.replace_header('From', original_recipient) - - # Reply-To optional beibehalten - if not parsed.get('Reply-To'): - parsed['Reply-To'] = original_recipient - - # Subject anpassen falls nötig - if 'delivery status notification' in subject.lower(): - parsed.replace_header('Subject', f"Delivery Status: {orig_destinations[0]}") - - raw_bytes = parsed.as_bytes() - modified = True - - print(f" ✅ Rewritten: From={original_recipient}") - else: - print(f" ⚠ No DynamoDB record found for {original_msg_id}") - - except Exception as e: - print(f" ⚠ DynamoDB lookup failed: {e}") - else: - print(f" ⚠ Could not extract original Message-ID") + sqs.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps(fake_sns_payload) + ) + logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}") + break + except ClientError as e: + attempt += 1 + error_code = e.response['Error']['Code'] + logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}") + if attempt == MAX_RETRIES: + raise + time.sleep(exponential_backoff(attempt)) - # S3 aktualisieren falls modified - if modified: - s3.put_object(Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata) - print(f" 💾 Updated S3 object with rewritten email") - - except Exception as e: - print(f" ⚠ Email parsing error: {e}") + return {'status': 'ok'} - # In Queue einreihen - try: - sqs_message_id = send_to_queue( - queue_url=queue_url, - bucket=bucket, - key=key, - from_addr=source, - recipients=recipients, - domain=domain, - subject=subject, - message_id=message_id - ) - - print(f"\n✅ SUCCESS - Queued for delivery\n") - - return { - 'statusCode': 200, - 'body': json.dumps({ - 'status': 'queued', - 'message_id': message_id, - 'sqs_message_id': sqs_message_id, - 'modified': modified - }) - } - except Exception as e: - print(f"\n✗ QUEUE FAILED: {e}") - return {'statusCode': 500, 'body': json.dumps({'error': str(e)})} \ No newline at end of file + logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True) + raise e \ No newline at end of file diff --git a/worker.py b/worker.py index 3988171..654e766 100755 --- a/worker.py +++ b/worker.py @@ -35,12 +35,15 @@ SMTP_PASS = os.environ.get('SMTP_PASS') shutdown_requested = False # DynamoDB Ressource für Bounce-Lookup +# DynamoDB Ressource für Bounce-Lookup und Rules try: dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) msg_table = dynamo.Table('ses-outbound-messages') + rules_table = dynamo.Table('email-rules') # Neu: Für OOO/Forwards except Exception as e: log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING') msg_table = None + rules_table = None def get_bucket_name(domain): """Konvention: domain.tld -> domain-tld-emails""" @@ -511,7 +514,66 @@ def process_message(message_body: dict, receive_count: int) -> bool: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') from_addr_final = from_addr - # 5. SMTP VERSAND (Loop über Recipients) + # 5. OOO & FORWARD LOGIC (neu, vor SMTP-Versand) + if rules_table and not is_ses_bounce_or_autoreply(parsed): # Vermeide Loops bei Bounces/Auto-Replies + for recipient in recipients: + try: + rule = rules_table.get_item(Key={'email_address': recipient}).get('Item', {}) + + # OOO handling + if rule.get('ooo_active', False): + ooo_msg = rule.get('ooo_message', 'Default OOO message.') + content_type = rule.get('ooo_content_type', 'text') # Default: text + sender = parsed.get('From') # Original-Sender + + reply_subject = f"Out of Office: {subject}" + original_body = str(parsed.get_payload(decode=True)) # Original für Quote + + if content_type == 'html': + reply_body = {'Html': {'Data': f"
{ooo_msg}
Original Message:"}} + else: + reply_body = {'Text': {'Data': f"{ooo_msg}\n\nOriginal Message:\nSubject: {parsed.get('Subject')}\nFrom: {sender}\n\n{original_body}"}} + + ses.send_email( + Source=recipient, # Verifizierte eigene Adresse + Destination={'ToAddresses': [sender]}, + Message={ + 'Subject': {'Data': reply_subject}, + 'Body': reply_body # Dynamisch Text oder Html + }, + ReplyToAddresses=[recipient] # Optional: Für Replies + ) + log(f"✓ Sent OOO reply to {sender} from {recipient}") + + # Forward handling + forwards = rule.get('forwards', []) + if forwards: + original_from = parsed.get('From') # Für Headers + fwd_subject = f"FWD: {subject}" + fwd_body_text = f"Forwarded from: {original_from}\n\n{original_body}" + fwd_body = {'Text': {'Data': fwd_body_text}} # Erweiterbar auf HTML + + for forward_to in forwards: + ses.send_email( + Source=recipient, # Verifizierte eigene Adresse + Destination={'ToAddresses': [forward_to]}, + Message={ + 'Subject': {'Data': fwd_subject}, + 'Body': fwd_body + }, + ReplyToAddresses=[original_from] # Original-Sender für Replies + ) + log(f"✓ Forwarded to {forward_to} from {recipient} (original: {original_from})") + + except boto3.exceptions.ClientError as e: + if 'MessageRejected' in str(e): + log(f"⚠ SES rejected: {e}. Check verification.", 'ERROR') + else: + log(f"⚠ Rules error for {recipient}: {e}", 'WARNING') + except Exception as e: + log(f"⚠ General error: {e}", 'WARNING') + + # 6. SMTP VERSAND (Loop über Recipients) log(f"📤 Sending to {len(recipients)} recipient(s)...") successful = []
Subject: {parsed.get('Subject')}
From: {sender}
{original_body}