email-amazon/unified-worker/email-worker/metrics/prometheus.py

143 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""
Prometheus metrics collection
"""
from typing import Optional
from logger import log
# Try to import Prometheus client
try:
from prometheus_client import start_http_server, Counter, Gauge, Histogram
PROMETHEUS_ENABLED = True
except ImportError:
PROMETHEUS_ENABLED = False
class MetricsCollector:
"""Collects and exposes Prometheus metrics"""
def __init__(self):
self.enabled = PROMETHEUS_ENABLED
if self.enabled:
# Email processing metrics
self.emails_processed = Counter(
'emails_processed_total',
'Total emails processed',
['domain', 'status']
)
self.emails_in_flight = Gauge(
'emails_in_flight',
'Emails currently being processed'
)
self.processing_time = Histogram(
'email_processing_seconds',
'Time to process email',
['domain']
)
self.queue_size = Gauge(
'queue_messages_available',
'Messages in queue',
['domain']
)
# Bounce metrics
self.bounces_processed = Counter(
'bounces_processed_total',
'Bounce notifications processed',
['domain', 'type']
)
# Rules metrics
self.autoreplies_sent = Counter(
'autoreplies_sent_total',
'Auto-replies sent',
['domain']
)
self.forwards_sent = Counter(
'forwards_sent_total',
'Forwards sent',
['domain']
)
# Blocklist metrics
self.blocked_senders = Counter(
'blocked_senders_total',
'Emails blocked by blacklist',
['domain']
)
def increment_processed(self, domain: str, status: str):
"""Increment processed email counter"""
if self.enabled:
self.emails_processed.labels(domain=domain, status=status).inc()
def increment_in_flight(self):
"""Increment in-flight email gauge"""
if self.enabled:
self.emails_in_flight.inc()
def decrement_in_flight(self):
"""Decrement in-flight email gauge"""
if self.enabled:
self.emails_in_flight.dec()
def observe_processing_time(self, domain: str, seconds: float):
"""Record processing time"""
if self.enabled:
self.processing_time.labels(domain=domain).observe(seconds)
def set_queue_size(self, domain: str, size: int):
"""Set queue size"""
if self.enabled:
self.queue_size.labels(domain=domain).set(size)
def increment_bounce(self, domain: str, bounce_type: str):
"""Increment bounce counter"""
if self.enabled:
self.bounces_processed.labels(domain=domain, type=bounce_type).inc()
def increment_autoreply(self, domain: str):
"""Increment autoreply counter"""
if self.enabled:
self.autoreplies_sent.labels(domain=domain).inc()
def increment_forward(self, domain: str):
"""Increment forward counter"""
if self.enabled:
self.forwards_sent.labels(domain=domain).inc()
def increment_blocked(self, domain: str):
"""Increment blocked sender counter"""
if self.enabled:
self.blocked_senders.labels(domain=domain).inc()
def start_metrics_server(port: int) -> Optional[MetricsCollector]:
"""
Start Prometheus metrics HTTP server
Args:
port: Port to listen on
Returns:
MetricsCollector instance or None if Prometheus not available
"""
if not PROMETHEUS_ENABLED:
log("⚠ Prometheus client not installed, metrics disabled", 'WARNING')
return None
try:
start_http_server(port)
log(f"Prometheus metrics on port {port}")
return MetricsCollector()
except Exception as e:
log(f"Failed to start metrics server: {e}", 'ERROR')
return None