docker/ses-lambda-nodejs/index.js

167 lines
6.0 KiB
JavaScript

import { S3Client, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
import { simpleParser } from 'mailparser';
import { gzipSync } from 'zlib';
import { Base64 } from 'js-base64';
import { createLogger, format, transports } from 'winston';
import { config } from 'dotenv';
// Load environment variables
config();
// Logger setup
const logger = createLogger({
level: 'info',
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`)
),
transports: [new transports.Console()]
});
// Environment variables
const API_BASE_URL = process.env.API_BASE_URL;
const API_TOKEN = process.env.API_TOKEN;
const MAX_EMAIL_SIZE = parseInt(process.env.MAX_EMAIL_SIZE || '10485760', 10);
const AWS_REGION = process.env.AWS_REGION || 'us-east-1';
// Log environment variables (omit sensitive values like API_TOKEN)
logger.info(`Environment: API_BASE_URL=${API_BASE_URL}, AWS_REGION=${AWS_REGION}, MAX_EMAIL_SIZE=${MAX_EMAIL_SIZE}`);
// Validate environment variables
if (!API_BASE_URL || !API_TOKEN) {
logger.error('Missing required environment variables: API_BASE_URL or API_TOKEN');
throw new Error('Missing required environment variables');
}
// S3 client
const s3Client = new S3Client({ region: AWS_REGION });
// Utility to convert stream to buffer
async function streamToBuffer(stream) {
try {
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
} catch (error) {
throw new Error(`Failed to convert stream to buffer: ${error.message}`);
}
}
// Utility to call the REST API
async function callApiOnce(payload, domain, requestId) {
const url = `${API_BASE_URL}/process/${domain}`;
logger.info(
`[${requestId}] Preparing POST to ${url}: ` +
`domain=${domain}, key=${payload.s3_key}, bucket=${payload.s3_bucket}, ` +
`orig_size=${payload.original_size}, comp_size=${payload.compressed_size}`
);
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${API_TOKEN}`,
'Content-Type': 'application/json',
'X-Request-ID': requestId
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(25000)
});
const responseBody = await response.text();
logger.info(`[${requestId}] API response: status=${response.status}, body=${responseBody}`);
if (response.ok) {
logger.info(`[${requestId}] API call successful`);
return true;
} else {
logger.error(`[${requestId}] API returned ${response.status}: ${responseBody}`);
return false;
}
} catch (error) {
logger.error(`[${requestId}] API call failed: ${error.message}`);
return false;
}
}
// Lambda handler
export const handler = async (event, context) => {
const reqId = context.awsRequestId;
logger.info(`[${reqId}] Starting Lambda execution`);
logger.info(`[${reqId}] Event: ${JSON.stringify(event)}`);
try {
const rec = event.Records[0].s3;
const bucket = rec.bucket.name;
const key = decodeURIComponent(rec.object.key.replace(/\+/g, ' '));
logger.info(`[${reqId}] Processing ${bucket}/${key}`);
// Check email size
logger.info(`[${reqId}] Fetching object metadata for ${bucket}/${key}`);
const headCommand = new HeadObjectCommand({ Bucket: bucket, Key: key });
const head = await s3Client.send(headCommand);
const size = head.ContentLength;
logger.info(`[${reqId}] Object size: ${size} bytes`);
if (size > MAX_EMAIL_SIZE) {
logger.warning(`[${reqId}] Email too large: ${size} bytes (max: ${MAX_EMAIL_SIZE})`);
return { statusCode: 413, body: JSON.stringify({ error: 'Email too large' }) };
}
// Load email content
logger.info(`[${reqId}] Fetching object content from ${bucket}/${key}`);
const getObjectCommand = new GetObjectCommand({ Bucket: bucket, Key: key });
const { Body } = await s3Client.send(getObjectCommand);
logger.info(`[${reqId}] Object content retrieved, converting to buffer`);
const body = await streamToBuffer(Body);
logger.info(`[${reqId}] Buffer size: ${body.length} bytes`);
// Parse and log from/to
let fromAddr = '';
let toAddrs = [];
try {
logger.info(`[${reqId}] Parsing email content`);
const parser = await simpleParser(body);
fromAddr = parser.from?.value[0]?.address || '';
toAddrs = [
...(parser.to?.value || []),
...(parser.cc?.value || []),
...(parser.bcc?.value || [])
].map(addr => addr.address).filter(Boolean);
logger.info(`[${reqId}] Parsed email: from=${fromAddr}, to=${toAddrs}`);
} catch (error) {
logger.error(`[${reqId}] Error parsing email: ${error.message}`);
}
// Compress and build payload
logger.info(`[${reqId}] Compressing email content`);
const compressed = gzipSync(body);
const payload = {
s3_bucket: bucket,
s3_key: key,
domain: bucket.replace(/-/g, '.').replace('.emails', ''),
email_content: Base64.encode(compressed.toString('binary')),
compressed: true,
etag: head.ETag.replace(/"/g, ''),
request_id: reqId,
original_size: body.length,
compressed_size: compressed.length
};
logger.info(`[${reqId}] Payload prepared: domain=${payload.domain}, compressed_size=${payload.compressed_size}`);
// Send to REST API
logger.info(`[${reqId}] Sending payload to REST API`);
const success = await callApiOnce(payload, payload.domain, reqId);
// Log result
if (success) {
logger.info(`[${reqId}] Email processed successfully`);
} else {
logger.info(`[${reqId}] Email processing failed, status handled by REST API`);
}
logger.info(`[${reqId}] Lambda execution completed`);
return { statusCode: 200, body: JSON.stringify({ message: 'Done' }) };
} catch (error) {
logger.error(`[${reqId}] Error processing event: ${error.message}, stack: ${error.stack}`);
return { statusCode: 500, body: JSON.stringify({ error: 'Internal server error' }) };
}
};