email-amazon/lambda_function.py

419 lines
14 KiB
Python

import os
import boto3
import json
import time
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
s3 = boto3.client('s3')
sqs = boto3.client('sqs', region_name='us-east-2')
# AWS Region
AWS_REGION = 'us-east-2'
# Dynamo Table
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
msg_table = dynamo.Table('ses-outbound-messages')
# Metadata Keys
PROCESSED_KEY = 'processed'
PROCESSED_VALUE = 'true'
def is_ses_bounce_or_autoreply(parsed):
"""Erkennt SES Bounces und Auto-Replies"""
from_h = (parsed.get('From') or '').lower()
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
# SES MAILER-DAEMON oder Auto-Submitted Header
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
return is_mailer_daemon or is_auto_replied
def extract_original_message_id(parsed):
"""Extrahiert die ursprüngliche Message-ID aus In-Reply-To oder References"""
# Versuche In-Reply-To
in_reply_to = (parsed.get('In-Reply-To') or '').strip()
if in_reply_to:
msg_id = in_reply_to
if msg_id.startswith('<') and '>' in msg_id:
msg_id = msg_id[1:msg_id.find('>')]
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
if '@' in msg_id:
msg_id = msg_id.split('@')[0]
return msg_id
# Fallback: References Header (nimm die ERSTE ID)
refs = (parsed.get('References') or '').strip()
if refs:
first_ref = refs.split()[0]
if first_ref.startswith('<') and '>' in first_ref:
first_ref = first_ref[1:first_ref.find('>')]
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
if '@' in first_ref:
first_ref = first_ref.split('@')[0]
return first_ref
return None
def domain_to_bucket(domain: str) -> str:
"""Konvertiert Domain zu S3 Bucket Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-emails'
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-queue'
def get_queue_url_for_domain(domain: str) -> str:
"""Ermittelt SQS Queue URL für Domain"""
queue_name = domain_to_queue_name(domain)
try:
response = sqs.get_queue_url(QueueName=queue_name)
queue_url = response['QueueUrl']
print(f"✓ Found queue: {queue_name}")
return queue_url
except sqs.exceptions.QueueDoesNotExist:
raise Exception(
f"Queue does not exist: {queue_name} "
f"(for domain: {domain.lower()})"
)
except Exception as e:
raise Exception(f"Error getting queue URL for {domain.lower()}: {e}")
def is_already_processed(bucket: str, key: str) -> bool:
"""Prüft ob E-Mail bereits verarbeitet wurde"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE:
processed_at = metadata.get('processed_at', 'unknown')
print(f"✓ Already processed at {processed_at}")
return True
return False
except s3.exceptions.NoSuchKey:
print(f"⚠ Object {key} not found in {bucket}")
return True
except Exception as e:
print(f"⚠ Error checking processed status: {e}")
return False
def set_processing_lock(bucket: str, key: str) -> bool:
"""
Setzt Processing Lock um Duplicate Processing zu verhindern
Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
# Prüfe auf existierenden Lock
processing_started = metadata.get('processing_started')
if processing_started:
lock_age = time.time() - float(processing_started)
if lock_age < 300: # 5 Minuten Lock
print(f"⚠ Processing lock active (age: {lock_age:.0f}s)")
return False
else:
print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding")
# Setze neuen Lock
new_meta = metadata.copy()
new_meta['processing_started'] = str(int(time.time()))
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"✓ Processing lock set")
return True
except Exception as e:
print(f"⚠ Error setting processing lock: {e}")
return True
def mark_as_queued(bucket: str, key: str, queue_name: str):
"""Markiert E-Mail als in Queue eingereiht"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['queued_at'] = str(int(time.time()))
metadata['queued_to'] = queue_name
metadata['status'] = 'queued'
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
print(f"✓ Marked as queued to {queue_name}")
except Exception as e:
print(f"⚠ Failed to mark as queued: {e}")
def send_to_queue(queue_url: str, bucket: str, key: str,
from_addr: str, recipients: list, domain: str,
subject: str, message_id: str):
"""
Sendet E-Mail-Job in domain-spezifische SQS Queue
EINE Message mit ALLEN Recipients für diese Domain
"""
queue_name = queue_url.split('/')[-1]
message = {
'bucket': bucket,
'key': key,
'from': from_addr,
'recipients': recipients,
'domain': domain,
'subject': subject,
'message_id': message_id,
'timestamp': int(time.time())
}
try:
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message, ensure_ascii=False),
MessageAttributes={
'domain': {
'StringValue': domain,
'DataType': 'String'
},
'bucket': {
'StringValue': bucket,
'DataType': 'String'
},
'recipient_count': {
'StringValue': str(len(recipients)),
'DataType': 'Number'
},
'message_id': {
'StringValue': message_id,
'DataType': 'String'
}
}
)
sqs_message_id = response['MessageId']
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
print(f" Recipients: {len(recipients)} - {', '.join(recipients)}")
mark_as_queued(bucket, key, queue_name)
return sqs_message_id
except Exception as e:
print(f"✗ Failed to queue message: {e}")
raise
def lambda_handler(event, context):
"""
Lambda Handler für SES Inbound Events
"""
print(f"{'='*70}")
print(f"Lambda invoked: {context.aws_request_id}")
print(f"Region: {AWS_REGION}")
print(f"{'='*70}")
# SES Event parsen
try:
record = event['Records'][0]
ses = record['ses']
except (KeyError, IndexError) as e:
print(f"✗ Invalid event structure: {e}")
return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'})}
mail = ses['mail']
receipt = ses['receipt']
message_id = mail['messageId']
source = mail['source']
timestamp = mail.get('timestamp', '')
recipients = receipt.get('recipients', [])
print(f"\n🔑 S3 Key: {message_id}")
print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}")
if not recipients:
print(f"✗ No recipients found in event")
return {'statusCode': 400, 'body': json.dumps({'error': 'No recipients'})}
# Domain extrahieren
domain = recipients[0].split('@')[1].lower()
bucket = domain_to_bucket(domain)
print(f"\n📧 Email Event:")
print(f" MessageId: {message_id}")
print(f" From: {source}")
print(f" Domain: {domain}")
print(f" Bucket: {bucket}")
# Queue ermitteln
try:
queue_url = get_queue_url_for_domain(domain)
queue_name = queue_url.split('/')[-1]
except Exception as e:
print(f"\n✗ Queue ERROR: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}
# S3 Object finden
try:
response = s3.list_objects_v2(Bucket=bucket, Prefix=message_id, MaxKeys=1)
if 'Contents' not in response:
raise Exception(f"No S3 object found for {message_id}")
key = response['Contents'][0]['Key']
size = response['Contents'][0]['Size']
print(f" Found: s3://{bucket}/{key} ({size/1024:.1f} KB)")
except Exception as e:
print(f"\n✗ S3 ERROR: {e}")
return {'statusCode': 404, 'body': json.dumps({'error': str(e)})}
# Duplicate Check
if is_already_processed(bucket, key):
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processed'})}
# Processing Lock
if not set_processing_lock(bucket, key):
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processing'})}
# E-Mail laden und ggf. umschreiben
subject = '(unknown)'
modified = False
try:
print(f"\n📖 Reading email...")
obj = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = obj['Body'].read()
metadata = obj.get('Metadata', {}) or {}
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
print(f" Subject: {subject}")
# 🔁 Auto-Response / Bounce Detection
if is_ses_bounce_or_autoreply(parsed):
print(f" 🔍 Detected auto-response/bounce from SES")
# Extrahiere ursprüngliche Message-ID
original_msg_id = extract_original_message_id(parsed)
if original_msg_id:
print(f" 📋 Original MessageId: {original_msg_id}")
try:
# Hole Original-Send aus DynamoDB
result = msg_table.get_item(Key={'MessageId': original_msg_id})
original_send = result.get('Item')
if original_send:
orig_source = original_send.get('source', '')
orig_destinations = original_send.get('destinations', [])
print(f" ✓ Found original send:")
print(f" Original From: {orig_source}")
print(f" Original To: {orig_destinations}")
# **WICHTIG**: Der erste Empfänger war der eigentliche Empfänger
original_recipient = orig_destinations[0] if orig_destinations else ''
if original_recipient:
# Absender umschreiben auf ursprünglichen Empfänger
original_from = parsed.get('From', '')
parsed['X-Original-SES-From'] = original_from
parsed['X-Original-MessageId'] = original_msg_id
# **From auf den ursprünglichen Empfänger setzen**
parsed.replace_header('From', original_recipient)
# Reply-To optional beibehalten
if not parsed.get('Reply-To'):
parsed['Reply-To'] = original_recipient
# Subject anpassen falls nötig
if 'delivery status notification' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {orig_destinations[0]}")
raw_bytes = parsed.as_bytes()
modified = True
print(f" ✅ Rewritten: From={original_recipient}")
else:
print(f" ⚠ No DynamoDB record found for {original_msg_id}")
except Exception as e:
print(f" ⚠ DynamoDB lookup failed: {e}")
else:
print(f" ⚠ Could not extract original Message-ID")
# S3 aktualisieren falls modified
if modified:
s3.put_object(Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata)
print(f" 💾 Updated S3 object with rewritten email")
except Exception as e:
print(f" ⚠ Email parsing error: {e}")
# In Queue einreihen
try:
sqs_message_id = send_to_queue(
queue_url=queue_url,
bucket=bucket,
key=key,
from_addr=source,
recipients=recipients,
domain=domain,
subject=subject,
message_id=message_id
)
print(f"\n✅ SUCCESS - Queued for delivery\n")
return {
'statusCode': 200,
'body': json.dumps({
'status': 'queued',
'message_id': message_id,
'sqs_message_id': sqs_message_id,
'modified': modified
})
}
except Exception as e:
print(f"\n✗ QUEUE FAILED: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}