email-amazon/worker.py

411 lines
14 KiB
Python

import os
import sys
import boto3
import smtplib
import json
import time
import traceback
import signal
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Graceful shutdown
shutdown_requested = False
def signal_handler(signum, frame):
global shutdown_requested
print(f"\n⚠ Shutdown signal received (signal {signum})")
shutdown_requested = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def log(message: str, level: str = 'INFO'):
"""Structured logging with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str):
"""Markiert E-Mail als erfolgreich zugestellt"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING')
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
"""Markiert E-Mail als fehlgeschlagen"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['status'] = 'failed'
metadata['failed_at'] = str(int(time.time()))
metadata['failed_by'] = WORKER_NAME
metadata['error'] = error[:500] # S3 Metadata limit
metadata['retry_count'] = str(receive_count)
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
except Exception as e:
log(f"Failed to mark as failed: {e}", 'WARNING')
def is_temporary_smtp_error(error_msg: str) -> bool:
"""
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
4xx Codes = temporär, 5xx = permanent
"""
temporary_indicators = [
'421', # Service not available
'450', # Mailbox unavailable
'451', # Local error
'452', # Insufficient storage
'4', # Generisch 4xx
'timeout',
'connection refused',
'connection reset',
'network unreachable',
'temporarily',
'try again'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in temporary_indicators)
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
"""
Sendet E-Mail via SMTP
Returns: (success: bool, error: str or None)
"""
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
smtp.ehlo()
# STARTTLS falls konfiguriert
if SMTP_USE_TLS:
try:
smtp.starttls()
smtp.ehlo()
log("✓ STARTTLS enabled")
except Exception as e:
log(f"STARTTLS failed: {e}", 'WARNING')
# Authentication falls konfiguriert
if SMTP_USER and SMTP_PASS:
try:
smtp.login(SMTP_USER, SMTP_PASS)
log("✓ SMTP authenticated")
except Exception as e:
log(f"SMTP auth failed: {e}", 'WARNING')
# E-Mail senden
result = smtp.sendmail(from_addr, [recipient], raw_message)
# Result auswerten
if isinstance(result, dict) and result:
# Empfänger wurde abgelehnt
error = result.get(recipient, 'Unknown refusal')
log(f"✗ Recipient refused: {error}", 'ERROR')
return False, str(error)
else:
# Erfolgreich
log(f"✓ Email delivered to {recipient}", 'SUCCESS')
return True, None
except smtplib.SMTPException as e:
log(f"✗ SMTP error: {e}", 'ERROR')
return False, str(e)
except Exception as e:
log(f"✗ Connection error: {e}", 'ERROR')
return False, str(e)
def process_message(message_body: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
"""
bucket = message_body['bucket']
key = message_body['key']
from_addr = message_body['from']
recipient = message_body['recipient'] # Nur 1 Empfänger
domain = message_body['domain']
subject = message_body.get('subject', '(unknown)')
message_id = message_body.get('message_id', '(unknown)')
log(f"\n{'='*70}")
log(f"Processing email (Attempt #{receive_count}):")
log(f" MessageId: {message_id}")
log(f" Domain: {domain}")
log(f" From: {from_addr}")
log(f" To: {recipient}")
log(f" Subject: {subject}")
log(f" S3: s3://{bucket}/{key}")
log(f"{'='*70}")
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
if domain.lower() != WORKER_DOMAIN.lower():
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
log("This message should not be in this queue! Deleting...", 'ERROR')
return True # Message löschen (gehört nicht hierher)
# E-Mail aus S3 laden
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
except s3.exceptions.NoSuchKey:
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
return True # Nicht retryable - Message löschen
except Exception as e:
log(f"✗ Failed to load from S3: {e}", 'ERROR')
return False # Könnte temporär sein - retry
# E-Mail senden
success, error = send_email(from_addr, recipient, raw_bytes)
if success:
# Erfolgreich zugestellt
mark_as_processed(bucket, key)
log(f"{'='*70}")
log(f"✅ Email delivered successfully", 'SUCCESS')
log(f"{'='*70}\n")
return True # Message löschen
else:
# Fehler aufgetreten
error_msg = error or "Unknown SMTP error"
# Prüfe ob temporärer Fehler (Retry sinnvoll)
if receive_count < 3 and is_temporary_smtp_error(error_msg):
log(f"⚠ Temporary error detected, will retry", 'WARNING')
log(f"{'='*70}\n")
return False # Message NICHT löschen → Retry
else:
# Permanenter Fehler oder max retries erreicht
mark_as_failed(bucket, key, error_msg, receive_count)
log(f"{'='*70}")
log(f"✗ Email delivery failed permanently", 'ERROR')
log(f" Error: {error_msg}")
log(f"{'='*70}\n")
return False # Nach 3 Versuchen → automatisch DLQ
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
log(f" Max messages per poll: {MAX_MESSAGES}")
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
log(f"{'='*70}\n")
consecutive_errors = 0
max_consecutive_errors = 10
messages_processed = 0
last_activity = time.time()
while not shutdown_requested:
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All']
)
# Reset error counter bei erfolgreicher Abfrage
consecutive_errors = 0
if 'Messages' not in response:
# Keine Nachrichten
if time.time() - last_activity > 60:
log(f"Waiting for messages... (processed: {messages_processed})")
last_activity = time.time()
continue
message_count = len(response['Messages'])
log(f"\n✉ Received {message_count} message(s) from queue")
last_activity = time.time()
# Messages verarbeiten
for msg in response['Messages']:
if shutdown_requested:
log("Shutdown requested, stopping processing")
break
receipt_handle = msg['ReceiptHandle']
# Receive Count auslesen
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
# Sent Timestamp (für Queue-Zeit-Berechnung)
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
if queue_time > 0:
log(f"Message was in queue for {queue_time}s")
try:
message_body = json.loads(msg['Body'])
# E-Mail verarbeiten
success = process_message(message_body, receive_count)
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
messages_processed += 1
else:
# Bei Fehler bleibt Message in Queue
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR')
traceback.print_exc()
# Message bleibt in Queue für Retry
except KeyboardInterrupt:
log("\n⚠ Keyboard interrupt received")
break
except Exception as e:
consecutive_errors += 1
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
traceback.print_exc()
if consecutive_errors >= max_consecutive_errors:
log("Too many consecutive errors, shutting down", 'ERROR')
break
# Kurze Pause bei Fehlern
time.sleep(5)
log(f"\n{'='*70}")
log(f"👋 Worker shutting down")
log(f" Messages processed: {messages_processed}")
log(f"{'='*70}\n")
if __name__ == '__main__':
# Validierung
if not WORKER_DOMAIN:
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e:
log(f"Fatal error: {e}", 'ERROR')
traceback.print_exc()
sys.exit(1)