email-amazon/email-worker/aws/s3_handler.py

194 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
S3 operations handler
"""
import time
from typing import Optional, List
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config, domain_to_bucket_name
class S3Handler:
"""Handles all S3 operations"""
def __init__(self):
self.client = boto3.client('s3', region_name=config.aws_region)
def get_email(self, domain: str, message_id: str, receive_count: int) -> Optional[bytes]:
"""
Download email from S3
Args:
domain: Email domain
message_id: SES Message ID
receive_count: Number of times this message was received from queue
Returns:
Raw email bytes or None if not found/error
"""
bucket = domain_to_bucket_name(domain)
try:
response = self.client.get_object(Bucket=bucket, Key=message_id)
return response['Body'].read()
except self.client.exceptions.NoSuchKey:
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return None
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
raise
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return None
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
raise
log(f"❌ S3 Download Error: {e}", 'ERROR')
raise
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR')
raise
def mark_as_processed(
self,
domain: str,
message_id: str,
worker_name: str,
invalid_inboxes: Optional[List[str]] = None
):
"""Mark email as successfully delivered"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
def mark_as_all_invalid(
self,
domain: str,
message_id: str,
invalid_inboxes: List[str],
worker_name: str
):
"""Mark email as failed because all recipients are invalid"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
def mark_as_blocked(
self,
domain: str,
message_id: str,
blocked_recipients: List[str],
sender: str,
worker_name: str
):
"""
Mark email as blocked by sender blacklist
This sets metadata BEFORE deletion for audit trail
"""
bucket = domain_to_bucket_name(domain)
try:
head = self.client.head_object(Bucket=bucket, Key=message_id)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = worker_name
metadata['status'] = 'blocked'
metadata['blocked_recipients'] = ','.join(blocked_recipients)
metadata['blocked_sender'] = sender
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
self.client.copy_object(
Bucket=bucket,
Key=message_id,
CopySource={'Bucket': bucket, 'Key': message_id},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked as blocked in S3 metadata", 'INFO', worker_name)
except Exception as e:
log(f"⚠ Failed to mark as blocked: {e}", 'ERROR', worker_name)
raise
def delete_blocked_email(
self,
domain: str,
message_id: str,
worker_name: str
):
"""
Delete email after marking as blocked
Only call this after mark_as_blocked() succeeded
"""
bucket = domain_to_bucket_name(domain)
try:
self.client.delete_object(Bucket=bucket, Key=message_id)
log(f"🗑 Deleted blocked email from S3", 'SUCCESS', worker_name)
except Exception as e:
log(f"⚠ Failed to delete blocked email: {e}", 'ERROR', worker_name)
raise