email-amazon/email-worker/aws/dynamodb_handler.py

193 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
DynamoDB operations handler
"""
import time
from typing import Optional, Dict, Any, List
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config
class DynamoDBHandler:
"""Handles all DynamoDB operations"""
def __init__(self):
self.resource = boto3.resource('dynamodb', region_name=config.aws_region)
self.available = False
self.rules_table = None
self.messages_table = None
self.blocked_table = None
self._initialize_tables()
def _initialize_tables(self):
"""Initialize DynamoDB table connections"""
try:
self.rules_table = self.resource.Table(config.rules_table)
self.messages_table = self.resource.Table(config.messages_table)
self.blocked_table = self.resource.Table(config.blocked_table)
# Test connection
self.rules_table.table_status
self.messages_table.table_status
self.blocked_table.table_status
self.available = True
log("✓ DynamoDB tables connected successfully")
except Exception as e:
log(f"⚠ DynamoDB not fully available: {e}", 'WARNING')
self.available = False
def get_email_rules(self, email_address: str) -> Optional[Dict[str, Any]]:
"""
Get email rules for recipient (OOO, Forwarding)
Args:
email_address: Recipient email address
Returns:
Rule dictionary or None if not found
"""
if not self.available or not self.rules_table:
return None
try:
response = self.rules_table.get_item(Key={'email_address': email_address})
return response.get('Item')
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceNotFoundException':
log(f"⚠ DynamoDB error for {email_address}: {e}", 'ERROR')
return None
except Exception as e:
log(f"⚠ DynamoDB error for {email_address}: {e}", 'WARNING')
return None
def get_bounce_info(self, message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
"""
Get bounce information from DynamoDB with retry logic
Args:
message_id: SES Message ID
worker_name: Worker name for logging
Returns:
Bounce info dictionary or None
"""
if not self.available or not self.messages_table:
return None
for attempt in range(config.bounce_lookup_retries):
try:
response = self.messages_table.get_item(Key={'MessageId': message_id})
item = response.get('Item')
if item:
return {
'original_source': item.get('original_source', ''),
'bounceType': item.get('bounceType', 'Unknown'),
'bounceSubType': item.get('bounceSubType', 'Unknown'),
'bouncedRecipients': item.get('bouncedRecipients', []),
'timestamp': item.get('timestamp', '')
}
if attempt < config.bounce_lookup_retries - 1:
log(
f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s "
f"(attempt {attempt + 1}/{config.bounce_lookup_retries})...",
'INFO',
worker_name
)
time.sleep(config.bounce_lookup_delay)
else:
log(
f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts "
f"for Message-ID: {message_id}",
'WARNING',
worker_name
)
return None
except Exception as e:
log(
f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}",
'ERROR',
worker_name
)
if attempt < config.bounce_lookup_retries - 1:
time.sleep(config.bounce_lookup_delay)
else:
return None
return None
def get_blocked_patterns(self, email_address: str) -> List[str]:
"""
Get blocked sender patterns for recipient
Args:
email_address: Recipient email address
Returns:
List of blocked patterns (may include wildcards)
"""
if not self.available or not self.blocked_table:
return []
try:
response = self.blocked_table.get_item(Key={'email_address': email_address})
item = response.get('Item', {})
return item.get('blocked_patterns', [])
except Exception as e:
log(f"⚠ Error getting block list for {email_address}: {e}", 'ERROR')
return []
def batch_get_blocked_patterns(self, email_addresses: List[str]) -> Dict[str, List[str]]:
"""
Batch get blocked patterns for multiple recipients (more efficient)
Args:
email_addresses: List of recipient email addresses
Returns:
Dictionary mapping email_address -> list of blocked patterns
"""
if not self.available or not self.blocked_table:
return {addr: [] for addr in email_addresses}
try:
# DynamoDB BatchGetItem
keys = [{'email_address': addr} for addr in email_addresses]
response = self.resource.batch_get_item(
RequestItems={
config.blocked_table: {'Keys': keys}
}
)
items = response.get('Responses', {}).get(config.blocked_table, [])
# Build result dictionary
result = {}
for email_address in email_addresses:
matching_item = next(
(item for item in items if item['email_address'] == email_address),
None
)
if matching_item:
result[email_address] = matching_item.get('blocked_patterns', [])
else:
result[email_address] = []
return result
except Exception as e:
log(f"⚠ Batch blocklist check error: {e}", 'ERROR')
return {addr: [] for addr in email_addresses}