Compare commits
13 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
d331bd13b5 | |
|
|
610b01eee7 | |
|
|
c2d4903bc9 | |
|
|
613aa30493 | |
|
|
29f360ece8 | |
|
|
62221e8121 | |
|
|
74c4f5801e | |
|
|
90b120957d | |
|
|
99ab2a07d8 | |
|
|
d9a91c13ed | |
|
|
1d53f2d357 | |
|
|
9586869c0c | |
|
|
d1426afec5 |
|
|
@ -24,3 +24,8 @@ COPY sieve-schedule /etc/sieve-schedule
|
|||
|
||||
# 5. Supervisor Konfiguration kopieren
|
||||
COPY sieve-supervisor.conf /etc/supervisor/conf.d/sieve-sync.conf
|
||||
|
||||
# 6. Dynamic Whitelist Script und Supervisor-Config kopieren
|
||||
COPY dynamic_whitelist.py /scripts/dynamic_whitelist.py
|
||||
RUN chmod +x /scripts/dynamic_whitelist.py
|
||||
COPY whitelist-supervisor.conf /etc/supervisor/conf.d/dynamic-whitelist.conf
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import subprocess
|
||||
import threading
|
||||
from datetime import datetime
|
||||
try:
|
||||
from croniter import croniter
|
||||
except ImportError:
|
||||
print("Bitte 'croniter' via pip installieren!")
|
||||
exit(1)
|
||||
|
||||
LOG_FILE = '/var/log/mail/mail.log'
|
||||
WHITELIST_DURATION_SEC = 24 * 60 * 60 # 24 Stunden
|
||||
CRON_SCHEDULE = "0 * * * *" # Jede Stunde
|
||||
|
||||
active_ips = {}
|
||||
|
||||
# Regex für Dovecot IMAP/POP3 erfolgreiche Logins
|
||||
LOGIN_REGEX = re.compile(r"dovecot: (?:imap|pop3)-login: Login: user=<[^>]+>.*rip=([0-9]{1,3}(?:\.[0-9]{1,3}){3}),")
|
||||
# Private Netze (Docker/Local) ignorieren
|
||||
IGNORE_REGEX = re.compile(r"^(172\.|10\.|192\.168\.|127\.)")
|
||||
|
||||
def run_command(cmd):
|
||||
try:
|
||||
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
except Exception as e:
|
||||
print(f"Fehler bei: {cmd} - {e}")
|
||||
|
||||
def cleanup_job():
|
||||
"""Cron-Thread für das stündliche Aufräumen abgelaufener IPs."""
|
||||
iter = croniter(CRON_SCHEDULE, datetime.now())
|
||||
while True:
|
||||
next_run = iter.get_next(datetime)
|
||||
sleep_seconds = (next_run - datetime.now()).total_seconds()
|
||||
|
||||
if sleep_seconds > 0:
|
||||
time.sleep(sleep_seconds)
|
||||
|
||||
print(f"[{datetime.now()}] Starte stündlichen Whitelist-Cleanup...")
|
||||
now = time.time()
|
||||
expired_ips = [ip for ip, timestamp in active_ips.items() if now - timestamp > WHITELIST_DURATION_SEC]
|
||||
|
||||
for ip in expired_ips:
|
||||
print(f"[{datetime.now()}] Whitelist für {ip} abgelaufen. Entferne...")
|
||||
run_command(f"fail2ban-client set dovecot delignoreip {ip}")
|
||||
run_command(f"fail2ban-client set postfix delignoreip {ip}")
|
||||
del active_ips[ip]
|
||||
|
||||
def follow_log():
|
||||
"""Verwendet System 'tail -F', da dies Log-Rotation automatisch handhabt."""
|
||||
print(f"[{datetime.now()}] Dynamic Whitelist Monitor gestartet...")
|
||||
|
||||
while not os.path.exists(LOG_FILE):
|
||||
time.sleep(2)
|
||||
|
||||
process = subprocess.Popen(['tail', '-F', LOG_FILE], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
|
||||
|
||||
for line in process.stdout:
|
||||
match = LOGIN_REGEX.search(line)
|
||||
if match:
|
||||
ip = match.group(1)
|
||||
|
||||
if IGNORE_REGEX.match(ip):
|
||||
continue
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Neue IP in die Fail2ban Whitelist eintragen
|
||||
if ip not in active_ips:
|
||||
print(f"[{datetime.now()}] Neuer erfolgreicher Login von {ip}. Setze auf Whitelist...")
|
||||
run_command(f"fail2ban-client set dovecot addignoreip {ip}")
|
||||
run_command(f"fail2ban-client set postfix addignoreip {ip}")
|
||||
|
||||
# Timestamp (Last Seen) aktualisieren
|
||||
active_ips[ip] = now
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Warte kurz, bis Fail2ban nach einem Container-Start hochgefahren ist
|
||||
time.sleep(15)
|
||||
|
||||
# Cron-Cleanup im Hintergrund starten
|
||||
threading.Thread(target=cleanup_job, daemon=True).start()
|
||||
|
||||
# Log-Überwachung in der Endlosschleife starten
|
||||
follow_log()
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
[program:dynamic-whitelist]
|
||||
command=/usr/bin/python3 -u /scripts/dynamic_whitelist.py
|
||||
autostart=true
|
||||
autorestart=true
|
||||
stderr_logfile=/var/log/supervisor/dynamic-whitelist.err.log
|
||||
stdout_logfile=/var/log/supervisor/dynamic-whitelist.out.log
|
||||
|
|
@ -8,10 +8,10 @@ services:
|
|||
env_file: .env
|
||||
volumes:
|
||||
- ./domains.txt:/etc/email-worker/domains.txt:ro
|
||||
- worker-logs:/var/log/email-worker
|
||||
- ./logs:/var/log/email-worker
|
||||
ports:
|
||||
- "8000:8000" # Prometheus metrics
|
||||
- "8080:8080" # Health check
|
||||
- "9000:8000" # Prometheus metrics (Host:Container)
|
||||
- "9090:8080" # Health check (Host:Container)
|
||||
# Connect to DMS on the host or Docker network
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -26,15 +26,15 @@ export class BlocklistChecker {
|
|||
* Batch-check whether a sender is blocked for each recipient.
|
||||
* Uses a single batch DynamoDB call for efficiency.
|
||||
*/
|
||||
async batchCheckBlockedSenders(
|
||||
async batchCheckBlockedSenders(
|
||||
recipients: string[],
|
||||
sender: string,
|
||||
senders: string[], // <-- Geändert zu Array
|
||||
workerName: string,
|
||||
): Promise<Record<string, boolean>> {
|
||||
const patternsByRecipient =
|
||||
await this.dynamodb.batchGetBlockedPatterns(recipients);
|
||||
const patternsByRecipient = await this.dynamodb.batchGetBlockedPatterns(recipients);
|
||||
|
||||
const senderClean = extractAddress(sender);
|
||||
// Alle übergebenen Adressen bereinigen
|
||||
const sendersClean = senders.map(s => extractAddress(s)).filter(Boolean);
|
||||
const result: Record<string, boolean> = {};
|
||||
|
||||
for (const recipient of recipients) {
|
||||
|
|
@ -42,21 +42,21 @@ export class BlocklistChecker {
|
|||
let isBlocked = false;
|
||||
|
||||
for (const pattern of patterns) {
|
||||
if (picomatch.isMatch(senderClean, pattern.toLowerCase())) {
|
||||
log(
|
||||
`⛔ BLOCKED: Sender ${senderClean} matches pattern '${pattern}' ` +
|
||||
`for inbox ${recipient}`,
|
||||
'WARNING',
|
||||
workerName,
|
||||
);
|
||||
isBlocked = true;
|
||||
break;
|
||||
for (const senderClean of sendersClean) {
|
||||
if (picomatch.isMatch(senderClean, pattern.toLowerCase())) {
|
||||
log(
|
||||
`⛔ BLOCKED: Sender ${senderClean} matches pattern '${pattern}' for inbox ${recipient}`,
|
||||
'WARNING',
|
||||
workerName,
|
||||
);
|
||||
isBlocked = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (isBlocked) break;
|
||||
}
|
||||
|
||||
result[recipient] = isBlocked;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -9,13 +9,13 @@
|
|||
|
||||
import { createTransport } from 'nodemailer';
|
||||
import type { ParsedMail } from 'mailparser';
|
||||
import type { DynamoDBHandler, EmailRule } from '../aws/dynamodb.js';
|
||||
import type { SESHandler } from '../aws/ses.js';
|
||||
import { extractBodyParts } from './parser.js';
|
||||
import { config, isInternalAddress } from '../config.js';
|
||||
import { log } from '../logger.js';
|
||||
// Wir nutzen MailComposer direkt für das Erstellen der Raw Bytes
|
||||
import MailComposer from 'nodemailer/lib/mail-composer/index.js';
|
||||
import { DynamoDBHandler, EmailRule } from '../aws/dynamodb.js';
|
||||
import { config, isInternalAddress } from '../config.js';
|
||||
|
||||
export type MetricsCallback = (action: 'autoreply' | 'forward', domain: string) => void;
|
||||
|
||||
|
|
@ -103,12 +103,28 @@ function ensureFileStream(): WriteStream | null {
|
|||
// ---------------------------------------------------------------------------
|
||||
const logger = pino({
|
||||
level: 'info',
|
||||
formatters: {
|
||||
level(label) {
|
||||
return { level: label };
|
||||
},
|
||||
},
|
||||
timestamp: pino.stdTimeFunctions.isoTime,
|
||||
transport: {
|
||||
targets: [
|
||||
{
|
||||
// 1. Schicke bunte Logs in die Konsole (für docker compose logs -f)
|
||||
target: 'pino-pretty',
|
||||
options: {
|
||||
colorize: true,
|
||||
translateTime: 'SYS:yyyy-mm-dd HH:MM:ss',
|
||||
ignore: 'pid,hostname',
|
||||
singleLine: true
|
||||
}
|
||||
},
|
||||
{
|
||||
// 2. Schreibe gleichzeitig alles unformatiert in die Datei
|
||||
target: 'pino/file',
|
||||
options: {
|
||||
destination: '/var/log/email-worker/worker.log',
|
||||
mkdir: true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -13,7 +13,7 @@ import { config, loadDomains } from './config.js';
|
|||
import { log } from './logger.js';
|
||||
import { startMetricsServer, type MetricsCollector } from './metrics.js';
|
||||
import { startHealthServer } from './health.js';
|
||||
import { UnifiedWorker } from './worker/index.js';
|
||||
import { UnifiedWorker } from './worker/unified-worker.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Banner
|
||||
|
|
@ -8,8 +8,9 @@
|
|||
*/
|
||||
|
||||
import { createTransport, type Transporter } from 'nodemailer';
|
||||
import { config } from '../config.js';
|
||||
|
||||
import { log } from '../logger.js';
|
||||
import { config } from '../config.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Permanent error detection
|
||||
|
|
@ -9,9 +9,9 @@
|
|||
*/
|
||||
|
||||
import type { SQSHandler } from '../aws/sqs.js';
|
||||
import type { MessageProcessor } from './message-processor.js';
|
||||
import type { MetricsCollector } from '../metrics.js';
|
||||
import { log } from '../logger.js';
|
||||
import { MessageProcessor } from './message-processor.js';
|
||||
|
||||
export interface DomainPollerStats {
|
||||
domain: string;
|
||||
|
|
@ -19,15 +19,13 @@ import type { SESHandler } from '../aws/ses.js';
|
|||
import type { DynamoDBHandler } from '../aws/dynamodb.js';
|
||||
import type { EmailDelivery } from '../smtp/delivery.js';
|
||||
import type { MetricsCollector } from '../metrics.js';
|
||||
import {
|
||||
parseEmail,
|
||||
isProcessedByWorker,
|
||||
BounceHandler,
|
||||
RulesProcessor,
|
||||
BlocklistChecker,
|
||||
} from '../email/index.js';
|
||||
import { domainToBucketName } from '../config.js';
|
||||
import type { ParsedMail } from 'mailparser';
|
||||
|
||||
import { log } from '../logger.js';
|
||||
import { BlocklistChecker } from '../email/blocklist.js';
|
||||
import { BounceHandler } from '../email/bounce-handler.js';
|
||||
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
|
||||
import { RulesProcessor } from '../email/rules-processor.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Processor
|
||||
|
|
@ -138,6 +136,7 @@ export class MessageProcessor {
|
|||
let finalRawBytes = rawBytes;
|
||||
let fromAddrFinal = fromAddr;
|
||||
let isBounce = false;
|
||||
let parsedFinal: ParsedMail | null = null; // <-- Hier deklarieren
|
||||
|
||||
try {
|
||||
const parsed = await parseEmail(rawBytes);
|
||||
|
|
@ -150,7 +149,6 @@ export class MessageProcessor {
|
|||
subject,
|
||||
workerName,
|
||||
);
|
||||
|
||||
isBounce = bounceResult.isBounce;
|
||||
finalRawBytes = bounceResult.rawBytes;
|
||||
|
||||
|
|
@ -168,23 +166,31 @@ export class MessageProcessor {
|
|||
}
|
||||
|
||||
// Re-parse after modifications for rules processing
|
||||
var parsedFinal = await parseEmail(finalRawBytes);
|
||||
parsedFinal = await parseEmail(finalRawBytes);
|
||||
} catch (err: any) {
|
||||
log(
|
||||
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`,
|
||||
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original RAW mail without rules.`,
|
||||
'WARNING',
|
||||
workerName,
|
||||
);
|
||||
log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName);
|
||||
fromAddrFinal = fromAddr;
|
||||
isBounce = false;
|
||||
var parsedFinal = await parseEmail(rawBytes);
|
||||
parsedFinal = null; // <-- GANZ WICHTIG: Kein erneuter Parse-Versuch!
|
||||
}
|
||||
|
||||
// 6. BLOCKLIST CHECK
|
||||
const sendersToCheck: string[] = [];
|
||||
if (fromAddrFinal) sendersToCheck.push(fromAddrFinal);
|
||||
|
||||
const headerFrom = parsedFinal?.from?.text;
|
||||
if (headerFrom && !sendersToCheck.includes(headerFrom)) {
|
||||
sendersToCheck.push(headerFrom);
|
||||
}
|
||||
|
||||
const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders(
|
||||
recipients,
|
||||
fromAddrFinal,
|
||||
sendersToCheck, // <-- Array übergeben
|
||||
workerName,
|
||||
);
|
||||
|
||||
|
|
@ -210,7 +216,7 @@ export class MessageProcessor {
|
|||
}
|
||||
|
||||
// Process rules (OOO, Forwarding) — not for bounces or already forwarded
|
||||
if (!isBounce && !skipRules) {
|
||||
if (!isBounce && !skipRules && parsedFinal !== null) {
|
||||
const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => {
|
||||
if (action === 'autoreply') this.metrics?.incrementAutoreply(dom);
|
||||
else if (action === 'forward') this.metrics?.incrementForward(dom);
|
||||
|
|
@ -8,17 +8,22 @@
|
|||
* - Graceful shutdown
|
||||
*/
|
||||
|
||||
import { S3Handler, SQSHandler, SESHandler, DynamoDBHandler } from '../aws/index.js';
|
||||
import { EmailDelivery } from '../smtp/index.js';
|
||||
import { DynamoDBHandler } from '../aws/dynamodb';
|
||||
import { S3Handler} from '../aws/s3.js';
|
||||
import { SQSHandler} from '../aws/sqs.js'
|
||||
import { SESHandler } from '../aws/ses';
|
||||
import { EmailDelivery } from '../smtp/delivery.js';
|
||||
import { MessageProcessor } from './message-processor.js';
|
||||
import { DomainPoller, type DomainPollerStats } from './domain-poller.js';
|
||||
import type { MetricsCollector } from '../metrics.js';
|
||||
import { log } from '../logger.js';
|
||||
|
||||
|
||||
export class UnifiedWorker {
|
||||
private pollers: DomainPoller[] = [];
|
||||
private processor: MessageProcessor;
|
||||
private sqs: SQSHandler;
|
||||
private statusInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(
|
||||
private domains: string[],
|
||||
|
|
@ -74,10 +79,16 @@ export class UnifiedWorker {
|
|||
this.pollers.map((p) => p.stats.domain).join(', '),
|
||||
'SUCCESS',
|
||||
);
|
||||
|
||||
// Starte den 5-Minuten-Status-Report
|
||||
this.statusInterval = setInterval(() => {
|
||||
this.printStatus();
|
||||
}, 5 * 60 * 1000);
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
log('🛑 Stopping all domain pollers...');
|
||||
if (this.statusInterval) clearInterval(this.statusInterval); // <-- Neue Zeile
|
||||
await Promise.all(this.pollers.map((p) => p.stop()));
|
||||
log('✅ All pollers stopped.');
|
||||
}
|
||||
|
|
@ -99,4 +110,25 @@ export class UnifiedWorker {
|
|||
|
||||
return { totalProcessed, totalErrors, domains };
|
||||
}
|
||||
|
||||
private printStatus(): void {
|
||||
const stats = this.getStats();
|
||||
// Zähle aktive Poller
|
||||
const activePollers = this.pollers.filter((p) => p.stats.running).length;
|
||||
const totalPollers = this.pollers.length;
|
||||
|
||||
// Formatiere die Domain-Statistiken (z.B. hotshpotshga:1)
|
||||
const domainStats = stats.domains
|
||||
.map((d) => {
|
||||
const shortName = d.domain.split('.')[0].substring(0, 12);
|
||||
return `${shortName}:${d.processed}`;
|
||||
})
|
||||
.join(' | ');
|
||||
|
||||
log(
|
||||
`📊 Status: ${activePollers}/${totalPollers} active, total:${stats.totalProcessed} | ${domainStats}`,
|
||||
'INFO',
|
||||
'unified-worker'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
services:
|
||||
prometheus:
|
||||
image: prom/prometheus:latest
|
||||
container_name: prometheus
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
|
||||
- prometheus_data:/prometheus
|
||||
ports:
|
||||
- "9091:9090"
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway" # Damit er deinen Worker findet
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
container_name: grafana
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- grafana_data:/var/lib/grafana
|
||||
ports:
|
||||
- "4000:3000"
|
||||
depends_on:
|
||||
- prometheus
|
||||
|
||||
blackbox_exporter:
|
||||
image: prom/blackbox-exporter:latest
|
||||
container_name: blackbox_exporter
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "9115:9115"
|
||||
extra_hosts: # <-- Diese Zeile neu
|
||||
- "host.docker.internal:host-gateway" # <-- Diese Zeile neu
|
||||
|
||||
volumes:
|
||||
prometheus_data:
|
||||
grafana_data:
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
global:
|
||||
scrape_interval: 15s
|
||||
|
||||
scrape_configs:
|
||||
# 1. Scraping deines Node.js Email-Workers
|
||||
- job_name: 'email-worker'
|
||||
static_configs:
|
||||
- targets: ['host.docker.internal:9000']
|
||||
|
||||
# 2. Port-Überwachung deines Mailservers (IMAP 993 & POP3 995)
|
||||
- job_name: 'mailserver_ports'
|
||||
metrics_path: /probe
|
||||
params:
|
||||
module: [tcp_connect] # Prüft nur, ob der TCP-Port offen ist
|
||||
static_configs:
|
||||
- targets:
|
||||
- host.docker.internal:993 # IMAPS
|
||||
- host.docker.internal:995 # POP3S
|
||||
relabel_configs:
|
||||
- source_labels: [__address__]
|
||||
target_label: __param_target
|
||||
- source_labels: [__param_target]
|
||||
target_label: instance
|
||||
- target_label: __address__
|
||||
replacement: blackbox_exporter:9115 # Der Exporter führt den Check aus
|
||||
Loading…
Reference in New Issue