email-amazon/worker.py

520 lines
18 KiB
Python

import os
import sys
import boto3
import smtplib
import json
import time
import traceback
import signal
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Graceful shutdown
shutdown_requested = False
def signal_handler(signum, frame):
global shutdown_requested
print(f"\n⚠ Shutdown signal received (signal {signum})")
shutdown_requested = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def log(message: str, level: str = 'INFO'):
"""Structured logging with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None):
"""
Markiert E-Mail als erfolgreich zugestellt
Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
# Invalid inboxes speichern falls vorhanden
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING')
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list):
"""
Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING')
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
"""
Markiert E-Mail als komplett fehlgeschlagen
Wird nur aufgerufen wenn ALLE Recipients fehlschlagen
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['status'] = 'failed'
metadata['failed_at'] = str(int(time.time()))
metadata['failed_by'] = WORKER_NAME
metadata['error'] = error[:500] # S3 Metadata limit
metadata['retry_count'] = str(receive_count)
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
except Exception as e:
log(f"Failed to mark as failed: {e}", 'WARNING')
def is_temporary_smtp_error(error_msg: str) -> bool:
"""
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
4xx Codes = temporär, 5xx = permanent
"""
temporary_indicators = [
'421', # Service not available
'450', # Mailbox unavailable
'451', # Local error
'452', # Insufficient storage
'4', # Generisch 4xx
'timeout',
'connection refused',
'connection reset',
'network unreachable',
'temporarily',
'try again'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in temporary_indicators)
def is_permanent_recipient_error(error_msg: str) -> bool:
"""
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid
"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
"""
Sendet E-Mail via SMTP an EINEN Empfänger
Returns: (success: bool, error: str or None, is_permanent: bool)
"""
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
smtp.ehlo()
# STARTTLS falls konfiguriert
if SMTP_USE_TLS:
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
log(f" STARTTLS failed: {e}", 'WARNING')
# Authentication falls konfiguriert
if SMTP_USER and SMTP_PASS:
try:
smtp.login(SMTP_USER, SMTP_PASS)
except Exception as e:
log(f" SMTP auth failed: {e}", 'WARNING')
# E-Mail senden
result = smtp.sendmail(from_addr, [recipient], raw_message)
# Result auswerten
if isinstance(result, dict) and result:
# Empfänger wurde abgelehnt
error = result.get(recipient, 'Unknown refusal')
is_permanent = is_permanent_recipient_error(str(error))
log(f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR')
return False, str(error), is_permanent
else:
# Erfolgreich
log(f"{recipient}: Delivered", 'SUCCESS')
return True, None, False
except smtplib.SMTPException as e:
error_msg = str(e)
is_permanent = is_permanent_recipient_error(error_msg)
log(f"{recipient}: SMTP error - {error_msg}", 'ERROR')
return False, error_msg, is_permanent
except Exception as e:
# Connection errors sind immer temporär
log(f"{recipient}: Connection error - {e}", 'ERROR')
return False, str(e), False
def process_message(message_body: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue
Kann mehrere Recipients haben - sendet an alle
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
"""
bucket = message_body['bucket']
key = message_body['key']
from_addr = message_body['from']
recipients = message_body['recipients'] # Liste von Empfängern
domain = message_body['domain']
subject = message_body.get('subject', '(unknown)')
message_id = message_body.get('message_id', '(unknown)')
log(f"\n{'='*70}")
log(f"Processing email (Attempt #{receive_count}):")
log(f" MessageId: {message_id}")
log(f" S3 Key: {key}")
log(f" Domain: {domain}")
log(f" From: {from_addr}")
log(f" Recipients: {len(recipients)}")
for recipient in recipients:
log(f" - {recipient}")
log(f" Subject: {subject}")
log(f" S3: s3://{bucket}/{key}")
log(f"{'='*70}")
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
if domain.lower() != WORKER_DOMAIN.lower():
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
log("This message should not be in this queue! Deleting...", 'ERROR')
return True # Message löschen (gehört nicht hierher)
# E-Mail aus S3 laden
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
except s3.exceptions.NoSuchKey:
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
return True # Nicht retryable - Message löschen
except Exception as e:
log(f"✗ Failed to load from S3: {e}", 'ERROR')
return False # Könnte temporär sein - retry
# An alle Recipients senden
log(f"\n📤 Sending to {len(recipients)} recipient(s)...")
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
successful = []
failed_temporary = []
failed_permanent = []
for recipient in recipients:
success, error, is_permanent = send_email(from_addr, recipient, raw_bytes)
if success:
successful.append(recipient)
elif is_permanent:
failed_permanent.append(recipient)
else:
failed_temporary.append(recipient)
# Ergebnis-Zusammenfassung
log(f"\n📊 Delivery Results:")
log(f" ✓ Successful: {len(successful)}/{len(recipients)}")
log(f" ✗ Failed (temporary): {len(failed_temporary)}")
log(f" ✗ Failed (permanent): {len(failed_permanent)}")
# Entscheidungslogik
if len(successful) > 0:
# ✅ Fall 1: Mindestens 1 Recipient erfolgreich
# → status=delivered, invalid_inboxes tracken
invalid_inboxes = failed_permanent if failed_permanent else None
mark_as_processed(bucket, key, invalid_inboxes)
log(f"{'='*70}")
log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS')
if failed_permanent:
log(f"{len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING')
if failed_temporary:
log(f"{len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING')
log(f"{'='*70}\n")
return True # Message löschen
elif len(failed_permanent) == len(recipients):
# ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig)
# → status=failed, invalid_inboxes = ALLE
mark_as_all_invalid(bucket, key, failed_permanent)
log(f"{'='*70}")
log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR')
log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR')
log(f"{'='*70}\n")
return True # Message löschen (nicht retryable)
else:
# ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries
# → Retry wenn noch Versuche übrig
if receive_count < 3:
log(f"⚠ All failures are temporary, will retry", 'WARNING')
log(f"{'='*70}\n")
return False # Message NICHT löschen → Retry
else:
# Max retries erreicht → als failed markieren
error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients."
mark_as_failed(bucket, key, error_summary, receive_count)
log(f"{'='*70}")
log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR')
log(f"{'='*70}\n")
return False # Nach 3 Versuchen → automatisch DLQ
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
log(f" Max messages per poll: {MAX_MESSAGES}")
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
log(f"{'='*70}\n")
consecutive_errors = 0
max_consecutive_errors = 10
messages_processed = 0
last_activity = time.time()
while not shutdown_requested:
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All']
)
# Reset error counter bei erfolgreicher Abfrage
consecutive_errors = 0
if 'Messages' not in response:
# Keine Nachrichten
if time.time() - last_activity > 60:
log(f"Waiting for messages... (processed: {messages_processed})")
last_activity = time.time()
continue
message_count = len(response['Messages'])
log(f"\n✉ Received {message_count} message(s) from queue")
last_activity = time.time()
# Messages verarbeiten
for msg in response['Messages']:
if shutdown_requested:
log("Shutdown requested, stopping processing")
break
receipt_handle = msg['ReceiptHandle']
# Receive Count auslesen
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
# Sent Timestamp (für Queue-Zeit-Berechnung)
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
if queue_time > 0:
log(f"Message was in queue for {queue_time}s")
try:
message_body = json.loads(msg['Body'])
# E-Mail verarbeiten
success = process_message(message_body, receive_count)
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
messages_processed += 1
else:
# Bei Fehler bleibt Message in Queue
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR')
traceback.print_exc()
# Message bleibt in Queue für Retry
except KeyboardInterrupt:
log("\n⚠ Keyboard interrupt received")
break
except Exception as e:
consecutive_errors += 1
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
traceback.print_exc()
if consecutive_errors >= max_consecutive_errors:
log("Too many consecutive errors, shutting down", 'ERROR')
break
# Kurze Pause bei Fehlern
time.sleep(5)
log(f"\n{'='*70}")
log(f"👋 Worker shutting down")
log(f" Messages processed: {messages_processed}")
log(f"{'='*70}\n")
if __name__ == '__main__':
# Validierung
if not WORKER_DOMAIN:
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e:
log(f"Fatal error: {e}", 'ERROR')
traceback.print_exc()
sys.exit(1)