From 7f9ac1c9e6f8307ef7cbe6ed27f55c59e3e44c87 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Tue, 13 Jan 2026 21:47:54 -0600 Subject: [PATCH] avoid loops --- unified-worker/unified_worker.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index 1d4cb0d..921a333 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -468,7 +468,8 @@ def create_ooo_reply(original_parsed, recipient: str, ooo_msg: str, content_type msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['In-Reply-To'] = original_parsed.get('Message-ID', '') msg['References'] = original_parsed.get('Message-ID', '') - msg['Auto-Submitted'] = 'auto-replied' # Verhindert Loops + msg['Auto-Submitted'] = 'auto-replied' # Standard header für Auto-Replies + msg['X-SES-Worker-Processed'] = 'ooo-reply' # Unser Loop-Prevention Header # Body-Teil erstellen body_part = MIMEMultipart('alternative') @@ -515,6 +516,7 @@ def create_forward_message(original_parsed, recipient: str, forward_to: str, ori msg['Date'] = formatdate(localtime=True) msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1]) msg['Reply-To'] = original_from + msg['X-SES-Worker-Processed'] = 'forwarded' # Unser Loop-Prevention Header # Forward-Header als Text text_body, html_body = extract_body_parts(original_parsed) @@ -885,6 +887,25 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() + + # LOOP DETECTION: Check if this message was already processed by our worker + # We use a unique header that only our worker sets + temp_parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + + # Check for OUR unique loop prevention header + x_worker_processed = temp_parsed.get('X-SES-Worker-Processed', '') + auto_submitted = temp_parsed.get('Auto-Submitted', '') + + # Only skip if OUR header is present (not generic headers that others might set) + is_processed_by_us = bool(x_worker_processed) + is_our_auto_reply = auto_submitted == 'auto-replied' and x_worker_processed + + if is_processed_by_us: + log(f"🔄 Loop prevention: Already processed by worker", 'INFO', worker_name) + skip_rules = True + else: + skip_rules = False + except s3.exceptions.NoSuchKey: if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) @@ -929,9 +950,10 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING', worker_name) from_addr_final = from_addr is_bounce = False + skip_rules = False # Default if parsing failed - # 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces) - if not is_bounce: + # 5. OOO & FORWARD LOGIC (vor SMTP-Versand, nicht für Bounces oder bereits weitergeleitete) + if not is_bounce and not skip_rules: for recipient in recipients: process_rules_for_recipient(recipient, parsed, domain, worker_name) @@ -1282,4 +1304,4 @@ def main(): worker.start() if __name__ == '__main__': - main() \ No newline at end of file + main()