194 lines
6.5 KiB
Python
194 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
S3 operations handler
|
|
"""
|
|
|
|
import time
|
|
from typing import Optional, List
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
from logger import log
|
|
from config import config, domain_to_bucket_name
|
|
|
|
|
|
class S3Handler:
|
|
"""Handles all S3 operations"""
|
|
|
|
def __init__(self):
|
|
self.client = boto3.client('s3', region_name=config.aws_region)
|
|
|
|
def get_email(self, domain: str, message_id: str, receive_count: int) -> Optional[bytes]:
|
|
"""
|
|
Download email from S3
|
|
|
|
Args:
|
|
domain: Email domain
|
|
message_id: SES Message ID
|
|
receive_count: Number of times this message was received from queue
|
|
|
|
Returns:
|
|
Raw email bytes or None if not found/error
|
|
"""
|
|
bucket = domain_to_bucket_name(domain)
|
|
|
|
try:
|
|
response = self.client.get_object(Bucket=bucket, Key=message_id)
|
|
return response['Body'].read()
|
|
|
|
except self.client.exceptions.NoSuchKey:
|
|
if receive_count < 5:
|
|
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
|
|
return None
|
|
else:
|
|
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
|
|
raise
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == 'NoSuchKey':
|
|
if receive_count < 5:
|
|
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
|
|
return None
|
|
else:
|
|
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
|
|
raise
|
|
log(f"❌ S3 Download Error: {e}", 'ERROR')
|
|
raise
|
|
|
|
except Exception as e:
|
|
log(f"❌ S3 Download Error: {e}", 'ERROR')
|
|
raise
|
|
|
|
def mark_as_processed(
|
|
self,
|
|
domain: str,
|
|
message_id: str,
|
|
worker_name: str,
|
|
invalid_inboxes: Optional[List[str]] = None
|
|
):
|
|
"""Mark email as successfully delivered"""
|
|
bucket = domain_to_bucket_name(domain)
|
|
|
|
try:
|
|
head = self.client.head_object(Bucket=bucket, Key=message_id)
|
|
metadata = head.get('Metadata', {}) or {}
|
|
|
|
metadata['processed'] = 'true'
|
|
metadata['processed_at'] = str(int(time.time()))
|
|
metadata['processed_by'] = worker_name
|
|
metadata['status'] = 'delivered'
|
|
metadata.pop('processing_started', None)
|
|
metadata.pop('queued_at', None)
|
|
|
|
if invalid_inboxes:
|
|
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
|
|
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING', worker_name)
|
|
|
|
self.client.copy_object(
|
|
Bucket=bucket,
|
|
Key=message_id,
|
|
CopySource={'Bucket': bucket, 'Key': message_id},
|
|
Metadata=metadata,
|
|
MetadataDirective='REPLACE'
|
|
)
|
|
|
|
except Exception as e:
|
|
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
|
|
|
|
def mark_as_all_invalid(
|
|
self,
|
|
domain: str,
|
|
message_id: str,
|
|
invalid_inboxes: List[str],
|
|
worker_name: str
|
|
):
|
|
"""Mark email as failed because all recipients are invalid"""
|
|
bucket = domain_to_bucket_name(domain)
|
|
|
|
try:
|
|
head = self.client.head_object(Bucket=bucket, Key=message_id)
|
|
metadata = head.get('Metadata', {}) or {}
|
|
|
|
metadata['processed'] = 'true'
|
|
metadata['processed_at'] = str(int(time.time()))
|
|
metadata['processed_by'] = worker_name
|
|
metadata['status'] = 'failed'
|
|
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
|
|
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
|
|
metadata.pop('processing_started', None)
|
|
metadata.pop('queued_at', None)
|
|
|
|
self.client.copy_object(
|
|
Bucket=bucket,
|
|
Key=message_id,
|
|
CopySource={'Bucket': bucket, 'Key': message_id},
|
|
Metadata=metadata,
|
|
MetadataDirective='REPLACE'
|
|
)
|
|
|
|
except Exception as e:
|
|
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
|
|
|
|
def mark_as_blocked(
|
|
self,
|
|
domain: str,
|
|
message_id: str,
|
|
blocked_recipients: List[str],
|
|
sender: str,
|
|
worker_name: str
|
|
):
|
|
"""
|
|
Mark email as blocked by sender blacklist
|
|
|
|
This sets metadata BEFORE deletion for audit trail
|
|
"""
|
|
bucket = domain_to_bucket_name(domain)
|
|
|
|
try:
|
|
head = self.client.head_object(Bucket=bucket, Key=message_id)
|
|
metadata = head.get('Metadata', {}) or {}
|
|
|
|
metadata['processed'] = 'true'
|
|
metadata['processed_at'] = str(int(time.time()))
|
|
metadata['processed_by'] = worker_name
|
|
metadata['status'] = 'blocked'
|
|
metadata['blocked_recipients'] = ','.join(blocked_recipients)
|
|
metadata['blocked_sender'] = sender
|
|
metadata.pop('processing_started', None)
|
|
metadata.pop('queued_at', None)
|
|
|
|
self.client.copy_object(
|
|
Bucket=bucket,
|
|
Key=message_id,
|
|
CopySource={'Bucket': bucket, 'Key': message_id},
|
|
Metadata=metadata,
|
|
MetadataDirective='REPLACE'
|
|
)
|
|
|
|
log(f"✓ Marked as blocked in S3 metadata", 'INFO', worker_name)
|
|
|
|
except Exception as e:
|
|
log(f"⚠ Failed to mark as blocked: {e}", 'ERROR', worker_name)
|
|
raise
|
|
|
|
def delete_blocked_email(
|
|
self,
|
|
domain: str,
|
|
message_id: str,
|
|
worker_name: str
|
|
):
|
|
"""
|
|
Delete email after marking as blocked
|
|
|
|
Only call this after mark_as_blocked() succeeded
|
|
"""
|
|
bucket = domain_to_bucket_name(domain)
|
|
|
|
try:
|
|
self.client.delete_object(Bucket=bucket, Key=message_id)
|
|
log(f"🗑 Deleted blocked email from S3", 'SUCCESS', worker_name)
|
|
|
|
except Exception as e:
|
|
log(f"⚠ Failed to delete blocked email: {e}", 'ERROR', worker_name)
|
|
raise
|