152 lines
4.3 KiB
TypeScript
152 lines
4.3 KiB
TypeScript
/**
|
|
* Domain queue poller
|
|
*
|
|
* One poller per domain. Runs an async loop that long-polls SQS
|
|
* and dispatches messages to the MessageProcessor.
|
|
*
|
|
* Replaces Python's threading.Thread + threading.Event with
|
|
* a simple async loop + AbortController for cancellation.
|
|
*/
|
|
|
|
import type { SQSHandler } from '../aws/sqs.js';
|
|
import type { MessageProcessor } from './message-processor.js';
|
|
import type { MetricsCollector } from '../metrics.js';
|
|
import { log } from '../logger.js';
|
|
|
|
export interface DomainPollerStats {
|
|
domain: string;
|
|
processed: number;
|
|
errors: number;
|
|
lastActivity: Date | null;
|
|
running: boolean;
|
|
}
|
|
|
|
export class DomainPoller {
|
|
public stats: DomainPollerStats;
|
|
private abort: AbortController;
|
|
private loopPromise: Promise<void> | null = null;
|
|
|
|
constructor(
|
|
private domain: string,
|
|
private queueUrl: string,
|
|
private sqs: SQSHandler,
|
|
private processor: MessageProcessor,
|
|
private metrics: MetricsCollector | null,
|
|
) {
|
|
this.abort = new AbortController();
|
|
this.stats = {
|
|
domain,
|
|
processed: 0,
|
|
errors: 0,
|
|
lastActivity: null,
|
|
running: false,
|
|
};
|
|
}
|
|
|
|
/** Start the polling loop. Returns immediately. */
|
|
start(): void {
|
|
if (this.stats.running) return;
|
|
this.stats.running = true;
|
|
log(`▶ Started poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
|
|
this.loopPromise = this.pollLoop();
|
|
}
|
|
|
|
/** Signal the poller to stop and wait for it to finish. */
|
|
async stop(): Promise<void> {
|
|
if (!this.stats.running) return;
|
|
this.abort.abort();
|
|
if (this.loopPromise) {
|
|
await this.loopPromise;
|
|
}
|
|
this.stats.running = false;
|
|
log(`⏹ Stopped poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Poll loop
|
|
// -----------------------------------------------------------------------
|
|
private async pollLoop(): Promise<void> {
|
|
const workerName = `poller-${this.domain}`;
|
|
|
|
while (!this.abort.signal.aborted) {
|
|
try {
|
|
// Report queue size
|
|
const qSize = await this.sqs.getQueueSize(this.queueUrl);
|
|
this.metrics?.setQueueSize(this.domain, qSize);
|
|
|
|
if (qSize > 0) {
|
|
log(`📊 Queue ${this.domain}: ~${qSize} messages`, 'INFO', workerName);
|
|
}
|
|
|
|
// Long-poll
|
|
const messages = await this.sqs.receiveMessages(this.queueUrl);
|
|
|
|
if (this.abort.signal.aborted) break;
|
|
|
|
if (messages.length === 0) continue;
|
|
|
|
log(
|
|
`📬 Received ${messages.length} message(s) for ${this.domain}`,
|
|
'INFO',
|
|
workerName,
|
|
);
|
|
|
|
// Process each message
|
|
for (const msg of messages) {
|
|
if (this.abort.signal.aborted) break;
|
|
|
|
const receiveCount = parseInt(
|
|
msg.Attributes?.ApproximateReceiveCount ?? '1',
|
|
10,
|
|
);
|
|
|
|
this.metrics?.incrementInFlight();
|
|
const start = Date.now();
|
|
|
|
try {
|
|
const shouldDelete = await this.processor.processMessage(
|
|
this.domain,
|
|
msg,
|
|
receiveCount,
|
|
);
|
|
|
|
if (shouldDelete && msg.ReceiptHandle) {
|
|
await this.sqs.deleteMessage(this.queueUrl, msg.ReceiptHandle);
|
|
}
|
|
|
|
this.stats.processed++;
|
|
this.stats.lastActivity = new Date();
|
|
|
|
const elapsed = ((Date.now() - start) / 1000).toFixed(2);
|
|
this.metrics?.observeProcessingTime(this.domain, parseFloat(elapsed));
|
|
} catch (err: any) {
|
|
this.stats.errors++;
|
|
log(
|
|
`❌ Error processing message: ${err.message ?? err}`,
|
|
'ERROR',
|
|
workerName,
|
|
);
|
|
} finally {
|
|
this.metrics?.decrementInFlight();
|
|
}
|
|
}
|
|
} catch (err: any) {
|
|
if (this.abort.signal.aborted) break;
|
|
this.stats.errors++;
|
|
log(
|
|
`❌ Polling error for ${this.domain}: ${err.message ?? err}`,
|
|
'ERROR',
|
|
workerName,
|
|
);
|
|
// Back off on repeated errors
|
|
await sleep(5000);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|