318 lines
11 KiB
TypeScript
318 lines
11 KiB
TypeScript
import { db } from '@/app/db/drizzle';
|
|
import { domains, emails } from '@/app/db/schema';
|
|
import { getS3Client, getBody } from '@/app/lib/utils';
|
|
import { simpleParser } from 'mailparser';
|
|
import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
|
import { eq, sql, inArray } from 'drizzle-orm';
|
|
import { Readable } from 'stream';
|
|
import pLimit from 'p-limit';
|
|
import pRetry from 'p-retry';
|
|
|
|
// Konfigurierbare Konstanten
|
|
const CONCURRENT_S3_OPERATIONS = 10;
|
|
const BATCH_INSERT_SIZE = 100;
|
|
const CONCURRENT_EMAIL_PARSING = 5;
|
|
|
|
// Globale Helper function für sichere Date-Konvertierung
|
|
function parseDate(dateInput: string | Date | undefined | null): Date | null {
|
|
if (!dateInput) return null;
|
|
|
|
try {
|
|
const date = dateInput instanceof Date ? dateInput : new Date(dateInput);
|
|
if (isNaN(date.getTime())) {
|
|
return null;
|
|
}
|
|
const year = date.getFullYear();
|
|
if (year < 1970 || year > 2100) {
|
|
return null;
|
|
}
|
|
return date;
|
|
} catch (error) {
|
|
console.error('Error parsing date:', dateInput, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// Helper Funktion für Address-Extraktion aus Header-Feldern
|
|
function extractAddresses(addressObj: any): string[] {
|
|
if (!addressObj) return [];
|
|
if (Array.isArray(addressObj)) {
|
|
return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean);
|
|
}
|
|
return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || [];
|
|
}
|
|
|
|
// NEUE FUNKTION: Extrahiere Envelope Recipients aus Received Headers
|
|
function extractEnvelopeRecipients(parsed: any): string[] {
|
|
const envelopeRecipients: string[] = [];
|
|
const receivedHeaders = parsed.headers.get('received');
|
|
|
|
if (receivedHeaders) {
|
|
const receivedArray = Array.isArray(receivedHeaders) ? receivedHeaders : [receivedHeaders];
|
|
|
|
for (const received of receivedArray) {
|
|
const receivedStr = typeof received === 'string' ? received : received.toString();
|
|
// Regex für "for <email@domain>" oder "for email@domain"
|
|
const forMatches = receivedStr.match(/\bfor\s+<?([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})>?/gi);
|
|
if (forMatches) {
|
|
forMatches.forEach(match => {
|
|
const emailMatch = match.match(/([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})/i);
|
|
if (emailMatch && emailMatch[1]) {
|
|
envelopeRecipients.push(emailMatch[1].toLowerCase());
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Dedupliziere
|
|
return [...new Set(envelopeRecipients)];
|
|
}
|
|
|
|
export async function syncAllDomains() {
|
|
console.log('Starting optimized syncAllDomains...');
|
|
const startTime = Date.now();
|
|
|
|
const s3 = getS3Client();
|
|
const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 });
|
|
const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || [];
|
|
console.log(`Found ${domainBuckets.length} domain buckets`);
|
|
|
|
const bucketLimit = pLimit(3);
|
|
|
|
await Promise.all(
|
|
domainBuckets.map(bucketObj =>
|
|
bucketLimit(async () => {
|
|
const bucket = bucketObj.Name!;
|
|
|
|
// Korrekte Domain-Konvertierung: Nur den letzten '-' vor '-emails' zu '.' machen
|
|
let domainName = bucket.replace('-emails', '');
|
|
const lastDashIndex = domainName.lastIndexOf('-');
|
|
if (lastDashIndex !== -1) {
|
|
domainName = domainName.substring(0, lastDashIndex) + '.' + domainName.substring(lastDashIndex + 1);
|
|
}
|
|
|
|
console.log(`Processing bucket: ${bucket} -> Domain: ${domainName}`);
|
|
|
|
const [domain] = await db
|
|
.insert(domains)
|
|
.values({ bucket, domain: domainName })
|
|
.onConflictDoUpdate({
|
|
target: domains.bucket,
|
|
set: { domain: domainName }
|
|
})
|
|
.returning({ id: domains.id });
|
|
|
|
await syncEmailsForDomainOptimized(domain.id, bucket, s3);
|
|
})
|
|
)
|
|
);
|
|
|
|
const duration = (Date.now() - startTime) / 1000;
|
|
console.log(`syncAllDomains completed in ${duration}s`);
|
|
}
|
|
|
|
async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3: S3Client) {
|
|
console.log(`Starting optimized sync for bucket: ${bucket}`);
|
|
const startTime = Date.now();
|
|
|
|
let allS3Keys: string[] = [];
|
|
let continuationToken: string | undefined;
|
|
|
|
do {
|
|
const response = await pRetry(
|
|
() => s3.send(new ListObjectsV2Command({
|
|
Bucket: bucket,
|
|
MaxKeys: 1000,
|
|
ContinuationToken: continuationToken
|
|
})),
|
|
{ retries: 3 }
|
|
);
|
|
|
|
allS3Keys.push(...(response.Contents?.map(obj => obj.Key!).filter(Boolean) || []));
|
|
continuationToken = response.NextContinuationToken;
|
|
} while (continuationToken);
|
|
|
|
console.log(`Found ${allS3Keys.length} objects in bucket`);
|
|
|
|
if (allS3Keys.length === 0) return;
|
|
|
|
const existingEmails = await db
|
|
.select({
|
|
s3Key: emails.s3Key,
|
|
processed: emails.processed
|
|
})
|
|
.from(emails)
|
|
.where(inArray(emails.s3Key, allS3Keys));
|
|
|
|
const existingKeysMap = new Map(
|
|
existingEmails.map(e => [e.s3Key, e.processed])
|
|
);
|
|
|
|
console.log(`Found ${existingEmails.length} existing emails in DB`);
|
|
|
|
const toInsert: string[] = [];
|
|
const toUpdate: string[] = [];
|
|
|
|
for (const key of allS3Keys) {
|
|
if (!existingKeysMap.has(key)) {
|
|
toInsert.push(key);
|
|
} else {
|
|
toUpdate.push(key);
|
|
}
|
|
}
|
|
|
|
console.log(`To insert: ${toInsert.length}, To update: ${toUpdate.length}`);
|
|
|
|
if (toUpdate.length > 0) {
|
|
const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS);
|
|
const updatePromises = toUpdate.map(key =>
|
|
updateLimit(async () => {
|
|
try {
|
|
const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
|
|
const metadata = head.Metadata || {};
|
|
|
|
const processed = metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
|
|
const processedAt = parseDate(metadata['processed_at']);
|
|
const processedBy = metadata['processed_by'] || null;
|
|
const queuedTo = metadata['queued_to'] || null;
|
|
const status = metadata['status'] || null;
|
|
|
|
await db
|
|
.update(emails)
|
|
.set({
|
|
processed,
|
|
processedAt,
|
|
processedBy,
|
|
queuedTo,
|
|
status
|
|
})
|
|
.where(eq(emails.s3Key, key));
|
|
} catch (error) {
|
|
console.error(`Error updating ${key}:`, error);
|
|
}
|
|
})
|
|
);
|
|
|
|
await Promise.all(updatePromises);
|
|
}
|
|
|
|
if (toInsert.length > 0) {
|
|
console.log(`Processing ${toInsert.length} new emails...`);
|
|
|
|
for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) {
|
|
const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE);
|
|
console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`);
|
|
|
|
const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING);
|
|
const emailDataPromises = batch.map(key =>
|
|
parseLimit(async () => {
|
|
try {
|
|
const [getObjResponse, headResponse] = await Promise.all([
|
|
pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }),
|
|
pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 })
|
|
]);
|
|
|
|
const raw = await getBody(getObjResponse.Body as Readable);
|
|
const parsed = await simpleParser(raw, {
|
|
skipHtmlToText: true,
|
|
skipTextContent: false,
|
|
skipImageLinks: true
|
|
});
|
|
|
|
const metadata = headResponse.Metadata || {};
|
|
|
|
// NEUE LOGIK: Extrahiere Envelope Recipients aus Received Headers
|
|
let to: string[] = [];
|
|
let cc: string[] = [];
|
|
let bcc: string[] = [];
|
|
|
|
const envelopeRecipients = extractEnvelopeRecipients(parsed);
|
|
|
|
if (envelopeRecipients.length > 0) {
|
|
to = envelopeRecipients;
|
|
console.log(`[${key}] Using ${to.length} envelope recipient(s): ${to.join(', ')}`);
|
|
} else {
|
|
to = extractAddresses(parsed.to);
|
|
cc = extractAddresses(parsed.cc);
|
|
bcc = extractAddresses(parsed.bcc);
|
|
console.log(`[${key}] No envelope recipients found, using header recipients`);
|
|
}
|
|
|
|
let emailDate: Date | null = null;
|
|
|
|
if (parsed.date) {
|
|
emailDate = parseDate(parsed.date);
|
|
}
|
|
|
|
if (!emailDate && headResponse.LastModified) {
|
|
emailDate = parseDate(headResponse.LastModified);
|
|
}
|
|
|
|
if (!emailDate) {
|
|
console.warn(`No valid date found for ${key}, using current date`);
|
|
emailDate = new Date();
|
|
}
|
|
|
|
return {
|
|
domainId,
|
|
s3Key: key,
|
|
from: parsed.from?.value[0]?.address,
|
|
to,
|
|
cc,
|
|
bcc,
|
|
subject: parsed.subject,
|
|
html: parsed.html || parsed.textAsHtml,
|
|
raw: raw.toString('utf-8'),
|
|
processed: metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
|
|
date: emailDate,
|
|
processedAt: parseDate(metadata['processed_at']),
|
|
processedBy: metadata['processed_by'] || null,
|
|
queuedTo: metadata['queued_to'] || null,
|
|
status: metadata['status'] || null,
|
|
};
|
|
} catch (error) {
|
|
console.error(`Error processing ${key}:`, error);
|
|
return null;
|
|
}
|
|
})
|
|
);
|
|
|
|
const emailData = (await Promise.all(emailDataPromises)).filter(Boolean);
|
|
|
|
if (emailData.length > 0) {
|
|
const validatedData = emailData.map(email => {
|
|
return {
|
|
...email,
|
|
date: email!.date || new Date(),
|
|
processedAt: email!.processedAt || null,
|
|
};
|
|
});
|
|
|
|
try {
|
|
await db.insert(emails).values(validatedData);
|
|
console.log(`Inserted ${validatedData.length} emails`);
|
|
} catch (dbError) {
|
|
console.error('Database insert error:', dbError);
|
|
console.log('Trying individual inserts to identify problematic email...');
|
|
for (const email of validatedData) {
|
|
try {
|
|
await db.insert(emails).values([email]);
|
|
console.log(`Successfully inserted: ${email!.s3Key}`);
|
|
} catch (singleError) {
|
|
console.error(`Failed to insert ${email!.s3Key}:`, singleError);
|
|
console.error('Problematic email data:', JSON.stringify({
|
|
s3Key: email!.s3Key,
|
|
date: email!.date,
|
|
processedAt: email!.processedAt,
|
|
}));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const duration = (Date.now() - startTime) / 1000;
|
|
console.log(`Sync for ${bucket} completed in ${duration}s`);
|
|
} |