email-amazon/email-worker-nodejs/domain-poller.ts

152 lines
4.3 KiB
TypeScript

/**
* Domain queue poller
*
* One poller per domain. Runs an async loop that long-polls SQS
* and dispatches messages to the MessageProcessor.
*
* Replaces Python's threading.Thread + threading.Event with
* a simple async loop + AbortController for cancellation.
*/
import type { SQSHandler } from '../aws/sqs.js';
import type { MessageProcessor } from './message-processor.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
export interface DomainPollerStats {
domain: string;
processed: number;
errors: number;
lastActivity: Date | null;
running: boolean;
}
export class DomainPoller {
public stats: DomainPollerStats;
private abort: AbortController;
private loopPromise: Promise<void> | null = null;
constructor(
private domain: string,
private queueUrl: string,
private sqs: SQSHandler,
private processor: MessageProcessor,
private metrics: MetricsCollector | null,
) {
this.abort = new AbortController();
this.stats = {
domain,
processed: 0,
errors: 0,
lastActivity: null,
running: false,
};
}
/** Start the polling loop. Returns immediately. */
start(): void {
if (this.stats.running) return;
this.stats.running = true;
log(`▶ Started poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
this.loopPromise = this.pollLoop();
}
/** Signal the poller to stop and wait for it to finish. */
async stop(): Promise<void> {
if (!this.stats.running) return;
this.abort.abort();
if (this.loopPromise) {
await this.loopPromise;
}
this.stats.running = false;
log(`⏹ Stopped poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
}
// -----------------------------------------------------------------------
// Poll loop
// -----------------------------------------------------------------------
private async pollLoop(): Promise<void> {
const workerName = `poller-${this.domain}`;
while (!this.abort.signal.aborted) {
try {
// Report queue size
const qSize = await this.sqs.getQueueSize(this.queueUrl);
this.metrics?.setQueueSize(this.domain, qSize);
if (qSize > 0) {
log(`📊 Queue ${this.domain}: ~${qSize} messages`, 'INFO', workerName);
}
// Long-poll
const messages = await this.sqs.receiveMessages(this.queueUrl);
if (this.abort.signal.aborted) break;
if (messages.length === 0) continue;
log(
`📬 Received ${messages.length} message(s) for ${this.domain}`,
'INFO',
workerName,
);
// Process each message
for (const msg of messages) {
if (this.abort.signal.aborted) break;
const receiveCount = parseInt(
msg.Attributes?.ApproximateReceiveCount ?? '1',
10,
);
this.metrics?.incrementInFlight();
const start = Date.now();
try {
const shouldDelete = await this.processor.processMessage(
this.domain,
msg,
receiveCount,
);
if (shouldDelete && msg.ReceiptHandle) {
await this.sqs.deleteMessage(this.queueUrl, msg.ReceiptHandle);
}
this.stats.processed++;
this.stats.lastActivity = new Date();
const elapsed = ((Date.now() - start) / 1000).toFixed(2);
this.metrics?.observeProcessingTime(this.domain, parseFloat(elapsed));
} catch (err: any) {
this.stats.errors++;
log(
`❌ Error processing message: ${err.message ?? err}`,
'ERROR',
workerName,
);
} finally {
this.metrics?.decrementInFlight();
}
}
} catch (err: any) {
if (this.abort.signal.aborted) break;
this.stats.errors++;
log(
`❌ Polling error for ${this.domain}: ${err.message ?? err}`,
'ERROR',
workerName,
);
// Back off on repeated errors
await sleep(5000);
}
}
}
}
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}