email-amazon/email-worker/domain_poller.py

110 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
Domain queue poller
"""
import json
import time
import threading
import traceback
from logger import log
from aws import SQSHandler
from worker import MessageProcessor
from metrics.prometheus import MetricsCollector
class DomainPoller:
"""Polls SQS queue for a single domain"""
def __init__(
self,
domain: str,
queue_url: str,
message_processor: MessageProcessor,
sqs: SQSHandler,
metrics: MetricsCollector,
stop_event: threading.Event,
stats_dict: dict,
stats_lock: threading.Lock
):
self.domain = domain
self.queue_url = queue_url
self.processor = message_processor
self.sqs = sqs
self.metrics = metrics
self.stop_event = stop_event
self.stats_dict = stats_dict
self.stats_lock = stats_lock
self.worker_name = f"worker-{domain}"
self.messages_processed = 0
def poll(self):
"""Main polling loop"""
log(f"🚀 Starting poller for {self.domain}", 'INFO', self.worker_name)
while not self.stop_event.is_set():
try:
# Receive messages from queue
messages = self.sqs.receive_messages(self.queue_url)
# Update queue size metric
if self.metrics:
queue_size = self.sqs.get_queue_size(self.queue_url)
self.metrics.set_queue_size(self.domain, queue_size)
if not messages:
continue
log(f"✉ Received {len(messages)} message(s)", 'INFO', self.worker_name)
for message in messages:
if self.stop_event.is_set():
break
receipt_handle = message['ReceiptHandle']
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
if self.metrics:
self.metrics.increment_in_flight()
start_time = time.time()
try:
success = self.processor.process_message(self.domain, message, receive_count)
if success:
self.sqs.delete_message(self.queue_url, receipt_handle)
self.messages_processed += 1
# Update shared stats
with self.stats_lock:
self.stats_dict[self.domain] = self.messages_processed
else:
log(
f"⚠ Retry queued (attempt {receive_count}/3)",
'WARNING',
self.worker_name
)
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR', self.worker_name)
self.sqs.delete_message(self.queue_url, receipt_handle)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR', self.worker_name)
traceback.print_exc()
finally:
if self.metrics:
self.metrics.decrement_in_flight()
self.metrics.observe_processing_time(
self.domain,
time.time() - start_time
)
except Exception as e:
log(f"✗ Error polling: {e}", 'ERROR', self.worker_name)
time.sleep(5)
log(f"👋 Stopped (processed: {self.messages_processed})", 'INFO', self.worker_name)