#!/usr/bin/env python3 """ Postfix Content Filter for Internal Email Processing Handles forwarding and auto-reply for local deliveries Version: 2.0 (Optimized) """ import os import sys import smtplib import logging from email import message_from_binary_file from email.mime.text import MIMEText from email.utils import parseaddr, formatdate, make_msgid from datetime import datetime, timedelta from io import BytesIO # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/mail/content_filter.log'), logging.StreamHandler(sys.stderr) ] ) # AWS Configuration AWS_REGION = os.environ.get('AWS_REGION', 'us-east-2') DYNAMODB_RULES_TABLE = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules') # SMTP Configuration REINJECT_HOST = os.environ.get('REINJECT_HOST', 'localhost') REINJECT_PORT = int(os.environ.get('REINJECT_PORT', '10026')) # Cache Configuration CACHE_TTL_MINUTES = int(os.environ.get('CACHE_TTL_MINUTES', '5')) CACHE_TTL = timedelta(minutes=CACHE_TTL_MINUTES) # Auto-reply throttling (prevent sending multiple auto-replies to same sender) # Key: (recipient, sender), Value: last_sent_timestamp AUTOREPLY_SENT = {} AUTOREPLY_THROTTLE = timedelta(hours=24) # Only one auto-reply per sender per day # Cache for DynamoDB rules RULES_CACHE = {} # Initialize boto3 (lazy import to catch errors) DYNAMODB_AVAILABLE = False try: import boto3 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION) rules_table = dynamodb.Table(DYNAMODB_RULES_TABLE) # Test connection rules_table.table_status DYNAMODB_AVAILABLE = True logging.info("✓ DynamoDB connection initialized") except Exception as e: logging.error(f"✗ DynamoDB initialization failed: {e}") logging.warning("Auto-reply and forwarding will be DISABLED") def extract_domain(email_addr): """ Extract domain from email address, handling various formats Examples: "user@example.com" -> "example.com" "Name " -> "example.com" "invalid" -> "" """ if not email_addr or '@' not in email_addr: return '' # parseaddr handles "Name " format _, addr = parseaddr(email_addr) if '@' not in addr: return '' try: domain = addr.split('@')[1].lower() return domain except (IndexError, AttributeError): return '' def get_email_rules(email_address): """Fetch forwarding and auto-reply rules from DynamoDB with caching""" if not DYNAMODB_AVAILABLE: return {} now = datetime.now() cache_key = email_address.lower() # Check cache if cache_key in RULES_CACHE: cached_entry = RULES_CACHE[cache_key] if now - cached_entry['time'] < CACHE_TTL: logging.debug(f"Cache hit for {email_address}") return cached_entry['rules'] # Fetch from DynamoDB try: response = rules_table.get_item(Key={'email_address': email_address}) item = response.get('Item', {}) if item: forwards_count = len(item.get('forwards', [])) ooo_active = item.get('ooo_active', False) logging.info(f"Rules for {email_address}: forwards={forwards_count}, ooo={ooo_active}") else: logging.debug(f"No rules found for {email_address}") # Update cache RULES_CACHE[cache_key] = {'rules': item, 'time': now} return item except Exception as e: logging.error(f"DynamoDB error for {email_address}: {e}") return {} def should_send_autoreply(original_msg, sender_addr, recipient_addr): """ Check if we should send auto-reply to this sender Returns: (bool, str) - (should_send, reason_if_not) """ sender_lower = sender_addr.lower() # Don't reply to automated senders blocked_patterns = [ 'mailer-daemon', 'postmaster', 'noreply', 'no-reply', 'donotreply', 'do-not-reply', 'bounce', 'amazonses.com', 'notification', ] for pattern in blocked_patterns: if pattern in sender_lower: return (False, f"automated sender pattern: {pattern}") # Check for auto-submitted header to prevent loops (RFC 3834) auto_submitted = original_msg.get('Auto-Submitted', '') if auto_submitted and auto_submitted.lower().startswith('auto-'): return (False, f"Auto-Submitted header: {auto_submitted}") # Check precedence header (mailing lists, bulk mail) precedence = original_msg.get('Precedence', '').lower() if precedence in ['bulk', 'list', 'junk']: return (False, f"Precedence: {precedence}") # Check List-* headers (mailing lists) if original_msg.get('List-Id') or original_msg.get('List-Unsubscribe'): return (False, "mailing list headers detected") # Throttle: Only send one auto-reply per sender per 24 hours throttle_key = (recipient_addr.lower(), sender_lower) if throttle_key in AUTOREPLY_SENT: last_sent = AUTOREPLY_SENT[throttle_key] if datetime.now() - last_sent < AUTOREPLY_THROTTLE: time_left = AUTOREPLY_THROTTLE - (datetime.now() - last_sent) hours_left = int(time_left.total_seconds() / 3600) return (False, f"throttled (sent {hours_left}h ago)") return (True, "") def send_autoreply(original_msg, recipient_rules, recipient_addr): """Send auto-reply if enabled and appropriate""" if not recipient_rules.get('ooo_active'): return sender = original_msg.get('From') if not sender: logging.warning("No sender address, skipping auto-reply") return # Extract email from "Name " format sender_name, sender_addr = parseaddr(sender) if not sender_addr or '@' not in sender_addr: logging.warning(f"Invalid sender address: {sender}, skipping auto-reply") return # Check if we should send auto-reply should_send, reason = should_send_autoreply(original_msg, sender_addr, recipient_addr) if not should_send: logging.info(f"Skipping auto-reply to {sender_addr}: {reason}") return subject = original_msg.get('Subject', 'No Subject') message_id = original_msg.get('Message-ID') # Get auto-reply message ooo_message = recipient_rules.get('ooo_message', 'I am currently unavailable.') content_type = recipient_rules.get('ooo_content_type', 'text') # Create auto-reply if content_type == 'html': from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText as MIMETextPart reply = MIMEMultipart('alternative') reply.attach(MIMETextPart(ooo_message, 'plain')) reply.attach(MIMETextPart(ooo_message, 'html')) else: reply = MIMEText(ooo_message, 'plain', 'utf-8') reply['From'] = recipient_addr reply['To'] = sender_addr reply['Subject'] = f"Automatic Reply: {subject}" reply['Date'] = formatdate(localtime=True) reply['Message-ID'] = make_msgid() reply['Auto-Submitted'] = 'auto-replied' # RFC 3834 reply['Precedence'] = 'bulk' reply['X-Auto-Response-Suppress'] = 'All' # Microsoft Exchange if message_id: reply['In-Reply-To'] = message_id reply['References'] = message_id # Send via local SMTP try: with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp: smtp.send_message(reply) # Update throttle timestamp throttle_key = (recipient_addr.lower(), sender_addr.lower()) AUTOREPLY_SENT[throttle_key] = datetime.now() logging.info(f"✓ Sent auto-reply from {recipient_addr} to {sender_addr}") except Exception as e: logging.error(f"✗ Auto-reply failed: {e}") def send_forwards(original_msg_bytes, recipient_rules, recipient_addr, sender_addr): """Forward email to configured addresses""" forwards = recipient_rules.get('forwards', []) if not forwards: return for forward_addr in forwards: try: # Validate forward address if '@' not in forward_addr: logging.warning(f"Invalid forward address: {forward_addr}, skipping") continue # Parse message again for clean forwarding msg = message_from_binary_file(BytesIO(original_msg_bytes)) # Add forwarding headers msg['X-Forwarded-For'] = recipient_addr msg['X-Original-To'] = recipient_addr msg['X-Forwarded-By'] = 'content_filter.py' # Preserve original sender in envelope # (This way replies go to original sender, not to recipient) envelope_sender = sender_addr if sender_addr else recipient_addr # Send via local SMTP with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp: smtp.sendmail( from_addr=envelope_sender, to_addrs=[forward_addr], msg=msg.as_bytes() ) logging.info(f"✓ Forwarded from {recipient_addr} to {forward_addr}") except Exception as e: logging.error(f"✗ Forward to {forward_addr} failed: {e}") def is_internal_mail(sender, recipient): """ Check if this is internal mail (same domain) This is a safety check in addition to transport_maps filtering """ sender_domain = extract_domain(sender) recipient_domain = extract_domain(recipient) if not sender_domain or not recipient_domain: return False return sender_domain == recipient_domain def main(): """Main content filter logic""" if len(sys.argv) < 3: logging.error("Usage: content_filter.py [recipient2] ...") sys.exit(1) sender = sys.argv[1] recipients = sys.argv[2:] logging.info(f"Processing email from {sender} to {', '.join(recipients)}") # Read email from stdin try: msg_bytes = sys.stdin.buffer.read() if not msg_bytes: logging.error("No email data received on stdin") sys.exit(75) # EX_TEMPFAIL msg = message_from_binary_file(BytesIO(msg_bytes)) except Exception as e: logging.error(f"Failed to read email: {e}") sys.exit(75) # EX_TEMPFAIL # Process each recipient processed_count = 0 for recipient in recipients: try: # Safety check: Only process internal mail # (transport_maps should already filter, but defense-in-depth) if not is_internal_mail(sender, recipient): logging.debug(f"Skipping external mail: {sender} -> {recipient}") continue # Fetch rules from DynamoDB rules = get_email_rules(recipient) if rules: processed_count += 1 # Send auto-reply if configured if rules.get('ooo_active'): send_autoreply(msg, rules, recipient) # Send forwards if configured if rules.get('forwards'): send_forwards(msg_bytes, rules, recipient, sender) else: logging.debug(f"No rules for {recipient}") except Exception as e: logging.error(f"Error processing rules for {recipient}: {e}") import traceback logging.error(traceback.format_exc()) if processed_count > 0: logging.info(f"Processed rules for {processed_count}/{len(recipients)} recipients") # Re-inject original email for normal delivery try: with smtplib.SMTP(REINJECT_HOST, REINJECT_PORT, timeout=30) as smtp: smtp.sendmail(sender, recipients, msg_bytes) logging.info(f"✓ Delivered to {', '.join(recipients)}") sys.exit(0) except Exception as e: logging.error(f"✗ Delivery failed: {e}") import traceback logging.error(traceback.format_exc()) sys.exit(75) # EX_TEMPFAIL - Postfix will retry if __name__ == '__main__': try: main() except KeyboardInterrupt: logging.info("Interrupted by user") sys.exit(1) except Exception as e: logging.error(f"Fatal error: {e}") import traceback logging.error(traceback.format_exc()) sys.exit(75)