From 22eadee4cd01ce21de48104f3707d931cde0c244 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Wed, 17 Sep 2025 17:38:49 -0500 Subject: [PATCH] retry mechanism --- caddy/Caddyfile | 2 +- ses-lambda-new-python/lambda_function.py | 240 ++++++++++++++++++----- 2 files changed, 188 insertions(+), 54 deletions(-) diff --git a/caddy/Caddyfile b/caddy/Caddyfile index 9f27d93..11332e6 100644 --- a/caddy/Caddyfile +++ b/caddy/Caddyfile @@ -108,7 +108,7 @@ iitwelders.bayarea-cc.com { reverse_proxy host.docker.internal:8080 } log { - output file /var/log/caddy/gregknoppcpa.log + output file /var/log/caddy/iitwelders.log format console } encode gzip diff --git a/ses-lambda-new-python/lambda_function.py b/ses-lambda-new-python/lambda_function.py index 738dca5..2dcdae3 100644 --- a/ses-lambda-new-python/lambda_function.py +++ b/ses-lambda-new-python/lambda_function.py @@ -3,6 +3,7 @@ import boto3 import smtplib import time import traceback +import json from email.parser import BytesParser from email.policy import default from email.utils import getaddresses @@ -19,6 +20,10 @@ SMTP_PASS = os.environ.get('SMTP_PASS') or os.environ.get('MAILCOW_SMTP_PASS') PROCESSED_META_KEY = os.environ.get('PROCESSED_META_KEY', 'processed') PROCESSED_META_VALUE = os.environ.get('PROCESSED_META_VALUE', 'true') +# Retry configuration +MAX_RETRIES = int(os.environ.get('MAX_SMTP_RETRIES', '3')) +RETRY_DELAYS = [1, 5, 15] # Sekunden zwischen Versuchen + def domain_to_bucket(domain: str) -> str: return domain.replace('.', '-') + '-emails' @@ -54,6 +59,159 @@ def mark_object_processed(bucket: str, key: str): print("Failed to mark processed metadata:", e) traceback.print_exc() +def update_retry_metadata(bucket: str, key: str, retry_count: int, last_error: str = None): + """Update S3 object metadata with retry information""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + current_metadata = head.get('Metadata', {}) or {} + new_meta = current_metadata.copy() + new_meta['retry_count'] = str(retry_count) + new_meta['last_retry'] = str(int(time.time())) + if last_error: + # S3 metadata values must be ASCII, so we encode the error + new_meta['last_error'] = last_error[:255].replace('\n', ' ') + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_meta, + MetadataDirective='REPLACE' + ) + print(f"Updated retry metadata for {bucket}/{key}: retry_count={retry_count}") + except Exception as e: + print(f"Failed to update retry metadata: {e}") + +def get_retry_count(bucket: str, key: str) -> int: + """Get current retry count from S3 metadata""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + return int(metadata.get('retry_count', '0')) + except Exception: + return 0 + +def is_temporary_smtp_error(error_code): + """Check if SMTP error code indicates a temporary failure (4xx)""" + if isinstance(error_code, tuple) and len(error_code) >= 1: + code = error_code[0] + if isinstance(code, int): + return 400 <= code < 500 + return False + +def send_email_with_retry(smtp_host, smtp_port, smtp_user, smtp_pass, + frm_addr, recipients, raw_message, local_helo, + max_retries=MAX_RETRIES): + """Send email with retry logic for temporary failures""" + + delivered = [] + refused = {} + last_error = None + + for attempt in range(max_retries + 1): + if attempt > 0: + delay = RETRY_DELAYS[min(attempt - 1, len(RETRY_DELAYS) - 1)] + print(f"Retry attempt {attempt}/{max_retries} after {delay}s delay...") + time.sleep(delay) + + try: + with smtplib.SMTP(smtp_host, smtp_port, timeout=30, local_hostname=local_helo) as smtp: + smtp.ehlo() + + # Try STARTTLS + try: + smtp.starttls() + smtp.ehlo() + except Exception as e: + print("STARTTLS not available or failed (continuing):", e) + + # Login if credentials provided + if smtp_user and smtp_pass: + try: + smtp.login(smtp_user, smtp_pass) + except Exception as e: + print("SMTP login failed (continuing):", e) + + # Attempt to send + try: + send_result = smtp.sendmail(frm_addr, recipients, raw_message) + + if isinstance(send_result, dict): + # Separate temporary and permanent failures + temp_refused = {} + perm_refused = {} + + for rcpt, error in send_result.items(): + if is_temporary_smtp_error(error): + temp_refused[rcpt] = error + else: + perm_refused[rcpt] = error + + # If we have temporary failures and more retries, continue + if temp_refused and attempt < max_retries: + print(f"Temporary failures for {list(temp_refused.keys())}, will retry...") + recipients = list(temp_refused.keys()) # Only retry temporary failures + refused = perm_refused # Keep track of permanent failures + last_error = str(temp_refused) + continue + else: + # No temporary failures or no more retries + refused = send_result + delivered = [r for r in recipients if r not in refused] + break + else: + # All delivered successfully + delivered = recipients[:] + refused = {} + break + + except smtplib.SMTPRecipientsRefused as e: + print(f"SMTPRecipientsRefused on attempt {attempt + 1}: {e}") + + # Check if all are temporary failures + temp_refused = {} + perm_refused = {} + + for rcpt, error in e.recipients.items(): + if is_temporary_smtp_error(error): + temp_refused[rcpt] = error + else: + perm_refused[rcpt] = error + + if temp_refused and attempt < max_retries: + recipients = list(temp_refused.keys()) + refused = perm_refused + last_error = str(e) + continue + else: + refused = e.recipients + delivered = [r for r in recipients if r not in refused] + break + + except Exception as e: + print(f"SMTP sendmail error on attempt {attempt + 1}: {e}") + if attempt < max_retries: + last_error = str(e) + continue + else: + traceback.print_exc() + refused = {r: ('error', str(e)) for r in recipients} + delivered = [] + break + + except Exception as e: + print(f"Error connecting to SMTP host on attempt {attempt + 1}: {e}") + if attempt < max_retries: + last_error = str(e) + continue + else: + traceback.print_exc() + refused = {r: ('connect-error', str(e)) for r in recipients} + delivered = [] + break + + return delivered, refused + def lambda_handler(event, context): print("Event:", event) ses = None @@ -93,16 +251,21 @@ def lambda_handler(event, context): else: raise Exception("Unknown event type") - # Check if already processed (only to avoid unnecessary work; we still honor processed semantics) + # Check if already processed try: head = s3.head_object(Bucket=bucket, Key=key) if head.get('Metadata', {}).get(PROCESSED_META_KEY) == PROCESSED_META_VALUE: print(f"Object {key} in {bucket} already processed. Exiting.") return {'statusCode': 200, 'body': 'already processed'} except Exception as e: - # If head_object fails, continue and try to process (log for debugging) print("head_object error (continuing):", e) + # Check retry count - if too many retries, give up + retry_count = get_retry_count(bucket, key) + if retry_count >= MAX_RETRIES * 2: # Safety limit + print(f"Object {key} has been retried {retry_count} times, giving up") + return {'statusCode': 200, 'body': f'max retries exceeded ({retry_count})'} + # Get raw mail bytes resp = s3.get_object(Bucket=bucket, Key=key) raw_bytes = resp['Body'].read() @@ -115,7 +278,6 @@ def lambda_handler(event, context): from_addrs = getaddresses(parsed.get_all('from', []) or []) frm_addr = from_addrs[0][1] if from_addrs else None if not frm_addr: - # fallback: try envelope sender if present in SES event frm_addr = (ses['mail'].get('source') if ses else None) or ('noreply@' + (bucket_to_domain(bucket) if bucket else 'localhost')) print(f"From: {frm_addr}, Subject: {subj}") @@ -139,64 +301,31 @@ def lambda_handler(event, context): delivered = [] refused = {} if recipients: - # WICHTIG: Verwenden Sie die rohen Bytes direkt, NICHT dekodieren! - # smtplib.sendmail() akzeptiert sowohl bytes als auch strings - # Bei bytes wird keine weitere Kodierung vorgenommen + # Use raw bytes directly (no decoding!) raw_message = raw_bytes - # Determine a sensible local hostname (HELO). Prefer explicit ENV, else use domain of bucket (recipient domain). + # Determine HELO hostname env_local = os.environ.get('SMTP_LOCAL_HOSTNAME') derived_local = bucket_to_domain(bucket) if bucket else None local_helo = env_local or derived_local or 'localhost' print(f"Attempting SMTP send to {len(recipients)} recipients via {SMTP_HOST}:{SMTP_PORT} with local_hostname={local_helo}") start = time.time() - try: - # Pass local_hostname so the server receives a proper FQDN in HELO/EHLO - with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30, local_hostname=local_helo) as smtp: - smtp.ehlo() - # Try STARTTLS; if it fails, continue (server might be implicit TLS port) - try: - smtp.starttls() - smtp.ehlo() - except Exception as e: - print("STARTTLS not available or failed (continuing):", e) - - if SMTP_USER and SMTP_PASS: - try: - smtp.login(SMTP_USER, SMTP_PASS) - except Exception as e: - print("SMTP login failed (continuing):", e) - - try: - # Sende die rohen Bytes direkt - send_result = smtp.sendmail(frm_addr, recipients, raw_message) - # sendmail returns dict of refused recipients - if isinstance(send_result, dict): - refused = send_result - delivered = [r for r in recipients if r not in refused] - else: - # Unexpected but treat all as delivered - delivered = recipients[:] - refused = {} - except smtplib.SMTPRecipientsRefused as e: - print("SMTPRecipientsRefused:", e) - try: - refused = e.recipients - except Exception: - refused = {r: ('550', 'Recipient refused') for r in recipients} - delivered = [r for r in recipients if r not in refused] - except Exception as e: - print("SMTP sendmail error:", e) - traceback.print_exc() # Mehr Details für Debugging - refused = {r: ('error', str(e)) for r in recipients} - delivered = [] - except Exception as e: - print("Error connecting to SMTP host:", e) - traceback.print_exc() # Mehr Details für Debugging - refused = {r: ('connect-error', str(e)) for r in recipients} - delivered = [] + + # Send with retry logic + delivered, refused = send_email_with_retry( + SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, + frm_addr, recipients, raw_message, local_helo, + max_retries=MAX_RETRIES + ) + print(f"SMTP completed in {time.time()-start:.2f}s delivered={delivered} refused={refused}") + + # Update retry count if we had temporary failures + if refused and not delivered: + temp_failures = [r for r, e in refused.items() if is_temporary_smtp_error(e)] + if temp_failures: + update_retry_metadata(bucket, key, retry_count + 1, str(refused)) else: print("No recipients to send to; skipping SMTP.") @@ -211,5 +340,10 @@ def lambda_handler(event, context): return { 'statusCode': 200, - 'body': f"processed={bool(delivered)}, delivered={delivered}, refused_count={len(refused)}" + 'body': json.dumps({ + 'processed': bool(delivered), + 'delivered': delivered, + 'refused_count': len(refused), + 'retry_count': retry_count + }) } \ No newline at end of file