email-amazon/email-worker-nodejs/delivery.ts

155 lines
4.5 KiB
TypeScript

/**
* SMTP / email delivery with nodemailer pooled transport
*
* Replaces both Python's SMTPPool and EmailDelivery classes.
* nodemailer handles connection pooling, keepalive, and reconnection natively.
*
* Removed: LMTP delivery path (never used in production).
*/
import { createTransport, type Transporter } from 'nodemailer';
import { config } from '../config.js';
import { log } from '../logger.js';
// ---------------------------------------------------------------------------
// Permanent error detection
// ---------------------------------------------------------------------------
const PERMANENT_INDICATORS = [
'550', '551', '553',
'mailbox not found', 'user unknown', 'no such user',
'recipient rejected', 'does not exist', 'invalid recipient',
'unknown user',
];
function isPermanentRecipientError(errorMsg: string): boolean {
const lower = errorMsg.toLowerCase();
return PERMANENT_INDICATORS.some((ind) => lower.includes(ind));
}
// ---------------------------------------------------------------------------
// Delivery class
// ---------------------------------------------------------------------------
export class EmailDelivery {
private transport: Transporter;
constructor() {
this.transport = createTransport({
host: config.smtpHost,
port: config.smtpPort,
secure: config.smtpUseTls,
pool: true,
maxConnections: config.smtpPoolSize,
maxMessages: Infinity, // reuse connections indefinitely
tls: { rejectUnauthorized: false },
...(config.smtpUser && config.smtpPass
? { auth: { user: config.smtpUser, pass: config.smtpPass } }
: {}),
});
log(
`📡 SMTP pool initialized → ${config.smtpHost}:${config.smtpPort} ` +
`(max ${config.smtpPoolSize} connections)`,
);
}
/**
* Send raw email to ONE recipient via the local DMS.
*
* Returns: [success, errorMessage?, isPermanent]
*/
async sendToRecipient(
fromAddr: string,
recipient: string,
rawMessage: Buffer,
workerName: string,
maxRetries = 2,
): Promise<[boolean, string | null, boolean]> {
let lastError: string | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await this.transport.sendMail({
envelope: { from: fromAddr, to: [recipient] },
raw: rawMessage,
});
log(`${recipient}: Delivered (SMTP)`, 'SUCCESS', workerName);
return [true, null, false];
} catch (err: any) {
const errorMsg = err.message ?? String(err);
const responseCode = err.responseCode ?? 0;
// Check for permanent errors (5xx)
if (
responseCode >= 550 ||
isPermanentRecipientError(errorMsg)
) {
log(
`${recipient}: ${errorMsg} (permanent)`,
'ERROR',
workerName,
);
return [false, errorMsg, true];
}
// Connection-level errors → retry
if (
err.code === 'ECONNRESET' ||
err.code === 'ECONNREFUSED' ||
err.code === 'ETIMEDOUT' ||
errorMsg.toLowerCase().includes('disconnect') ||
errorMsg.toLowerCase().includes('closed') ||
errorMsg.toLowerCase().includes('connection')
) {
log(
`${recipient}: Connection error, retrying... ` +
`(attempt ${attempt + 1}/${maxRetries + 1})`,
'WARNING',
workerName,
);
lastError = errorMsg;
await sleep(300);
continue;
}
// Other SMTP errors
const isPerm = isPermanentRecipientError(errorMsg);
log(
`${recipient}: ${errorMsg} (${isPerm ? 'permanent' : 'temporary'})`,
'ERROR',
workerName,
);
return [false, errorMsg, isPerm];
}
}
// All retries exhausted
log(
`${recipient}: All retries failed - ${lastError}`,
'ERROR',
workerName,
);
return [false, lastError ?? 'Connection failed after retries', false];
}
/** Verify the transport is reachable (used during startup). */
async verify(): Promise<boolean> {
try {
await this.transport.verify();
return true;
} catch {
return false;
}
}
/** Close all pooled connections. */
close(): void {
this.transport.close();
}
}
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}