#!/usr/bin/env python3 """ S3 operations handler """ import time from typing import Optional, List import boto3 from botocore.exceptions import ClientError from logger import log from config import config, domain_to_bucket_name class S3Handler: """Handles all S3 operations""" def __init__(self): self.client = boto3.client('s3', region_name=config.aws_region) def get_email(self, domain: str, message_id: str, receive_count: int) -> Optional[bytes]: """ Download email from S3 Args: domain: Email domain message_id: SES Message ID receive_count: Number of times this message was received from queue Returns: Raw email bytes or None if not found/error """ bucket = domain_to_bucket_name(domain) try: response = self.client.get_object(Bucket=bucket, Key=message_id) return response['Body'].read() except self.client.exceptions.NoSuchKey: if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') return None else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR') raise except ClientError as e: if e.response['Error']['Code'] == 'NoSuchKey': if receive_count < 5: log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING') return None else: log(f"❌ S3 Object missing permanently after retries.", 'ERROR') raise log(f"❌ S3 Download Error: {e}", 'ERROR') raise except Exception as e: log(f"❌ S3 Download Error: {e}", 'ERROR') raise def mark_as_processed( self, domain: str, message_id: str, worker_name: str, invalid_inboxes: Optional[List[str]] = None ): """Mark email as successfully delivered""" bucket = domain_to_bucket_name(domain) try: head = self.client.head_object(Bucket=bucket, Key=message_id) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'delivered' metadata.pop('processing_started', None) metadata.pop('queued_at', None) if invalid_inboxes: metadata['invalid_inboxes'] = ','.join(invalid_inboxes) log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name) self.client.copy_object( Bucket=bucket, Key=message_id, CopySource={'Bucket': bucket, 'Key': message_id}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) def mark_as_all_invalid( self, domain: str, message_id: str, invalid_inboxes: List[str], worker_name: str ): """Mark email as failed because all recipients are invalid""" bucket = domain_to_bucket_name(domain) try: head = self.client.head_object(Bucket=bucket, Key=message_id) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'failed' metadata['error'] = 'All recipients are invalid (mailboxes do not exist)' metadata['invalid_inboxes'] = ','.join(invalid_inboxes) metadata.pop('processing_started', None) metadata.pop('queued_at', None) self.client.copy_object( Bucket=bucket, Key=message_id, CopySource={'Bucket': bucket, 'Key': message_id}, Metadata=metadata, MetadataDirective='REPLACE' ) except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) def mark_as_blocked( self, domain: str, message_id: str, blocked_recipients: List[str], sender: str, worker_name: str ): """ Mark email as blocked by sender blacklist This sets metadata BEFORE deletion for audit trail """ bucket = domain_to_bucket_name(domain) try: head = self.client.head_object(Bucket=bucket, Key=message_id) metadata = head.get('Metadata', {}) or {} metadata['processed'] = 'true' metadata['processed_at'] = str(int(time.time())) metadata['processed_by'] = worker_name metadata['status'] = 'blocked' metadata['blocked_recipients'] = ','.join(blocked_recipients) metadata['blocked_sender'] = sender metadata.pop('processing_started', None) metadata.pop('queued_at', None) self.client.copy_object( Bucket=bucket, Key=message_id, CopySource={'Bucket': bucket, 'Key': message_id}, Metadata=metadata, MetadataDirective='REPLACE' ) log(f"✓ Marked as blocked in S3 metadata", 'INFO', worker_name) except Exception as e: log(f"⚠ Failed to mark as blocked: {e}", 'ERROR', worker_name) raise def delete_blocked_email( self, domain: str, message_id: str, worker_name: str ): """ Delete email after marking as blocked Only call this after mark_as_blocked() succeeded """ bucket = domain_to_bucket_name(domain) try: self.client.delete_object(Bucket=bucket, Key=message_id) log(f"🗑 Deleted blocked email from S3", 'SUCCESS', worker_name) except Exception as e: log(f"⚠ Failed to delete blocked email: {e}", 'ERROR', worker_name) raise