email-amazon/DMS/content_filter.py

367 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Postfix Content Filter for Internal Email Processing
Handles forwarding and auto-reply for local deliveries
Version: 2.0 (Optimized)
"""
import os
import sys
import smtplib
import logging
from email import message_from_binary_file
from email.mime.text import MIMEText
from email.utils import parseaddr, formatdate, make_msgid
from datetime import datetime, timedelta
from io import BytesIO
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mail/content_filter.log'),
logging.StreamHandler(sys.stderr)
]
)
# AWS Configuration
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2')
DYNAMODB_RULES_TABLE = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
# SMTP Configuration
REINJECT_HOST = os.environ.get('REINJECT_HOST', 'localhost')
REINJECT_PORT = int(os.environ.get('REINJECT_PORT', '10026'))
# Cache Configuration
CACHE_TTL_MINUTES = int(os.environ.get('CACHE_TTL_MINUTES', '5'))
CACHE_TTL = timedelta(minutes=CACHE_TTL_MINUTES)
# Auto-reply throttling (prevent sending multiple auto-replies to same sender)
# Key: (recipient, sender), Value: last_sent_timestamp
AUTOREPLY_SENT = {}
AUTOREPLY_THROTTLE = timedelta(hours=24) # Only one auto-reply per sender per day
# Cache for DynamoDB rules
RULES_CACHE = {}
# Initialize boto3 (lazy import to catch errors)
DYNAMODB_AVAILABLE = False
try:
import boto3
dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)
rules_table = dynamodb.Table(DYNAMODB_RULES_TABLE)
# Test connection
rules_table.table_status
DYNAMODB_AVAILABLE = True
logging.info("✓ DynamoDB connection initialized")
except Exception as e:
logging.error(f"✗ DynamoDB initialization failed: {e}")
logging.warning("Auto-reply and forwarding will be DISABLED")
def extract_domain(email_addr):
"""
Extract domain from email address, handling various formats
Examples:
"user@example.com" -> "example.com"
"Name <user@example.com>" -> "example.com"
"invalid" -> ""
"""
if not email_addr or '@' not in email_addr:
return ''
# parseaddr handles "Name <email@domain.com>" format
_, addr = parseaddr(email_addr)
if '@' not in addr:
return ''
try:
domain = addr.split('@')[1].lower()
return domain
except (IndexError, AttributeError):
return ''
def get_email_rules(email_address):
"""Fetch forwarding and auto-reply rules from DynamoDB with caching"""
if not DYNAMODB_AVAILABLE:
return {}
now = datetime.now()
cache_key = email_address.lower()
# Check cache
if cache_key in RULES_CACHE:
cached_entry = RULES_CACHE[cache_key]
if now - cached_entry['time'] < CACHE_TTL:
logging.debug(f"Cache hit for {email_address}")
return cached_entry['rules']
# Fetch from DynamoDB
try:
response = rules_table.get_item(Key={'email_address': email_address})
item = response.get('Item', {})
if item:
forwards_count = len(item.get('forwards', []))
ooo_active = item.get('ooo_active', False)
logging.info(f"Rules for {email_address}: forwards={forwards_count}, ooo={ooo_active}")
else:
logging.debug(f"No rules found for {email_address}")
# Update cache
RULES_CACHE[cache_key] = {'rules': item, 'time': now}
return item
except Exception as e:
logging.error(f"DynamoDB error for {email_address}: {e}")
return {}
def should_send_autoreply(original_msg, sender_addr, recipient_addr):
"""
Check if we should send auto-reply to this sender
Returns: (bool, str) - (should_send, reason_if_not)
"""
sender_lower = sender_addr.lower()
# Don't reply to automated senders
blocked_patterns = [
'mailer-daemon',
'postmaster',
'noreply',
'no-reply',
'donotreply',
'do-not-reply',
'bounce',
'amazonses.com',
'notification',
]
for pattern in blocked_patterns:
if pattern in sender_lower:
return (False, f"automated sender pattern: {pattern}")
# Check for auto-submitted header to prevent loops (RFC 3834)
auto_submitted = original_msg.get('Auto-Submitted', '')
if auto_submitted and auto_submitted.lower().startswith('auto-'):
return (False, f"Auto-Submitted header: {auto_submitted}")
# Check precedence header (mailing lists, bulk mail)
precedence = original_msg.get('Precedence', '').lower()
if precedence in ['bulk', 'list', 'junk']:
return (False, f"Precedence: {precedence}")
# Check List-* headers (mailing lists)
if original_msg.get('List-Id') or original_msg.get('List-Unsubscribe'):
return (False, "mailing list headers detected")
# Throttle: Only send one auto-reply per sender per 24 hours
throttle_key = (recipient_addr.lower(), sender_lower)
if throttle_key in AUTOREPLY_SENT:
last_sent = AUTOREPLY_SENT[throttle_key]
if datetime.now() - last_sent < AUTOREPLY_THROTTLE:
time_left = AUTOREPLY_THROTTLE - (datetime.now() - last_sent)
hours_left = int(time_left.total_seconds() / 3600)
return (False, f"throttled (sent {hours_left}h ago)")
return (True, "")
def send_autoreply(original_msg, recipient_rules, recipient_addr):
"""Send auto-reply if enabled and appropriate"""
if not recipient_rules.get('ooo_active'):
return
sender = original_msg.get('From')
if not sender:
logging.warning("No sender address, skipping auto-reply")
return
# Extract email from "Name <email>" format
sender_name, sender_addr = parseaddr(sender)
if not sender_addr or '@' not in sender_addr:
logging.warning(f"Invalid sender address: {sender}, skipping auto-reply")
return
# Check if we should send auto-reply
should_send, reason = should_send_autoreply(original_msg, sender_addr, recipient_addr)
if not should_send:
logging.info(f"Skipping auto-reply to {sender_addr}: {reason}")
return
subject = original_msg.get('Subject', 'No Subject')
message_id = original_msg.get('Message-ID')
# Get auto-reply message
ooo_message = recipient_rules.get('ooo_message', 'I am currently unavailable.')
content_type = recipient_rules.get('ooo_content_type', 'text')
# Create auto-reply
if content_type == 'html':
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText as MIMETextPart
reply = MIMEMultipart('alternative')
reply.attach(MIMETextPart(ooo_message, 'plain'))
reply.attach(MIMETextPart(ooo_message, 'html'))
else:
reply = MIMEText(ooo_message, 'plain', 'utf-8')
reply['From'] = recipient_addr
reply['To'] = sender_addr
reply['Subject'] = f"Automatic Reply: {subject}"
reply['Date'] = formatdate(localtime=True)
reply['Message-ID'] = make_msgid()
reply['Auto-Submitted'] = 'auto-replied' # RFC 3834
reply['Precedence'] = 'bulk'
reply['X-Auto-Response-Suppress'] = 'All' # Microsoft Exchange
if message_id:
reply['In-Reply-To'] = message_id
reply['References'] = message_id
# Send via local SMTP
try:
with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp:
smtp.send_message(reply)
# Update throttle timestamp
throttle_key = (recipient_addr.lower(), sender_addr.lower())
AUTOREPLY_SENT[throttle_key] = datetime.now()
logging.info(f"✓ Sent auto-reply from {recipient_addr} to {sender_addr}")
except Exception as e:
logging.error(f"✗ Auto-reply failed: {e}")
def send_forwards(original_msg_bytes, recipient_rules, recipient_addr, sender_addr):
"""Forward email to configured addresses"""
forwards = recipient_rules.get('forwards', [])
if not forwards:
return
for forward_addr in forwards:
try:
# Validate forward address
if '@' not in forward_addr:
logging.warning(f"Invalid forward address: {forward_addr}, skipping")
continue
# Parse message again for clean forwarding
msg = message_from_binary_file(BytesIO(original_msg_bytes))
# Add forwarding headers
msg['X-Forwarded-For'] = recipient_addr
msg['X-Original-To'] = recipient_addr
msg['X-Forwarded-By'] = 'content_filter.py'
# Preserve original sender in envelope
# (This way replies go to original sender, not to recipient)
envelope_sender = sender_addr if sender_addr else recipient_addr
# Send via local SMTP
with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp:
smtp.sendmail(
from_addr=envelope_sender,
to_addrs=[forward_addr],
msg=msg.as_bytes()
)
logging.info(f"✓ Forwarded from {recipient_addr} to {forward_addr}")
except Exception as e:
logging.error(f"✗ Forward to {forward_addr} failed: {e}")
def is_internal_mail(sender, recipient):
"""
Check if this is internal mail (same domain)
This is a safety check in addition to transport_maps filtering
"""
sender_domain = extract_domain(sender)
recipient_domain = extract_domain(recipient)
if not sender_domain or not recipient_domain:
return False
return sender_domain == recipient_domain
def main():
"""Main content filter logic"""
if len(sys.argv) < 3:
logging.error("Usage: content_filter.py <sender> <recipient1> [recipient2] ...")
sys.exit(1)
sender = sys.argv[1]
recipients = sys.argv[2:]
logging.info(f"Processing email from {sender} to {', '.join(recipients)}")
# Read email from stdin
try:
msg_bytes = sys.stdin.buffer.read()
if not msg_bytes:
logging.error("No email data received on stdin")
sys.exit(75) # EX_TEMPFAIL
msg = message_from_binary_file(BytesIO(msg_bytes))
except Exception as e:
logging.error(f"Failed to read email: {e}")
sys.exit(75) # EX_TEMPFAIL
# Process each recipient
processed_count = 0
for recipient in recipients:
try:
# Safety check: Only process internal mail
# (transport_maps should already filter, but defense-in-depth)
if not is_internal_mail(sender, recipient):
logging.debug(f"Skipping external mail: {sender} -> {recipient}")
continue
# Fetch rules from DynamoDB
rules = get_email_rules(recipient)
if rules:
processed_count += 1
# Send auto-reply if configured
if rules.get('ooo_active'):
send_autoreply(msg, rules, recipient)
# Send forwards if configured
if rules.get('forwards'):
send_forwards(msg_bytes, rules, recipient, sender)
else:
logging.debug(f"No rules for {recipient}")
except Exception as e:
logging.error(f"Error processing rules for {recipient}: {e}")
import traceback
logging.error(traceback.format_exc())
if processed_count > 0:
logging.info(f"Processed rules for {processed_count}/{len(recipients)} recipients")
# Re-inject original email for normal delivery
try:
with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp:
smtp.sendmail(sender, recipients, msg_bytes)
logging.info(f"✓ Delivered to {', '.join(recipients)}")
sys.exit(0)
except Exception as e:
logging.error(f"✗ Delivery failed: {e}")
import traceback
logging.error(traceback.format_exc())
sys.exit(75) # EX_TEMPFAIL - Postfix will retry
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logging.info("Interrupted by user")
sys.exit(1)
except Exception as e:
logging.error(f"Fatal error: {e}")
import traceback
logging.error(traceback.format_exc())
sys.exit(75)