diff --git a/app/api/email/route.ts b/app/api/email/route.ts index b3404f2..9f6990d 100644 --- a/app/api/email/route.ts +++ b/app/api/email/route.ts @@ -1,12 +1,8 @@ import { NextRequest, NextResponse } from 'next/server'; import { db } from '@/app/db/drizzle'; import { emails } from '@/app/db/schema'; -import { authenticate, getBody } from '@/app/lib/utils'; +import { authenticate } from '@/app/lib/utils'; import { eq } from 'drizzle-orm'; -import { CopyObjectCommand, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3'; -import { getS3Client } from '@/app/lib/utils'; -import nodemailer from 'nodemailer'; -import { Readable } from 'stream'; export async function GET(req: NextRequest) { if (!authenticate(req)) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); @@ -26,67 +22,9 @@ export async function GET(req: NextRequest) { html: email.html, raw: email.raw, processed: email.processed ? 'true' : 'false', + processedAt: email.processedAt?.toISOString() || null, + processedBy: email.processedBy, + queuedTo: email.queuedTo, + status: email.status, }); -} - -// PUT: Update processed in S3 and DB -export async function PUT(req: NextRequest) { - if (!authenticate(req)) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); - - const { bucket, key, processed } = await req.json(); - if (!bucket || !key) return NextResponse.json({ error: 'Missing params' }, { status: 400 }); - - const s3 = getS3Client(); - const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); - const newMeta = { ...head.Metadata, [process.env.PROCESSED_META_KEY!]: processed }; - await s3.send(new CopyObjectCommand({ - Bucket: bucket, - Key: key, - CopySource: `${bucket}/${key}`, - Metadata: newMeta, - MetadataDirective: 'REPLACE' - })); - - await db.update(emails).set({ processed: processed === 'true' }).where(eq(emails.s3Key, key)); - - return NextResponse.json({ success: true }); -} - -// POST: Resend, update in S3 and DB -export async function POST(req: NextRequest) { - if (!authenticate(req)) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); - - const { bucket, key } = await req.json(); - if (!bucket || !key) return NextResponse.json({ error: 'Missing params' }, { status: 400 }); - - const s3 = getS3Client(); - const { Body } = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); - const raw = await getBody(Body as Readable); - - const transporter = nodemailer.createTransport({ - host: process.env.SMTP_HOST, - port: Number(process.env.SMTP_PORT), - secure: false, - auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }, - tls: { rejectUnauthorized: false } - }); - - try { - await transporter.sendMail({ raw }); - // Update S3 Metadata - const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); - const newMeta = { ...head.Metadata, [process.env.PROCESSED_META_KEY!]: process.env.PROCESSED_META_VALUE! }; - await s3.send(new CopyObjectCommand({ - Bucket: bucket, - Key: key, - CopySource: `${bucket}/${key}`, - Metadata: newMeta, - MetadataDirective: 'REPLACE' - })); - // Update DB - await db.update(emails).set({ processed: true }).where(eq(emails.s3Key, key)); - return NextResponse.json({ message: 'Resent successfully' }); - } catch (error) { - return NextResponse.json({ error: (error as Error).message }, { status: 500 }); - } } \ No newline at end of file diff --git a/app/api/emails/route.ts b/app/api/emails/route.ts index 3e1d123..fcada8d 100644 --- a/app/api/emails/route.ts +++ b/app/api/emails/route.ts @@ -20,7 +20,20 @@ export async function GET(req: NextRequest) { subject: emails.subject, date: emails.date, processed: emails.processed, + processedAt: emails.processedAt, + processedBy: emails.processedBy, + queuedTo: emails.queuedTo, + status: emails.status, }).from(emails).where(sql`${mailbox} = ANY(${emails.to}) AND ${emails.domainId} = ${domain.id}`); - return NextResponse.json(emailList.map(e => ({ key: e.key, subject: e.subject, date: e.date?.toISOString(), processed: e.processed ? 'true' : 'false' }))); + return NextResponse.json(emailList.map(e => ({ + key: e.key, + subject: e.subject, + date: e.date?.toISOString(), + processed: e.processed ? 'true' : 'false', + processedAt: e.processedAt?.toISOString() || null, + processedBy: e.processedBy, + queuedTo: e.queuedTo, + status: e.status, + }))); } \ No newline at end of file diff --git a/app/api/mailboxes/route.ts b/app/api/mailboxes/route.ts index c4dbc4f..c6bb703 100644 --- a/app/api/mailboxes/route.ts +++ b/app/api/mailboxes/route.ts @@ -2,7 +2,7 @@ import { NextRequest, NextResponse } from 'next/server'; import { db } from '@/app/db/drizzle'; import { domains, emails } from '@/app/db/schema'; import { authenticate } from '@/app/lib/utils'; -import { eq, sql } from 'drizzle-orm'; +import { eq } from 'drizzle-orm'; export async function GET(req: NextRequest) { if (!authenticate(req)) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); @@ -14,9 +14,24 @@ export async function GET(req: NextRequest) { const [domain] = await db.select().from(domains).where(eq(domains.bucket, bucket)); if (!domain) return NextResponse.json({ error: 'Domain not found' }, { status: 404 }); + // Hole alle E-Mail-Adressen aus den "to" Feldern für diese Domain const mailboxData = await db.select({ to: emails.to }).from(emails).where(eq(emails.domainId, domain.id)); + + // Extrahiere die Domain aus dem Bucket-Namen (z.B. "example-com-emails" -> "example.com") + const domainName = bucket.replace('-emails', '').replace(/-/g, '.'); + const uniqueMailboxes = new Set(); - mailboxData.forEach(em => em.to?.forEach(r => uniqueMailboxes.add(r.toLowerCase()))); + + // Filtere nur E-Mail-Adressen, die zur aktuellen Domain gehören + mailboxData.forEach(em => { + em.to?.forEach(recipient => { + const recipientLower = recipient.toLowerCase(); + // Prüfe, ob die E-Mail-Adresse zur Domain gehört + if (recipientLower.endsWith(`@${domainName}`)) { + uniqueMailboxes.add(recipientLower); + } + }); + }); - return NextResponse.json(Array.from(uniqueMailboxes)); + return NextResponse.json(Array.from(uniqueMailboxes).sort()); } \ No newline at end of file diff --git a/app/api/resend-domain/route.ts b/app/api/resend-domain/route.ts deleted file mode 100644 index b5247f6..0000000 --- a/app/api/resend-domain/route.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server'; -import { db } from '@/app/db/drizzle'; -import { domains, emails } from '@/app/db/schema'; -import { authenticate } from '@/app/lib/utils'; -import { eq } from 'drizzle-orm'; - -export async function POST(req: NextRequest) { - if (!authenticate(req)) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); - - const { bucket } = await req.json(); - if (!bucket) return NextResponse.json({ error: 'Missing bucket' }, { status: 400 }); - - const [domain] = await db.select().from(domains).where(eq(domains.bucket, bucket)); - if (!domain) return NextResponse.json({ error: 'Domain not found' }, { status: 404 }); - - const unprocessed = await db.select({ s3Key: emails.s3Key }).from(emails).where(eq(emails.processed, false)); - - let count = 0; - for (const em of unprocessed) { - // Call POST /api/email internally for resend (updates DB/S3) - await fetch(`${req.headers.get('origin')}/api/email`, { - method: 'POST', - headers: { 'Content-Type': 'application/json', Authorization: req.headers.get('Authorization')! }, - body: JSON.stringify({ bucket, key: em.s3Key }), - }); - count++; - } - - return NextResponse.json({ message: `Resent ${count} emails` }); -} \ No newline at end of file diff --git a/app/db/schema.ts b/app/db/schema.ts index 7063359..5789318 100644 --- a/app/db/schema.ts +++ b/app/db/schema.ts @@ -19,4 +19,9 @@ export const emails = pgTable('emails', { raw: text('raw'), processed: boolean('processed').default(false), date: timestamp('date'), + // Neue Metadaten + processedAt: timestamp('processed_at'), + processedBy: text('processed_by'), + queuedTo: text('queued_to'), + status: text('status'), }); \ No newline at end of file diff --git a/app/email/page.tsx b/app/email/page.tsx index 83e4afe..00d3fe7 100644 --- a/app/email/page.tsx +++ b/app/email/page.tsx @@ -8,8 +8,19 @@ export default function EmailDetail() { const searchParams = useSearchParams(); const bucket = searchParams.get('bucket'); const key = searchParams.get('key'); - const mailbox = searchParams.get('mailbox'); // Für Breadcrumb - const [email, setEmail] = useState({ subject: '', from: '', to: '', html: '', raw: '', processed: '' }); + const mailbox = searchParams.get('mailbox'); + const [email, setEmail] = useState({ + subject: '', + from: '', + to: '', + html: '', + raw: '', + processed: '', + processedAt: null as string | null, + processedBy: null as string | null, + queuedTo: null as string | null, + status: null as string | null, + }); const [viewMode, setViewMode] = useState('html'); const [error, setError] = useState(null); const [loading, setLoading] = useState(true); @@ -42,6 +53,20 @@ export default function EmailDetail() { if (loading) return
Loading...
; if (error) return
{error}
; + const formatDate = (dateStr: string | null) => { + if (!dateStr) return 'N/A'; + const date = new Date(dateStr); + return date.toLocaleString('en-US', { + year: 'numeric', + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + hour12: false + }); + }; + return (

{email.subject}

-

From: {email.from}

-

To: {email.to}

-

S3 Key: {key}

-

Processed: {email.processed}

+ +
+
+

From: {email.from}

+

To: {email.to}

+

S3 Key: {key}

+
+
+

+ Processed: + + {email.processed === 'true' ? 'Yes' : 'No'} + +

+

Processed At: {formatDate(email.processedAt)}

+

Processed By: {email.processedBy || 'N/A'}

+

Queued To: {email.queuedTo || 'N/A'}

+

+ Status: + {email.status ? ( + + {email.status} + + ) : 'N/A'} +

+
+
+
-
- {message &&

{message}

} -
- - - - - - - - - - - - {emails.map((e: Email) => ( - - - - - - - - ))} - -
SubjectDateS3 KeyProcessedActions
{e.subject}{formatDate(e.date)}{e.key} - handleUpdateProcessed(e.key, e.processed !== 'true')} - className="h-4 w-4 text-blue-600 focus:ring-blue-500 border-gray-300 rounded" - /> - - - View - - -
-
-
- ); + return NextResponse.json(Array.from(uniqueMailboxes).sort()); } \ No newline at end of file diff --git a/app/lib/sync.ts b/app/lib/sync.ts index 2d6e272..22931ed 100644 --- a/app/lib/sync.ts +++ b/app/lib/sync.ts @@ -5,13 +5,13 @@ import { simpleParser } from 'mailparser'; import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3'; import { eq, sql, inArray } from 'drizzle-orm'; import { Readable } from 'stream'; -import pLimit from 'p-limit'; // Für parallele Verarbeitung mit Limit +import pLimit from 'p-limit'; import pRetry from 'p-retry'; // Konfigurierbare Konstanten -const CONCURRENT_S3_OPERATIONS = 10; // Parallele S3 Operationen -const BATCH_INSERT_SIZE = 100; // Batch-Größe für DB Inserts -const CONCURRENT_EMAIL_PARSING = 5; // Parallele E-Mail Parser +const CONCURRENT_S3_OPERATIONS = 10; +const BATCH_INSERT_SIZE = 100; +const CONCURRENT_EMAIL_PARSING = 5; export async function syncAllDomains() { console.log('Starting optimized syncAllDomains...'); @@ -22,8 +22,7 @@ export async function syncAllDomains() { const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || []; console.log(`Found ${domainBuckets.length} domain buckets`); - // Parallele Verarbeitung der Buckets - const bucketLimit = pLimit(3); // Max 3 Buckets parallel + const bucketLimit = pLimit(3); await Promise.all( domainBuckets.map(bucketObj => @@ -32,7 +31,6 @@ export async function syncAllDomains() { const domainName = bucket.replace('-emails', '').replace(/-/g, '.'); console.log(`Processing bucket: ${bucket}`); - // Upsert Domain const [domain] = await db .insert(domains) .values({ bucket, domain: domainName }) @@ -63,7 +61,7 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 const response = await pRetry( () => s3.send(new ListObjectsV2Command({ Bucket: bucket, - MaxKeys: 1000, // Maximum per Request + MaxKeys: 1000, ContinuationToken: continuationToken })), { retries: 3 } @@ -94,37 +92,45 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 // 3. Bestimme was zu tun ist const toInsert: string[] = []; - const toCheckProcessed: string[] = []; + const toUpdate: string[] = []; for (const key of allS3Keys) { if (!existingKeysMap.has(key)) { toInsert.push(key); } else { - toCheckProcessed.push(key); + toUpdate.push(key); } } - console.log(`To insert: ${toInsert.length}, To check: ${toCheckProcessed.length}`); + console.log(`To insert: ${toInsert.length}, To update: ${toUpdate.length}`); - // 4. Parallele Verarbeitung der Updates (Processed Status) - if (toCheckProcessed.length > 0) { + // 4. Parallele Verarbeitung der Updates (Metadaten) + if (toUpdate.length > 0) { const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS); - const updatePromises = toCheckProcessed.map(key => + const updatePromises = toUpdate.map(key => updateLimit(async () => { try { const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); - const processed = head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!; - const currentProcessed = existingKeysMap.get(key); + const metadata = head.Metadata || {}; - if (currentProcessed !== processed) { - await db - .update(emails) - .set({ processed }) - .where(eq(emails.s3Key, key)); - console.log(`Updated processed status for ${key}`); - } + const processed = metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!; + const processedAt = metadata['processed_at'] ? new Date(metadata['processed_at']) : null; + const processedBy = metadata['processed_by'] || null; + const queuedTo = metadata['queued_to'] || null; + const status = metadata['status'] || null; + + await db + .update(emails) + .set({ + processed, + processedAt, + processedBy, + queuedTo, + status + }) + .where(eq(emails.s3Key, key)); } catch (error) { - console.error(`Error checking ${key}:`, error); + console.error(`Error updating ${key}:`, error); } }) ); @@ -136,17 +142,14 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 if (toInsert.length > 0) { console.log(`Processing ${toInsert.length} new emails...`); - // Verarbeite in Batches for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) { const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE); console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`); - // Paralleles Fetching und Parsing const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING); const emailDataPromises = batch.map(key => parseLimit(async () => { try { - // Hole Objekt und Metadata parallel const [getObjResponse, headResponse] = await Promise.all([ pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }), pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }) @@ -154,11 +157,12 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 const raw = await getBody(getObjResponse.Body as Readable); const parsed = await simpleParser(raw, { - skipHtmlToText: true, // Schneller, wenn Text nicht benötigt + skipHtmlToText: true, skipTextContent: false, skipImageLinks: true }); + const metadata = headResponse.Metadata || {}; const to = extractAddresses(parsed.to); const cc = extractAddresses(parsed.cc); const bcc = extractAddresses(parsed.bcc); @@ -173,8 +177,13 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 subject: parsed.subject, html: parsed.html || parsed.textAsHtml, raw: raw.toString('utf-8'), - processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!, + processed: metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!, date: parsed.date || headResponse.LastModified, + // Neue Metadaten + processedAt: metadata['processed_at'] ? new Date(metadata['processed_at']) : null, + processedBy: metadata['processed_by'] || null, + queuedTo: metadata['queued_to'] || null, + status: metadata['status'] || null, }; } catch (error) { console.error(`Error processing ${key}:`, error); @@ -185,7 +194,6 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 const emailData = (await Promise.all(emailDataPromises)).filter(Boolean); - // Batch Insert if (emailData.length > 0) { await db.insert(emails).values(emailData); console.log(`Inserted ${emailData.length} emails`); @@ -197,93 +205,10 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3 console.log(`Sync for ${bucket} completed in ${duration}s`); } -// Helper Funktion für Address-Extraktion function extractAddresses(addressObj: any): string[] { if (!addressObj) return []; if (Array.isArray(addressObj)) { return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean); } return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || []; -} - -// Optimierte Version mit Stream-Processing für sehr große Buckets -export async function syncEmailsForDomainStreaming(domainId: number, bucket: string, s3: S3Client) { - console.log(`Starting streaming sync for bucket: ${bucket}`); - - // Verwende S3 Select für große Datensätze (falls unterstützt) - // Dies reduziert die übertragene Datenmenge erheblich - - const existingKeys = new Set( - (await db - .select({ s3Key: emails.s3Key }) - .from(emails) - .where(eq(emails.domainId, domainId)) - ).map(e => e.s3Key) - ); - - const processQueue: any[] = []; - const QUEUE_SIZE = 50; - - // Stream-basierte Verarbeitung - let continuationToken: string | undefined; - - do { - const response = await s3.send(new ListObjectsV2Command({ - Bucket: bucket, - MaxKeys: 100, - ContinuationToken: continuationToken - })); - - const newKeys = response.Contents?.filter(obj => - obj.Key && !existingKeys.has(obj.Key) - ) || []; - - // Verarbeite parallel während wir weitere Keys holen - if (newKeys.length > 0) { - const batch = newKeys.slice(0, QUEUE_SIZE); - // Prozessiere Batch async (ohne await) - processBatchAsync(batch, bucket, domainId, s3); - } - - continuationToken = response.NextContinuationToken; - } while (continuationToken); -} - -async function processBatchAsync(batch: any[], bucket: string, domainId: number, s3: S3Client) { - // Async Batch Processing ohne den Hauptthread zu blockieren - const emailData = await Promise.all( - batch.map(async (obj) => { - try { - const [getObjResponse, headResponse] = await Promise.all([ - s3.send(new GetObjectCommand({ Bucket: bucket, Key: obj.Key })), - s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key })) - ]); - - const raw = await getBody(getObjResponse.Body as Readable); - const parsed = await simpleParser(raw); - - return { - domainId, - s3Key: obj.Key, - from: parsed.from?.value[0]?.address, - to: extractAddresses(parsed.to), - cc: extractAddresses(parsed.cc), - bcc: extractAddresses(parsed.bcc), - subject: parsed.subject, - html: parsed.html || parsed.textAsHtml, - raw: raw.toString('utf-8'), - processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!, - date: parsed.date || obj.LastModified, - }; - } catch (error) { - console.error(`Error processing ${obj.Key}:`, error); - return null; - } - }) - ); - - const validData = emailData.filter(Boolean); - if (validData.length > 0) { - await db.insert(emails).values(validData); - } } \ No newline at end of file