123 lines
4.3 KiB
Python
123 lines
4.3 KiB
Python
import json
|
|
import os
|
|
import boto3
|
|
import uuid
|
|
import logging
|
|
from datetime import datetime
|
|
from botocore.exceptions import ClientError
|
|
import time
|
|
import random
|
|
|
|
# Logging konfigurieren
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
sqs = boto3.client('sqs')
|
|
|
|
# Retry-Konfiguration
|
|
MAX_RETRIES = 3
|
|
BASE_BACKOFF = 1 # Sekunden
|
|
|
|
def exponential_backoff(attempt):
|
|
"""Exponential Backoff mit Jitter"""
|
|
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
|
|
|
|
def get_queue_url(domain):
|
|
"""
|
|
Generiert Queue-Namen aus Domain und holt URL.
|
|
Konvention: domain.tld -> domain-tld-queue
|
|
"""
|
|
queue_name = domain.replace('.', '-') + '-queue'
|
|
try:
|
|
response = sqs.get_queue_url(QueueName=queue_name)
|
|
return response['QueueUrl']
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
|
logger.error(f"Queue nicht gefunden für Domain: {domain}")
|
|
raise ValueError(f"Keine Queue für Domain {domain}")
|
|
else:
|
|
raise
|
|
|
|
def lambda_handler(event, context):
|
|
"""
|
|
Nimmt SES Event entgegen, extrahiert Domain dynamisch,
|
|
verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS.
|
|
Mit integrierter Retry-Logik für SQS-Send.
|
|
"""
|
|
try:
|
|
records = event.get('Records', [])
|
|
logger.info(f"Received event with {len(records)} records.")
|
|
|
|
for record in records:
|
|
ses_data = record.get('ses', {})
|
|
if not ses_data:
|
|
logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}")
|
|
continue
|
|
|
|
mail = ses_data.get('mail', {})
|
|
receipt = ses_data.get('receipt', {})
|
|
|
|
# Domain extrahieren (aus erstem Recipient)
|
|
recipients = receipt.get('recipients', []) or mail.get('destination', [])
|
|
if not recipients:
|
|
logger.warning("No recipients in event - skipping")
|
|
continue
|
|
|
|
first_recipient = recipients[0]
|
|
domain = first_recipient.split('@')[-1].lower()
|
|
if not domain:
|
|
logger.error("Could not extract domain from recipient")
|
|
continue
|
|
|
|
# Wichtige Metadaten loggen
|
|
msg_id = mail.get('messageId', 'unknown')
|
|
source = mail.get('source', 'unknown')
|
|
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
|
|
logger.info(f" From: {source}")
|
|
logger.info(f" To: {recipients}")
|
|
|
|
# SES JSON als String serialisieren
|
|
ses_json_string = json.dumps(ses_data)
|
|
|
|
# Payload Größe loggen und checken (Safeguard)
|
|
payload_size = len(ses_json_string.encode('utf-8'))
|
|
logger.info(f" Metadata Payload Size: {payload_size} bytes")
|
|
if payload_size > 200000: # Arbitrary Limit < SQS 256KB
|
|
raise ValueError("Payload too large for SQS")
|
|
|
|
# Fake SNS Payload
|
|
fake_sns_payload = {
|
|
"Type": "Notification",
|
|
"MessageId": str(uuid.uuid4()),
|
|
"TopicArn": "arn:aws:sns:ses-shim:global-topic",
|
|
"Subject": "Amazon SES Email Receipt Notification",
|
|
"Message": ses_json_string,
|
|
"Timestamp": datetime.utcnow().isoformat() + "Z"
|
|
}
|
|
|
|
# Queue URL dynamisch holen
|
|
queue_url = get_queue_url(domain)
|
|
|
|
# SQS Send mit Retries
|
|
attempt = 0
|
|
while attempt < MAX_RETRIES:
|
|
try:
|
|
sqs.send_message(
|
|
QueueUrl=queue_url,
|
|
MessageBody=json.dumps(fake_sns_payload)
|
|
)
|
|
logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}")
|
|
break
|
|
except ClientError as e:
|
|
attempt += 1
|
|
error_code = e.response['Error']['Code']
|
|
logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}")
|
|
if attempt == MAX_RETRIES:
|
|
raise
|
|
time.sleep(exponential_backoff(attempt))
|
|
|
|
return {'status': 'ok'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
|
|
raise e |