143 lines
4.2 KiB
Python
143 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Prometheus metrics collection
|
|
"""
|
|
|
|
from typing import Optional
|
|
|
|
from logger import log
|
|
|
|
# Try to import Prometheus client
|
|
try:
|
|
from prometheus_client import start_http_server, Counter, Gauge, Histogram
|
|
PROMETHEUS_ENABLED = True
|
|
except ImportError:
|
|
PROMETHEUS_ENABLED = False
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Collects and exposes Prometheus metrics"""
|
|
|
|
def __init__(self):
|
|
self.enabled = PROMETHEUS_ENABLED
|
|
|
|
if self.enabled:
|
|
# Email processing metrics
|
|
self.emails_processed = Counter(
|
|
'emails_processed_total',
|
|
'Total emails processed',
|
|
['domain', 'status']
|
|
)
|
|
|
|
self.emails_in_flight = Gauge(
|
|
'emails_in_flight',
|
|
'Emails currently being processed'
|
|
)
|
|
|
|
self.processing_time = Histogram(
|
|
'email_processing_seconds',
|
|
'Time to process email',
|
|
['domain']
|
|
)
|
|
|
|
self.queue_size = Gauge(
|
|
'queue_messages_available',
|
|
'Messages in queue',
|
|
['domain']
|
|
)
|
|
|
|
# Bounce metrics
|
|
self.bounces_processed = Counter(
|
|
'bounces_processed_total',
|
|
'Bounce notifications processed',
|
|
['domain', 'type']
|
|
)
|
|
|
|
# Rules metrics
|
|
self.autoreplies_sent = Counter(
|
|
'autoreplies_sent_total',
|
|
'Auto-replies sent',
|
|
['domain']
|
|
)
|
|
|
|
self.forwards_sent = Counter(
|
|
'forwards_sent_total',
|
|
'Forwards sent',
|
|
['domain']
|
|
)
|
|
|
|
# Blocklist metrics
|
|
self.blocked_senders = Counter(
|
|
'blocked_senders_total',
|
|
'Emails blocked by blacklist',
|
|
['domain']
|
|
)
|
|
|
|
def increment_processed(self, domain: str, status: str):
|
|
"""Increment processed email counter"""
|
|
if self.enabled:
|
|
self.emails_processed.labels(domain=domain, status=status).inc()
|
|
|
|
def increment_in_flight(self):
|
|
"""Increment in-flight email gauge"""
|
|
if self.enabled:
|
|
self.emails_in_flight.inc()
|
|
|
|
def decrement_in_flight(self):
|
|
"""Decrement in-flight email gauge"""
|
|
if self.enabled:
|
|
self.emails_in_flight.dec()
|
|
|
|
def observe_processing_time(self, domain: str, seconds: float):
|
|
"""Record processing time"""
|
|
if self.enabled:
|
|
self.processing_time.labels(domain=domain).observe(seconds)
|
|
|
|
def set_queue_size(self, domain: str, size: int):
|
|
"""Set queue size"""
|
|
if self.enabled:
|
|
self.queue_size.labels(domain=domain).set(size)
|
|
|
|
def increment_bounce(self, domain: str, bounce_type: str):
|
|
"""Increment bounce counter"""
|
|
if self.enabled:
|
|
self.bounces_processed.labels(domain=domain, type=bounce_type).inc()
|
|
|
|
def increment_autoreply(self, domain: str):
|
|
"""Increment autoreply counter"""
|
|
if self.enabled:
|
|
self.autoreplies_sent.labels(domain=domain).inc()
|
|
|
|
def increment_forward(self, domain: str):
|
|
"""Increment forward counter"""
|
|
if self.enabled:
|
|
self.forwards_sent.labels(domain=domain).inc()
|
|
|
|
def increment_blocked(self, domain: str):
|
|
"""Increment blocked sender counter"""
|
|
if self.enabled:
|
|
self.blocked_senders.labels(domain=domain).inc()
|
|
|
|
|
|
def start_metrics_server(port: int) -> Optional[MetricsCollector]:
|
|
"""
|
|
Start Prometheus metrics HTTP server
|
|
|
|
Args:
|
|
port: Port to listen on
|
|
|
|
Returns:
|
|
MetricsCollector instance or None if Prometheus not available
|
|
"""
|
|
if not PROMETHEUS_ENABLED:
|
|
log("⚠ Prometheus client not installed, metrics disabled", 'WARNING')
|
|
return None
|
|
|
|
try:
|
|
start_http_server(port)
|
|
log(f"Prometheus metrics on port {port}")
|
|
return MetricsCollector()
|
|
except Exception as e:
|
|
log(f"Failed to start metrics server: {e}", 'ERROR')
|
|
return None
|