aktueller Code

This commit is contained in:
Andreas Knuth 2025-11-14 13:39:04 -06:00
parent e4ed492e78
commit a3e86add49
1 changed files with 36 additions and 36 deletions

View File

@ -18,19 +18,18 @@ PROCESSED_VALUE = 'true'
def domain_to_bucket(domain: str) -> str: def domain_to_bucket(domain: str) -> str:
"""Konvertiert Domain zu S3 Bucket Namen""" """Konvertiert Domain zu S3 Bucket Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-emails' return domain.replace('.', '-') + '-emails'
def domain_to_queue_name(domain: str) -> str: def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen""" """Konvertiert Domain zu SQS Queue Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-queue' return domain.replace('.', '-') + '-queue'
def get_queue_url_for_domain(domain: str) -> str: def get_queue_url_for_domain(domain: str) -> str:
""" """Ermittelt SQS Queue URL für Domain"""
Ermittelt SQS Queue URL für Domain
Queue Name: domain-mit-bindestrichen-queue
"""
queue_name = domain_to_queue_name(domain) queue_name = domain_to_queue_name(domain)
try: try:
@ -42,10 +41,10 @@ def get_queue_url_for_domain(domain: str) -> str:
except sqs.exceptions.QueueDoesNotExist: except sqs.exceptions.QueueDoesNotExist:
raise Exception( raise Exception(
f"Queue does not exist: {queue_name} " f"Queue does not exist: {queue_name} "
f"(for domain: {domain})" f"(for domain: {domain.lower()})"
) )
except Exception as e: except Exception as e:
raise Exception(f"Error getting queue URL for {domain}: {e}") raise Exception(f"Error getting queue URL for {domain.lower()}: {e}")
def is_already_processed(bucket: str, key: str) -> bool: def is_already_processed(bucket: str, key: str) -> bool:
@ -63,7 +62,7 @@ def is_already_processed(bucket: str, key: str) -> bool:
except s3.exceptions.NoSuchKey: except s3.exceptions.NoSuchKey:
print(f"⚠ Object {key} not found in {bucket}") print(f"⚠ Object {key} not found in {bucket}")
return True # Wenn nicht existiert, als verarbeitet betrachten return True
except Exception as e: except Exception as e:
print(f"⚠ Error checking processed status: {e}") print(f"⚠ Error checking processed status: {e}")
@ -107,7 +106,7 @@ def set_processing_lock(bucket: str, key: str) -> bool:
except Exception as e: except Exception as e:
print(f"⚠ Error setting processing lock: {e}") print(f"⚠ Error setting processing lock: {e}")
return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren) return True
def mark_as_queued(bucket: str, key: str, queue_name: str): def mark_as_queued(bucket: str, key: str, queue_name: str):
@ -119,7 +118,7 @@ def mark_as_queued(bucket: str, key: str, queue_name: str):
metadata['queued_at'] = str(int(time.time())) metadata['queued_at'] = str(int(time.time()))
metadata['queued_to'] = queue_name metadata['queued_to'] = queue_name
metadata['status'] = 'queued' metadata['status'] = 'queued'
metadata.pop('processing_started', None) # Lock entfernen metadata.pop('processing_started', None)
s3.copy_object( s3.copy_object(
Bucket=bucket, Bucket=bucket,
@ -136,20 +135,20 @@ def mark_as_queued(bucket: str, key: str, queue_name: str):
def send_to_queue(queue_url: str, bucket: str, key: str, def send_to_queue(queue_url: str, bucket: str, key: str,
from_addr: str, recipient: str, domain: str, from_addr: str, recipients: list, domain: str,
subject: str, message_id: str): subject: str, message_id: str):
""" """
Sendet E-Mail-Job in domain-spezifische SQS Queue Sendet E-Mail-Job in domain-spezifische SQS Queue
EINE Message mit ALLEN Recipients für diese Domain
""" """
# Queue Name aus URL extrahieren (für Logging)
queue_name = queue_url.split('/')[-1] queue_name = queue_url.split('/')[-1]
message = { message = {
'bucket': bucket, 'bucket': bucket,
'key': key, 'key': key,
'from': from_addr, 'from': from_addr,
'recipient': recipient, # Nur 1 Empfänger 'recipients': recipients, # Liste aller Empfänger
'domain': domain, 'domain': domain,
'subject': subject, 'subject': subject,
'message_id': message_id, 'message_id': message_id,
@ -169,9 +168,9 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
'StringValue': bucket, 'StringValue': bucket,
'DataType': 'String' 'DataType': 'String'
}, },
'recipient': { 'recipient_count': {
'StringValue': recipient, 'StringValue': str(len(recipients)),
'DataType': 'String' 'DataType': 'Number'
}, },
'message_id': { 'message_id': {
'StringValue': message_id, 'StringValue': message_id,
@ -182,6 +181,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
sqs_message_id = response['MessageId'] sqs_message_id = response['MessageId']
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
print(f" Recipients: {len(recipients)} - {', '.join(recipients)}")
# Als queued markieren # Als queued markieren
mark_as_queued(bucket, key, queue_name) mark_as_queued(bucket, key, queue_name)
@ -196,8 +196,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
def lambda_handler(event, context): def lambda_handler(event, context):
""" """
Lambda Handler für SES Events Lambda Handler für SES Events
WICHTIG: SES ruft Lambda einmal PRO Empfänger auf! Eine Domain pro Event = eine Queue Message mit allen Recipients
Jedes Event hat genau 1 Empfänger in receipt.recipients
""" """
print(f"{'='*70}") print(f"{'='*70}")
@ -222,33 +221,33 @@ def lambda_handler(event, context):
message_id = mail['messageId'] message_id = mail['messageId']
source = mail['source'] source = mail['source']
timestamp = mail.get('timestamp', '') timestamp = mail.get('timestamp', '')
# ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event
# (NICHT mail.destination verwenden - das hat alle Original-Empfänger)
recipients = receipt.get('recipients', []) recipients = receipt.get('recipients', [])
if not recipients or len(recipients) != 1: # FRÜHES LOGGING: S3 Key und Recipients
print(f"✗ Unexpected recipients count: {len(recipients)}") print(f"\n🔑 S3 Key: {message_id}")
print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}")
if not recipients:
print(f"✗ No recipients found in event")
return { return {
'statusCode': 400, 'statusCode': 400,
'body': json.dumps({ 'body': json.dumps({
'error': 'Expected exactly 1 recipient', 'error': 'No recipients in event',
'found': len(recipients) 'message_id': message_id
}) })
} }
# SES garantiert: genau 1 Empfänger pro Event # Domain extrahieren (alle Recipients haben gleiche Domain!)
recipient = recipients[0] domain = recipients[0].split('@')[1].lower()
domain = recipient.split('@')[1]
bucket = domain_to_bucket(domain) bucket = domain_to_bucket(domain)
print(f"\n📧 Email Event:") print(f"\n📧 Email Event:")
print(f" MessageId: {message_id}") print(f" MessageId: {message_id}")
print(f" From: {source}") print(f" From: {source}")
print(f" To: {recipient}")
print(f" Domain: {domain}") print(f" Domain: {domain}")
print(f" Bucket: {bucket}") print(f" Bucket: {bucket}")
print(f" Timestamp: {timestamp}") print(f" Timestamp: {timestamp}")
print(f" Recipients: {len(recipients)}")
# Queue für Domain ermitteln # Queue für Domain ermitteln
try: try:
@ -262,7 +261,7 @@ def lambda_handler(event, context):
'body': json.dumps({ 'body': json.dumps({
'error': 'queue_not_configured', 'error': 'queue_not_configured',
'domain': domain, 'domain': domain,
'recipient': recipient, 'recipients': recipients,
'message': str(e) 'message': str(e)
}) })
} }
@ -305,7 +304,7 @@ def lambda_handler(event, context):
'body': json.dumps({ 'body': json.dumps({
'status': 'already_processed', 'status': 'already_processed',
'message_id': message_id, 'message_id': message_id,
'recipient': recipient 'recipients': recipients
}) })
} }
@ -318,11 +317,11 @@ def lambda_handler(event, context):
'body': json.dumps({ 'body': json.dumps({
'status': 'already_processing', 'status': 'already_processing',
'message_id': message_id, 'message_id': message_id,
'recipient': recipient 'recipients': recipients
}) })
} }
# E-Mail laden um Subject zu extrahieren (optional, für besseres Logging) # E-Mail laden um Subject zu extrahieren
subject = '(unknown)' subject = '(unknown)'
try: try:
print(f"\n📖 Reading email for metadata...") print(f"\n📖 Reading email for metadata...")
@ -338,7 +337,7 @@ def lambda_handler(event, context):
except Exception as e: except Exception as e:
print(f" ⚠ Could not parse email (continuing): {e}") print(f" ⚠ Could not parse email (continuing): {e}")
# In domain-spezifische Queue einreihen # In Queue einreihen (EINE Message mit ALLEN Recipients)
try: try:
print(f"\n📤 Queuing to {queue_name}...") print(f"\n📤 Queuing to {queue_name}...")
@ -347,7 +346,7 @@ def lambda_handler(event, context):
bucket=bucket, bucket=bucket,
key=key, key=key,
from_addr=source, from_addr=source,
recipient=recipient, # Nur 1 Empfänger recipients=recipients, # ALLE Recipients
domain=domain, domain=domain,
subject=subject, subject=subject,
message_id=message_id message_id=message_id
@ -365,7 +364,8 @@ def lambda_handler(event, context):
'sqs_message_id': sqs_message_id, 'sqs_message_id': sqs_message_id,
'queue': queue_name, 'queue': queue_name,
'domain': domain, 'domain': domain,
'recipient': recipient, 'recipients': recipients,
'recipient_count': len(recipients),
'subject': subject 'subject': subject
}) })
} }
@ -382,6 +382,6 @@ def lambda_handler(event, context):
'error': 'failed_to_queue', 'error': 'failed_to_queue',
'message': str(e), 'message': str(e),
'message_id': message_id, 'message_id': message_id,
'recipient': recipient 'recipients': recipients
}) })
} }