From a3e86add4904452785d78786475d96d4aee4c8a1 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Fri, 14 Nov 2025 13:39:04 -0600 Subject: [PATCH] aktueller Code --- lambda_function.py | 72 +++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/lambda_function.py b/lambda_function.py index 6cae4de..64ffc63 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -18,19 +18,18 @@ PROCESSED_VALUE = 'true' def domain_to_bucket(domain: str) -> str: """Konvertiert Domain zu S3 Bucket Namen""" + domain = domain.lower() return domain.replace('.', '-') + '-emails' def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" + domain = domain.lower() return domain.replace('.', '-') + '-queue' def get_queue_url_for_domain(domain: str) -> str: - """ - Ermittelt SQS Queue URL für Domain - Queue Name: domain-mit-bindestrichen-queue - """ + """Ermittelt SQS Queue URL für Domain""" queue_name = domain_to_queue_name(domain) try: @@ -42,10 +41,10 @@ def get_queue_url_for_domain(domain: str) -> str: except sqs.exceptions.QueueDoesNotExist: raise Exception( f"Queue does not exist: {queue_name} " - f"(for domain: {domain})" + f"(for domain: {domain.lower()})" ) except Exception as e: - raise Exception(f"Error getting queue URL for {domain}: {e}") + raise Exception(f"Error getting queue URL for {domain.lower()}: {e}") def is_already_processed(bucket: str, key: str) -> bool: @@ -63,7 +62,7 @@ def is_already_processed(bucket: str, key: str) -> bool: except s3.exceptions.NoSuchKey: print(f"⚠ Object {key} not found in {bucket}") - return True # Wenn nicht existiert, als verarbeitet betrachten + return True except Exception as e: print(f"⚠ Error checking processed status: {e}") @@ -107,7 +106,7 @@ def set_processing_lock(bucket: str, key: str) -> bool: except Exception as e: print(f"⚠ Error setting processing lock: {e}") - return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren) + return True def mark_as_queued(bucket: str, key: str, queue_name: str): @@ -119,7 +118,7 @@ def mark_as_queued(bucket: str, key: str, queue_name: str): metadata['queued_at'] = str(int(time.time())) metadata['queued_to'] = queue_name metadata['status'] = 'queued' - metadata.pop('processing_started', None) # Lock entfernen + metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, @@ -136,20 +135,20 @@ def mark_as_queued(bucket: str, key: str, queue_name: str): def send_to_queue(queue_url: str, bucket: str, key: str, - from_addr: str, recipient: str, domain: str, + from_addr: str, recipients: list, domain: str, subject: str, message_id: str): """ Sendet E-Mail-Job in domain-spezifische SQS Queue + EINE Message mit ALLEN Recipients für diese Domain """ - # Queue Name aus URL extrahieren (für Logging) queue_name = queue_url.split('/')[-1] message = { 'bucket': bucket, 'key': key, 'from': from_addr, - 'recipient': recipient, # Nur 1 Empfänger + 'recipients': recipients, # Liste aller Empfänger 'domain': domain, 'subject': subject, 'message_id': message_id, @@ -169,9 +168,9 @@ def send_to_queue(queue_url: str, bucket: str, key: str, 'StringValue': bucket, 'DataType': 'String' }, - 'recipient': { - 'StringValue': recipient, - 'DataType': 'String' + 'recipient_count': { + 'StringValue': str(len(recipients)), + 'DataType': 'Number' }, 'message_id': { 'StringValue': message_id, @@ -182,6 +181,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str, sqs_message_id = response['MessageId'] print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") + print(f" Recipients: {len(recipients)} - {', '.join(recipients)}") # Als queued markieren mark_as_queued(bucket, key, queue_name) @@ -196,8 +196,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str, def lambda_handler(event, context): """ Lambda Handler für SES Events - WICHTIG: SES ruft Lambda einmal PRO Empfänger auf! - Jedes Event hat genau 1 Empfänger in receipt.recipients + Eine Domain pro Event = eine Queue Message mit allen Recipients """ print(f"{'='*70}") @@ -222,33 +221,33 @@ def lambda_handler(event, context): message_id = mail['messageId'] source = mail['source'] timestamp = mail.get('timestamp', '') - - # ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event - # (NICHT mail.destination verwenden - das hat alle Original-Empfänger) recipients = receipt.get('recipients', []) - if not recipients or len(recipients) != 1: - print(f"✗ Unexpected recipients count: {len(recipients)}") + # FRÜHES LOGGING: S3 Key und Recipients + print(f"\n🔑 S3 Key: {message_id}") + print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}") + + if not recipients: + print(f"✗ No recipients found in event") return { 'statusCode': 400, 'body': json.dumps({ - 'error': 'Expected exactly 1 recipient', - 'found': len(recipients) + 'error': 'No recipients in event', + 'message_id': message_id }) } - # SES garantiert: genau 1 Empfänger pro Event - recipient = recipients[0] - domain = recipient.split('@')[1] + # Domain extrahieren (alle Recipients haben gleiche Domain!) + domain = recipients[0].split('@')[1].lower() bucket = domain_to_bucket(domain) print(f"\n📧 Email Event:") print(f" MessageId: {message_id}") print(f" From: {source}") - print(f" To: {recipient}") print(f" Domain: {domain}") print(f" Bucket: {bucket}") print(f" Timestamp: {timestamp}") + print(f" Recipients: {len(recipients)}") # Queue für Domain ermitteln try: @@ -262,7 +261,7 @@ def lambda_handler(event, context): 'body': json.dumps({ 'error': 'queue_not_configured', 'domain': domain, - 'recipient': recipient, + 'recipients': recipients, 'message': str(e) }) } @@ -305,7 +304,7 @@ def lambda_handler(event, context): 'body': json.dumps({ 'status': 'already_processed', 'message_id': message_id, - 'recipient': recipient + 'recipients': recipients }) } @@ -318,11 +317,11 @@ def lambda_handler(event, context): 'body': json.dumps({ 'status': 'already_processing', 'message_id': message_id, - 'recipient': recipient + 'recipients': recipients }) } - # E-Mail laden um Subject zu extrahieren (optional, für besseres Logging) + # E-Mail laden um Subject zu extrahieren subject = '(unknown)' try: print(f"\n📖 Reading email for metadata...") @@ -338,7 +337,7 @@ def lambda_handler(event, context): except Exception as e: print(f" ⚠ Could not parse email (continuing): {e}") - # In domain-spezifische Queue einreihen + # In Queue einreihen (EINE Message mit ALLEN Recipients) try: print(f"\n📤 Queuing to {queue_name}...") @@ -347,7 +346,7 @@ def lambda_handler(event, context): bucket=bucket, key=key, from_addr=source, - recipient=recipient, # Nur 1 Empfänger + recipients=recipients, # ALLE Recipients domain=domain, subject=subject, message_id=message_id @@ -365,7 +364,8 @@ def lambda_handler(event, context): 'sqs_message_id': sqs_message_id, 'queue': queue_name, 'domain': domain, - 'recipient': recipient, + 'recipients': recipients, + 'recipient_count': len(recipients), 'subject': subject }) } @@ -382,6 +382,6 @@ def lambda_handler(event, context): 'error': 'failed_to_queue', 'message': str(e), 'message_id': message_id, - 'recipient': recipient + 'recipients': recipients }) } \ No newline at end of file