#!/usr/bin/env python3 """ Prometheus metrics collection """ from typing import Optional from logger import log # Try to import Prometheus client try: from prometheus_client import start_http_server, Counter, Gauge, Histogram PROMETHEUS_ENABLED = True except ImportError: PROMETHEUS_ENABLED = False class MetricsCollector: """Collects and exposes Prometheus metrics""" def __init__(self): self.enabled = PROMETHEUS_ENABLED if self.enabled: # Email processing metrics self.emails_processed = Counter( 'emails_processed_total', 'Total emails processed', ['domain', 'status'] ) self.emails_in_flight = Gauge( 'emails_in_flight', 'Emails currently being processed' ) self.processing_time = Histogram( 'email_processing_seconds', 'Time to process email', ['domain'] ) self.queue_size = Gauge( 'queue_messages_available', 'Messages in queue', ['domain'] ) # Bounce metrics self.bounces_processed = Counter( 'bounces_processed_total', 'Bounce notifications processed', ['domain', 'type'] ) # Rules metrics self.autoreplies_sent = Counter( 'autoreplies_sent_total', 'Auto-replies sent', ['domain'] ) self.forwards_sent = Counter( 'forwards_sent_total', 'Forwards sent', ['domain'] ) # Blocklist metrics self.blocked_senders = Counter( 'blocked_senders_total', 'Emails blocked by blacklist', ['domain'] ) def increment_processed(self, domain: str, status: str): """Increment processed email counter""" if self.enabled: self.emails_processed.labels(domain=domain, status=status).inc() def increment_in_flight(self): """Increment in-flight email gauge""" if self.enabled: self.emails_in_flight.inc() def decrement_in_flight(self): """Decrement in-flight email gauge""" if self.enabled: self.emails_in_flight.dec() def observe_processing_time(self, domain: str, seconds: float): """Record processing time""" if self.enabled: self.processing_time.labels(domain=domain).observe(seconds) def set_queue_size(self, domain: str, size: int): """Set queue size""" if self.enabled: self.queue_size.labels(domain=domain).set(size) def increment_bounce(self, domain: str, bounce_type: str): """Increment bounce counter""" if self.enabled: self.bounces_processed.labels(domain=domain, type=bounce_type).inc() def increment_autoreply(self, domain: str): """Increment autoreply counter""" if self.enabled: self.autoreplies_sent.labels(domain=domain).inc() def increment_forward(self, domain: str): """Increment forward counter""" if self.enabled: self.forwards_sent.labels(domain=domain).inc() def increment_blocked(self, domain: str): """Increment blocked sender counter""" if self.enabled: self.blocked_senders.labels(domain=domain).inc() def start_metrics_server(port: int) -> Optional[MetricsCollector]: """ Start Prometheus metrics HTTP server Args: port: Port to listen on Returns: MetricsCollector instance or None if Prometheus not available """ if not PROMETHEUS_ENABLED: log("⚠ Prometheus client not installed, metrics disabled", 'WARNING') return None try: start_http_server(port) log(f"Prometheus metrics on port {port}") return MetricsCollector() except Exception as e: log(f"Failed to start metrics server: {e}", 'ERROR') return None