/** * Domain queue poller * * One poller per domain. Runs an async loop that long-polls SQS * and dispatches messages to the MessageProcessor. * * Replaces Python's threading.Thread + threading.Event with * a simple async loop + AbortController for cancellation. */ import type { SQSHandler } from '../aws/sqs.js'; import type { MessageProcessor } from './message-processor.js'; import type { MetricsCollector } from '../metrics.js'; import { log } from '../logger.js'; export interface DomainPollerStats { domain: string; processed: number; errors: number; lastActivity: Date | null; running: boolean; } export class DomainPoller { public stats: DomainPollerStats; private abort: AbortController; private loopPromise: Promise | null = null; constructor( private domain: string, private queueUrl: string, private sqs: SQSHandler, private processor: MessageProcessor, private metrics: MetricsCollector | null, ) { this.abort = new AbortController(); this.stats = { domain, processed: 0, errors: 0, lastActivity: null, running: false, }; } /** Start the polling loop. Returns immediately. */ start(): void { if (this.stats.running) return; this.stats.running = true; log(`▶ Started poller for ${this.domain}`, 'INFO', `poller-${this.domain}`); this.loopPromise = this.pollLoop(); } /** Signal the poller to stop and wait for it to finish. */ async stop(): Promise { if (!this.stats.running) return; this.abort.abort(); if (this.loopPromise) { await this.loopPromise; } this.stats.running = false; log(`⏹ Stopped poller for ${this.domain}`, 'INFO', `poller-${this.domain}`); } // ----------------------------------------------------------------------- // Poll loop // ----------------------------------------------------------------------- private async pollLoop(): Promise { const workerName = `poller-${this.domain}`; while (!this.abort.signal.aborted) { try { // Report queue size const qSize = await this.sqs.getQueueSize(this.queueUrl); this.metrics?.setQueueSize(this.domain, qSize); if (qSize > 0) { log(`📊 Queue ${this.domain}: ~${qSize} messages`, 'INFO', workerName); } // Long-poll const messages = await this.sqs.receiveMessages(this.queueUrl); if (this.abort.signal.aborted) break; if (messages.length === 0) continue; log( `📬 Received ${messages.length} message(s) for ${this.domain}`, 'INFO', workerName, ); // Process each message for (const msg of messages) { if (this.abort.signal.aborted) break; const receiveCount = parseInt( msg.Attributes?.ApproximateReceiveCount ?? '1', 10, ); this.metrics?.incrementInFlight(); const start = Date.now(); try { const shouldDelete = await this.processor.processMessage( this.domain, msg, receiveCount, ); if (shouldDelete && msg.ReceiptHandle) { await this.sqs.deleteMessage(this.queueUrl, msg.ReceiptHandle); } this.stats.processed++; this.stats.lastActivity = new Date(); const elapsed = ((Date.now() - start) / 1000).toFixed(2); this.metrics?.observeProcessingTime(this.domain, parseFloat(elapsed)); } catch (err: any) { this.stats.errors++; log( `❌ Error processing message: ${err.message ?? err}`, 'ERROR', workerName, ); } finally { this.metrics?.decrementInFlight(); } } } catch (err: any) { if (this.abort.signal.aborted) break; this.stats.errors++; log( `❌ Polling error for ${this.domain}: ${err.message ?? err}`, 'ERROR', workerName, ); // Back off on repeated errors await sleep(5000); } } } } // --------------------------------------------------------------------------- function sleep(ms: number): Promise { return new Promise((r) => setTimeout(r, ms)); }