actual function
This commit is contained in:
parent
286de26c87
commit
3b3d20f89a
|
|
@ -18,19 +18,18 @@ PROCESSED_VALUE = 'true'
|
||||||
|
|
||||||
def domain_to_bucket(domain: str) -> str:
|
def domain_to_bucket(domain: str) -> str:
|
||||||
"""Konvertiert Domain zu S3 Bucket Namen"""
|
"""Konvertiert Domain zu S3 Bucket Namen"""
|
||||||
|
domain = domain.lower()
|
||||||
return domain.replace('.', '-') + '-emails'
|
return domain.replace('.', '-') + '-emails'
|
||||||
|
|
||||||
|
|
||||||
def domain_to_queue_name(domain: str) -> str:
|
def domain_to_queue_name(domain: str) -> str:
|
||||||
"""Konvertiert Domain zu SQS Queue Namen"""
|
"""Konvertiert Domain zu SQS Queue Namen"""
|
||||||
|
domain = domain.lower()
|
||||||
return domain.replace('.', '-') + '-queue'
|
return domain.replace('.', '-') + '-queue'
|
||||||
|
|
||||||
|
|
||||||
def get_queue_url_for_domain(domain: str) -> str:
|
def get_queue_url_for_domain(domain: str) -> str:
|
||||||
"""
|
"""Ermittelt SQS Queue URL für Domain"""
|
||||||
Ermittelt SQS Queue URL für Domain
|
|
||||||
Queue Name: domain-mit-bindestrichen-queue
|
|
||||||
"""
|
|
||||||
queue_name = domain_to_queue_name(domain)
|
queue_name = domain_to_queue_name(domain)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -42,10 +41,10 @@ def get_queue_url_for_domain(domain: str) -> str:
|
||||||
except sqs.exceptions.QueueDoesNotExist:
|
except sqs.exceptions.QueueDoesNotExist:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
f"Queue does not exist: {queue_name} "
|
f"Queue does not exist: {queue_name} "
|
||||||
f"(for domain: {domain})"
|
f"(for domain: {domain.lower()})"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise Exception(f"Error getting queue URL for {domain}: {e}")
|
raise Exception(f"Error getting queue URL for {domain.lower()}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def is_already_processed(bucket: str, key: str) -> bool:
|
def is_already_processed(bucket: str, key: str) -> bool:
|
||||||
|
|
@ -63,7 +62,7 @@ def is_already_processed(bucket: str, key: str) -> bool:
|
||||||
|
|
||||||
except s3.exceptions.NoSuchKey:
|
except s3.exceptions.NoSuchKey:
|
||||||
print(f"⚠ Object {key} not found in {bucket}")
|
print(f"⚠ Object {key} not found in {bucket}")
|
||||||
return True # Wenn nicht existiert, als verarbeitet betrachten
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"⚠ Error checking processed status: {e}")
|
print(f"⚠ Error checking processed status: {e}")
|
||||||
|
|
@ -107,7 +106,7 @@ def set_processing_lock(bucket: str, key: str) -> bool:
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"⚠ Error setting processing lock: {e}")
|
print(f"⚠ Error setting processing lock: {e}")
|
||||||
return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren)
|
return True
|
||||||
|
|
||||||
|
|
||||||
def mark_as_queued(bucket: str, key: str, queue_name: str):
|
def mark_as_queued(bucket: str, key: str, queue_name: str):
|
||||||
|
|
@ -119,7 +118,7 @@ def mark_as_queued(bucket: str, key: str, queue_name: str):
|
||||||
metadata['queued_at'] = str(int(time.time()))
|
metadata['queued_at'] = str(int(time.time()))
|
||||||
metadata['queued_to'] = queue_name
|
metadata['queued_to'] = queue_name
|
||||||
metadata['status'] = 'queued'
|
metadata['status'] = 'queued'
|
||||||
metadata.pop('processing_started', None) # Lock entfernen
|
metadata.pop('processing_started', None)
|
||||||
|
|
||||||
s3.copy_object(
|
s3.copy_object(
|
||||||
Bucket=bucket,
|
Bucket=bucket,
|
||||||
|
|
@ -136,20 +135,20 @@ def mark_as_queued(bucket: str, key: str, queue_name: str):
|
||||||
|
|
||||||
|
|
||||||
def send_to_queue(queue_url: str, bucket: str, key: str,
|
def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||||
from_addr: str, recipient: str, domain: str,
|
from_addr: str, recipients: list, domain: str,
|
||||||
subject: str, message_id: str):
|
subject: str, message_id: str):
|
||||||
"""
|
"""
|
||||||
Sendet E-Mail-Job in domain-spezifische SQS Queue
|
Sendet E-Mail-Job in domain-spezifische SQS Queue
|
||||||
|
EINE Message mit ALLEN Recipients für diese Domain
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Queue Name aus URL extrahieren (für Logging)
|
|
||||||
queue_name = queue_url.split('/')[-1]
|
queue_name = queue_url.split('/')[-1]
|
||||||
|
|
||||||
message = {
|
message = {
|
||||||
'bucket': bucket,
|
'bucket': bucket,
|
||||||
'key': key,
|
'key': key,
|
||||||
'from': from_addr,
|
'from': from_addr,
|
||||||
'recipient': recipient, # Nur 1 Empfänger
|
'recipients': recipients, # Liste aller Empfänger
|
||||||
'domain': domain,
|
'domain': domain,
|
||||||
'subject': subject,
|
'subject': subject,
|
||||||
'message_id': message_id,
|
'message_id': message_id,
|
||||||
|
|
@ -169,9 +168,9 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||||
'StringValue': bucket,
|
'StringValue': bucket,
|
||||||
'DataType': 'String'
|
'DataType': 'String'
|
||||||
},
|
},
|
||||||
'recipient': {
|
'recipient_count': {
|
||||||
'StringValue': recipient,
|
'StringValue': str(len(recipients)),
|
||||||
'DataType': 'String'
|
'DataType': 'Number'
|
||||||
},
|
},
|
||||||
'message_id': {
|
'message_id': {
|
||||||
'StringValue': message_id,
|
'StringValue': message_id,
|
||||||
|
|
@ -182,6 +181,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||||
|
|
||||||
sqs_message_id = response['MessageId']
|
sqs_message_id = response['MessageId']
|
||||||
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
|
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
|
||||||
|
print(f" Recipients: {len(recipients)} - {', '.join(recipients)}")
|
||||||
|
|
||||||
# Als queued markieren
|
# Als queued markieren
|
||||||
mark_as_queued(bucket, key, queue_name)
|
mark_as_queued(bucket, key, queue_name)
|
||||||
|
|
@ -196,8 +196,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||||
def lambda_handler(event, context):
|
def lambda_handler(event, context):
|
||||||
"""
|
"""
|
||||||
Lambda Handler für SES Events
|
Lambda Handler für SES Events
|
||||||
WICHTIG: SES ruft Lambda einmal PRO Empfänger auf!
|
Eine Domain pro Event = eine Queue Message mit allen Recipients
|
||||||
Jedes Event hat genau 1 Empfänger in receipt.recipients
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
print(f"{'='*70}")
|
print(f"{'='*70}")
|
||||||
|
|
@ -222,33 +221,33 @@ def lambda_handler(event, context):
|
||||||
message_id = mail['messageId']
|
message_id = mail['messageId']
|
||||||
source = mail['source']
|
source = mail['source']
|
||||||
timestamp = mail.get('timestamp', '')
|
timestamp = mail.get('timestamp', '')
|
||||||
|
|
||||||
# ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event
|
|
||||||
# (NICHT mail.destination verwenden - das hat alle Original-Empfänger)
|
|
||||||
recipients = receipt.get('recipients', [])
|
recipients = receipt.get('recipients', [])
|
||||||
|
|
||||||
if not recipients or len(recipients) != 1:
|
# FRÜHES LOGGING: S3 Key und Recipients
|
||||||
print(f"✗ Unexpected recipients count: {len(recipients)}")
|
print(f"\n🔑 S3 Key: {message_id}")
|
||||||
|
print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}")
|
||||||
|
|
||||||
|
if not recipients:
|
||||||
|
print(f"✗ No recipients found in event")
|
||||||
return {
|
return {
|
||||||
'statusCode': 400,
|
'statusCode': 400,
|
||||||
'body': json.dumps({
|
'body': json.dumps({
|
||||||
'error': 'Expected exactly 1 recipient',
|
'error': 'No recipients in event',
|
||||||
'found': len(recipients)
|
'message_id': message_id
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
# SES garantiert: genau 1 Empfänger pro Event
|
# Domain extrahieren (alle Recipients haben gleiche Domain!)
|
||||||
recipient = recipients[0]
|
domain = recipients[0].split('@')[1].lower()
|
||||||
domain = recipient.split('@')[1]
|
|
||||||
bucket = domain_to_bucket(domain)
|
bucket = domain_to_bucket(domain)
|
||||||
|
|
||||||
print(f"\n📧 Email Event:")
|
print(f"\n📧 Email Event:")
|
||||||
print(f" MessageId: {message_id}")
|
print(f" MessageId: {message_id}")
|
||||||
print(f" From: {source}")
|
print(f" From: {source}")
|
||||||
print(f" To: {recipient}")
|
|
||||||
print(f" Domain: {domain}")
|
print(f" Domain: {domain}")
|
||||||
print(f" Bucket: {bucket}")
|
print(f" Bucket: {bucket}")
|
||||||
print(f" Timestamp: {timestamp}")
|
print(f" Timestamp: {timestamp}")
|
||||||
|
print(f" Recipients: {len(recipients)}")
|
||||||
|
|
||||||
# Queue für Domain ermitteln
|
# Queue für Domain ermitteln
|
||||||
try:
|
try:
|
||||||
|
|
@ -262,7 +261,7 @@ def lambda_handler(event, context):
|
||||||
'body': json.dumps({
|
'body': json.dumps({
|
||||||
'error': 'queue_not_configured',
|
'error': 'queue_not_configured',
|
||||||
'domain': domain,
|
'domain': domain,
|
||||||
'recipient': recipient,
|
'recipients': recipients,
|
||||||
'message': str(e)
|
'message': str(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -305,7 +304,7 @@ def lambda_handler(event, context):
|
||||||
'body': json.dumps({
|
'body': json.dumps({
|
||||||
'status': 'already_processed',
|
'status': 'already_processed',
|
||||||
'message_id': message_id,
|
'message_id': message_id,
|
||||||
'recipient': recipient
|
'recipients': recipients
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -318,11 +317,11 @@ def lambda_handler(event, context):
|
||||||
'body': json.dumps({
|
'body': json.dumps({
|
||||||
'status': 'already_processing',
|
'status': 'already_processing',
|
||||||
'message_id': message_id,
|
'message_id': message_id,
|
||||||
'recipient': recipient
|
'recipients': recipients
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
# E-Mail laden um Subject zu extrahieren (optional, für besseres Logging)
|
# E-Mail laden um Subject zu extrahieren
|
||||||
subject = '(unknown)'
|
subject = '(unknown)'
|
||||||
try:
|
try:
|
||||||
print(f"\n📖 Reading email for metadata...")
|
print(f"\n📖 Reading email for metadata...")
|
||||||
|
|
@ -338,7 +337,7 @@ def lambda_handler(event, context):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f" ⚠ Could not parse email (continuing): {e}")
|
print(f" ⚠ Could not parse email (continuing): {e}")
|
||||||
|
|
||||||
# In domain-spezifische Queue einreihen
|
# In Queue einreihen (EINE Message mit ALLEN Recipients)
|
||||||
try:
|
try:
|
||||||
print(f"\n📤 Queuing to {queue_name}...")
|
print(f"\n📤 Queuing to {queue_name}...")
|
||||||
|
|
||||||
|
|
@ -347,7 +346,7 @@ def lambda_handler(event, context):
|
||||||
bucket=bucket,
|
bucket=bucket,
|
||||||
key=key,
|
key=key,
|
||||||
from_addr=source,
|
from_addr=source,
|
||||||
recipient=recipient, # Nur 1 Empfänger
|
recipients=recipients, # ALLE Recipients
|
||||||
domain=domain,
|
domain=domain,
|
||||||
subject=subject,
|
subject=subject,
|
||||||
message_id=message_id
|
message_id=message_id
|
||||||
|
|
@ -365,7 +364,8 @@ def lambda_handler(event, context):
|
||||||
'sqs_message_id': sqs_message_id,
|
'sqs_message_id': sqs_message_id,
|
||||||
'queue': queue_name,
|
'queue': queue_name,
|
||||||
'domain': domain,
|
'domain': domain,
|
||||||
'recipient': recipient,
|
'recipients': recipients,
|
||||||
|
'recipient_count': len(recipients),
|
||||||
'subject': subject
|
'subject': subject
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -382,6 +382,6 @@ def lambda_handler(event, context):
|
||||||
'error': 'failed_to_queue',
|
'error': 'failed_to_queue',
|
||||||
'message': str(e),
|
'message': str(e),
|
||||||
'message_id': message_id,
|
'message_id': message_id,
|
||||||
'recipient': recipient
|
'recipients': recipients
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue