Compare commits

...

13 Commits

Author SHA1 Message Date
Andreas Knuth d331bd13b5 no buffer 2026-03-11 19:38:02 -05:00
Andreas Knuth 610b01eee7 whitelist helper 2026-03-11 19:26:32 -05:00
Andreas Knuth c2d4903bc9 ENABLE_FAIL2BAN 0 2026-03-11 09:38:00 -05:00
Andreas Knuth 613aa30493 logs 2026-03-08 16:15:41 -05:00
Andreas Knuth 29f360ece8 logger console + file 2026-03-08 16:09:30 -05:00
Andreas Knuth 62221e8121 fix 2026-03-08 14:54:33 -05:00
Andreas Knuth 74c4f5801e Prometheus, Grafana, blackbox_exporter 2026-03-08 14:50:43 -05:00
Andreas Knuth 90b120957d add missing import 2026-03-07 17:07:50 -06:00
Andreas Knuth 99ab2a07d8 send mail even if if parsing fails 2026-03-07 17:06:03 -06:00
Andreas Knuth d9a91c13ed printstats 2026-03-07 16:41:51 -06:00
Andreas Knuth 1d53f2d357 pino 2026-03-07 15:34:15 -06:00
Andreas Knuth 9586869c0c neue Ports 2026-03-07 15:26:56 -06:00
Andreas Knuth d1426afec5 new structure 2026-03-07 15:16:14 -06:00
24 changed files with 3452 additions and 48 deletions

View File

@ -24,3 +24,8 @@ COPY sieve-schedule /etc/sieve-schedule
# 5. Supervisor Konfiguration kopieren # 5. Supervisor Konfiguration kopieren
COPY sieve-supervisor.conf /etc/supervisor/conf.d/sieve-sync.conf COPY sieve-supervisor.conf /etc/supervisor/conf.d/sieve-sync.conf
# 6. Dynamic Whitelist Script und Supervisor-Config kopieren
COPY dynamic_whitelist.py /scripts/dynamic_whitelist.py
RUN chmod +x /scripts/dynamic_whitelist.py
COPY whitelist-supervisor.conf /etc/supervisor/conf.d/dynamic-whitelist.conf

87
DMS/dynamic_whitelist.py Normal file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import os
import re
import time
import subprocess
import threading
from datetime import datetime
try:
from croniter import croniter
except ImportError:
print("Bitte 'croniter' via pip installieren!")
exit(1)
LOG_FILE = '/var/log/mail/mail.log'
WHITELIST_DURATION_SEC = 24 * 60 * 60 # 24 Stunden
CRON_SCHEDULE = "0 * * * *" # Jede Stunde
active_ips = {}
# Regex für Dovecot IMAP/POP3 erfolgreiche Logins
LOGIN_REGEX = re.compile(r"dovecot: (?:imap|pop3)-login: Login: user=<[^>]+>.*rip=([0-9]{1,3}(?:\.[0-9]{1,3}){3}),")
# Private Netze (Docker/Local) ignorieren
IGNORE_REGEX = re.compile(r"^(172\.|10\.|192\.168\.|127\.)")
def run_command(cmd):
try:
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as e:
print(f"Fehler bei: {cmd} - {e}")
def cleanup_job():
"""Cron-Thread für das stündliche Aufräumen abgelaufener IPs."""
iter = croniter(CRON_SCHEDULE, datetime.now())
while True:
next_run = iter.get_next(datetime)
sleep_seconds = (next_run - datetime.now()).total_seconds()
if sleep_seconds > 0:
time.sleep(sleep_seconds)
print(f"[{datetime.now()}] Starte stündlichen Whitelist-Cleanup...")
now = time.time()
expired_ips = [ip for ip, timestamp in active_ips.items() if now - timestamp > WHITELIST_DURATION_SEC]
for ip in expired_ips:
print(f"[{datetime.now()}] Whitelist für {ip} abgelaufen. Entferne...")
run_command(f"fail2ban-client set dovecot delignoreip {ip}")
run_command(f"fail2ban-client set postfix delignoreip {ip}")
del active_ips[ip]
def follow_log():
"""Verwendet System 'tail -F', da dies Log-Rotation automatisch handhabt."""
print(f"[{datetime.now()}] Dynamic Whitelist Monitor gestartet...")
while not os.path.exists(LOG_FILE):
time.sleep(2)
process = subprocess.Popen(['tail', '-F', LOG_FILE], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
for line in process.stdout:
match = LOGIN_REGEX.search(line)
if match:
ip = match.group(1)
if IGNORE_REGEX.match(ip):
continue
now = time.time()
# Neue IP in die Fail2ban Whitelist eintragen
if ip not in active_ips:
print(f"[{datetime.now()}] Neuer erfolgreicher Login von {ip}. Setze auf Whitelist...")
run_command(f"fail2ban-client set dovecot addignoreip {ip}")
run_command(f"fail2ban-client set postfix addignoreip {ip}")
# Timestamp (Last Seen) aktualisieren
active_ips[ip] = now
if __name__ == '__main__':
# Warte kurz, bis Fail2ban nach einem Container-Start hochgefahren ist
time.sleep(15)
# Cron-Cleanup im Hintergrund starten
threading.Thread(target=cleanup_job, daemon=True).start()
# Log-Überwachung in der Endlosschleife starten
follow_log()

View File

@ -0,0 +1,6 @@
[program:dynamic-whitelist]
command=/usr/bin/python3 -u /scripts/dynamic_whitelist.py
autostart=true
autorestart=true
stderr_logfile=/var/log/supervisor/dynamic-whitelist.err.log
stdout_logfile=/var/log/supervisor/dynamic-whitelist.out.log

View File

@ -8,10 +8,10 @@ services:
env_file: .env env_file: .env
volumes: volumes:
- ./domains.txt:/etc/email-worker/domains.txt:ro - ./domains.txt:/etc/email-worker/domains.txt:ro
- worker-logs:/var/log/email-worker - ./logs:/var/log/email-worker
ports: ports:
- "8000:8000" # Prometheus metrics - "9000:8000" # Prometheus metrics (Host:Container)
- "8080:8080" # Health check - "9090:8080" # Health check (Host:Container)
# Connect to DMS on the host or Docker network # Connect to DMS on the host or Docker network
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"

3190
email-worker-nodejs/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -26,15 +26,15 @@ export class BlocklistChecker {
* Batch-check whether a sender is blocked for each recipient. * Batch-check whether a sender is blocked for each recipient.
* Uses a single batch DynamoDB call for efficiency. * Uses a single batch DynamoDB call for efficiency.
*/ */
async batchCheckBlockedSenders( async batchCheckBlockedSenders(
recipients: string[], recipients: string[],
sender: string, senders: string[], // <-- Geändert zu Array
workerName: string, workerName: string,
): Promise<Record<string, boolean>> { ): Promise<Record<string, boolean>> {
const patternsByRecipient = const patternsByRecipient = await this.dynamodb.batchGetBlockedPatterns(recipients);
await this.dynamodb.batchGetBlockedPatterns(recipients);
const senderClean = extractAddress(sender); // Alle übergebenen Adressen bereinigen
const sendersClean = senders.map(s => extractAddress(s)).filter(Boolean);
const result: Record<string, boolean> = {}; const result: Record<string, boolean> = {};
for (const recipient of recipients) { for (const recipient of recipients) {
@ -42,21 +42,21 @@ export class BlocklistChecker {
let isBlocked = false; let isBlocked = false;
for (const pattern of patterns) { for (const pattern of patterns) {
if (picomatch.isMatch(senderClean, pattern.toLowerCase())) { for (const senderClean of sendersClean) {
log( if (picomatch.isMatch(senderClean, pattern.toLowerCase())) {
`⛔ BLOCKED: Sender ${senderClean} matches pattern '${pattern}' ` + log(
`for inbox ${recipient}`, `⛔ BLOCKED: Sender ${senderClean} matches pattern '${pattern}' for inbox ${recipient}`,
'WARNING', 'WARNING',
workerName, workerName,
); );
isBlocked = true; isBlocked = true;
break; break;
}
} }
if (isBlocked) break;
} }
result[recipient] = isBlocked; result[recipient] = isBlocked;
} }
return result; return result;
} }
} }

View File

@ -9,13 +9,13 @@
import { createTransport } from 'nodemailer'; import { createTransport } from 'nodemailer';
import type { ParsedMail } from 'mailparser'; import type { ParsedMail } from 'mailparser';
import type { DynamoDBHandler, EmailRule } from '../aws/dynamodb.js';
import type { SESHandler } from '../aws/ses.js'; import type { SESHandler } from '../aws/ses.js';
import { extractBodyParts } from './parser.js'; import { extractBodyParts } from './parser.js';
import { config, isInternalAddress } from '../config.js';
import { log } from '../logger.js'; import { log } from '../logger.js';
// Wir nutzen MailComposer direkt für das Erstellen der Raw Bytes // Wir nutzen MailComposer direkt für das Erstellen der Raw Bytes
import MailComposer from 'nodemailer/lib/mail-composer/index.js'; import MailComposer from 'nodemailer/lib/mail-composer/index.js';
import { DynamoDBHandler, EmailRule } from '../aws/dynamodb.js';
import { config, isInternalAddress } from '../config.js';
export type MetricsCallback = (action: 'autoreply' | 'forward', domain: string) => void; export type MetricsCallback = (action: 'autoreply' | 'forward', domain: string) => void;

View File

@ -103,12 +103,28 @@ function ensureFileStream(): WriteStream | null {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const logger = pino({ const logger = pino({
level: 'info', level: 'info',
formatters: { transport: {
level(label) { targets: [
return { level: label }; {
}, // 1. Schicke bunte Logs in die Konsole (für docker compose logs -f)
}, target: 'pino-pretty',
timestamp: pino.stdTimeFunctions.isoTime, options: {
colorize: true,
translateTime: 'SYS:yyyy-mm-dd HH:MM:ss',
ignore: 'pid,hostname',
singleLine: true
}
},
{
// 2. Schreibe gleichzeitig alles unformatiert in die Datei
target: 'pino/file',
options: {
destination: '/var/log/email-worker/worker.log',
mkdir: true
}
}
]
}
}); });
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -13,7 +13,7 @@ import { config, loadDomains } from './config.js';
import { log } from './logger.js'; import { log } from './logger.js';
import { startMetricsServer, type MetricsCollector } from './metrics.js'; import { startMetricsServer, type MetricsCollector } from './metrics.js';
import { startHealthServer } from './health.js'; import { startHealthServer } from './health.js';
import { UnifiedWorker } from './worker/index.js'; import { UnifiedWorker } from './worker/unified-worker.js';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Banner // Banner

View File

@ -8,8 +8,9 @@
*/ */
import { createTransport, type Transporter } from 'nodemailer'; import { createTransport, type Transporter } from 'nodemailer';
import { config } from '../config.js';
import { log } from '../logger.js'; import { log } from '../logger.js';
import { config } from '../config.js';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Permanent error detection // Permanent error detection

View File

@ -9,9 +9,9 @@
*/ */
import type { SQSHandler } from '../aws/sqs.js'; import type { SQSHandler } from '../aws/sqs.js';
import type { MessageProcessor } from './message-processor.js';
import type { MetricsCollector } from '../metrics.js'; import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js'; import { log } from '../logger.js';
import { MessageProcessor } from './message-processor.js';
export interface DomainPollerStats { export interface DomainPollerStats {
domain: string; domain: string;

View File

@ -19,15 +19,13 @@ import type { SESHandler } from '../aws/ses.js';
import type { DynamoDBHandler } from '../aws/dynamodb.js'; import type { DynamoDBHandler } from '../aws/dynamodb.js';
import type { EmailDelivery } from '../smtp/delivery.js'; import type { EmailDelivery } from '../smtp/delivery.js';
import type { MetricsCollector } from '../metrics.js'; import type { MetricsCollector } from '../metrics.js';
import { import type { ParsedMail } from 'mailparser';
parseEmail,
isProcessedByWorker,
BounceHandler,
RulesProcessor,
BlocklistChecker,
} from '../email/index.js';
import { domainToBucketName } from '../config.js';
import { log } from '../logger.js'; import { log } from '../logger.js';
import { BlocklistChecker } from '../email/blocklist.js';
import { BounceHandler } from '../email/bounce-handler.js';
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
import { RulesProcessor } from '../email/rules-processor.js';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Processor // Processor
@ -138,6 +136,7 @@ export class MessageProcessor {
let finalRawBytes = rawBytes; let finalRawBytes = rawBytes;
let fromAddrFinal = fromAddr; let fromAddrFinal = fromAddr;
let isBounce = false; let isBounce = false;
let parsedFinal: ParsedMail | null = null; // <-- Hier deklarieren
try { try {
const parsed = await parseEmail(rawBytes); const parsed = await parseEmail(rawBytes);
@ -150,7 +149,6 @@ export class MessageProcessor {
subject, subject,
workerName, workerName,
); );
isBounce = bounceResult.isBounce; isBounce = bounceResult.isBounce;
finalRawBytes = bounceResult.rawBytes; finalRawBytes = bounceResult.rawBytes;
@ -168,23 +166,31 @@ export class MessageProcessor {
} }
// Re-parse after modifications for rules processing // Re-parse after modifications for rules processing
var parsedFinal = await parseEmail(finalRawBytes); parsedFinal = await parseEmail(finalRawBytes);
} catch (err: any) { } catch (err: any) {
log( log(
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`, `⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original RAW mail without rules.`,
'WARNING', 'WARNING',
workerName, workerName,
); );
log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName); log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName);
fromAddrFinal = fromAddr; fromAddrFinal = fromAddr;
isBounce = false; isBounce = false;
var parsedFinal = await parseEmail(rawBytes); parsedFinal = null; // <-- GANZ WICHTIG: Kein erneuter Parse-Versuch!
} }
// 6. BLOCKLIST CHECK // 6. BLOCKLIST CHECK
const sendersToCheck: string[] = [];
if (fromAddrFinal) sendersToCheck.push(fromAddrFinal);
const headerFrom = parsedFinal?.from?.text;
if (headerFrom && !sendersToCheck.includes(headerFrom)) {
sendersToCheck.push(headerFrom);
}
const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders( const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders(
recipients, recipients,
fromAddrFinal, sendersToCheck, // <-- Array übergeben
workerName, workerName,
); );
@ -210,7 +216,7 @@ export class MessageProcessor {
} }
// Process rules (OOO, Forwarding) — not for bounces or already forwarded // Process rules (OOO, Forwarding) — not for bounces or already forwarded
if (!isBounce && !skipRules) { if (!isBounce && !skipRules && parsedFinal !== null) {
const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => { const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => {
if (action === 'autoreply') this.metrics?.incrementAutoreply(dom); if (action === 'autoreply') this.metrics?.incrementAutoreply(dom);
else if (action === 'forward') this.metrics?.incrementForward(dom); else if (action === 'forward') this.metrics?.incrementForward(dom);

View File

@ -8,17 +8,22 @@
* - Graceful shutdown * - Graceful shutdown
*/ */
import { S3Handler, SQSHandler, SESHandler, DynamoDBHandler } from '../aws/index.js'; import { DynamoDBHandler } from '../aws/dynamodb';
import { EmailDelivery } from '../smtp/index.js'; import { S3Handler} from '../aws/s3.js';
import { SQSHandler} from '../aws/sqs.js'
import { SESHandler } from '../aws/ses';
import { EmailDelivery } from '../smtp/delivery.js';
import { MessageProcessor } from './message-processor.js'; import { MessageProcessor } from './message-processor.js';
import { DomainPoller, type DomainPollerStats } from './domain-poller.js'; import { DomainPoller, type DomainPollerStats } from './domain-poller.js';
import type { MetricsCollector } from '../metrics.js'; import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js'; import { log } from '../logger.js';
export class UnifiedWorker { export class UnifiedWorker {
private pollers: DomainPoller[] = []; private pollers: DomainPoller[] = [];
private processor: MessageProcessor; private processor: MessageProcessor;
private sqs: SQSHandler; private sqs: SQSHandler;
private statusInterval: NodeJS.Timeout | null = null;
constructor( constructor(
private domains: string[], private domains: string[],
@ -74,10 +79,16 @@ export class UnifiedWorker {
this.pollers.map((p) => p.stats.domain).join(', '), this.pollers.map((p) => p.stats.domain).join(', '),
'SUCCESS', 'SUCCESS',
); );
// Starte den 5-Minuten-Status-Report
this.statusInterval = setInterval(() => {
this.printStatus();
}, 5 * 60 * 1000);
} }
async stop(): Promise<void> { async stop(): Promise<void> {
log('🛑 Stopping all domain pollers...'); log('🛑 Stopping all domain pollers...');
if (this.statusInterval) clearInterval(this.statusInterval); // <-- Neue Zeile
await Promise.all(this.pollers.map((p) => p.stop())); await Promise.all(this.pollers.map((p) => p.stop()));
log('✅ All pollers stopped.'); log('✅ All pollers stopped.');
} }
@ -99,4 +110,25 @@ export class UnifiedWorker {
return { totalProcessed, totalErrors, domains }; return { totalProcessed, totalErrors, domains };
} }
private printStatus(): void {
const stats = this.getStats();
// Zähle aktive Poller
const activePollers = this.pollers.filter((p) => p.stats.running).length;
const totalPollers = this.pollers.length;
// Formatiere die Domain-Statistiken (z.B. hotshpotshga:1)
const domainStats = stats.domains
.map((d) => {
const shortName = d.domain.split('.')[0].substring(0, 12);
return `${shortName}:${d.processed}`;
})
.join(' | ');
log(
`📊 Status: ${activePollers}/${totalPollers} active, total:${stats.totalProcessed} | ${domainStats}`,
'INFO',
'unified-worker'
);
}
} }

View File

@ -0,0 +1,36 @@
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
restart: unless-stopped
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
ports:
- "9091:9090"
extra_hosts:
- "host.docker.internal:host-gateway" # Damit er deinen Worker findet
grafana:
image: grafana/grafana:latest
container_name: grafana
restart: unless-stopped
volumes:
- grafana_data:/var/lib/grafana
ports:
- "4000:3000"
depends_on:
- prometheus
blackbox_exporter:
image: prom/blackbox-exporter:latest
container_name: blackbox_exporter
restart: unless-stopped
ports:
- "9115:9115"
extra_hosts: # <-- Diese Zeile neu
- "host.docker.internal:host-gateway" # <-- Diese Zeile neu
volumes:
prometheus_data:
grafana_data:

25
monitoring/prometheus.yml Normal file
View File

@ -0,0 +1,25 @@
global:
scrape_interval: 15s
scrape_configs:
# 1. Scraping deines Node.js Email-Workers
- job_name: 'email-worker'
static_configs:
- targets: ['host.docker.internal:9000']
# 2. Port-Überwachung deines Mailservers (IMAP 993 & POP3 995)
- job_name: 'mailserver_ports'
metrics_path: /probe
params:
module: [tcp_connect] # Prüft nur, ob der TCP-Port offen ist
static_configs:
- targets:
- host.docker.internal:993 # IMAPS
- host.docker.internal:995 # POP3S
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: blackbox_exporter:9115 # Der Exporter führt den Check aus