email-amazon/email-worker-nodejs/sqs.ts

100 lines
2.8 KiB
TypeScript

/**
* SQS operations handler
*
* Responsibilities:
* - Resolve queue URL for a domain
* - Long-poll for messages
* - Delete processed messages
* - Report approximate queue size
*/
import {
SQSClient,
GetQueueUrlCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
GetQueueAttributesCommand,
type Message,
} from '@aws-sdk/client-sqs';
import { config, domainToQueueName } from '../config.js';
import { log } from '../logger.js';
export class SQSHandler {
private client: SQSClient;
constructor() {
this.client = new SQSClient({ region: config.awsRegion });
}
/** Resolve queue URL for a domain. Returns null if queue does not exist. */
async getQueueUrl(domain: string): Promise<string | null> {
const queueName = domainToQueueName(domain);
try {
const resp = await this.client.send(
new GetQueueUrlCommand({ QueueName: queueName }),
);
return resp.QueueUrl ?? null;
} catch (err: any) {
if (err.name === 'QueueDoesNotExist' ||
err.Code === 'AWS.SimpleQueueService.NonExistentQueue') {
log(`Queue not found for domain: ${domain}`, 'WARNING');
} else {
log(`Error getting queue URL for ${domain}: ${err.message ?? err}`, 'ERROR');
}
return null;
}
}
/** Long-poll for messages (uses configured poll interval as wait time). */
async receiveMessages(queueUrl: string): Promise<Message[]> {
try {
const resp = await this.client.send(
new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: config.maxMessages,
WaitTimeSeconds: config.pollInterval,
VisibilityTimeout: config.visibilityTimeout,
MessageSystemAttributeNames: ['ApproximateReceiveCount', 'SentTimestamp'],
}),
);
return resp.Messages ?? [];
} catch (err: any) {
log(`Error receiving messages: ${err.message ?? err}`, 'ERROR');
return [];
}
}
/** Delete a message from the queue after successful processing. */
async deleteMessage(queueUrl: string, receiptHandle: string): Promise<void> {
try {
await this.client.send(
new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
}),
);
} catch (err: any) {
log(`Error deleting message: ${err.message ?? err}`, 'ERROR');
throw err;
}
}
/** Approximate number of messages in the queue. Returns 0 on error. */
async getQueueSize(queueUrl: string): Promise<number> {
try {
const resp = await this.client.send(
new GetQueueAttributesCommand({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessages'],
}),
);
return parseInt(
resp.Attributes?.ApproximateNumberOfMessages ?? '0',
10,
);
} catch {
return 0;
}
}
}