110 lines
4.0 KiB
Python
110 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Domain queue poller
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
import traceback
|
|
|
|
from logger import log
|
|
from aws import SQSHandler
|
|
from worker import MessageProcessor
|
|
from metrics.prometheus import MetricsCollector
|
|
|
|
|
|
class DomainPoller:
|
|
"""Polls SQS queue for a single domain"""
|
|
|
|
def __init__(
|
|
self,
|
|
domain: str,
|
|
queue_url: str,
|
|
message_processor: MessageProcessor,
|
|
sqs: SQSHandler,
|
|
metrics: MetricsCollector,
|
|
stop_event: threading.Event,
|
|
stats_dict: dict,
|
|
stats_lock: threading.Lock
|
|
):
|
|
self.domain = domain
|
|
self.queue_url = queue_url
|
|
self.processor = message_processor
|
|
self.sqs = sqs
|
|
self.metrics = metrics
|
|
self.stop_event = stop_event
|
|
self.stats_dict = stats_dict
|
|
self.stats_lock = stats_lock
|
|
self.worker_name = f"worker-{domain}"
|
|
self.messages_processed = 0
|
|
|
|
def poll(self):
|
|
"""Main polling loop"""
|
|
log(f"🚀 Starting poller for {self.domain}", 'INFO', self.worker_name)
|
|
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
# Receive messages from queue
|
|
messages = self.sqs.receive_messages(self.queue_url)
|
|
|
|
# Update queue size metric
|
|
if self.metrics:
|
|
queue_size = self.sqs.get_queue_size(self.queue_url)
|
|
self.metrics.set_queue_size(self.domain, queue_size)
|
|
|
|
if not messages:
|
|
continue
|
|
|
|
log(f"✉ Received {len(messages)} message(s)", 'INFO', self.worker_name)
|
|
|
|
for message in messages:
|
|
if self.stop_event.is_set():
|
|
break
|
|
|
|
receipt_handle = message['ReceiptHandle']
|
|
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
|
|
|
|
if self.metrics:
|
|
self.metrics.increment_in_flight()
|
|
start_time = time.time()
|
|
|
|
try:
|
|
success = self.processor.process_message(self.domain, message, receive_count)
|
|
|
|
if success:
|
|
self.sqs.delete_message(self.queue_url, receipt_handle)
|
|
self.messages_processed += 1
|
|
|
|
# Update shared stats
|
|
with self.stats_lock:
|
|
self.stats_dict[self.domain] = self.messages_processed
|
|
else:
|
|
log(
|
|
f"⚠ Retry queued (attempt {receive_count}/3)",
|
|
'WARNING',
|
|
self.worker_name
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
log(f"✗ Invalid message format: {e}", 'ERROR', self.worker_name)
|
|
self.sqs.delete_message(self.queue_url, receipt_handle)
|
|
|
|
except Exception as e:
|
|
log(f"✗ Error processing message: {e}", 'ERROR', self.worker_name)
|
|
traceback.print_exc()
|
|
|
|
finally:
|
|
if self.metrics:
|
|
self.metrics.decrement_in_flight()
|
|
self.metrics.observe_processing_time(
|
|
self.domain,
|
|
time.time() - start_time
|
|
)
|
|
|
|
except Exception as e:
|
|
log(f"✗ Error polling: {e}", 'ERROR', self.worker_name)
|
|
time.sleep(5)
|
|
|
|
log(f"👋 Stopped (processed: {self.messages_processed})", 'INFO', self.worker_name)
|