#!/usr/bin/env python3 import boto3 from datetime import datetime # Region fest auf us-east-2 sqs = boto3.client('sqs', region_name='us-east-2') def get_all_queues(): """Findet automatisch alle Queues, die auf '-queue' enden (keine DLQs)""" queues = [] paginator = sqs.get_paginator('list_queues') for page in paginator.paginate(): for url in page.get('QueueUrls', []): name = url.split('/')[-1] if name.endswith('-queue'): queues.append((name, url)) return queues def main(): print(f"\n{'='*70}") print(f"Email Queue Monitoring (us-east-2) - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"{'='*70}\n") queues = get_all_queues() if not queues: print("No queues found matching '*-queue'. Check your region or permissions.") return # Sortieren für schönere Ausgabe queues.sort(key=lambda x: x[0]) for name, url in queues: dlq_name = name + '-dlq' try: # Main Queue Stats - NUR gültige Attribute abfragen attrs = sqs.get_queue_attributes( QueueUrl=url, AttributeNames=['ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesNotVisible'] )['Attributes'] # DLQ Stats (Versuch URL zu finden) try: dlq_url = sqs.get_queue_url(QueueName=dlq_name)['QueueUrl'] dlq_attrs = sqs.get_queue_attributes(QueueUrl=dlq_url, AttributeNames=['ApproximateNumberOfMessages'])['Attributes'] dlq_count = int(dlq_attrs.get('ApproximateNumberOfMessages', 0)) except: dlq_count = -1 # Keine DLQ gefunden oder Fehler available = int(attrs.get('ApproximateNumberOfMessages', 0)) flight = int(attrs.get('ApproximateNumberOfMessagesNotVisible', 0)) # Status-Icon Bestimmung status = "✅" if dlq_count > 0: status = "⚠️ " # DLQ nicht leer if available > 50: status = "🔥" # Stau in der Main Queue print(f"{status} Queue: {name}") print(f" Pending: {available:<5} (Waiting for worker)") print(f" Processing: {flight:<5} (Currently in worker)") if dlq_count >= 0: if dlq_count > 0: print(f" DLQ Errors: \033[91m{dlq_count:<5}\033[0m (In {dlq_name})") # Rot markiert else: print(f" DLQ Errors: {dlq_count:<5} (In {dlq_name})") else: print(f" DLQ: Not found / No access") print("-" * 30) except Exception as e: print(f"❌ Error checking {name}: {e}") if __name__ == '__main__': main()