diff --git a/worker.py b/worker.py index 0b2d1c5..ee57a74 100644 --- a/worker.py +++ b/worker.py @@ -70,7 +70,7 @@ def get_queue_url() -> str: def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): """ Markiert E-Mail als erfolgreich zugestellt - Setzt processed=true auch wenn manche Recipients fehlgeschlagen sind + Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war """ try: head = s3.head_object(Bucket=bucket, Key=key) @@ -102,6 +102,37 @@ def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None): log(f"Failed to mark as processed: {e}", 'WARNING') +def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list): + """ + Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind + """ + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = WORKER_NAME + metadata['status'] = 'failed' + metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' + metadata['invalid_inboxes'] = ','.join(invalid_inboxes) + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS') + + except Exception as e: + log(f"Failed to mark as all invalid: {e}", 'WARNING') + + def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): """ Markiert E-Mail als komplett fehlgeschlagen @@ -301,8 +332,8 @@ def process_message(message_body: dict, receive_count: int) -> bool: # Entscheidungslogik if len(successful) > 0: - # Mindestens 1 Recipient erfolgreich - # → processed=true setzen, invalid_inboxes tracken + # ✅ Fall 1: Mindestens 1 Recipient erfolgreich + # → status=delivered, invalid_inboxes tracken invalid_inboxes = failed_permanent if failed_permanent else None mark_as_processed(bucket, key, invalid_inboxes) @@ -318,19 +349,20 @@ def process_message(message_body: dict, receive_count: int) -> bool: return True # Message löschen elif len(failed_permanent) == len(recipients): - # ALLE Recipients sind permanent fehlgeschlagen (alle Inboxen ungültig) - # → processed=true setzen mit allen als invalid_inboxes + # ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig) + # → status=failed, invalid_inboxes = ALLE - mark_as_processed(bucket, key, failed_permanent) + mark_as_all_invalid(bucket, key, failed_permanent) log(f"{'='*70}") - log(f"✗ All recipients permanently failed (invalid inboxes)", 'ERROR') + log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR') + log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR') log(f"{'='*70}\n") return True # Message löschen (nicht retryable) else: - # Nur temporäre Fehler, keine erfolgreichen Deliveries + # ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries # → Retry wenn noch Versuche übrig if receive_count < 3: