193 lines
6.8 KiB
Python
193 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
DynamoDB operations handler
|
|
"""
|
|
|
|
import time
|
|
from typing import Optional, Dict, Any, List
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
from logger import log
|
|
from config import config
|
|
|
|
|
|
class DynamoDBHandler:
|
|
"""Handles all DynamoDB operations"""
|
|
|
|
def __init__(self):
|
|
self.resource = boto3.resource('dynamodb', region_name=config.aws_region)
|
|
self.available = False
|
|
self.rules_table = None
|
|
self.messages_table = None
|
|
self.blocked_table = None
|
|
|
|
self._initialize_tables()
|
|
|
|
def _initialize_tables(self):
|
|
"""Initialize DynamoDB table connections"""
|
|
try:
|
|
self.rules_table = self.resource.Table(config.rules_table)
|
|
self.messages_table = self.resource.Table(config.messages_table)
|
|
self.blocked_table = self.resource.Table(config.blocked_table)
|
|
|
|
# Test connection
|
|
self.rules_table.table_status
|
|
self.messages_table.table_status
|
|
self.blocked_table.table_status
|
|
|
|
self.available = True
|
|
log("✓ DynamoDB tables connected successfully")
|
|
|
|
except Exception as e:
|
|
log(f"⚠ DynamoDB not fully available: {e}", 'WARNING')
|
|
self.available = False
|
|
|
|
def get_email_rules(self, email_address: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get email rules for recipient (OOO, Forwarding)
|
|
|
|
Args:
|
|
email_address: Recipient email address
|
|
|
|
Returns:
|
|
Rule dictionary or None if not found
|
|
"""
|
|
if not self.available or not self.rules_table:
|
|
return None
|
|
|
|
try:
|
|
response = self.rules_table.get_item(Key={'email_address': email_address})
|
|
return response.get('Item')
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] != 'ResourceNotFoundException':
|
|
log(f"⚠ DynamoDB error for {email_address}: {e}", 'ERROR')
|
|
return None
|
|
|
|
except Exception as e:
|
|
log(f"⚠ DynamoDB error for {email_address}: {e}", 'WARNING')
|
|
return None
|
|
|
|
def get_bounce_info(self, message_id: str, worker_name: str = 'unified') -> Optional[Dict]:
|
|
"""
|
|
Get bounce information from DynamoDB with retry logic
|
|
|
|
Args:
|
|
message_id: SES Message ID
|
|
worker_name: Worker name for logging
|
|
|
|
Returns:
|
|
Bounce info dictionary or None
|
|
"""
|
|
if not self.available or not self.messages_table:
|
|
return None
|
|
|
|
for attempt in range(config.bounce_lookup_retries):
|
|
try:
|
|
response = self.messages_table.get_item(Key={'MessageId': message_id})
|
|
item = response.get('Item')
|
|
|
|
if item:
|
|
return {
|
|
'original_source': item.get('original_source', ''),
|
|
'bounceType': item.get('bounceType', 'Unknown'),
|
|
'bounceSubType': item.get('bounceSubType', 'Unknown'),
|
|
'bouncedRecipients': item.get('bouncedRecipients', []),
|
|
'timestamp': item.get('timestamp', '')
|
|
}
|
|
|
|
if attempt < config.bounce_lookup_retries - 1:
|
|
log(
|
|
f" Bounce record not found yet, retrying in {config.bounce_lookup_delay}s "
|
|
f"(attempt {attempt + 1}/{config.bounce_lookup_retries})...",
|
|
'INFO',
|
|
worker_name
|
|
)
|
|
time.sleep(config.bounce_lookup_delay)
|
|
else:
|
|
log(
|
|
f"⚠ No bounce record found after {config.bounce_lookup_retries} attempts "
|
|
f"for Message-ID: {message_id}",
|
|
'WARNING',
|
|
worker_name
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
log(
|
|
f"⚠ DynamoDB Error (attempt {attempt + 1}/{config.bounce_lookup_retries}): {e}",
|
|
'ERROR',
|
|
worker_name
|
|
)
|
|
if attempt < config.bounce_lookup_retries - 1:
|
|
time.sleep(config.bounce_lookup_delay)
|
|
else:
|
|
return None
|
|
|
|
return None
|
|
|
|
def get_blocked_patterns(self, email_address: str) -> List[str]:
|
|
"""
|
|
Get blocked sender patterns for recipient
|
|
|
|
Args:
|
|
email_address: Recipient email address
|
|
|
|
Returns:
|
|
List of blocked patterns (may include wildcards)
|
|
"""
|
|
if not self.available or not self.blocked_table:
|
|
return []
|
|
|
|
try:
|
|
response = self.blocked_table.get_item(Key={'email_address': email_address})
|
|
item = response.get('Item', {})
|
|
return item.get('blocked_patterns', [])
|
|
|
|
except Exception as e:
|
|
log(f"⚠ Error getting block list for {email_address}: {e}", 'ERROR')
|
|
return []
|
|
|
|
def batch_get_blocked_patterns(self, email_addresses: List[str]) -> Dict[str, List[str]]:
|
|
"""
|
|
Batch get blocked patterns for multiple recipients (more efficient)
|
|
|
|
Args:
|
|
email_addresses: List of recipient email addresses
|
|
|
|
Returns:
|
|
Dictionary mapping email_address -> list of blocked patterns
|
|
"""
|
|
if not self.available or not self.blocked_table:
|
|
return {addr: [] for addr in email_addresses}
|
|
|
|
try:
|
|
# DynamoDB BatchGetItem
|
|
keys = [{'email_address': addr} for addr in email_addresses]
|
|
response = self.resource.batch_get_item(
|
|
RequestItems={
|
|
config.blocked_table: {'Keys': keys}
|
|
}
|
|
)
|
|
|
|
items = response.get('Responses', {}).get(config.blocked_table, [])
|
|
|
|
# Build result dictionary
|
|
result = {}
|
|
for email_address in email_addresses:
|
|
matching_item = next(
|
|
(item for item in items if item['email_address'] == email_address),
|
|
None
|
|
)
|
|
if matching_item:
|
|
result[email_address] = matching_item.get('blocked_patterns', [])
|
|
else:
|
|
result[email_address] = []
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
log(f"⚠ Batch blocklist check error: {e}", 'ERROR')
|
|
return {addr: [] for addr in email_addresses}
|