import { ConnectionOptions, Queue, Worker } from 'bullmq'; import Redis from 'ioredis'; import nodemailer from 'nodemailer'; import db from '../db'; // Redis connection (reuse from main scheduler) const redisConnection = new Redis(process.env.REDIS_URL || 'redis://localhost:6380', { maxRetriesPerRequest: null, }); const queueConnection = redisConnection as unknown as ConnectionOptions; // Digest queue export const digestQueue = new Queue('change-digests', { connection: queueConnection, defaultJobOptions: { removeOnComplete: 10, removeOnFail: 10, }, }); // Email transporter (same config as alerter) const transporter = nodemailer.createTransport({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS, }, }); const APP_URL = process.env.APP_URL || 'http://localhost:3000'; const EMAIL_FROM = process.env.EMAIL_FROM || 'noreply@websitemonitor.com'; interface DigestChange { monitorId: string; monitorName: string; monitorUrl: string; changePercentage: number; changedAt: Date; importanceScore: number; } interface DigestUser { id: string; email: string; digestInterval: 'daily' | 'weekly' | 'none'; lastDigestAt: Date | null; } /** * Get users who need a digest email */ async function getUsersForDigest(interval: 'daily' | 'weekly'): Promise { const cutoffHours = interval === 'daily' ? 24 : 168; // 24h or 7 days const result = await db.query( `SELECT id, email, COALESCE(notification_preferences->>'digestInterval', 'none') as "digestInterval", last_digest_at as "lastDigestAt" FROM users WHERE COALESCE(notification_preferences->>'digestInterval', 'none') = $1 AND (last_digest_at IS NULL OR last_digest_at < NOW() - INTERVAL '${cutoffHours} hours')`, [interval] ); return result.rows; } /** * Get changes for a user since their last digest */ async function getChangesForUser(userId: string, since: Date): Promise { const result = await db.query( `SELECT m.id as "monitorId", m.name as "monitorName", m.url as "monitorUrl", s.change_percentage as "changePercentage", s.checked_at as "changedAt", COALESCE(s.importance_score, 50) as "importanceScore" FROM monitors m JOIN snapshots s ON s.monitor_id = m.id WHERE m.user_id = $1 AND s.has_changes = true AND s.checked_at > $2 ORDER BY s.importance_score DESC, s.checked_at DESC LIMIT 50`, [userId, since] ); return result.rows; } /** * Generate HTML for the digest email */ function generateDigestHtml(changes: DigestChange[], interval: string): string { const periodText = interval === 'daily' ? 'today' : 'this week'; if (changes.length === 0) { return `

📊 Your Change Digest

No changes detected ${periodText}. All quiet on your monitors!

Visit your dashboard to manage your monitors.

`; } // Group by importance const highImportance = changes.filter(c => c.importanceScore >= 70); const mediumImportance = changes.filter(c => c.importanceScore >= 40 && c.importanceScore < 70); const lowImportance = changes.filter(c => c.importanceScore < 40); let html = `

📊 Your Change Digest

Here's what changed ${periodText}:

${changes.length} changes detected across your monitors
`; if (highImportance.length > 0) { html += `

🔴 High Priority (${highImportance.length})

${generateChangesList(highImportance)} `; } if (mediumImportance.length > 0) { html += `

🟡 Medium Priority (${mediumImportance.length})

${generateChangesList(mediumImportance)} `; } if (lowImportance.length > 0) { html += `

🟢 Low Priority (${lowImportance.length})

${generateChangesList(lowImportance)} `; } html += `

Manage digest settings | View all monitors

`; return html; } function generateChangesList(changes: DigestChange[]): string { return ` ${changes.map(c => ` `).join('')}
${c.monitorName}
${c.monitorUrl}
${c.changePercentage.toFixed(1)}% changed
`; } /** * Send digest email to user */ async function sendDigestEmail(user: DigestUser, changes: DigestChange[]): Promise { const subject = changes.length > 0 ? `📊 ${changes.length} change${changes.length > 1 ? 's' : ''} detected on your monitors` : '📊 Your monitor digest - All quiet!'; const html = generateDigestHtml(changes, user.digestInterval); await transporter.sendMail({ from: EMAIL_FROM, to: user.email, subject, html, }); // Update last digest timestamp await db.query( 'UPDATE users SET last_digest_at = NOW() WHERE id = $1', [user.id] ); console.log(`[Digest] Sent ${user.digestInterval} digest to ${user.email} with ${changes.length} changes`); } /** * Process all pending digests */ export async function processDigests(interval: 'daily' | 'weekly'): Promise { console.log(`[Digest] Processing ${interval} digests...`); const users = await getUsersForDigest(interval); console.log(`[Digest] Found ${users.length} users for ${interval} digest`); for (const user of users) { try { const since = user.lastDigestAt || new Date(Date.now() - (interval === 'daily' ? 24 : 168) * 60 * 60 * 1000); const changes = await getChangesForUser(user.id, since); await sendDigestEmail(user, changes); } catch (error) { console.error(`[Digest] Error sending digest to ${user.email}:`, error); } } } /** * Schedule digest jobs (call on server start) */ export async function scheduleDigestJobs(): Promise { // Daily digest at 9 AM await digestQueue.add( 'daily-digest', { interval: 'daily' }, { jobId: 'daily-digest', repeat: { pattern: '0 9 * * *', // Every day at 9 AM }, } ); // Weekly digest on Mondays at 9 AM await digestQueue.add( 'weekly-digest', { interval: 'weekly' }, { jobId: 'weekly-digest', repeat: { pattern: '0 9 * * 1', // Every Monday at 9 AM }, } ); console.log('[Digest] Scheduled daily and weekly digest jobs'); } /** * Start digest worker */ export function startDigestWorker(): Worker { const worker = new Worker( 'change-digests', async (job) => { const { interval } = job.data; await processDigests(interval); }, { connection: queueConnection, concurrency: 1, } ); worker.on('completed', (job) => { console.log(`[Digest] Job ${job.id} completed`); }); worker.on('failed', (job, err) => { console.error(`[Digest] Job ${job?.id} failed:`, err.message); }); console.log('[Digest] Worker started'); return worker; }