411 lines
14 KiB
Python
411 lines
14 KiB
Python
import os
|
|
import sys
|
|
import boto3
|
|
import smtplib
|
|
import json
|
|
import time
|
|
import traceback
|
|
import signal
|
|
from email.parser import BytesParser
|
|
from email.policy import SMTP as SMTPPolicy
|
|
from datetime import datetime
|
|
|
|
# AWS Configuration
|
|
AWS_REGION = 'us-east-2'
|
|
s3 = boto3.client('s3', region_name=AWS_REGION)
|
|
sqs = boto3.client('sqs', region_name=AWS_REGION)
|
|
|
|
# ✨ Worker Configuration (domain-spezifisch)
|
|
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
|
|
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
|
|
|
|
# Worker Settings
|
|
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
|
|
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
|
|
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
|
|
|
|
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
|
|
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
|
|
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
|
|
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
|
|
SMTP_USER = os.environ.get('SMTP_USER')
|
|
SMTP_PASS = os.environ.get('SMTP_PASS')
|
|
|
|
# Graceful shutdown
|
|
shutdown_requested = False
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
global shutdown_requested
|
|
print(f"\n⚠ Shutdown signal received (signal {signum})")
|
|
shutdown_requested = True
|
|
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
|
def log(message: str, level: str = 'INFO'):
|
|
"""Structured logging with timestamp"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
|
|
|
|
|
|
def domain_to_queue_name(domain: str) -> str:
|
|
"""Konvertiert Domain zu SQS Queue Namen"""
|
|
return domain.replace('.', '-') + '-queue'
|
|
|
|
|
|
def get_queue_url() -> str:
|
|
"""Ermittelt Queue-URL für die konfigurierte Domain"""
|
|
queue_name = domain_to_queue_name(WORKER_DOMAIN)
|
|
|
|
try:
|
|
response = sqs.get_queue_url(QueueName=queue_name)
|
|
return response['QueueUrl']
|
|
except Exception as e:
|
|
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
|
|
|
|
|
|
def mark_as_processed(bucket: str, key: str):
|
|
"""Markiert E-Mail als erfolgreich zugestellt"""
|
|
try:
|
|
head = s3.head_object(Bucket=bucket, Key=key)
|
|
metadata = head.get('Metadata', {}) or {}
|
|
|
|
metadata['processed'] = 'true'
|
|
metadata['processed_at'] = str(int(time.time()))
|
|
metadata['processed_by'] = WORKER_NAME
|
|
metadata['status'] = 'delivered'
|
|
metadata.pop('processing_started', None)
|
|
metadata.pop('queued_at', None)
|
|
|
|
s3.copy_object(
|
|
Bucket=bucket,
|
|
Key=key,
|
|
CopySource={'Bucket': bucket, 'Key': key},
|
|
Metadata=metadata,
|
|
MetadataDirective='REPLACE'
|
|
)
|
|
|
|
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
|
|
|
|
except Exception as e:
|
|
log(f"Failed to mark as processed: {e}", 'WARNING')
|
|
|
|
|
|
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
|
|
"""Markiert E-Mail als fehlgeschlagen"""
|
|
try:
|
|
head = s3.head_object(Bucket=bucket, Key=key)
|
|
metadata = head.get('Metadata', {}) or {}
|
|
|
|
metadata['status'] = 'failed'
|
|
metadata['failed_at'] = str(int(time.time()))
|
|
metadata['failed_by'] = WORKER_NAME
|
|
metadata['error'] = error[:500] # S3 Metadata limit
|
|
metadata['retry_count'] = str(receive_count)
|
|
metadata.pop('processing_started', None)
|
|
|
|
s3.copy_object(
|
|
Bucket=bucket,
|
|
Key=key,
|
|
CopySource={'Bucket': bucket, 'Key': key},
|
|
Metadata=metadata,
|
|
MetadataDirective='REPLACE'
|
|
)
|
|
|
|
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
|
|
|
|
except Exception as e:
|
|
log(f"Failed to mark as failed: {e}", 'WARNING')
|
|
|
|
|
|
def is_temporary_smtp_error(error_msg: str) -> bool:
|
|
"""
|
|
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
|
|
4xx Codes = temporär, 5xx = permanent
|
|
"""
|
|
temporary_indicators = [
|
|
'421', # Service not available
|
|
'450', # Mailbox unavailable
|
|
'451', # Local error
|
|
'452', # Insufficient storage
|
|
'4', # Generisch 4xx
|
|
'timeout',
|
|
'connection refused',
|
|
'connection reset',
|
|
'network unreachable',
|
|
'temporarily',
|
|
'try again'
|
|
]
|
|
|
|
error_lower = error_msg.lower()
|
|
return any(indicator in error_lower for indicator in temporary_indicators)
|
|
|
|
|
|
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
|
|
"""
|
|
Sendet E-Mail via SMTP
|
|
Returns: (success: bool, error: str or None)
|
|
"""
|
|
|
|
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
|
|
|
|
try:
|
|
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
|
|
smtp.ehlo()
|
|
|
|
# STARTTLS falls konfiguriert
|
|
if SMTP_USE_TLS:
|
|
try:
|
|
smtp.starttls()
|
|
smtp.ehlo()
|
|
log("✓ STARTTLS enabled")
|
|
except Exception as e:
|
|
log(f"STARTTLS failed: {e}", 'WARNING')
|
|
|
|
# Authentication falls konfiguriert
|
|
if SMTP_USER and SMTP_PASS:
|
|
try:
|
|
smtp.login(SMTP_USER, SMTP_PASS)
|
|
log("✓ SMTP authenticated")
|
|
except Exception as e:
|
|
log(f"SMTP auth failed: {e}", 'WARNING')
|
|
|
|
# E-Mail senden
|
|
result = smtp.sendmail(from_addr, [recipient], raw_message)
|
|
|
|
# Result auswerten
|
|
if isinstance(result, dict) and result:
|
|
# Empfänger wurde abgelehnt
|
|
error = result.get(recipient, 'Unknown refusal')
|
|
log(f"✗ Recipient refused: {error}", 'ERROR')
|
|
return False, str(error)
|
|
else:
|
|
# Erfolgreich
|
|
log(f"✓ Email delivered to {recipient}", 'SUCCESS')
|
|
return True, None
|
|
|
|
except smtplib.SMTPException as e:
|
|
log(f"✗ SMTP error: {e}", 'ERROR')
|
|
return False, str(e)
|
|
|
|
except Exception as e:
|
|
log(f"✗ Connection error: {e}", 'ERROR')
|
|
return False, str(e)
|
|
|
|
|
|
def process_message(message_body: dict, receive_count: int) -> bool:
|
|
"""
|
|
Verarbeitet eine E-Mail aus der Queue
|
|
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
|
|
"""
|
|
|
|
bucket = message_body['bucket']
|
|
key = message_body['key']
|
|
from_addr = message_body['from']
|
|
recipient = message_body['recipient'] # Nur 1 Empfänger
|
|
domain = message_body['domain']
|
|
subject = message_body.get('subject', '(unknown)')
|
|
message_id = message_body.get('message_id', '(unknown)')
|
|
|
|
log(f"\n{'='*70}")
|
|
log(f"Processing email (Attempt #{receive_count}):")
|
|
log(f" MessageId: {message_id}")
|
|
log(f" Domain: {domain}")
|
|
log(f" From: {from_addr}")
|
|
log(f" To: {recipient}")
|
|
log(f" Subject: {subject}")
|
|
log(f" S3: s3://{bucket}/{key}")
|
|
log(f"{'='*70}")
|
|
|
|
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
|
|
if domain.lower() != WORKER_DOMAIN.lower():
|
|
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
|
|
log("This message should not be in this queue! Deleting...", 'ERROR')
|
|
return True # Message löschen (gehört nicht hierher)
|
|
|
|
# E-Mail aus S3 laden
|
|
try:
|
|
response = s3.get_object(Bucket=bucket, Key=key)
|
|
raw_bytes = response['Body'].read()
|
|
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
|
|
except s3.exceptions.NoSuchKey:
|
|
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
|
|
return True # Nicht retryable - Message löschen
|
|
except Exception as e:
|
|
log(f"✗ Failed to load from S3: {e}", 'ERROR')
|
|
return False # Könnte temporär sein - retry
|
|
|
|
# E-Mail senden
|
|
success, error = send_email(from_addr, recipient, raw_bytes)
|
|
|
|
if success:
|
|
# Erfolgreich zugestellt
|
|
mark_as_processed(bucket, key)
|
|
|
|
log(f"{'='*70}")
|
|
log(f"✅ Email delivered successfully", 'SUCCESS')
|
|
log(f"{'='*70}\n")
|
|
|
|
return True # Message löschen
|
|
|
|
else:
|
|
# Fehler aufgetreten
|
|
error_msg = error or "Unknown SMTP error"
|
|
|
|
# Prüfe ob temporärer Fehler (Retry sinnvoll)
|
|
if receive_count < 3 and is_temporary_smtp_error(error_msg):
|
|
log(f"⚠ Temporary error detected, will retry", 'WARNING')
|
|
log(f"{'='*70}\n")
|
|
return False # Message NICHT löschen → Retry
|
|
else:
|
|
# Permanenter Fehler oder max retries erreicht
|
|
mark_as_failed(bucket, key, error_msg, receive_count)
|
|
|
|
log(f"{'='*70}")
|
|
log(f"✗ Email delivery failed permanently", 'ERROR')
|
|
log(f" Error: {error_msg}")
|
|
log(f"{'='*70}\n")
|
|
|
|
return False # Nach 3 Versuchen → automatisch DLQ
|
|
|
|
|
|
def main_loop():
|
|
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
|
|
|
|
# Queue URL ermitteln
|
|
try:
|
|
queue_url = get_queue_url()
|
|
except Exception as e:
|
|
log(f"FATAL: {e}", 'ERROR')
|
|
sys.exit(1)
|
|
|
|
log(f"\n{'='*70}")
|
|
log(f"🚀 Email Worker started")
|
|
log(f"{'='*70}")
|
|
log(f" Worker Name: {WORKER_NAME}")
|
|
log(f" Domain: {WORKER_DOMAIN}")
|
|
log(f" Queue: {queue_url}")
|
|
log(f" Region: {AWS_REGION}")
|
|
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
|
|
log(f" Poll interval: {POLL_INTERVAL}s")
|
|
log(f" Max messages per poll: {MAX_MESSAGES}")
|
|
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
|
|
log(f"{'='*70}\n")
|
|
|
|
consecutive_errors = 0
|
|
max_consecutive_errors = 10
|
|
messages_processed = 0
|
|
last_activity = time.time()
|
|
|
|
while not shutdown_requested:
|
|
try:
|
|
# Messages aus Queue holen (Long Polling)
|
|
response = sqs.receive_message(
|
|
QueueUrl=queue_url,
|
|
MaxNumberOfMessages=MAX_MESSAGES,
|
|
WaitTimeSeconds=POLL_INTERVAL,
|
|
VisibilityTimeout=VISIBILITY_TIMEOUT,
|
|
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
|
|
MessageAttributeNames=['All']
|
|
)
|
|
|
|
# Reset error counter bei erfolgreicher Abfrage
|
|
consecutive_errors = 0
|
|
|
|
if 'Messages' not in response:
|
|
# Keine Nachrichten
|
|
if time.time() - last_activity > 60:
|
|
log(f"Waiting for messages... (processed: {messages_processed})")
|
|
last_activity = time.time()
|
|
continue
|
|
|
|
message_count = len(response['Messages'])
|
|
log(f"\n✉ Received {message_count} message(s) from queue")
|
|
last_activity = time.time()
|
|
|
|
# Messages verarbeiten
|
|
for msg in response['Messages']:
|
|
if shutdown_requested:
|
|
log("Shutdown requested, stopping processing")
|
|
break
|
|
|
|
receipt_handle = msg['ReceiptHandle']
|
|
|
|
# Receive Count auslesen
|
|
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
|
|
|
|
# Sent Timestamp (für Queue-Zeit-Berechnung)
|
|
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
|
|
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
|
|
|
|
if queue_time > 0:
|
|
log(f"Message was in queue for {queue_time}s")
|
|
|
|
try:
|
|
message_body = json.loads(msg['Body'])
|
|
|
|
# E-Mail verarbeiten
|
|
success = process_message(message_body, receive_count)
|
|
|
|
if success:
|
|
# Message aus Queue löschen
|
|
sqs.delete_message(
|
|
QueueUrl=queue_url,
|
|
ReceiptHandle=receipt_handle
|
|
)
|
|
log("✓ Message deleted from queue")
|
|
messages_processed += 1
|
|
else:
|
|
# Bei Fehler bleibt Message in Queue
|
|
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
|
|
|
|
except json.JSONDecodeError as e:
|
|
log(f"✗ Invalid message format: {e}", 'ERROR')
|
|
# Ungültige Messages löschen (nicht retryable)
|
|
sqs.delete_message(
|
|
QueueUrl=queue_url,
|
|
ReceiptHandle=receipt_handle
|
|
)
|
|
|
|
except Exception as e:
|
|
log(f"✗ Error processing message: {e}", 'ERROR')
|
|
traceback.print_exc()
|
|
# Message bleibt in Queue für Retry
|
|
|
|
except KeyboardInterrupt:
|
|
log("\n⚠ Keyboard interrupt received")
|
|
break
|
|
|
|
except Exception as e:
|
|
consecutive_errors += 1
|
|
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
|
|
traceback.print_exc()
|
|
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
log("Too many consecutive errors, shutting down", 'ERROR')
|
|
break
|
|
|
|
# Kurze Pause bei Fehlern
|
|
time.sleep(5)
|
|
|
|
log(f"\n{'='*70}")
|
|
log(f"👋 Worker shutting down")
|
|
log(f" Messages processed: {messages_processed}")
|
|
log(f"{'='*70}\n")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Validierung
|
|
if not WORKER_DOMAIN:
|
|
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
|
|
sys.exit(1)
|
|
|
|
try:
|
|
main_loop()
|
|
except Exception as e:
|
|
log(f"Fatal error: {e}", 'ERROR')
|
|
traceback.print_exc()
|
|
sys.exit(1) |