import { db } from '@/app/db/drizzle'; import { domains, emails } from '@/app/db/schema'; import { getS3Client, getBody } from '@/app/lib/utils'; import { simpleParser } from 'mailparser'; import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3'; import { eq, sql, inArray } from 'drizzle-orm'; import { Readable } from 'stream'; import pLimit from 'p-limit'; import pRetry from 'p-retry'; // Konfigurierbare Konstanten const CONCURRENT_S3_OPERATIONS = 10; const BATCH_INSERT_SIZE = 100; const CONCURRENT_EMAIL_PARSING = 5; // Globale Helper function für sichere Date-Konvertierung function parseDate(dateInput: string | Date | undefined | null): Date | null { if (!dateInput) return null; try { const date = dateInput instanceof Date ? dateInput : new Date(dateInput); if (isNaN(date.getTime())) { return null; } const year = date.getFullYear(); if (year < 1970 || year > 2100) { return null; } return date; } catch (error) { console.error('Error parsing date:', dateInput, error); return null; } } // Helper Funktion für Address-Extraktion aus Header-Feldern function extractAddresses(addressObj: any): string[] { if (!addressObj) return []; if (Array.isArray(addressObj)) { return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean); } return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || []; } // NEUE FUNKTION: Extrahiere Envelope Recipients aus Received Headers function extractEnvelopeRecipients(parsed: any): string[] { const envelopeRecipients: string[] = []; const receivedHeaders = parsed.headers.get('received'); if (receivedHeaders) { const receivedArray = Array.isArray(receivedHeaders) ? receivedHeaders : [receivedHeaders]; for (const received of receivedArray) { const receivedStr = typeof received === 'string' ? received : received.toString(); // Regex für "for " oder "for email@domain" const forMatches = receivedStr.match(/\bfor\s+?/gi); if (forMatches) { forMatches.forEach(match => { const emailMatch = match.match(/([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})/i); if (emailMatch && emailMatch[1]) { envelopeRecipients.push(emailMatch[1].toLowerCase()); } }); } } } // Dedupliziere return [...new Set(envelopeRecipients)]; } export async function syncAllDomains() { console.log('Starting optimized syncAllDomains...'); const startTime = Date.now(); const s3 = getS3Client(); const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 }); const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || []; console.log(`Found ${domainBuckets.length} domain buckets`); const bucketLimit = pLimit(3); await Promise.all( domainBuckets.map(bucketObj => bucketLimit(async () => { const bucket = bucketObj.Name!; // Korrekte Domain-Konvertierung: Nur den letzten '-' vor '-emails' zu '.' machen let domainName = bucket.replace('-emails', ''); const lastDashIndex = domainName.lastIndexOf('-'); if (lastDashIndex !== -1) { domainName = domainName.substring(0, lastDashIndex) + '.' + domainName.substring(lastDashIndex + 1); } console.log(`Processing bucket: ${bucket} -> Domain: ${domainName}`); const [domain] = await db .insert(domains) .values({ bucket, domain: domainName }) .onConflictDoUpdate({ target: domains.bucket, set: { domain: domainName } }) .returning({ id: domains.id }); await syncEmailsForDomainOptimized(domain.id, bucket, s3); }) ) ); const duration = (Date.now() - startTime) / 1000; console.log(`syncAllDomains completed in ${duration}s`); } async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3: S3Client) { console.log(`Starting optimized sync for bucket: ${bucket}`); const startTime = Date.now(); let allS3Keys: string[] = []; let continuationToken: string | undefined; do { const response = await pRetry( () => s3.send(new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1000, ContinuationToken: continuationToken })), { retries: 3 } ); allS3Keys.push(...(response.Contents?.map(obj => obj.Key!).filter(Boolean) || [])); continuationToken = response.NextContinuationToken; } while (continuationToken); console.log(`Found ${allS3Keys.length} objects in bucket`); if (allS3Keys.length === 0) return; // Hole ALLE existierenden E-Mails für diese Domain aus der DB (nicht nur die in S3) const allDbEmails = await db .select({ s3Key: emails.s3Key, processed: emails.processed }) .from(emails) .where(eq(emails.domainId, domainId)); console.log(`Found ${allDbEmails.length} existing emails in DB for this domain`); const existingKeysMap = new Map( allDbEmails.map(e => [e.s3Key, e.processed]) ); const toInsert: string[] = []; const toUpdate: string[] = []; const toDelete: string[] = []; // Bestimme was zu tun ist for (const key of allS3Keys) { if (!existingKeysMap.has(key)) { toInsert.push(key); } else { toUpdate.push(key); } } // Finde DB-Einträge die nicht mehr in S3 existieren for (const dbEmail of allDbEmails) { if (!allS3Keys.includes(dbEmail.s3Key)) { toDelete.push(dbEmail.s3Key); } } console.log(`To insert: ${toInsert.length}, To update: ${toUpdate.length}, To delete: ${toDelete.length}`); // Lösche verwaiste E-Mails if (toDelete.length > 0) { console.log(`Deleting ${toDelete.length} orphaned email(s) from DB: ${toDelete.slice(0, 5).join(', ')}${toDelete.length > 5 ? '...' : ''}`); try { await db.delete(emails).where(inArray(emails.s3Key, toDelete)); console.log(`Successfully deleted ${toDelete.length} orphaned emails`); } catch (error) { console.error('Error deleting orphaned emails:', error); } } else { console.log('No orphaned emails found'); } if (toUpdate.length > 0) { const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS); const updatePromises = toUpdate.map(key => updateLimit(async () => { try { const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); const metadata = head.Metadata || {}; const processed = metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!; const processedAt = parseDate(metadata['processed_at']); const processedBy = metadata['processed_by'] || null; const queuedTo = metadata['queued_to'] || null; const status = metadata['status'] || null; await db .update(emails) .set({ processed, processedAt, processedBy, queuedTo, status }) .where(eq(emails.s3Key, key)); } catch (error) { console.error(`Error updating ${key}:`, error); } }) ); await Promise.all(updatePromises); } if (toInsert.length > 0) { console.log(`Processing ${toInsert.length} new emails...`); for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) { const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE); console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`); const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING); const emailDataPromises = batch.map(key => parseLimit(async () => { try { const [getObjResponse, headResponse] = await Promise.all([ pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }), pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }) ]); const raw = await getBody(getObjResponse.Body as Readable); const parsed = await simpleParser(raw, { skipHtmlToText: true, skipTextContent: false, skipImageLinks: true }); const metadata = headResponse.Metadata || {}; // NEUE LOGIK: Extrahiere Envelope Recipients aus Received Headers let to: string[] = []; let cc: string[] = []; let bcc: string[] = []; const envelopeRecipients = extractEnvelopeRecipients(parsed); if (envelopeRecipients.length > 0) { to = envelopeRecipients; console.log(`[${key}] Using ${to.length} envelope recipient(s): ${to.join(', ')}`); } else { to = extractAddresses(parsed.to); cc = extractAddresses(parsed.cc); bcc = extractAddresses(parsed.bcc); console.log(`[${key}] No envelope recipients found, using header recipients`); } let emailDate: Date | null = null; if (parsed.date) { emailDate = parseDate(parsed.date); } if (!emailDate && headResponse.LastModified) { emailDate = parseDate(headResponse.LastModified); } if (!emailDate) { console.warn(`No valid date found for ${key}, using current date`); emailDate = new Date(); } return { domainId, s3Key: key, from: parsed.from?.value[0]?.address, to, cc, bcc, subject: parsed.subject, html: parsed.html || parsed.textAsHtml, raw: raw.toString('utf-8'), processed: metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!, date: emailDate, processedAt: parseDate(metadata['processed_at']), processedBy: metadata['processed_by'] || null, queuedTo: metadata['queued_to'] || null, status: metadata['status'] || null, }; } catch (error) { console.error(`Error processing ${key}:`, error); return null; } }) ); const emailData = (await Promise.all(emailDataPromises)).filter(Boolean); if (emailData.length > 0) { const validatedData = emailData.map(email => { return { ...email, date: email!.date || new Date(), processedAt: email!.processedAt || null, }; }); try { await db.insert(emails).values(validatedData); console.log(`Inserted ${validatedData.length} emails`); } catch (dbError) { console.error('Database insert error:', dbError); console.log('Trying individual inserts to identify problematic email...'); for (const email of validatedData) { try { await db.insert(emails).values([email]); console.log(`Successfully inserted: ${email!.s3Key}`); } catch (singleError) { console.error(`Failed to insert ${email!.s3Key}:`, singleError); console.error('Problematic email data:', JSON.stringify({ s3Key: email!.s3Key, date: email!.date, processedAt: email!.processedAt, })); } } } } } } const duration = (Date.now() - startTime) / 1000; console.log(`Sync for ${bucket} completed in ${duration}s`); }