neuer Worker

This commit is contained in:
Andreas Knuth 2025-11-30 16:41:21 -06:00
parent 442326ac24
commit eba3e0462b
1 changed files with 626 additions and 0 deletions

626
worker_sns.py Executable file
View File

@ -0,0 +1,626 @@
import os
import sys
import boto3
import smtplib
import json
import time
import traceback
import signal
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Graceful shutdown
shutdown_requested = False
# DynamoDB Ressource (neu im Worker!)
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
msg_table = dynamo.Table('ses-outbound-messages')
def get_bucket_name(domain):
"""Konvention: domain.tld -> domain-tld-emails"""
return domain.replace('.', '-') + '-emails'
def is_ses_bounce_or_autoreply(parsed):
"""Erkennt SES Bounces"""
from_h = (parsed.get('From') or '').lower()
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
return is_mailer_daemon or is_auto_replied
def extract_original_message_id(parsed):
"""Extrahiert Original-Message-ID aus Headern"""
in_reply_to = (parsed.get('In-Reply-To') or '').strip()
if in_reply_to:
msg_id = in_reply_to
if msg_id.startswith('<') and '>' in msg_id:
msg_id = msg_id[1:msg_id.find('>')]
if '@' in msg_id: msg_id = msg_id.split('@')[0]
return msg_id
# Fallback References
refs = (parsed.get('References') or '').strip()
if refs:
first_ref = refs.split()[0]
if first_ref.startswith('<') and '>' in first_ref:
first_ref = first_ref[1:first_ref.find('>')]
if '@' in first_ref: first_ref = first_ref.split('@')[0]
return first_ref
return None
def apply_bounce_logic(parsed, subject):
"""
Prüft auf Bounce, sucht in DynamoDB und schreibt Header um.
Returns: (parsed_email_object, was_modified_bool)
"""
if not is_ses_bounce_or_autoreply(parsed):
return parsed, False
log("🔍 Detected auto-response/bounce. Checking DynamoDB...")
original_msg_id = extract_original_message_id(parsed)
if not original_msg_id:
log("⚠ Could not extract original Message-ID")
return parsed, False
try:
# Lookup in DynamoDB
result = msg_table.get_item(Key={'MessageId': original_msg_id})
item = result.get('Item')
if not item:
log(f"⚠ No DynamoDB record found for {original_msg_id}")
return parsed, False
# Treffer!
orig_source = item.get('source', '')
orig_destinations = item.get('destinations', [])
original_recipient = orig_destinations[0] if orig_destinations else ''
if original_recipient:
log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}")
# Rewrite Headers
parsed['X-Original-SES-From'] = parsed.get('From', '')
parsed.replace_header('From', original_recipient)
if not parsed.get('Reply-To'):
parsed['Reply-To'] = original_recipient
if 'delivery status notification' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {original_recipient}")
return parsed, True
except Exception as e:
log(f"⚠ DynamoDB Error: {e}")
return parsed, False
def signal_handler(signum, frame):
global shutdown_requested
print(f"\n⚠ Shutdown signal received (signal {signum})")
shutdown_requested = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def log(message: str, level: str = 'INFO'):
"""Structured logging with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None):
"""
Markiert E-Mail als erfolgreich zugestellt
Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
# Invalid inboxes speichern falls vorhanden
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING')
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list):
"""
Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING')
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
"""
Markiert E-Mail als komplett fehlgeschlagen
Wird nur aufgerufen wenn ALLE Recipients fehlschlagen
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['status'] = 'failed'
metadata['failed_at'] = str(int(time.time()))
metadata['failed_by'] = WORKER_NAME
metadata['error'] = error[:500] # S3 Metadata limit
metadata['retry_count'] = str(receive_count)
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
except Exception as e:
log(f"Failed to mark as failed: {e}", 'WARNING')
def is_temporary_smtp_error(error_msg: str) -> bool:
"""
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
4xx Codes = temporär, 5xx = permanent
"""
temporary_indicators = [
'421', # Service not available
'450', # Mailbox unavailable
'451', # Local error
'452', # Insufficient storage
'4', # Generisch 4xx
'timeout',
'connection refused',
'connection reset',
'network unreachable',
'temporarily',
'try again'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in temporary_indicators)
def is_permanent_recipient_error(error_msg: str) -> bool:
"""
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid
"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
"""
Sendet E-Mail via SMTP an EINEN Empfänger
Returns: (success: bool, error: str or None, is_permanent: bool)
"""
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
smtp.ehlo()
# STARTTLS falls konfiguriert
if SMTP_USE_TLS:
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
log(f" STARTTLS failed: {e}", 'WARNING')
# Authentication falls konfiguriert
if SMTP_USER and SMTP_PASS:
try:
smtp.login(SMTP_USER, SMTP_PASS)
except Exception as e:
log(f" SMTP auth failed: {e}", 'WARNING')
# E-Mail senden
result = smtp.sendmail(from_addr, [recipient], raw_message)
# Result auswerten
if isinstance(result, dict) and result:
# Empfänger wurde abgelehnt
error = result.get(recipient, 'Unknown refusal')
is_permanent = is_permanent_recipient_error(str(error))
log(f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR')
return False, str(error), is_permanent
else:
# Erfolgreich
log(f"{recipient}: Delivered", 'SUCCESS')
return True, None, False
except smtplib.SMTPException as e:
error_msg = str(e)
is_permanent = is_permanent_recipient_error(error_msg)
log(f"{recipient}: SMTP error - {error_msg}", 'ERROR')
return False, error_msg, is_permanent
except Exception as e:
# Connection errors sind immer temporär
log(f"{recipient}: Connection error - {e}", 'ERROR')
return False, str(e), False
# ==========================================
# HAUPTFUNKTION: PROCESS MESSAGE
# ==========================================
def process_message(message_body: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
"""
try:
# 1. UNPACKING (SNS -> SES)
# SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...'
if 'Message' in message_body and 'Type' in message_body:
# Es ist eine SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
# Fallback: Vielleicht doch direkt SES (Legacy support)
ses_msg = message_body
# 2. DATEN EXTRAHIEREN
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId') # Das ist der S3 Key!
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
# S3 Key Validation
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR')
return True # Löschen, da unbrauchbar
# Domain Validation
# Wir nehmen den ersten Empfänger um die Domain zu prüfen
if recipients:
first_recipient = recipients[0]
domain = first_recipient.split('@')[1]
if domain.lower() != WORKER_DOMAIN.lower():
log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING')
return True # Löschen, gehört nicht hierher
else:
log("⚠ Warning: No recipients in event", 'WARNING')
return True
# Bucket Name ableiten
bucket = get_bucket_name(WORKER_DOMAIN)
key = message_id
log(f"\n{'='*70}")
log(f"Processing Email (SNS/SES):")
log(f" ID: {key}")
log(f" Recipients: {len(recipients)} -> {recipients}")
log(f" Bucket: {bucket}")
# 3. LADEN AUS S3
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
log(f"✓ Loaded {len(raw_bytes)} bytes from S3")
except s3.exceptions.NoSuchKey:
# Race Condition: SNS war schneller als S3.
# Wir geben False zurück, damit SQS es in 30s nochmal versucht.
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
return True # Löschen
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR')
return False # Retry
# 4. PARSING & BOUNCE LOGIC
try:
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
# Hier passiert die Magie: Bounce Header umschreiben
parsed, modified = apply_bounce_logic(parsed, subject)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB")
# Wir arbeiten mit den modifizierten Bytes weiter
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope
else:
from_addr_final = from_addr # Original Envelope Sender
except Exception as e:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING')
from_addr_final = from_addr
# 5. SMTP VERSAND (Loop über Recipients)
log(f"📤 Sending to {len(recipients)} recipient(s)...")
successful = []
failed_permanent = []
failed_temporary = []
for recipient in recipients:
# Wir nutzen raw_bytes (ggf. modifiziert)
# WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final'
# (bei Bounces ist das der Original-Empfänger, sonst der SES Sender)
success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes)
if success:
successful.append(recipient)
elif is_perm:
failed_permanent.append(recipient)
else:
failed_temporary.append(recipient)
# 6. RESULTAT & CLEANUP
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail")
if len(successful) > 0:
# Mindestens einer durchgegangen -> Erfolg
mark_as_processed(bucket, key, failed_permanent if failed_permanent else None)
log(f"✅ Success. Deleted from queue.")
return True
elif len(failed_permanent) == len(recipients):
# Alle permanent fehlgeschlagen (User unknown) -> Löschen
mark_as_all_invalid(bucket, key, failed_permanent)
log(f"🛑 All recipients invalid. Deleted from queue.")
return True
else:
# Temporäre Fehler -> Retry
log(f"🔄 Temporary failures. Keeping in queue.")
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR')
traceback.print_exc()
return False # Retry (außer es crasht immer wieder)
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
log(f" Max messages per poll: {MAX_MESSAGES}")
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
log(f"{'='*70}\n")
consecutive_errors = 0
max_consecutive_errors = 10
messages_processed = 0
last_activity = time.time()
while not shutdown_requested:
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All']
)
# Reset error counter bei erfolgreicher Abfrage
consecutive_errors = 0
if 'Messages' not in response:
# Keine Nachrichten
if time.time() - last_activity > 60:
log(f"Waiting for messages... (processed: {messages_processed})")
last_activity = time.time()
continue
message_count = len(response['Messages'])
log(f"\n✉ Received {message_count} message(s) from queue")
last_activity = time.time()
# Messages verarbeiten
for msg in response['Messages']:
if shutdown_requested:
log("Shutdown requested, stopping processing")
break
receipt_handle = msg['ReceiptHandle']
# Receive Count auslesen
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
# Sent Timestamp (für Queue-Zeit-Berechnung)
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
if queue_time > 0:
log(f"Message was in queue for {queue_time}s")
try:
message_body = json.loads(msg['Body'])
# E-Mail verarbeiten
success = process_message(message_body, receive_count)
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
messages_processed += 1
else:
# Bei Fehler bleibt Message in Queue
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR')
traceback.print_exc()
# Message bleibt in Queue für Retry
except KeyboardInterrupt:
log("\n⚠ Keyboard interrupt received")
break
except Exception as e:
consecutive_errors += 1
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
traceback.print_exc()
if consecutive_errors >= max_consecutive_errors:
log("Too many consecutive errors, shutting down", 'ERROR')
break
# Kurze Pause bei Fehlern
time.sleep(5)
log(f"\n{'='*70}")
log(f"👋 Worker shutting down")
log(f" Messages processed: {messages_processed}")
log(f"{'='*70}\n")
if __name__ == '__main__':
# Validierung
if not WORKER_DOMAIN:
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e:
log(f"Fatal error: {e}", 'ERROR')
traceback.print_exc()
sys.exit(1)