import { S3Client, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3'; import { simpleParser } from 'mailparser'; import { gzipSync } from 'zlib'; import { Base64 } from 'js-base64'; import { createLogger, format, transports } from 'winston'; import { config } from 'dotenv'; // Load environment variables config(); // Logger setup const logger = createLogger({ level: 'info', format: format.combine( format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), format.printf(({ timestamp, level, message }) => `${timestamp} ${level.toUpperCase()} ${message}`) ), transports: [new transports.Console()] }); // Environment variables const API_BASE_URL = process.env.API_BASE_URL; const API_TOKEN = process.env.API_TOKEN; const MAX_EMAIL_SIZE = parseInt(process.env.MAX_EMAIL_SIZE || '10485760', 10); const AWS_REGION = process.env.AWS_REGION || 'us-east-1'; // Log environment variables (omit sensitive values like API_TOKEN) logger.info(`Environment: API_BASE_URL=${API_BASE_URL}, AWS_REGION=${AWS_REGION}, MAX_EMAIL_SIZE=${MAX_EMAIL_SIZE}`); // Validate environment variables if (!API_BASE_URL || !API_TOKEN) { logger.error('Missing required environment variables: API_BASE_URL or API_TOKEN'); throw new Error('Missing required environment variables'); } // S3 client const s3Client = new S3Client({ region: AWS_REGION }); // Utility to convert stream to buffer async function streamToBuffer(stream) { try { const chunks = []; for await (const chunk of stream) { chunks.push(chunk); } return Buffer.concat(chunks); } catch (error) { throw new Error(`Failed to convert stream to buffer: ${error.message}`); } } // Utility to call the REST API async function callApiOnce(payload, domain, requestId) { const url = `${API_BASE_URL}/process/${domain}`; logger.info( `[${requestId}] Preparing POST to ${url}: ` + `domain=${domain}, key=${payload.s3_key}, bucket=${payload.s3_bucket}, ` + `orig_size=${payload.original_size}, comp_size=${payload.compressed_size}` ); try { const response = await fetch(url, { method: 'POST', headers: { 'Authorization': `Bearer ${API_TOKEN}`, 'Content-Type': 'application/json', 'X-Request-ID': requestId }, body: JSON.stringify(payload), signal: AbortSignal.timeout(25000) }); const responseBody = await response.text(); logger.info(`[${requestId}] API response: status=${response.status}, body=${responseBody}`); if (response.ok) { logger.info(`[${requestId}] API call successful`); return true; } else { logger.error(`[${requestId}] API returned ${response.status}: ${responseBody}`); return false; } } catch (error) { logger.error(`[${requestId}] API call failed: ${error.message}`); return false; } } // Lambda handler export const handler = async (event, context) => { const reqId = context.awsRequestId; logger.info(`[${reqId}] Starting Lambda execution`); logger.info(`[${reqId}] Event: ${JSON.stringify(event)}`); try { const rec = event.Records[0].s3; const bucket = rec.bucket.name; const key = decodeURIComponent(rec.object.key.replace(/\+/g, ' ')); logger.info(`[${reqId}] Processing ${bucket}/${key}`); // Check email size logger.info(`[${reqId}] Fetching object metadata for ${bucket}/${key}`); const headCommand = new HeadObjectCommand({ Bucket: bucket, Key: key }); const head = await s3Client.send(headCommand); const size = head.ContentLength; logger.info(`[${reqId}] Object size: ${size} bytes`); if (size > MAX_EMAIL_SIZE) { logger.warning(`[${reqId}] Email too large: ${size} bytes (max: ${MAX_EMAIL_SIZE})`); return { statusCode: 413, body: JSON.stringify({ error: 'Email too large' }) }; } // Load email content logger.info(`[${reqId}] Fetching object content from ${bucket}/${key}`); const getObjectCommand = new GetObjectCommand({ Bucket: bucket, Key: key }); const { Body } = await s3Client.send(getObjectCommand); logger.info(`[${reqId}] Object content retrieved, converting to buffer`); const body = await streamToBuffer(Body); logger.info(`[${reqId}] Buffer size: ${body.length} bytes`); // Parse and log from/to let fromAddr = ''; let toAddrs = []; try { logger.info(`[${reqId}] Parsing email content`); const parser = await simpleParser(body); fromAddr = parser.from?.value[0]?.address || ''; toAddrs = [ ...(parser.to?.value || []), ...(parser.cc?.value || []), ...(parser.bcc?.value || []) ].map(addr => addr.address).filter(Boolean); logger.info(`[${reqId}] Parsed email: from=${fromAddr}, to=${toAddrs}`); } catch (error) { logger.error(`[${reqId}] Error parsing email: ${error.message}`); } // Compress and build payload logger.info(`[${reqId}] Compressing email content`); const compressed = gzipSync(body); const payload = { s3_bucket: bucket, s3_key: key, domain: bucket.replace(/-/g, '.').replace('.emails', ''), email_content: Base64.encode(compressed.toString('binary')), compressed: true, etag: head.ETag.replace(/"/g, ''), request_id: reqId, original_size: body.length, compressed_size: compressed.length }; logger.info(`[${reqId}] Payload prepared: domain=${payload.domain}, compressed_size=${payload.compressed_size}`); // Send to REST API logger.info(`[${reqId}] Sending payload to REST API`); const success = await callApiOnce(payload, payload.domain, reqId); // Log result if (success) { logger.info(`[${reqId}] Email processed successfully`); } else { logger.info(`[${reqId}] Email processing failed, status handled by REST API`); } logger.info(`[${reqId}] Lambda execution completed`); return { statusCode: 200, body: JSON.stringify({ message: 'Done' }) }; } catch (error) { logger.error(`[${reqId}] Error processing event: ${error.message}, stack: ${error.stack}`); return { statusCode: 500, body: JSON.stringify({ error: 'Internal server error' }) }; } };