import os import sys import boto3 import smtplib import json import time import traceback import signal from email.parser import BytesParser from email.policy import SMTP as SMTPPolicy from datetime import datetime from botocore.exceptions import ClientError # Neu: Korrekter Import für SES-Exceptions # AWS Configuration AWS_REGION = 'us-east-2' s3 = boto3.client('s3', region_name=AWS_REGION) sqs = boto3.client('sqs', region_name=AWS_REGION) ses = boto3.client('ses', region_name=AWS_REGION) # Neu: Für OOO/Forwards # ✨ Worker Configuration (domain-spezifisch) WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') # Worker Settings POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) # SMTP Configuration (einfach, da nur 1 Domain pro Worker) SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' SMTP_USER = os.environ.get('SMTP_USER') SMTP_PASS = os.environ.get('SMTP_PASS') # Graceful shutdown shutdown_requested = False # DynamoDB Ressource für Bounce-Lookup # DynamoDB Ressource für Bounce-Lookup und Rules try: dynamo = boto3.resource('dynamodb', region_name=AWS_REGION) msg_table = dynamo.Table('ses-outbound-messages') rules_table = dynamo.Table('email-rules') # Neu: Für OOO/Forwards except Exception as e: log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING') msg_table = None rules_table = None def get_bucket_name(domain): """Konvention: domain.tld -> domain-tld-emails""" return domain.replace('.', '-') + '-emails' def is_ses_bounce_or_autoreply(parsed): """Erkennt SES Bounces""" from_h = (parsed.get('From') or '').lower() auto_sub = (parsed.get('Auto-Submitted') or '').lower() is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub return is_mailer_daemon or is_auto_replied def extract_original_message_id(parsed): """ Extrahiert Original SES Message-ID aus Email SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-[hex6] """ import re # SES Message-ID Pattern (endet immer mit -000000) ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000') # 1. Versuche Standard-Header (In-Reply-To, References) for header in ['In-Reply-To', 'References']: value = (parsed.get(header) or '').strip() if value: match = ses_pattern.search(value) if match: log(f" Found Message-ID in {header}: {match.group(0)}") return match.group(0) # 2. Durchsuche Message-ID Header (manchmal steht dort die Original-ID) msg_id_header = (parsed.get('Message-ID') or '').strip() if msg_id_header: match = ses_pattern.search(msg_id_header) if match: # Aber nur wenn es nicht die ID der aktuellen Bounce-Message ist # (die beginnt oft auch mit 010f...) pass # Wir überspringen das erstmal # 3. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts) # Das fängt auch attached messages, text attachments, etc. ab try: body_text = '' # Hole den kompletten Body als String if parsed.is_multipart(): for part in parsed.walk(): content_type = part.get_content_type() # Durchsuche ALLE Parts (außer Binärdaten wie images) # Text-Parts, HTML, attached messages, und auch application/* Parts if content_type.startswith('text/') or \ content_type == 'message/rfc822' or \ content_type.startswith('application/'): try: payload = part.get_payload(decode=True) if payload: # Versuche als UTF-8, fallback auf Latin-1 try: body_text += payload.decode('utf-8', errors='ignore') except: try: body_text += payload.decode('latin-1', errors='ignore') except: # Letzter Versuch: als ASCII mit ignore body_text += str(payload, errors='ignore') except: # Falls decode fehlschlägt, String-Payload holen payload = part.get_payload() if isinstance(payload, str): body_text += payload else: # Nicht-Multipart Message payload = parsed.get_payload(decode=True) if payload: try: body_text = payload.decode('utf-8', errors='ignore') except: body_text = payload.decode('latin-1', errors='ignore') # Suche alle SES Message-IDs im Body matches = ses_pattern.findall(body_text) if matches: # Nehme die ERSTE gefundene ID (meist die Original-ID) # Die letzte ist oft die Bounce-Message selbst log(f" Found {len(matches)} SES Message-ID(s) in body, using first: {matches[0]}") return matches[0] except Exception as e: log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING') return None def apply_bounce_logic(parsed, subject): """ Prüft auf Bounce, sucht in DynamoDB und schreibt Header um. Returns: (parsed_email_object, was_modified_bool) """ if not is_ses_bounce_or_autoreply(parsed): return parsed, False log("🔍 Detected auto-response/bounce. Checking DynamoDB...") original_msg_id = extract_original_message_id(parsed) if not original_msg_id: log("⚠ Could not extract original Message-ID") return parsed, False try: # Lookup in DynamoDB result = msg_table.get_item(Key={'MessageId': original_msg_id}) item = result.get('Item') if not item: log(f"⚠ No DynamoDB record found for {original_msg_id}") return parsed, False # Treffer! orig_source = item.get('source', '') orig_destinations = item.get('destinations', []) original_recipient = orig_destinations[0] if orig_destinations else '' if original_recipient: log(f"✓ Found original sender: {orig_source} -> intended for {original_recipient}") # Rewrite Headers parsed['X-Original-SES-From'] = parsed.get('From', '') parsed.replace_header('From', original_recipient) if not parsed.get('Reply-To'): parsed['Reply-To'] = original_recipient if 'delivery status notification' in subject.lower(): parsed.replace_header('Subject', f"Delivery Status: {original_recipient}") return parsed, True except Exception as e: log(f"⚠ DynamoDB Error: {e}") return parsed, False def signal_handler(signum, frame): global shutdown_requested print(f"\n⚠ Shutdown signal received (signal {signum})") shutdown_requested = True signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def log(message: str, level: str = 'INFO'): """Structured logging with timestamp""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) def domain_to_queue_name(domain: str) -> str: """Konvertiert Domain zu SQS Queue Namen""" return domain.replace('.', '-') + '-queue' def get_queue_url() -> str: """Ermittelt Queue-URL für die konfigurierte Domain""" queue_name = domain_to_queue_name(WORKER_DOMAIN) try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except Exception as e: raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}") def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): """ Markiert E-Mail als erfolgreich zugestellt Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) # Invalid inboxes speichern falls vorhanden if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING') s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING') def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list): """ Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = WORKER_NAME metadata['status'] = 'failed' metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' metadata['invalid_inboxes'] = ','.join(invalid_inboxes) metadata.pop('processing_started', None) metadata.pop('queued_at', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS') except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING') def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): """ Markiert E-Mail als komplett fehlgeschlagen Wird nur aufgerufen wenn ALLE Recipients fehlschlagen """ try: head = s3.head_object(Bucket=bucket, Key=key) metadata = head.get('Metadata', {}) or {} metadata['status'] = 'failed' metadata['failed_at'] = str(int(time.time())) metadata['failed_by'] = WORKER_NAME metadata['error'] = error[:500] # S3 Metadata limit metadata['retry_count'] = str(receive_count) metadata.pop('processing_started', None) s3.copy_object( Bucket=bucket, Key=key, CopySource={'Bucket': bucket, 'Key': key}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') except Exception as e: log(f"Failed to mark as failed: {e}", 'WARNING') def is_temporary_smtp_error(error_msg: str) -> bool: """ Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) 4xx Codes = temporär, 5xx = permanent """ temporary_indicators = [ '421', # Service not available '450', # Mailbox unavailable '451', # Local error '452', # Insufficient storage '4', # Generisch 4xx 'timeout', 'connection refused', 'connection reset', 'network unreachable', 'temporarily', 'try again' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in temporary_indicators) def is_permanent_recipient_error(error_msg: str) -> bool: """ Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht) 550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid """ permanent_indicators = [ '550', # Mailbox unavailable / not found '551', # User not local '553', # Mailbox name not allowed / invalid 'mailbox not found', 'user unknown', 'no such user', 'recipient rejected', 'does not exist', 'invalid recipient', 'unknown user' ] error_lower = error_msg.lower() return any(indicator in error_lower for indicator in permanent_indicators) def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: """ Sendet E-Mail via SMTP an EINEN Empfänger Returns: (success: bool, error: str or None, is_permanent: bool) """ try: with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: smtp.ehlo() # STARTTLS falls konfiguriert if SMTP_USE_TLS: try: smtp.starttls() smtp.ehlo() except Exception as e: log(f" STARTTLS failed: {e}", 'WARNING') # Authentication falls konfiguriert if SMTP_USER and SMTP_PASS: try: smtp.login(SMTP_USER, SMTP_PASS) except Exception as e: log(f" SMTP auth failed: {e}", 'WARNING') # E-Mail senden result = smtp.sendmail(from_addr, [recipient], raw_message) # Result auswerten if isinstance(result, dict) and result: # Empfänger wurde abgelehnt error = result.get(recipient, 'Unknown refusal') is_permanent = is_permanent_recipient_error(str(error)) log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR') return False, str(error), is_permanent else: # Erfolgreich log(f" ✓ {recipient}: Delivered", 'SUCCESS') return True, None, False except smtplib.SMTPException as e: error_msg = str(e) is_permanent = is_permanent_recipient_error(error_msg) log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR') return False, error_msg, is_permanent except Exception as e: # Connection errors sind immer temporär log(f" ✗ {recipient}: Connection error - {e}", 'ERROR') return False, str(e), False # ========================================== # HAUPTFUNKTION: PROCESS MESSAGE # ========================================== def process_message(message_body: dict, receive_count: int) -> bool: """ Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification) Returns: True (Erfolg/Löschen), False (Retry/Behalten) """ try: # 1. UNPACKING (SNS -> SES) # SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...' if 'Message' in message_body and 'Type' in message_body: # Es ist eine SNS Notification sns_content = message_body['Message'] if isinstance(sns_content, str): ses_msg = json.loads(sns_content) else: ses_msg = sns_content else: # Fallback: Vielleicht doch direkt SES (Legacy support) ses_msg = message_body # 2. DATEN EXTRAHIEREN mail = ses_msg.get('mail', {}) receipt = ses_msg.get('receipt', {}) message_id = mail.get('messageId') # Das ist der S3 Key! # FIX: Amazon SES Setup Notification ignorieren if message_id == "AMAZON_SES_SETUP_NOTIFICATION": log("ℹ️ Received Amazon SES Setup Notification. Ignoring.", 'INFO') return True # Erfolgreich (löschen), da kein Fehler from_addr = mail.get('source') recipients = receipt.get('recipients', []) # S3 Key Validation if not message_id: log("❌ Error: No messageId in event payload", 'ERROR') return True # Löschen, da unbrauchbar # Domain Validation # Wir nehmen den ersten Empfänger um die Domain zu prüfen if recipients: first_recipient = recipients[0] domain = first_recipient.split('@')[1] if domain.lower() != WORKER_DOMAIN.lower(): log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING') return True # Löschen, gehört nicht hierher else: log("⚠ Warning: No recipients in event", 'WARNING') return True # Bucket Name ableiten bucket = get_bucket_name(WORKER_DOMAIN) key = message_id log(f"\n{'='*70}") log(f"Processing Email (SNS/SES):") log(f" ID: {key}") log(f" Recipients: {len(recipients)} -> {recipients}") log(f" Bucket: {bucket}") # 3. LADEN AUS S3 try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() log(f"✓ Loaded {len(raw_bytes)} bytes from S3") except s3.exceptions.NoSuchKey: # Race Condition: SNS war schneller als S3. # Wir geben False zurück, damit SQS es in 30s nochmal versucht. if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') return False else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR') return True # Löschen except Exception as e: log(f"❌ S3 Download Error: {e}", 'ERROR') return False # Retry # 4. PARSING & BOUNCE LOGIC try: parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) subject = parsed.get('Subject', '(no subject)') # Hier passiert die Magie: Bounce Header umschreiben parsed, modified = apply_bounce_logic(parsed, subject) if modified: log(" ✨ Bounce detected & headers rewritten via DynamoDB") # Wir arbeiten mit den modifizierten Bytes weiter raw_bytes = parsed.as_bytes() from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope else: from_addr_final = from_addr # Original Envelope Sender except Exception as e: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING') from_addr_final = from_addr # 5. OOO & FORWARD LOGIC (neu, vor SMTP-Versand) if rules_table and not is_ses_bounce_or_autoreply(parsed): # Vermeide Loops bei Bounces/Auto-Replies for recipient in recipients: try: rule = rules_table.get_item(Key={'email_address': recipient}).get('Item', {}) # OOO handling if rule.get('ooo_active', False): ooo_msg = rule.get('ooo_message', 'Default OOO message.') content_type = rule.get('ooo_content_type', 'text') # Default: text sender = parsed.get('From') # Original-Sender reply_subject = f"Out of Office: {subject}" original_body = str(parsed.get_payload(decode=True)) # Original für Quote if content_type == 'html': reply_body = {'Html': {'Data': f"

