import os import boto3 import json import time from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy s3 = boto3.client('s3') sqs = boto3.client('sqs', region_name='us-east-2') # AWS Region AWS_REGION = 'us-east-2' # Dynamo Table dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) msg_table = dynamo.Table('ses-outbound-messages') # Metadata Keys PROCESSED_KEY = 'processed' PROCESSED_VALUE = 'true' def is_ses_autoresponse(parsed): from_h = (parsed.get('From') or '').lower() auto_sub = (parsed.get('Auto-Submitted') or '').lower() return ( 'mailer-daemon@us-east-2.amazonses.com' in from_h and 'auto-replied' in auto_sub ) def domain_to_bucket(domain: str) -> str: """Konvertiert Domain zu S3 Bucket Namen""" domain = domain.lower() return domain.replace('.', '-') + '-emails' def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" domain = domain.lower() return domain.replace('.', '-') + '-queue' def get_queue_url_for_domain(domain: str) -> str: """Ermittelt SQS Queue URL für Domain""" queue_name = domain_to_queue_name(domain) try: response = sqs.get_queue_url(QueueName=queue_name) queue_url = response['QueueUrl'] print(f"✓ Found queue: {queue_name}") return queue_url except sqs.exceptions.QueueDoesNotExist: raise Exception( f"Queue does not exist: {queue_name} " f"(for domain: {domain.lower()})" ) except Exception as e: raise Exception(f"Error getting queue URL for {domain.lower()}: {e}") def is_already_processed(bucket: str, key: str) -> bool: """Prüft ob E-Mail bereits verarbeitet wurde""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: processed_at = metadata.get('processed_at', 'unknown') print(f"✓ Already processed at {processed_at}") return True return False except s3.exceptions.NoSuchKey: print(f"⚠ Object {key} not found in {bucket}") return True except Exception as e: print(f"⚠ Error checking processed status: {e}") return False def set_processing_lock(bucket: str, key: str) -> bool: """ Setzt Processing Lock um Duplicate Processing zu verhindern Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} # Prüfe auf existierenden Lock processing_started = metadata.get('processing_started') if processing_started: lock_age = time.time() - float(processing_started) if lock_age < 300: # 5 Minuten Lock print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") return False else: print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") # Setze neuen Lock new_meta = metadata.copy() new_meta['processing_started'] = str(int(time.time())) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"✓ Processing lock set") return True except Exception as e: print(f"⚠ Error setting processing lock: {e}") return True def mark_as_queued(bucket: str, key: str, queue_name: str): """Markiert E-Mail als in Queue eingereiht""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['queued_at'] = str(int(time.time())) metadata['queued_to'] = queue_name metadata['status'] = 'queued' metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) print(f"✓ Marked as queued to {queue_name}") except Exception as e: print(f"⚠ Failed to mark as queued: {e}") def send_to_queue(queue_url: str, bucket: str, key: str, from_addr: str, recipients: list, domain: str, subject: str, message_id: str): """ Sendet E-Mail-Job in domain-spezifische SQS Queue EINE Message mit ALLEN Recipients für diese Domain """ queue_name = queue_url.split('/')[-1] message = { 'bucket': bucket, 'key': key, 'from': from_addr, 'recipients': recipients, # Liste aller Empfänger 'domain': domain, 'subject': subject, 'message_id': message_id, 'timestamp': int(time.time()) } try: response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message, ensure_ascii=False), MessageAttributes={ 'domain': { 'StringValue': domain, 'DataType': 'String' }, 'bucket': { 'StringValue': bucket, 'DataType': 'String' }, 'recipient_count': { 'StringValue': str(len(recipients)), 'DataType': 'Number' }, 'message_id': { 'StringValue': message_id, 'DataType': 'String' } } ) sqs_message_id = response['MessageId'] print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") print(f" Recipients: {len(recipients)} - {', '.join(recipients)}") # Als queued markieren mark_as_queued(bucket, key, queue_name) return sqs_message_id except Exception as e: print(f"✗ Failed to queue message: {e}") raise def lambda_handler(event, context): """ Lambda Handler für SES Events Eine Domain pro Event = eine Queue Message mit allen Recipients """ print(f"{'='*70}") print(f"Lambda invoked: {context.aws_request_id}") print(f"Region: {AWS_REGION}") print(f"{'='*70}") # SES Event parsen try: record = event['Records'][0] ses = record['ses'] except (KeyError, IndexError) as e: print(f"✗ Invalid event structure: {e}") return { 'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'}) } mail = ses['mail'] receipt = ses['receipt'] message_id = mail['messageId'] source = mail['source'] timestamp = mail.get('timestamp', '') recipients = receipt.get('recipients', []) # FRÜHES LOGGING: S3 Key und Recipients print(f"\n🔑 S3 Key: {message_id}") print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}") if not recipients: print(f"✗ No recipients found in event") return { 'statusCode': 400, 'body': json.dumps({ 'error': 'No recipients in event', 'message_id': message_id }) } # Domain extrahieren (alle Recipients haben gleiche Domain!) domain = recipients[0].split('@')[1].lower() bucket = domain_to_bucket(domain) print(f"\n📧 Email Event:") print(f" MessageId: {message_id}") print(f" From: {source}") print(f" Domain: {domain}") print(f" Bucket: {bucket}") print(f" Timestamp: {timestamp}") print(f" Recipients: {len(recipients)}") # Queue für Domain ermitteln try: queue_url = get_queue_url_for_domain(domain) queue_name = queue_url.split('/')[-1] print(f" Queue: {queue_name}") except Exception as e: print(f"\n✗ ERROR: {e}") return { 'statusCode': 500, 'body': json.dumps({ 'error': 'queue_not_configured', 'domain': domain, 'recipients': recipients, 'message': str(e) }) } # S3 Object finden try: print(f"\n📦 Searching S3...") response = s3.list_objects_v2( Bucket=bucket, Prefix=message_id, MaxKeys=1 ) if 'Contents' not in response or not response['Contents']: raise Exception(f"No S3 object found for message {message_id}") key = response['Contents'][0]['Key'] size = response['Contents'][0]['Size'] print(f" Found: s3://{bucket}/{key}") print(f" Size: {size:,} bytes ({size/1024:.1f} KB)") except Exception as e: print(f"\n✗ S3 ERROR: {e}") return { 'statusCode': 404, 'body': json.dumps({ 'error': 's3_object_not_found', 'message_id': message_id, 'bucket': bucket, 'details': str(e) }) } # Duplicate Check print(f"\n🔍 Checking for duplicates...") if is_already_processed(bucket, key): print(f" Already processed, skipping") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'already_processed', 'message_id': message_id, 'recipients': recipients }) } # Processing Lock setzen print(f"\n🔒 Setting processing lock...") if not set_processing_lock(bucket, key): print(f" Already being processed by another instance") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'already_processing', 'message_id': message_id, 'recipients': recipients }) } # E-Mail laden um Subject zu extrahieren subject = '(unknown)' raw_bytes = b'' parsed = None modified = False try: print(f"\n📖 Reading email for metadata...") obj = s3.get_object(Bucket=bucket, Key=key) raw_bytes = obj['Body'].read() metadata = obj.get('Metadata', {}) or {} # Header parsen parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('subject', '(no subject)') print(f" Subject: {subject}") # 🔁 SES Auto-Response erkennen if is_ses_autoresponse(parsed): print(" Detected SES auto-response (out-of-office)") # Message-ID der ursprünglichen Mail aus In-Reply-To / References holen in_reply_to = (parsed.get('In-Reply-To') or '').strip() if not in_reply_to: refs = (parsed.get('References') or '').strip() # nimm die erste ID aus References in_reply_to = refs.split()[0] if refs else '' lookup_id = '' if in_reply_to.startswith('<') and '>' in in_reply_to: lookup_id = in_reply_to[1:in_reply_to.find('>')] else: lookup_id = in_reply_to original = None if lookup_id: try: res = msg_table.get_item(Key={'MessageId': lookup_id}) original = res.get('Item') print(f" Dynamo lookup for {lookup_id}: {'hit' if original else 'miss'}") except Exception as e: print(f"⚠ Dynamo lookup failed: {e}") if original: orig_from = original.get('source', '') destinations = original.get('destinations', []) or [] # einfache Variante: nimm den ersten Empfänger orig_to = destinations[0] if destinations else '' # Domain hast du oben bereits aus recipients[0] extrahiert display = f"Out of Office from {orig_to}" if orig_to else "Out of Office" # ursprüngliche Infos sichern parsed['X-SES-Original-From'] = parsed.get('From', '') parsed['X-SES-Original-Recipient'] = orig_to # From für den User "freundlich" machen parsed.replace_header('From', f'"{display}" ') # Antworten trotzdem an den Absender deiner ursprünglichen Mail if orig_from: parsed['Reply-To'] = orig_from subj = parsed.get('Subject', 'out of office') if not subj.lower().startswith('out of office'): parsed.replace_header('Subject', f"Out of office: {subj}") # geänderte Mail zurück in Bytes raw_bytes = parsed.as_bytes() modified = True print(" Auto-response rewritten for delivery to user inbox") else: print(" No original send record found for auto-response") # Wenn wir die Mail verändert haben, aktualisieren wir das S3-Objekt if modified: s3.put_object( Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata ) print(" Updated S3 object with rewritten auto-response") except Exception as e: print(f" ⚠ Could not parse email (continuing): {e}") # In Queue einreihen (EINE Message mit ALLEN Recipients) try: print(f"\n📤 Queuing to {queue_name}...") sqs_message_id = send_to_queue( queue_url=queue_url, bucket=bucket, key=key, from_addr=source, recipients=recipients, # ALLE Recipients domain=domain, subject=subject, message_id=message_id ) print(f"\n{'='*70}") print(f"✅ SUCCESS - Email queued for delivery") print(f"{'='*70}\n") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'queued', 'message_id': message_id, 'sqs_message_id': sqs_message_id, 'queue': queue_name, 'domain': domain, 'recipients': recipients, 'recipient_count': len(recipients), 'subject': subject }) } except Exception as e: print(f"\n{'='*70}") print(f"✗ FAILED TO QUEUE") print(f"{'='*70}") print(f"Error: {e}") return { 'statusCode': 500, 'body': json.dumps({ 'error': 'failed_to_queue', 'message': str(e), 'message_id': message_id, 'recipients': recipients }) }