import os import boto3 import json import time from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy s3 = boto3.client('s3') sqs = boto3.client('sqs', region_name='us-east-2') # AWS Region AWS_REGION = 'us-east-2' # Metadata Keys PROCESSED_KEY = 'processed' PROCESSED_VALUE = 'true' def domain_to_bucket(domain: str) -> str: """Konvertiert Domain zu S3 Bucket Namen""" return domain.replace('.', '-') + '-emails' def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" return domain.replace('.', '-') + '-queue' def get_queue_url_for_domain(domain: str) -> str: """ Ermittelt SQS Queue URL für Domain Queue Name: domain-mit-bindestrichen-queue """ queue_name = domain_to_queue_name(domain) try: response = sqs.get_queue_url(QueueName=queue_name) queue_url = response['QueueUrl'] print(f"✓ Found queue: {queue_name}") return queue_url except sqs.exceptions.QueueDoesNotExist: raise Exception( f"Queue does not exist: {queue_name} " f"(for domain: {domain})" ) except Exception as e: raise Exception(f"Error getting queue URL for {domain}: {e}") def is_already_processed(bucket: str, key: str) -> bool: """Prüft ob E-Mail bereits verarbeitet wurde""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: processed_at = metadata.get('processed_at', 'unknown') print(f"✓ Already processed at {processed_at}") return True return False except s3.exceptions.NoSuchKey: print(f"⚠ Object {key} not found in {bucket}") return True # Wenn nicht existiert, als verarbeitet betrachten except Exception as e: print(f"⚠ Error checking processed status: {e}") return False def set_processing_lock(bucket: str, key: str) -> bool: """ Setzt Processing Lock um Duplicate Processing zu verhindern Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} # Prüfe auf existierenden Lock processing_started = metadata.get('processing_started') if processing_started: lock_age = time.time() - float(processing_started) if lock_age < 300: # 5 Minuten Lock print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") return False else: print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") # Setze neuen Lock new_meta = metadata.copy() new_meta['processing_started'] = str(int(time.time())) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=new_meta, MetadataDirective='REPLACE' ) print(f"✓ Processing lock set") return True except Exception as e: print(f"⚠ Error setting processing lock: {e}") return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren) def mark_as_queued(bucket: str, key: str, queue_name: str): """Markiert E-Mail als in Queue eingereiht""" try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['queued_at'] = str(int(time.time())) metadata['queued_to'] = queue_name metadata['status'] = 'queued' metadata.pop('processing_started', None) # Lock entfernen s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) print(f"✓ Marked as queued to {queue_name}") except Exception as e: print(f"⚠ Failed to mark as queued: {e}") def send_to_queue(queue_url: str, bucket: str, key: str, from_addr: str, recipient: str, domain: str, subject: str, message_id: str): """ Sendet E-Mail-Job in domain-spezifische SQS Queue """ # Queue Name aus URL extrahieren (für Logging) queue_name = queue_url.split('/')[-1] message = { 'bucket': bucket, 'key': key, 'from': from_addr, 'recipient': recipient, # Nur 1 Empfänger 'domain': domain, 'subject': subject, 'message_id': message_id, 'timestamp': int(time.time()) } try: response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message, ensure_ascii=False), MessageAttributes={ 'domain': { 'StringValue': domain, 'DataType': 'String' }, 'bucket': { 'StringValue': bucket, 'DataType': 'String' }, 'recipient': { 'StringValue': recipient, 'DataType': 'String' }, 'message_id': { 'StringValue': message_id, 'DataType': 'String' } } ) sqs_message_id = response['MessageId'] print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") # Als queued markieren mark_as_queued(bucket, key, queue_name) return sqs_message_id except Exception as e: print(f"✗ Failed to queue message: {e}") raise def lambda_handler(event, context): """ Lambda Handler für SES Events WICHTIG: SES ruft Lambda einmal PRO Empfänger auf! Jedes Event hat genau 1 Empfänger in receipt.recipients """ print(f"{'='*70}") print(f"Lambda invoked: {context.request_id}") print(f"Region: {AWS_REGION}") print(f"{'='*70}") # SES Event parsen try: record = event['Records'][0] ses = record['ses'] except (KeyError, IndexError) as e: print(f"✗ Invalid event structure: {e}") return { 'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'}) } mail = ses['mail'] receipt = ses['receipt'] message_id = mail['messageId'] source = mail['source'] timestamp = mail.get('timestamp', '') # ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event # (NICHT mail.destination verwenden - das hat alle Original-Empfänger) recipients = receipt.get('recipients', []) if not recipients or len(recipients) != 1: print(f"✗ Unexpected recipients count: {len(recipients)}") return { 'statusCode': 400, 'body': json.dumps({ 'error': 'Expected exactly 1 recipient', 'found': len(recipients) }) } # SES garantiert: genau 1 Empfänger pro Event recipient = recipients[0] domain = recipient.split('@')[1] bucket = domain_to_bucket(domain) print(f"\n📧 Email Event:") print(f" MessageId: {message_id}") print(f" From: {source}") print(f" To: {recipient}") print(f" Domain: {domain}") print(f" Bucket: {bucket}") print(f" Timestamp: {timestamp}") # Queue für Domain ermitteln try: queue_url = get_queue_url_for_domain(domain) queue_name = queue_url.split('/')[-1] print(f" Queue: {queue_name}") except Exception as e: print(f"\n✗ ERROR: {e}") return { 'statusCode': 500, 'body': json.dumps({ 'error': 'queue_not_configured', 'domain': domain, 'recipient': recipient, 'message': str(e) }) } # S3 Object finden try: print(f"\n📦 Searching S3...") response = s3.list_objects_v2( Bucket=bucket, Prefix=message_id, MaxKeys=1 ) if 'Contents' not in response or not response['Contents']: raise Exception(f"No S3 object found for message {message_id}") key = response['Contents'][0]['Key'] size = response['Contents'][0]['Size'] print(f" Found: s3://{bucket}/{key}") print(f" Size: {size:,} bytes ({size/1024:.1f} KB)") except Exception as e: print(f"\n✗ S3 ERROR: {e}") return { 'statusCode': 404, 'body': json.dumps({ 'error': 's3_object_not_found', 'message_id': message_id, 'bucket': bucket, 'details': str(e) }) } # Duplicate Check print(f"\n🔍 Checking for duplicates...") if is_already_processed(bucket, key): print(f" Already processed, skipping") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'already_processed', 'message_id': message_id, 'recipient': recipient }) } # Processing Lock setzen print(f"\n🔒 Setting processing lock...") if not set_processing_lock(bucket, key): print(f" Already being processed by another instance") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'already_processing', 'message_id': message_id, 'recipient': recipient }) } # E-Mail laden um Subject zu extrahieren (optional, für besseres Logging) subject = '(unknown)' try: print(f"\n📖 Reading email for metadata...") obj = s3.get_object(Bucket=bucket, Key=key) raw_bytes = obj['Body'].read() # Nur Headers parsen (schneller) parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('subject', '(no subject)') print(f" Subject: {subject}") except Exception as e: print(f" ⚠ Could not parse email (continuing): {e}") # In domain-spezifische Queue einreihen try: print(f"\n📤 Queuing to {queue_name}...") sqs_message_id = send_to_queue( queue_url=queue_url, bucket=bucket, key=key, from_addr=source, recipient=recipient, # Nur 1 Empfänger domain=domain, subject=subject, message_id=message_id ) print(f"\n{'='*70}") print(f"✅ SUCCESS - Email queued for delivery") print(f"{'='*70}\n") return { 'statusCode': 200, 'body': json.dumps({ 'status': 'queued', 'message_id': message_id, 'sqs_message_id': sqs_message_id, 'queue': queue_name, 'domain': domain, 'recipient': recipient, 'subject': subject }) } except Exception as e: print(f"\n{'='*70}") print(f"✗ FAILED TO QUEUE") print(f"{'='*70}") print(f"Error: {e}") return { 'statusCode': 500, 'body': json.dumps({ 'error': 'failed_to_queue', 'message': str(e), 'message_id': message_id, 'recipient': recipient }) }