worker with ooo & forward logic

This commit is contained in:
Andreas Knuth 2025-12-17 12:25:56 -06:00
parent 19b4bb1471
commit 93f2c0c3bd
3 changed files with 167 additions and 407 deletions

View File

@ -27,8 +27,7 @@ services:
- ENABLE_OPENDMARC=0
- ENABLE_POLICYD_SPF=0
- ENABLE_AMAVIS=0
- ENABLE_SPAMASSASSIN=0
- ENABLE_POSTGREY=0
- ENABLE_SPAMASSASSIN=0 - ENABLE_POSTGREY=0
- RSPAMD_GREYLISTING=0
- ENABLE_CLAMAV=0
#- ENABLE_FAIL2BAN=1
@ -57,12 +56,7 @@ services:
- POSTFIX_MAILBOX_SIZE_LIMIT=0
- POSTFIX_MESSAGE_SIZE_LIMIT=0
- SPOOF_PROTECTION=0
- ENABLE_SRS=1
- SRS_EXCLUDE_DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net,hotshpotshgallery.com
- SRS_SENDER_CLASSES=envelope_sender
- SRS_SECRET=EBk/ndWRA2s8ZMQFIXq0mJnS6SRbgoj77wv00PZNpNw=
- SRS_DOMAINNAME=email-srvr.com
#- SRS_DOMAINNAME=bayarea-cc.com
- ENABLE_SRS=0
# Debug-Einstellungen
- LOG_LEVEL=info
cap_add:

View File