{ooo_msg}


Original Message:
Subject: {parsed.get('Subject')}
From: {sender}

{original_body}
"}} else: reply_body = {'Text': {'Data': f"{ooo_msg}\n\nOriginal Message:\nSubject: {parsed.get('Subject')}\nFrom: {sender}\n\n{original_body}"}} ses.send_email( Source=recipient, # Verifizierte eigene Adresse Destination={'ToAddresses': [sender]}, Message={ 'Subject': {'Data': reply_subject}, 'Body': reply_body # Dynamisch Text oder Html }, ReplyToAddresses=[recipient] # Optional: Für Replies ) log(f"✓ Sent OOO reply to {sender} from {recipient}") # Forward handling forwards = rule.get('forwards', []) if forwards: original_from = parsed.get('From') # Für Headers fwd_subject = f"FWD: {subject}" fwd_body_text = f"Forwarded from: {original_from}\n\n{original_body}" fwd_body = {'Text': {'Data': fwd_body_text}} # Erweiterbar auf HTML for forward_to in forwards: ses.send_email( Source=recipient, # Verifizierte eigene Adresse Destination={'ToAddresses': [forward_to]}, Message={ 'Subject': {'Data': fwd_subject}, 'Body': fwd_body }, ReplyToAddresses=[original_from] # Original-Sender für Replies ) log(f"✓ Forwarded to {forward_to} from {recipient} (original: {original_from})") except ClientError as e: # Fix: Korrekter Exception-Typ error_code = e.response['Error']['Code'] if error_code == 'MessageRejected': log(f"⚠ SES rejected send for {recipient}: {e}. Check verification or quotas.", 'ERROR') elif error_code == 'AccessDenied': log(f"⚠ SES AccessDenied for {recipient}: {e}. Check IAM policy.", 'ERROR') else: log(f"⚠ SES error for {recipient}: {e}", 'ERROR') except Exception as e: log(f"⚠ General error for {recipient}: {e}", 'WARNING') # 6. SMTP VERSAND (Loop über Recipients) log(f"📤 Sending to {len(recipients)} recipient(s)...") successful = [] failed_permanent = [] failed_temporary = [] for recipient in recipients: # Wir nutzen raw_bytes (ggf. modifiziert) # WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final' # (bei Bounces ist das der Original-Empfänger, sonst der SES Sender) success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes) if success: successful.append(recipient) elif is_perm: failed_permanent.append(recipient) else: failed_temporary.append(recipient) # 6. RESULTAT & CLEANUP log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail") if len(successful) > 0: # Mindestens einer durchgegangen -> Erfolg mark_as_processed(bucket, key, failed_permanent if failed_permanent else None) log(f"✅ Success. Deleted from queue.") return True elif len(failed_permanent) == len(recipients): # Alle permanent fehlgeschlagen (User unknown) -> Löschen mark_as_all_invalid(bucket, key, failed_permanent) log(f"🛑 All recipients invalid. Deleted from queue.") return True else: # Temporäre Fehler -> Retry log(f"🔄 Temporary failures. Keeping in queue.") return False except Exception as e: log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR') traceback.print_exc() return False # Retry (außer es crasht immer wieder) def main_loop(): """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" # Queue URL ermitteln try: queue_url = get_queue_url() except Exception as e: log(f"FATAL: {e}", 'ERROR') sys.exit(1) log(f"\n{'='*70}") log(f"🚀 Email Worker started") log(f"{'='*70}") log(f" Worker Name: {WORKER_NAME}") log(f" Domain: {WORKER_DOMAIN}") log(f" Queue: {queue_url}") log(f" Region: {AWS_REGION}") log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") log(f" Poll interval: {POLL_INTERVAL}s") log(f" Max messages per poll: {MAX_MESSAGES}") log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") log(f"{'='*70}\n") consecutive_errors = 0 max_consecutive_errors = 10 messages_processed = 0 last_activity = time.time() while not shutdown_requested: try: # Messages aus Queue holen (Long Polling) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=MAX_MESSAGES, WaitTimeSeconds=POLL_INTERVAL, VisibilityTimeout=VISIBILITY_TIMEOUT, AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], MessageAttributeNames=['All'] ) # Reset error counter bei erfolgreicher Abfrage consecutive_errors = 0 if 'Messages' not in response: # Keine Nachrichten if time.time() - last_activity > 60: log(f"Waiting for messages... (processed: {messages_processed})") last_activity = time.time() continue message_count = len(response['Messages']) log(f"\n✉ Received {message_count} message(s) from queue") last_activity = time.time() # Messages verarbeiten for msg in response['Messages']: if shutdown_requested: log("Shutdown requested, stopping processing") break receipt_handle = msg['ReceiptHandle'] # Receive Count auslesen receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) # Sent Timestamp (für Queue-Zeit-Berechnung) sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 if queue_time > 0: log(f"Message was in queue for {queue_time}s") try: message_body = json.loads(msg['Body']) # E-Mail verarbeiten success = process_message(message_body, receive_count) if success: # Message aus Queue löschen sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) log("✓ Message deleted from queue") messages_processed += 1 else: # Bei Fehler bleibt Message in Queue log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") except json.JSONDecodeError as e: log(f"✗ Invalid message format: {e}", 'ERROR') # Ungültige Messages löschen (nicht retryable) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) except Exception as e: log(f"✗ Error processing message: {e}", 'ERROR') traceback.print_exc() # Message bleibt in Queue für Retry except KeyboardInterrupt: log("\n⚠ Keyboard interrupt received") break except Exception as e: consecutive_errors += 1 log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') traceback.print_exc() if consecutive_errors >= max_consecutive_errors: log("Too many consecutive errors, shutting down", 'ERROR') break # Kurze Pause bei Fehlern time.sleep(5) log(f"\n{'='*70}") log(f"👋 Worker shutting down") log(f" Messages processed: {messages_processed}") log(f"{'='*70}\n") if __name__ == '__main__': # Validierung if not WORKER_DOMAIN: log("ERROR: WORKER_DOMAIN not set!", 'ERROR') sys.exit(1) try: main_loop() except Exception as e: log(f"Fatal error: {e}", 'ERROR') traceback.print_exc() sys.exit(1)