import os import boto3 import json import time from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy s3 = boto3.client('s3') sqs = boto3.client('sqs', region_name='us-east-2') # AWS Region AWS_REGION = 'us-east-2' # Dynamo Table dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) msg_table = dynamo.Table('ses-outbound-messages') # Metadata Keys PROCESSED_KEY = 'processed' PROCESSED_VALUE = 'true' def is_ses_bounce_or_autoreply(parsed): """Erkennt SES Bounces und Auto-Replies""" from_h = (parsed.get('From') or '').lower() auto_sub = (parsed.get('Auto-Submitted') or '').lower() # SES MAILER-DAEMON oder Auto-Submitted Header is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub return is_mailer_daemon or is_auto_replied def extract_original_message_id(parsed): """Extrahiert die ursprüngliche Message-ID aus In-Reply-To oder References""" # Versuche In-Reply-To in_reply_to = (parsed.get('In-Reply-To') or '').strip() if in_reply_to: msg_id = in_reply_to if msg_id.startswith('<') and '>' in msg_id: msg_id = msg_id[1:msg_id.find('>')] # ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden if '@' in msg_id: msg_id = msg_id.split('@')[0] return msg_id # Fallback: References Header (nimm die ERSTE ID) refs = (parsed.get('References') or '').strip() if refs: first_ref = refs.split()[0] if first_ref.startswith('<') and '>' in first_ref: first_ref = first_ref[1:first_ref.find('>')] # ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden if '@' in first_ref: first_ref = first_ref.split('@')[0] return first_ref return None def domain_to_bucket(domain: str) -> str: """Konvertiert Domain zu S3 Bucket Namen""" domain = domain.lower() return domain.replace('.', '-') + '-emails' def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" domain = domain.lower() return domain.replace('.', '-') + '-queue' def get_queue_url_for_domain(domain: str) -> str: """Ermittelt SQS Queue URL für Domain""" queue_name = domain_to_queue_name(domain) try: response = sqs.get_queue_url(QueueName=queue_name) queue_url = response['QueueUrl'] print(f"✓ Found queue: {queue_name}") return queue_url except sqs.exceptions.QueueDoesNotExist: raise Exception( f"Queue does not exist: {queue_name} " f"(for domain: {domain.lower()})" ) except Exception as e: raise Exception(f"Error getting queue URL for {domain.lower()}: {e}") def is_already_processed(bucket: str, key: str) -> bool: """Prüft ob E-Mail bereits verarbeitet wurde""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: processed_at = metadata.get('processed_at', 'unknown') print(f"✓ Already processed at {processed_at}") return True return False except s3.exceptions.NoSuchKey: print(f"⚠ Object {key} not found in {bucket}") return True except Exception as e: print(f"⚠ Error checking processed status: {e}") return False def set_processing_lock(bucket: str, key: str) -> bool: """ Setzt Processing Lock um Duplicate Processing zu verhindern Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} # Prüfe auf existierenden Lock processing_started = metadata.get('processing_started') if processing_started: lock_age = time.time() - float(processing_started) if lock_age < 300: # 5 Minuten Lock print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") return False else: print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") # Setze neuen Lock new_meta = metadata.copy() new_meta['processing_started'] = str(int(time.time())) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"✓ Processing lock set") return True except Exception as e: print(f"⚠ Error setting processing lock: {e}") return True def mark_as_queued(bucket: str, key: str, queue_name: str): """Markiert E-Mail als in Queue eingereiht""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['queued_at'] = str(int(time.time())) metadata['queued_to'] = queue_name metadata['status'] = 'queued' metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) print(f"✓ Marked as queued to {queue_name}") except Exception as e: print(f"⚠ Failed to mark as queued: {e}") def send_to_queue(queue_url: str, bucket: str, key: str, from_addr: str, recipients: list, domain: str, subject: str, message_id: str): """ Sendet E-Mail-Job in domain-spezifische SQS Queue EINE Message mit ALLEN Recipients für diese Domain """ queue_name = queue_url.split('/')[-1] message = { 'bucket': bucket, 'key': key, 'from': from_addr, 'recipients': recipients, 'domain': domain, 'subject': subject, 'message_id': message_id, 'timestamp': int(time.time()) } try: response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message, ensure_ascii=False), MessageAttributes={ 'domain': { 'StringValue': domain, 'DataType': 'String' }, 'bucket': { 'StringValue': bucket, 'DataType': 'String' }, 'recipient_count': { 'StringValue': str(len(recipients)), 'DataType': 'Number' }, 'message_id': { 'StringValue': message_id, 'DataType': 'String' } } ) sqs_message_id = response['MessageId'] print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") print(f" Recipients: {len(recipients)} - {', '.join(recipients)}") mark_as_queued(bucket, key, queue_name) return sqs_message_id except Exception as e: print(f"✗ Failed to queue message: {e}") raise def lambda_handler(event, context): """ Lambda Handler für SES Inbound Events """ print(f"{'='*70}") print(f"Lambda invoked: {context.aws_request_id}") print(f"Region: {AWS_REGION}") print(f"{'='*70}") # SES Event parsen try: record = event['Records'][0] ses = record['ses'] except (KeyError, IndexError) as e: print(f"✗ Invalid event structure: {e}") return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'})} mail = ses['mail'] receipt = ses['receipt'] message_id = mail['messageId'] source = mail['source'] timestamp = mail.get('timestamp', '') recipients = receipt.get('recipients', []) print(f"\n🔑 S3 Key: {message_id}") print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}") if not recipients: print(f"✗ No recipients found in event") return {'statusCode': 400, 'body': json.dumps({'error': 'No recipients'})} # Domain extrahieren domain = recipients[0].split('@')[1].lower() bucket = domain_to_bucket(domain) print(f"\n📧 Email Event:") print(f" MessageId: {message_id}") print(f" From: {source}") print(f" Domain: {domain}") print(f" Bucket: {bucket}") # Queue ermitteln try: queue_url = get_queue_url_for_domain(domain) queue_name = queue_url.split('/')[-1] except Exception as e: print(f"\n✗ Queue ERROR: {e}") return {'statusCode': 500, 'body': json.dumps({'error': str(e)})} # S3 Object finden try: response = s3.list_objects_v2(Bucket=bucket, Prefix=message_id, MaxKeys=1) if 'Contents' not in response: raise Exception(f"No S3 object found for {message_id}") key = response['Contents'][0]['Key'] size = response['Contents'][0]['Size'] print(f" Found: s3://{bucket}/{key} ({size/1024:.1f} KB)") except Exception as e: print(f"\n✗ S3 ERROR: {e}") return {'statusCode': 404, 'body': json.dumps({'error': str(e)})} # Duplicate Check if is_already_processed(bucket, key): return {'statusCode': 200, 'body': json.dumps({'status': 'already_processed'})} # Processing Lock if not set_processing_lock(bucket, key): return {'statusCode': 200, 'body': json.dumps({'status': 'already_processing'})} # E-Mail laden und ggf. umschreiben subject = '(unknown)' modified = False try: print(f"\n📖 Reading email...") obj = s3.get_object(Bucket=bucket, Key=key) raw_bytes = obj['Body'].read() metadata = obj.get('Metadata', {}) or {} parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') print(f" Subject: {subject}") # 🔁 Auto-Response / Bounce Detection if is_ses_bounce_or_autoreply(parsed): print(f" 🔍 Detected auto-response/bounce from SES") # Extrahiere ursprüngliche Message-ID original_msg_id = extract_original_message_id(parsed) if original_msg_id: print(f" 📋 Original MessageId: {original_msg_id}") try: # Hole Original-Send aus DynamoDB result = msg_table.get_item(Key={'MessageId': original_msg_id}) original_send = result.get('Item') if original_send: orig_source = original_send.get('source', '') orig_destinations = original_send.get('destinations', []) print(f" ✓ Found original send:") print(f" Original From: {orig_source}") print(f" Original To: {orig_destinations}") # **WICHTIG**: Der erste Empfänger war der eigentliche Empfänger original_recipient = orig_destinations[0] if orig_destinations else '' if original_recipient: # Absender umschreiben auf ursprünglichen Empfänger original_from = parsed.get('From', '') parsed['X-Original-SES-From'] = original_from parsed['X-Original-MessageId'] = original_msg_id # **From auf den ursprünglichen Empfänger setzen** parsed.replace_header('From', original_recipient) # Reply-To optional beibehalten if not parsed.get('Reply-To'): parsed['Reply-To'] = original_recipient # Subject anpassen falls nötig if 'delivery status notification' in subject.lower(): parsed.replace_header('Subject', f"Delivery Status: {orig_destinations[0]}") raw_bytes = parsed.as_bytes() modified = True print(f" ✅ Rewritten: From={original_recipient}") else: print(f" ⚠ No DynamoDB record found for {original_msg_id}") except Exception as e: print(f" ⚠ DynamoDB lookup failed: {e}") else: print(f" ⚠ Could not extract original Message-ID") # S3 aktualisieren falls modified if modified: s3.put_object(Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata) print(f" 💾 Updated S3 object with rewritten email") except Exception as e: print(f" ⚠ Email parsing error: {e}") # In Queue einreihen try: sqs_message_id = send_to_queue( queue_url=queue_url, bucket=bucket, key=key, from_addr=source, recipients=recipients, domain=domain, subject=subject, message_id=message_id ) print(f"\n✅ SUCCESS - Queued for delivery\n") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'queued', 'message_id': message_id, 'sqs_message_id': sqs_message_id, 'modified': modified }) } except Exception as e: print(f"\n✗ QUEUE FAILED: {e}") return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}