email-amazon/unified-worker/email-worker/smtp/delivery.py

188 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
SMTP/LMTP email delivery with retry logic
"""
import time
import smtplib
from typing import Tuple, Optional
from logger import log
from config import config
from smtp.pool import SMTPPool
class EmailDelivery:
"""Handles email delivery via SMTP or LMTP"""
def __init__(self, smtp_pool: SMTPPool):
self.smtp_pool = smtp_pool
@staticmethod
def is_permanent_recipient_error(error_msg: str) -> bool:
"""Check if error is permanent for this recipient (inbox doesn't exist)"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_to_recipient(
self,
from_addr: str,
recipient: str,
raw_message: bytes,
worker_name: str,
max_retries: int = 2
) -> Tuple[bool, Optional[str], bool]:
"""
Send email via SMTP/LMTP to ONE recipient
If LMTP is enabled, delivers directly to Dovecot (bypasses transport_maps).
With retry logic for connection errors.
Args:
from_addr: From address
recipient: Recipient address
raw_message: Raw MIME message bytes
worker_name: Worker name for logging
max_retries: Maximum retry attempts
Returns:
Tuple of (success: bool, error: str or None, is_permanent: bool)
"""
last_error = None
use_lmtp = config.lmtp_enabled
for attempt in range(max_retries + 1):
conn = None
try:
if use_lmtp:
# LMTP connection directly to Dovecot (bypasses Postfix/transport_maps)
conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30)
conn.ehlo()
else:
# Normal SMTP connection from pool
conn = self.smtp_pool.get_connection()
if not conn:
last_error = "Could not get SMTP connection"
log(
f"{recipient}: No SMTP connection "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
time.sleep(0.5)
continue
result = conn.sendmail(from_addr, [recipient], raw_message)
# Success
if use_lmtp:
conn.quit()
else:
self.smtp_pool.return_connection(conn)
if isinstance(result, dict) and result:
error = str(result.get(recipient, 'Unknown refusal'))
is_permanent = self.is_permanent_recipient_error(error)
log(
f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})",
'ERROR',
worker_name
)
return False, error, is_permanent
else:
delivery_method = "LMTP" if use_lmtp else "SMTP"
log(f"{recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name)
return True, None, False
except smtplib.SMTPServerDisconnected as e:
# Connection was closed - Retry with new connection
log(
f"{recipient}: Connection lost, retrying... "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
last_error = str(e)
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
except smtplib.SMTPRecipientsRefused as e:
if conn and not use_lmtp:
self.smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
error_msg = str(e)
is_permanent = self.is_permanent_recipient_error(error_msg)
log(f"{recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except smtplib.SMTPException as e:
error_msg = str(e)
# On connection errors: Retry
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
log(
f"{recipient}: Connection error, retrying... "
f"(attempt {attempt + 1}/{max_retries + 1})",
'WARNING',
worker_name
)
last_error = error_msg
if conn:
try:
conn.quit()
except:
pass
time.sleep(0.3)
continue
if conn and not use_lmtp:
self.smtp_pool.return_connection(conn)
elif conn:
try:
conn.quit()
except:
pass
is_permanent = self.is_permanent_recipient_error(error_msg)
log(f"{recipient}: Error - {error_msg}", 'ERROR', worker_name)
return False, error_msg, is_permanent
except Exception as e:
# Unknown error
if conn:
try:
conn.quit()
except:
pass
log(f"{recipient}: Unexpected error - {e}", 'ERROR', worker_name)
return False, str(e), False
# All retries failed
log(
f"{recipient}: All retries failed - {last_error}",
'ERROR',
worker_name
)
return False, last_error or "Connection failed after retries", False