@ -1,419 +1,123 @@
import json
import os
import boto3
import json
import uuid
import logging
from datetime import datetime
from botocore.exceptions import ClientError
import time
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
import random
s3 = boto3.client('s3')
sqs = boto3.client('sqs', region_name='us-east-2')
# Logging konfigurieren
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS Region
AWS_REGION = 'us-east-2'
sqs = boto3.client('sqs')
# Dynamo Table
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
msg_table = dynamo.Table('ses-outbound-messages')
# Retry-Konfiguration
MAX_RETRIES = 3
BASE_BACKOFF = 1 # Sekunden
# Metadata Keys
PROCESSED_KEY = 'processed'
PROCESSED_VALUE = 'true'
def is_ses_bounce_or_autoreply(parsed):
"""Erkennt SES Bounces und Auto-Replies"""
from_h = (parsed.get('From') or '').lower()
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
# SES MAILER-DAEMON oder Auto-Submitted Header
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
return is_mailer_daemon or is_auto_replied
def extract_original_message_id(parsed):
"""Extrahiert die ursprüngliche Message-ID aus In-Reply-To oder References"""
# Versuche In-Reply-To
in_reply_to = (parsed.get('In-Reply-To') or '').strip()
if in_reply_to:
msg_id = in_reply_to
if msg_id.startswith('<') and '>' in msg_id:
msg_id = msg_id[1:msg_id.find('>')]
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
if '@' in msg_id:
msg_id = msg_id.split('@')[0]
return msg_id
# Fallback: References Header (nimm die ERSTE ID)
refs = (parsed.get('References') or '').strip()
if refs:
first_ref = refs.split()[0]
if first_ref.startswith('<') and '>' in first_ref:
first_ref = first_ref[1:first_ref.find('>')]
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
if '@' in first_ref:
first_ref = first_ref.split('@')[0]
return first_ref
return None
def domain_to_bucket(domain: str) -> str:
"""Konvertiert Domain zu S3 Bucket Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-emails'
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
domain = domain.lower()
return domain.replace('.', '-') + '-queue'
def get_queue_url_for_domain(domain: str) -> str:
"""Ermittelt SQS Queue URL für Domain"""
queue_name = domain_to_queue_name(domain)
def exponential_backoff(attempt):
"""Exponential Backoff mit Jitter"""
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
def get_queue_url(domain):
"""
Generiert Queue-Namen aus Domain und holt URL.
Konvention: domain.tld -> domain-tld-queue
"""
queue_name = domain.replace('.', '-') + '-queue'
try:
response = sqs.get_queue_url(QueueName=queue_name)
queue_url = response['QueueUrl']
print(f"✓ Found queue: {queue_name}")
return queue_url
except sqs.exceptions.QueueDoesNotExist:
raise Exception(
f"Queue does not exist: {queue_name} "
f"(for domain: {domain.lower()})"
)
except Exception as e:
raise Exception(f"Error getting queue URL for {domain.lower()}: {e}")
def is_already_processed(bucket: str, key: str) -> bool:
"""Prüft ob E-Mail bereits verarbeitet wurde"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE:
processed_at = metadata.get('processed_at', 'unknown')
print(f"✓ Already processed at {processed_at}")
return True
return False
except s3.exceptions.NoSuchKey:
print(f"⚠ Object {key} not found in {bucket}")
return True
except Exception as e:
print(f"⚠ Error checking processed status: {e}")
return False
def set_processing_lock(bucket: str, key: str) -> bool:
"""
Setzt Processing Lock um Duplicate Processing zu verhindern
Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
# Prüfe auf existierenden Lock
processing_started = metadata.get('processing_started')
if processing_started:
lock_age = time.time() - float(processing_started)
if lock_age < 300: # 5 Minuten Lock
print(f"⚠ Processing lock active (age: {lock_age:.0f}s)")
return False
else:
print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding")
# Setze neuen Lock
new_meta = metadata.copy()
new_meta['processing_started'] = str(int(time.time()))
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=new_meta,
MetadataDirective='REPLACE'
)
print(f"✓ Processing lock set")
return True
except Exception as e:
print(f"⚠ Error setting processing lock: {e}")
return True
def mark_as_queued(bucket: str, key: str, queue_name: str):
"""Markiert E-Mail als in Queue eingereiht"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['queued_at'] = str(int(time.time()))
metadata['queued_to'] = queue_name
metadata['status'] = 'queued'
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
print(f"✓ Marked as queued to {queue_name}")
except Exception as e:
print(f"⚠ Failed to mark as queued: {e}")
def send_to_queue(queue_url: str, bucket: str, key: str,
from_addr: str, recipients: list, domain: str,
subject: str, message_id: str):
"""
Sendet E-Mail-Job in domain-spezifische SQS Queue
EINE Message mit ALLEN Recipients für diese Domain
"""
queue_name = queue_url.split('/')[-1]
message = {
'bucket': bucket,
'key': key,
'from': from_addr,
'recipients': recipients,
'domain': domain,
'subject': subject,
'message_id': message_id,
'timestamp': int(time.time())
}
try:
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message, ensure_ascii=False),
MessageAttributes={
'domain': {
'StringValue': domain,
'DataType': 'String'
},
'bucket': {
'StringValue': bucket,
'DataType': 'String'
},
'recipient_count': {
'StringValue': str(len(recipients)),
'DataType': 'Number'
},
'message_id': {
'StringValue': message_id,
'DataType': 'String'
}
}
)
sqs_message_id = response['MessageId']
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
print(f" Recipients: {len(recipients)} - {', '.join(recipients)}")
mark_as_queued(bucket, key, queue_name)
return sqs_message_id
except Exception as e:
print(f"✗ Failed to queue message: {e}")
raise
return response['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
logger.error(f"Queue nicht gefunden für Domain: {domain}")
raise ValueError(f"Keine Queue für Domain {domain}")
else:
raise
def lambda_handler(event, context):
"""
Lambda Handler für SES Inbound Events
Nimmt SES Event entgegen, extrahiert Domain dynamisch,
verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS.
Mit integrierter Retry-Logik für SQS-Send.
"""
print(f"{'='*70}")
print(f"Lambda invoked: {context.aws_request_id}")
print(f"Region: {AWS_REGION}")
print(f"{'='*70}")
# SES Event parsen
try:
record = event['Records'][0]
ses = record['ses']
except (KeyError, IndexError) as e:
print(f"✗ Invalid event structure: {e}")
return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'})}
records = event.get('Records', [])
logger.info(f"Received event with {len(records)} records.")
mail = ses['mail']
receipt = ses['receipt']
for record in records:
ses_data = record.get('ses', {})
if not ses_data:
logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}")
continue
message_id = mail['messageId']
source = mail['source']
timestamp = mail.get('timestamp', '')
recipients = receipt.get('recipients', [])
mail = ses_data.get('mail', {})
receipt = ses_data.get('receipt', {})
print(f"\n🔑 S3 Key: {message_id}")
print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}")
# Domain extrahieren (aus erstem Recipient)
recipients = receipt.get('recipients', []) or mail.get('destination', [])
if not recipients:
logger.warning("No recipients in event - skipping")
continue
if not recipients:
print(f"✗ No recipients found in event")
return {'statusCode': 400, 'body': json.dumps({'error': 'No recipients'})}
first_recipient = recipients[0]
domain = first_recipient.split('@')[-1].lower()
if not domain:
logger.error("Could not extract domain from recipient")
continue
# Domain extrahieren
domain = recipients[0].split('@')[1].lower()
bucket = domain_to_bucket(domain)
# Wichtige Metadaten loggen
msg_id = mail.get('messageId', 'unknown')
source = mail.get('source', 'unknown')
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
logger.info(f" From: {source}")
logger.info(f" To: {recipients}")
print(f"\n📧 Email Event:")
print(f" MessageId: {message_id}")
print(f" From: {source}")
print(f" Domain: {domain}")
print(f" Bucket: {bucket}")
# SES JSON als String serialisieren
ses_json_string = json.dumps(ses_data)
# Queue ermitteln
try:
queue_url = get_queue_url_for_domain(domain)
queue_name = queue_url.split('/')[-1]
except Exception as e:
print(f"\n✗ Queue ERROR: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}
# Payload Größe loggen und checken (Safeguard)
payload_size = len(ses_json_string.encode('utf-8'))
logger.info(f" Metadata Payload Size: {payload_size} bytes")
if payload_size > 200000: # Arbitrary Limit < SQS 256KB
raise ValueError("Payload too large for SQS")
# S3 Object finden
try:
response = s3.list_objects_v2(Bucket=bucket, Prefix=message_id, MaxKeys=1)
# Fake SNS Payload
fake_sns_payload = {
"Type": "Notification",
"MessageId": str(uuid.uuid4()),
"TopicArn": "arn:aws:sns:ses-shim:global-topic",
"Subject": "Amazon SES Email Receipt Notification",
"Message": ses_json_string,
"Timestamp": datetime.utcnow().isoformat() + "Z"
}
if 'Contents' not in response:
raise Exception(f"No S3 object found for {message_id}")
key = response['Contents'][0]['Key']
size = response['Contents'][0]['Size']
print(f" Found: s3://{bucket}/{key} ({size/1024:.1f} KB)")
except Exception as e:
print(f"\n✗ S3 ERROR: {e}")
return {'statusCode': 404, 'body': json.dumps({'error': str(e)})}
# Duplicate Check
if is_already_processed(bucket, key):
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processed'})}
# Processing Lock
if not set_processing_lock(bucket, key):
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processing'})}
# E-Mail laden und ggf. umschreiben
subject = '(unknown)'
modified = False
try:
print(f"\n📖 Reading email...")
obj = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = obj['Body'].read()
metadata = obj.get('Metadata', {}) or {}
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
print(f" Subject: {subject}")
# 🔁 Auto-Response / Bounce Detection
if is_ses_bounce_or_autoreply(parsed):
print(f" 🔍 Detected auto-response/bounce from SES")
# Extrahiere ursprüngliche Message-ID
original_msg_id = extract_original_message_id(parsed)
if original_msg_id:
print(f" 📋 Original MessageId: {original_msg_id}")
# Queue URL dynamisch holen
queue_url = get_queue_url(domain)
# SQS Send mit Retries
attempt = 0
while attempt < MAX_RETRIES:
try:
# Hole Original-Send aus DynamoDB
result = msg_table.get_item(Key={'MessageId': original_msg_id})
original_send = result.get('Item')
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(fake_sns_payload)
)
logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}")
break
except ClientError as e:
attempt += 1
error_code = e.response['Error']['Code']
logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
if original_send:
orig_source = original_send.get('source', '')
orig_destinations = original_send.get('destinations', [])
print(f" ✓ Found original send:")
print(f" Original From: {orig_source}")
print(f" Original To: {orig_destinations}")
# **WICHTIG**: Der erste Empfänger war der eigentliche Empfänger
original_recipient = orig_destinations[0] if orig_destinations else ''
if original_recipient:
# Absender umschreiben auf ursprünglichen Empfänger
original_from = parsed.get('From', '')
parsed['X-Original-SES-From'] = original_from
parsed['X-Original-MessageId'] = original_msg_id
# **From auf den ursprünglichen Empfänger setzen**
parsed.replace_header('From', original_recipient)
# Reply-To optional beibehalten
if not parsed.get('Reply-To'):
parsed['Reply-To'] = original_recipient
# Subject anpassen falls nötig
if 'delivery status notification' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {orig_destinations[0]}")
raw_bytes = parsed.as_bytes()
modified = True
print(f" ✅ Rewritten: From={original_recipient}")
else:
print(f" ⚠ No DynamoDB record found for {original_msg_id}")
except Exception as e:
print(f" ⚠ DynamoDB lookup failed: {e}")
else:
print(f" ⚠ Could not extract original Message-ID")
# S3 aktualisieren falls modified
if modified:
s3.put_object(Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata)
print(f" 💾 Updated S3 object with rewritten email")
return {'status': 'ok'}
except Exception as e:
print(f" ⚠ Email parsing error: {e}")
# In Queue einreihen
try:
sqs_message_id = send_to_queue(
queue_url=queue_url,
bucket=bucket,
key=key,
from_addr=source,
recipients=recipients,
domain=domain,
subject=subject,
message_id=message_id
)
print(f"\n✅ SUCCESS - Queued for delivery\n")
return {
'statusCode': 200,
'body': json.dumps({
'status': 'queued',
'message_id': message_id,
'sqs_message_id': sqs_message_id,
'modified': modified
})
}
except Exception as e:
print(f"\n✗ QUEUE FAILED: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
raise e

View File

@ -35,12 +35,15 @@ SMTP_PASS = os.environ.get('SMTP_PASS')
shutdown_requested = False
# DynamoDB Ressource für Bounce-Lookup
# DynamoDB Ressource für Bounce-Lookup und Rules
try:
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
msg_table = dynamo.Table('ses-outbound-messages')
rules_table = dynamo.Table('email-rules') # Neu: Für OOO/Forwards
except Exception as e:
log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING')
msg_table = None
rules_table = None
def get_bucket_name(domain):
"""Konvention: domain.tld -> domain-tld-emails"""
@ -511,7 +514,66 @@ def process_message(message_body: dict, receive_count: int) -> bool:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING')
from_addr_final = from_addr
# 5. SMTP VERSAND (Loop über Recipients)
# 5. OOO & FORWARD LOGIC (neu, vor SMTP-Versand)
if rules_table and not is_ses_bounce_or_autoreply(parsed): # Vermeide Loops bei Bounces/Auto-Replies
for recipient in recipients:
try:
rule = rules_table.get_item(Key={'email_address': recipient}).get('Item', {})
# OOO handling
if rule.get('ooo_active', False):
ooo_msg = rule.get('ooo_message', 'Default OOO message.')
content_type = rule.get('ooo_content_type', 'text') # Default: text
sender = parsed.get('From') # Original-Sender
reply_subject = f"Out of Office: {subject}"
original_body = str(parsed.get_payload(decode=True)) # Original für Quote
if content_type == 'html':
reply_body = {'Html': {'Data': f"<p>{ooo_msg}</p><br><blockquote>Original Message:<br>Subject: {parsed.get('Subject')}<br>From: {sender}<br><br>{original_body}</blockquote>"}}
else:
reply_body = {'Text': {'Data': f"{ooo_msg}\n\nOriginal Message:\nSubject: {parsed.get('Subject')}\nFrom: {sender}\n\n{original_body}"}}
ses.send_email(
Source=recipient, # Verifizierte eigene Adresse
Destination={'ToAddresses': [sender]},
Message={
'Subject': {'Data': reply_subject},
'Body': reply_body # Dynamisch Text oder Html
},
ReplyToAddresses=[recipient] # Optional: Für Replies
)
log(f"✓ Sent OOO reply to {sender} from {recipient}")
# Forward handling
forwards = rule.get('forwards', [])
if forwards:
original_from = parsed.get('From') # Für Headers
fwd_subject = f"FWD: {subject}"
fwd_body_text = f"Forwarded from: {original_from}\n\n{original_body}"
fwd_body = {'Text': {'Data': fwd_body_text}} # Erweiterbar auf HTML
for forward_to in forwards:
ses.send_email(
Source=recipient, # Verifizierte eigene Adresse
Destination={'ToAddresses': [forward_to]},
Message={
'Subject': {'Data': fwd_subject},
'Body': fwd_body
},
ReplyToAddresses=[original_from] # Original-Sender für Replies
)
log(f"✓ Forwarded to {forward_to} from {recipient} (original: {original_from})")
except boto3.exceptions.ClientError as e:
if 'MessageRejected' in str(e):
log(f"⚠ SES rejected: {e}. Check verification.", 'ERROR')
else:
log(f"⚠ Rules error for {recipient}: {e}", 'WARNING')
except Exception as e:
log(f"⚠ General error: {e}", 'WARNING')
# 6. SMTP VERSAND (Loop über Recipients)
log(f"📤 Sending to {len(recipients)} recipient(s)...")
successful = []