import { db } from '@/app/db/drizzle'; import { domains, emails } from '@/app/db/schema'; import { getS3Client, getBody } from '@/app/lib/utils'; import { simpleParser } from 'mailparser'; import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3'; import { eq, sql, inArray } from 'drizzle-orm'; import { Readable } from 'stream'; import pLimit from 'p-limit'; import pRetry from 'p-retry'; // Konfigurierbare Konstanten const CONCURRENT_S3_OPERATIONS = 10; const BATCH_INSERT_SIZE = 100; const CONCURRENT_EMAIL_PARSING = 5; // Globale Helper function für sichere Date-Konvertierung function parseDate(dateInput: string | Date | undefined | null): Date | null { if (!dateInput) return null; try { const date = dateInput instanceof Date ? dateInput : new Date(dateInput); // Prüfe ob das Datum gültig ist if (isNaN(date.getTime())) { return null; } // Prüfe ob das Datum in einem vernünftigen Bereich liegt (1970-2100) const year = date.getFullYear(); if (year < 1970 || year > 2100) { return null; } return date; } catch (error) { console.error('Error parsing date:', dateInput, error); return null; } } export async function syncAllDomains() { console.log('Starting optimized syncAllDomains...'); const startTime = Date.now(); const s3 = getS3Client(); const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 }); const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || []; console.log(`Found ${domainBuckets.length} domain buckets`); const bucketLimit = pLimit(3); await Promise.all( domainBuckets.map(bucketObj => bucketLimit(async () => { const bucket = bucketObj.Name!; // Korrekte Domain-Konvertierung: Nur den letzten '-' vor '-emails' zu '.' machen // Beispiel: bayarea-cc-com-emails -> bayarea-cc-com -> bayarea-cc.com let domainName = bucket.replace('-emails', ''); // Entferne '-emails' const lastDashIndex = domainName.lastIndexOf('-'); // Finde letzten Bindestrich if (lastDashIndex !== -1) { // Ersetze nur den letzten Bindestrich durch einen Punkt domainName = domainName.substring(0, lastDashIndex) + '.' + domainName.substring(lastDashIndex + 1); } console.log(`Processing bucket: ${bucket} -> Domain: ${domainName}`); const [domain] = await db .insert(domains) .values({ bucket, domain: domainName }) .onConflictDoUpdate({ target: domains.bucket, set: { domain: domainName } }) .returning({ id: domains.id }); await syncEmailsForDomainOptimized(domain.id, bucket, s3); }) ) ); const duration = (Date.now() - startTime) / 1000; console.log(`syncAllDomains completed in ${duration}s`); } async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3: S3Client) { console.log(`Starting optimized sync for bucket: ${bucket}`); const startTime = Date.now(); // 1. Hole alle S3 Keys auf einmal let allS3Keys: string[] = []; let continuationToken: string | undefined; do { const response = await pRetry( () => s3.send(new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1000, ContinuationToken: continuationToken })), { retries: 3 } ); allS3Keys.push(...(response.Contents?.map(obj => obj.Key!).filter(Boolean) || [])); continuationToken = response.NextContinuationToken; } while (continuationToken); console.log(`Found ${allS3Keys.length} objects in bucket`); if (allS3Keys.length === 0) return; // 2. Hole alle existierenden E-Mails aus der DB in einem Query const existingEmails = await db .select({ s3Key: emails.s3Key, processed: emails.processed }) .from(emails) .where(inArray(emails.s3Key, allS3Keys)); const existingKeysMap = new Map( existingEmails.map(e => [e.s3Key, e.processed]) ); console.log(`Found ${existingEmails.length} existing emails in DB`); // 3. Bestimme was zu tun ist const toInsert: string[] = []; const toUpdate: string[] = []; for (const key of allS3Keys) { if (!existingKeysMap.has(key)) { toInsert.push(key); } else { toUpdate.push(key); } } console.log(`To insert: ${toInsert.length}, To update: ${toUpdate.length}`); // 4. Parallele Verarbeitung der Updates (Metadaten) if (toUpdate.length > 0) { const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS); const updatePromises = toUpdate.map(key => updateLimit(async () => { try { const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); const metadata = head.Metadata || {}; const processed = metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!; const processedAt = parseDate(metadata['processed_at']); const processedBy = metadata['processed_by'] || null; const queuedTo = metadata['queued_to'] || null; const status = metadata['status'] || null; await db .update(emails) .set({ processed, processedAt, processedBy, queuedTo, status }) .where(eq(emails.s3Key, key)); } catch (error) { console.error(`Error updating ${key}:`, error); } }) ); await Promise.all(updatePromises); } // 5. Batch-Insert für neue E-Mails if (toInsert.length > 0) { console.log(`Processing ${toInsert.length} new emails...`); for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) { const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE); console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`); const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING); const emailDataPromises = batch.map(key => parseLimit(async () => { try { const [getObjResponse, headResponse] = await Promise.all([ pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }), pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }) ]); const raw = await getBody(getObjResponse.Body as Readable); const parsed = await simpleParser(raw, { skipHtmlToText: true, skipTextContent: false, skipImageLinks: true }); const metadata = headResponse.Metadata || {}; const to = extractAddresses(parsed.to); const cc = extractAddresses(parsed.cc); const bcc = extractAddresses(parsed.bcc); // Versuche verschiedene Datum-Quellen in Reihenfolge der Präferenz let emailDate: Date | null = null; // 1. Versuche parsed.date if (parsed.date) { emailDate = parseDate(parsed.date); } // 2. Falls parsed.date ungültig, versuche S3 LastModified if (!emailDate && headResponse.LastModified) { emailDate = parseDate(headResponse.LastModified); } // 3. Falls beides ungültig, verwende aktuelles Datum als Fallback if (!emailDate) { console.warn(`No valid date found for ${key}, using current date`); emailDate = new Date(); } return { domainId, s3Key: key, from: parsed.from?.value[0]?.address, to, cc, bcc, subject: parsed.subject, html: parsed.html || parsed.textAsHtml, raw: raw.toString('utf-8'), processed: metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!, date: emailDate, processedAt: parseDate(metadata['processed_at']), processedBy: metadata['processed_by'] || null, queuedTo: metadata['queued_to'] || null, status: metadata['status'] || null, }; } catch (error) { console.error(`Error processing ${key}:`, error); return null; } }) ); const emailData = (await Promise.all(emailDataPromises)).filter(Boolean); if (emailData.length > 0) { await db.insert(emails).values(emailData); console.log(`Inserted ${emailData.length} emails`); } } } const duration = (Date.now() - startTime) / 1000; console.log(`Sync for ${bucket} completed in ${duration}s`); } function extractAddresses(addressObj: any): string[] { if (!addressObj) return []; if (Array.isArray(addressObj)) { return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean); } return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || []; }