From a70ae78a93a0b25ed64ec3fbc5132a4b62639def Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Sat, 7 Mar 2026 11:56:54 -0600 Subject: [PATCH] Patch for blocklist --- email-worker/email_processing/blocklist.py | 42 ++++++++++++---------- email-worker/worker.py | 29 ++++++++++----- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/email-worker/email_processing/blocklist.py b/email-worker/email_processing/blocklist.py index b29bffd..a5dea0c 100644 --- a/email-worker/email_processing/blocklist.py +++ b/email-worker/email_processing/blocklist.py @@ -18,9 +18,9 @@ class BlocklistChecker: self.dynamodb = dynamodb def is_sender_blocked( - self, - recipient: str, - sender: str, + self, + recipient: str, + sender: str, worker_name: str ) -> bool: """ @@ -54,17 +54,17 @@ class BlocklistChecker: return False def batch_check_blocked_senders( - self, - recipients: List[str], - sender: str, - worker_name: str + self, + recipients: List[str], + senders: List[str], # <-- Geändert: Erwartet nun eine Liste + worker_name: str ) -> Dict[str, bool]: """ - Batch check if sender is blocked for multiple recipients (more efficient) + Batch check if ANY of the senders are blocked for multiple recipients (more efficient) Args: recipients: List of recipient email addresses - sender: Sender email address + senders: List of sender email addresses (Envelope & Header) worker_name: Worker name for logging Returns: @@ -73,7 +73,8 @@ class BlocklistChecker: # Get all blocked patterns in one batch call patterns_by_recipient = self.dynamodb.batch_get_blocked_patterns(recipients) - sender_clean = parseaddr(sender)[1].lower() + # Alle übergebenen Adressen bereinigen + senders_clean = [parseaddr(s)[1].lower() for s in senders if s] result = {} for recipient in recipients: @@ -81,15 +82,18 @@ class BlocklistChecker: is_blocked = False for pattern in patterns: - if fnmatch.fnmatch(sender_clean, pattern.lower()): - log( - f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' " - f"for inbox {recipient}", - 'WARNING', - worker_name - ) - is_blocked = True - break + for sender_clean in senders_clean: + if fnmatch.fnmatch(sender_clean, pattern.lower()): + log( + f"⛔ BLOCKED: Sender {sender_clean} matches pattern '{pattern}' " + f"for inbox {recipient}", + 'WARNING', + worker_name + ) + is_blocked = True + break # Bricht die Senders-Schleife ab + if is_blocked: + break # Bricht die Pattern-Schleife ab result[recipient] = is_blocked diff --git a/email-worker/worker.py b/email-worker/worker.py index afa5885..29e27a5 100644 --- a/email-worker/worker.py +++ b/email-worker/worker.py @@ -4,12 +4,9 @@ Email message processing worker """ import json -import time import traceback -from typing import List, Tuple from logger import log -from config import config, domain_to_bucket_name from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler from email_processing import EmailParser, BounceHandler, RulesProcessor, BlocklistChecker from smtp.delivery import EmailDelivery @@ -17,6 +14,7 @@ from metrics.prometheus import MetricsCollector from email.parser import BytesParser # War wahrscheinlich schon da, prüfen from email.policy import compat32 # <--- NEU: Hinzufügen + class MessageProcessor: """Processes individual email messages""" @@ -105,7 +103,6 @@ class MessageProcessor: log("⚠ Warning: No recipients in event", 'WARNING', worker_name) return True - bucket = domain_to_bucket_name(domain) key = message_id # Compact single-line log for email processing @@ -123,7 +120,7 @@ class MessageProcessor: skip_rules = self.parser.is_processed_by_worker(temp_parsed) if skip_rules: - log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) + log("🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) # 5. PARSING & BOUNCE LOGIC try: @@ -146,7 +143,7 @@ class MessageProcessor: log(f" 🔧 Sanitized malformed Message-ID via Legacy Mode: {clean_id}", 'INFO', worker_name) if self.metrics: - self.metrics.increment_bounce(domain, 'sanitized_header') + self.metrics.increment_bounce(domain, 'sanitized_header') except Exception as e_sanitize: # Sollte nicht passieren, aber wir wollen hier nicht abbrechen @@ -163,7 +160,7 @@ class MessageProcessor: # Klammern entfernen, aber spitze Klammern behalten clean_id = current_msg_id.replace('[', '').replace(']', '') parsed.replace_header('Message-ID', clean_id) - log(f" 🔧 Sanitized malformed Message-ID", 'INFO', worker_name) + log(" 🔧 Sanitized malformed Message-ID", 'INFO', worker_name) # --- FIX END --- subject = parsed.get('Subject', '(no subject)') @@ -203,9 +200,25 @@ class MessageProcessor: skip_rules = False # 6. BLOCKLIST CHECK (Batch for efficiency) + senders_to_check = [] + + # 1. Die Envelope-Adresse (aus dem SES Event / Return-Path) + if from_addr: + senders_to_check.append(from_addr) + + # 2. Die echte Header-Adresse (aus der MIME-E-Mail geparst) + header_from = parsed.get('From') + if header_from and header_from not in senders_to_check: + senders_to_check.append(header_from) + + # 3. Falls die Bounce-Logik die Adresse umgeschrieben hat + if from_addr_final and from_addr_final not in senders_to_check: + senders_to_check.append(from_addr_final) + + # Prüfe nun alle extrahierten Adressen gegen die Datenbank blocked_by_recipient = self.blocklist.batch_check_blocked_senders( recipients, - from_addr_final, + senders_to_check, # <-- Übergabe der Liste worker_name )