Compare commits
No commits in common. "f37208251282f5be25e74629a6435bcece0a5205" and "1b33990d868d33e92fd520b729e94bd12130b66b" have entirely different histories.
f372082512
...
1b33990d86
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Worker bauen
|
||||||
|
docker-compose build
|
||||||
|
|
||||||
|
# Worker starten
|
||||||
|
docker-compose up -d
|
||||||
|
|
||||||
|
# Logs ansehen
|
||||||
|
docker-compose logs -f
|
||||||
|
|
||||||
|
# Logs nur für eine Domain
|
||||||
|
docker-compose logs -f worker-andreasknuth
|
||||||
|
|
||||||
|
# Status prüfen
|
||||||
|
docker-compose ps
|
||||||
|
|
||||||
|
# Worker neu starten
|
||||||
|
docker-compose restart
|
||||||
|
|
||||||
|
# Worker stoppen
|
||||||
|
docker-compose down
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"s3:GetObject",
|
||||||
|
"s3:HeadObject",
|
||||||
|
"s3:ListBucket",
|
||||||
|
"s3:CopyObject"
|
||||||
|
],
|
||||||
|
"Resource": [
|
||||||
|
"arn:aws:s3:::*-emails",
|
||||||
|
"arn:aws:s3:::*-emails/*"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"sqs:SendMessage",
|
||||||
|
"sqs:GetQueueUrl"
|
||||||
|
],
|
||||||
|
"Resource": "arn:aws:sqs:eu-central-1:123456789:*-queue"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"logs:CreateLogGroup",
|
||||||
|
"logs:CreateLogStream",
|
||||||
|
"logs:PutLogEvents"
|
||||||
|
],
|
||||||
|
"Resource": "arn:aws:logs:::*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# setup_email_domain.sh - Ein Wrapper-Script, das alle drei Skripte in der richtigen Reihenfolge ausführt
|
||||||
|
|
||||||
|
# Überprüfen, ob die Domain-Variable gesetzt ist
|
||||||
|
if [ -z "$1" ]; then
|
||||||
|
echo "Fehler: Keine Domain angegeben."
|
||||||
|
echo "Verwendung: ./setup_email_domain.sh domain.de [region]"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
DOMAIN_NAME=$1
|
||||||
|
AWS_REGION=${2:-"us-east-2"}
|
||||||
|
|
||||||
|
# Variablen exportieren
|
||||||
|
export DOMAIN_NAME
|
||||||
|
export AWS_REGION
|
||||||
|
|
||||||
|
echo "=== AWS E-Mail-Infrastruktur für $DOMAIN_NAME einrichten ==="
|
||||||
|
echo "AWS-Region: $AWS_REGION"
|
||||||
|
echo
|
||||||
|
|
||||||
|
# Skripte nacheinander ausführen
|
||||||
|
echo "1. S3-Bucket erstellen..."
|
||||||
|
./awss3.sh
|
||||||
|
echo
|
||||||
|
|
||||||
|
echo "2. SES-Konfiguration einrichten..."
|
||||||
|
export S3_BUCKET_NAME=$(echo "$DOMAIN_NAME" | tr '.' '-' | awk '{print $0 "-emails"}')
|
||||||
|
./awsses.sh
|
||||||
|
echo
|
||||||
|
|
||||||
|
echo "3. IAM-Benutzer und SMTP-Zugangsdaten erstellen..."
|
||||||
|
./awsiam.sh
|
||||||
|
echo
|
||||||
|
|
||||||
|
echo "=== Setup abgeschlossen ==="
|
||||||
|
echo "Alle Schritte wurden abgeschlossen. Bitte überprüfen Sie die Ausgaben der einzelnen Skripte."
|
||||||
|
echo "Vergessen Sie nicht, die benötigten DNS-Einträge für Ihre Domain zu setzen, um die SES-Verifizierung abzuschließen."
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
FROM python:3.11-slim
|
|
||||||
|
|
||||||
LABEL maintainer="andreas@knuth.dev"
|
|
||||||
LABEL description="Unified multi-domain email worker"
|
|
||||||
|
|
||||||
# System packages
|
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
|
||||||
curl \
|
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
|
||||||
|
|
||||||
# Non-root user
|
|
||||||
RUN useradd -m -u 1000 worker && \
|
|
||||||
mkdir -p /app /var/log/email-worker /etc/email-worker && \
|
|
||||||
chown -R worker:worker /app /var/log/email-worker /etc/email-worker
|
|
||||||
|
|
||||||
# Python dependencies
|
|
||||||
COPY requirements.txt /app/
|
|
||||||
RUN pip install --no-cache-dir -r /app/requirements.txt
|
|
||||||
|
|
||||||
# Worker code
|
|
||||||
COPY --chown=worker:worker unified_worker.py /app/
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
USER worker
|
|
||||||
|
|
||||||
# Health check
|
|
||||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
|
|
||||||
CMD curl -f http://localhost:8080/health || exit 1
|
|
||||||
|
|
||||||
# Unbuffered output
|
|
||||||
ENV PYTHONUNBUFFERED=1
|
|
||||||
|
|
||||||
CMD ["python", "unified_worker.py"]
|
|
||||||
|
|
@ -1,68 +0,0 @@
|
||||||
version: "3.8"
|
|
||||||
|
|
||||||
# Unified Email Worker - verarbeitet alle Domains mit einem Container
|
|
||||||
|
|
||||||
services:
|
|
||||||
unified-worker:
|
|
||||||
build:
|
|
||||||
context: .
|
|
||||||
dockerfile: Dockerfile
|
|
||||||
container_name: unified-email-worker
|
|
||||||
restart: unless-stopped
|
|
||||||
network_mode: host # Für lokalen SMTP-Zugriff
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
# Domain-Liste (eine Domain pro Zeile)
|
|
||||||
- ./domains.txt:/etc/email-worker/domains.txt:ro
|
|
||||||
# Logs
|
|
||||||
- ./logs:/var/log/email-worker
|
|
||||||
|
|
||||||
environment:
|
|
||||||
# AWS Credentials
|
|
||||||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
|
|
||||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
|
|
||||||
- AWS_REGION=us-east-2
|
|
||||||
|
|
||||||
# Alternative: Domains direkt als Liste
|
|
||||||
# - DOMAINS=andreasknuth.de,bayarea-cc.com,bizmatch.net
|
|
||||||
|
|
||||||
# Worker Settings
|
|
||||||
- WORKER_THREADS=${WORKER_THREADS:-10}
|
|
||||||
- POLL_INTERVAL=${POLL_INTERVAL:-20}
|
|
||||||
- MAX_MESSAGES=${MAX_MESSAGES:-10}
|
|
||||||
- VISIBILITY_TIMEOUT=${VISIBILITY_TIMEOUT:-300}
|
|
||||||
|
|
||||||
# SMTP (lokal zum DMS)
|
|
||||||
- SMTP_HOST=${SMTP_HOST:-localhost}
|
|
||||||
- SMTP_PORT=${SMTP_PORT:-25}
|
|
||||||
- SMTP_POOL_SIZE=${SMTP_POOL_SIZE:-5}
|
|
||||||
|
|
||||||
# Monitoring
|
|
||||||
- METRICS_PORT=8000
|
|
||||||
- HEALTH_PORT=8080
|
|
||||||
|
|
||||||
ports:
|
|
||||||
# Prometheus Metrics
|
|
||||||
- "8000:8000"
|
|
||||||
# Health Check
|
|
||||||
- "8080:8080"
|
|
||||||
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
|
|
||||||
interval: 30s
|
|
||||||
timeout: 10s
|
|
||||||
retries: 3
|
|
||||||
start_period: 10s
|
|
||||||
|
|
||||||
logging:
|
|
||||||
driver: "json-file"
|
|
||||||
options:
|
|
||||||
max-size: "50m"
|
|
||||||
max-file: "10"
|
|
||||||
|
|
||||||
deploy:
|
|
||||||
resources:
|
|
||||||
limits:
|
|
||||||
memory: 512M
|
|
||||||
reservations:
|
|
||||||
memory: 256M
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
# domains.txt - Liste aller zu verarbeitenden Domains
|
|
||||||
# Eine Domain pro Zeile
|
|
||||||
# Zeilen mit # werden ignoriert
|
|
||||||
|
|
||||||
# Test Domain
|
|
||||||
andreasknuth.de
|
|
||||||
|
|
||||||
# Produktiv Domains (später hinzufügen)
|
|
||||||
# annavillesda.org
|
|
||||||
# bayarea-cc.com
|
|
||||||
# bizmatch.net
|
|
||||||
# hotshpotshgallery.com
|
|
||||||
# qrmaster.net
|
|
||||||
# ruehrgedoens.de
|
|
||||||
|
|
@ -1,2 +0,0 @@
|
||||||
boto3>=1.34.0
|
|
||||||
prometheus-client>=0.19.0
|
|
||||||
|
|
@ -1,998 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
unified_worker.py - Multi-Domain Email Worker (Full Featured)
|
|
||||||
|
|
||||||
Features:
|
|
||||||
- Multi-Domain parallel processing via Thread Pool
|
|
||||||
- Bounce Detection & Rewriting (SES MAILER-DAEMON handling)
|
|
||||||
- Auto-Reply / Out-of-Office (from DynamoDB email-rules)
|
|
||||||
- Email Forwarding (from DynamoDB email-rules)
|
|
||||||
- SMTP Connection Pooling
|
|
||||||
- Prometheus Metrics
|
|
||||||
- Graceful Shutdown
|
|
||||||
|
|
||||||
DynamoDB Tables:
|
|
||||||
- ses-outbound-messages: Tracking für Bounce-Korrelation
|
|
||||||
- email-rules: Forwards und Auto-Reply Regeln
|
|
||||||
|
|
||||||
Schema email-rules:
|
|
||||||
{
|
|
||||||
"email": "user@domain.com", # Partition Key
|
|
||||||
"rule_type": "forward|autoreply", # Sort Key
|
|
||||||
"enabled": true,
|
|
||||||
"target": "other@example.com", # Für forwards
|
|
||||||
"subject": "Out of Office", # Für autoreply
|
|
||||||
"body": "I am currently...", # Für autoreply
|
|
||||||
"start_date": "2025-01-01", # Optional: Zeitraum
|
|
||||||
"end_date": "2025-01-15" # Optional: Zeitraum
|
|
||||||
}
|
|
||||||
|
|
||||||
Schema ses-outbound-messages:
|
|
||||||
{
|
|
||||||
"MessageId": "abc123...", # Partition Key (SES Message-ID)
|
|
||||||
"original_source": "sender@domain.com",
|
|
||||||
"recipients": ["recipient@other.com"],
|
|
||||||
"timestamp": "2025-01-01T12:00:00Z",
|
|
||||||
"bounceType": "Permanent", # Nach Bounce-Notification
|
|
||||||
"bounceSubType": "General",
|
|
||||||
"bouncedRecipients": ["recipient@other.com"]
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
import signal
|
|
||||||
import logging
|
|
||||||
import threading
|
|
||||||
import smtplib
|
|
||||||
import hashlib
|
|
||||||
import re
|
|
||||||
from queue import Queue, Empty
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from typing import List, Dict, Optional, Tuple, Any
|
|
||||||
from datetime import datetime, date
|
|
||||||
from email.parser import BytesParser
|
|
||||||
from email.policy import SMTP as SMTPPolicy
|
|
||||||
from email.mime.text import MIMEText
|
|
||||||
from email.mime.multipart import MIMEMultipart
|
|
||||||
from email.utils import formataddr, parseaddr, formatdate
|
|
||||||
import copy
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
from botocore.exceptions import ClientError
|
|
||||||
from boto3.dynamodb.conditions import Key
|
|
||||||
|
|
||||||
# Optional: Prometheus Metrics
|
|
||||||
try:
|
|
||||||
from prometheus_client import start_http_server, Counter, Gauge, Histogram
|
|
||||||
PROMETHEUS_ENABLED = True
|
|
||||||
except ImportError:
|
|
||||||
PROMETHEUS_ENABLED = False
|
|
||||||
logging.warning("prometheus_client not installed - metrics disabled")
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# CONFIGURATION
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Config:
|
|
||||||
"""Worker Configuration"""
|
|
||||||
# AWS
|
|
||||||
aws_region: str = os.environ.get('AWS_REGION', 'us-east-2')
|
|
||||||
|
|
||||||
# Domains to process
|
|
||||||
domains_list: str = os.environ.get('DOMAINS', '')
|
|
||||||
domains_file: str = os.environ.get('DOMAINS_FILE', '/etc/email-worker/domains.txt')
|
|
||||||
|
|
||||||
# Worker Settings
|
|
||||||
worker_threads: int = int(os.environ.get('WORKER_THREADS', '10'))
|
|
||||||
poll_interval: int = int(os.environ.get('POLL_INTERVAL', '20'))
|
|
||||||
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
|
|
||||||
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
|
|
||||||
|
|
||||||
# SMTP
|
|
||||||
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
|
|
||||||
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
|
|
||||||
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
|
|
||||||
smtp_user: str = os.environ.get('SMTP_USER', '')
|
|
||||||
smtp_pass: str = os.environ.get('SMTP_PASS', '')
|
|
||||||
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
|
|
||||||
|
|
||||||
# DynamoDB Tables
|
|
||||||
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
|
|
||||||
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
|
|
||||||
|
|
||||||
# Bounce Handling
|
|
||||||
bounce_lookup_retries: int = int(os.environ.get('BOUNCE_LOOKUP_RETRIES', '3'))
|
|
||||||
bounce_lookup_delay: float = float(os.environ.get('BOUNCE_LOOKUP_DELAY', '1.0'))
|
|
||||||
|
|
||||||
# Auto-Reply
|
|
||||||
autoreply_from_name: str = os.environ.get('AUTOREPLY_FROM_NAME', 'Auto-Reply')
|
|
||||||
autoreply_cooldown_hours: int = int(os.environ.get('AUTOREPLY_COOLDOWN_HOURS', '24'))
|
|
||||||
|
|
||||||
# Monitoring
|
|
||||||
metrics_port: int = int(os.environ.get('METRICS_PORT', '8000'))
|
|
||||||
health_port: int = int(os.environ.get('HEALTH_PORT', '8080'))
|
|
||||||
|
|
||||||
config = Config()
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# LOGGING
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format='%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s',
|
|
||||||
datefmt='%Y-%m-%d %H:%M:%S'
|
|
||||||
)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# METRICS
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_processed = Counter('emails_processed_total', 'Total emails processed', ['domain', 'status'])
|
|
||||||
emails_in_flight = Gauge('emails_in_flight', 'Emails currently being processed')
|
|
||||||
processing_time = Histogram('email_processing_seconds', 'Time to process email', ['domain'])
|
|
||||||
queue_size = Gauge('queue_messages_available', 'Messages in queue', ['domain'])
|
|
||||||
bounces_processed = Counter('bounces_processed_total', 'Bounce notifications processed', ['domain', 'type'])
|
|
||||||
autoreplies_sent = Counter('autoreplies_sent_total', 'Auto-replies sent', ['domain'])
|
|
||||||
forwards_sent = Counter('forwards_sent_total', 'Forwards sent', ['domain'])
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# AWS CLIENTS
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
sqs = boto3.client('sqs', region_name=config.aws_region)
|
|
||||||
s3 = boto3.client('s3', region_name=config.aws_region)
|
|
||||||
ses = boto3.client('ses', region_name=config.aws_region)
|
|
||||||
|
|
||||||
# DynamoDB
|
|
||||||
dynamodb = boto3.resource('dynamodb', region_name=config.aws_region)
|
|
||||||
try:
|
|
||||||
rules_table = dynamodb.Table(config.rules_table)
|
|
||||||
messages_table = dynamodb.Table(config.messages_table)
|
|
||||||
# Test connection
|
|
||||||
rules_table.table_status
|
|
||||||
messages_table.table_status
|
|
||||||
DYNAMODB_AVAILABLE = True
|
|
||||||
logger.info(f"DynamoDB connected: {config.rules_table}, {config.messages_table}")
|
|
||||||
except Exception as e:
|
|
||||||
DYNAMODB_AVAILABLE = False
|
|
||||||
rules_table = None
|
|
||||||
messages_table = None
|
|
||||||
logger.warning(f"DynamoDB not available: {e}")
|
|
||||||
|
|
||||||
# Auto-Reply Cooldown Cache (in-memory, thread-safe)
|
|
||||||
autoreply_cache: Dict[str, datetime] = {}
|
|
||||||
autoreply_cache_lock = threading.Lock()
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# SMTP CONNECTION POOL
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
class SMTPPool:
|
|
||||||
"""Thread-safe SMTP Connection Pool"""
|
|
||||||
|
|
||||||
def __init__(self, host: str, port: int, pool_size: int = 5):
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.pool_size = pool_size
|
|
||||||
self._pool: Queue = Queue(maxsize=pool_size)
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._initialized = False
|
|
||||||
|
|
||||||
def _create_connection(self) -> Optional[smtplib.SMTP]:
|
|
||||||
"""Create new SMTP connection"""
|
|
||||||
try:
|
|
||||||
conn = smtplib.SMTP(self.host, self.port, timeout=30)
|
|
||||||
conn.ehlo()
|
|
||||||
if config.smtp_use_tls:
|
|
||||||
conn.starttls()
|
|
||||||
conn.ehlo()
|
|
||||||
if config.smtp_user and config.smtp_pass:
|
|
||||||
conn.login(config.smtp_user, config.smtp_pass)
|
|
||||||
return conn
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to create SMTP connection: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def initialize(self):
|
|
||||||
"""Pre-create connections"""
|
|
||||||
if self._initialized:
|
|
||||||
return
|
|
||||||
for _ in range(self.pool_size):
|
|
||||||
conn = self._create_connection()
|
|
||||||
if conn:
|
|
||||||
self._pool.put(conn)
|
|
||||||
self._initialized = True
|
|
||||||
logger.info(f"SMTP pool initialized with {self._pool.qsize()} connections")
|
|
||||||
|
|
||||||
def get_connection(self, timeout: float = 5.0) -> Optional[smtplib.SMTP]:
|
|
||||||
"""Get connection from pool"""
|
|
||||||
try:
|
|
||||||
conn = self._pool.get(timeout=timeout)
|
|
||||||
try:
|
|
||||||
conn.noop()
|
|
||||||
return conn
|
|
||||||
except:
|
|
||||||
conn = self._create_connection()
|
|
||||||
return conn
|
|
||||||
except Empty:
|
|
||||||
return self._create_connection()
|
|
||||||
|
|
||||||
def return_connection(self, conn: smtplib.SMTP):
|
|
||||||
"""Return connection to pool"""
|
|
||||||
if conn is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
self._pool.put_nowait(conn)
|
|
||||||
except:
|
|
||||||
try:
|
|
||||||
conn.quit()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close_all(self):
|
|
||||||
"""Close all connections"""
|
|
||||||
while not self._pool.empty():
|
|
||||||
try:
|
|
||||||
conn = self._pool.get_nowait()
|
|
||||||
conn.quit()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# HELPER FUNCTIONS
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def domain_to_queue_name(domain: str) -> str:
|
|
||||||
return domain.replace('.', '-') + '-queue'
|
|
||||||
|
|
||||||
def domain_to_bucket_name(domain: str) -> str:
|
|
||||||
return domain.replace('.', '-') + '-emails'
|
|
||||||
|
|
||||||
def get_queue_url(domain: str) -> Optional[str]:
|
|
||||||
queue_name = domain_to_queue_name(domain)
|
|
||||||
try:
|
|
||||||
response = sqs.get_queue_url(QueueName=queue_name)
|
|
||||||
return response['QueueUrl']
|
|
||||||
except ClientError as e:
|
|
||||||
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
|
||||||
logger.warning(f"Queue not found for domain: {domain}")
|
|
||||||
else:
|
|
||||||
logger.error(f"Error getting queue URL for {domain}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def load_domains() -> List[str]:
|
|
||||||
"""Load domains from config"""
|
|
||||||
domains = []
|
|
||||||
|
|
||||||
if config.domains_list:
|
|
||||||
domains.extend([d.strip() for d in config.domains_list.split(',') if d.strip()])
|
|
||||||
|
|
||||||
if os.path.exists(config.domains_file):
|
|
||||||
with open(config.domains_file, 'r') as f:
|
|
||||||
for line in f:
|
|
||||||
domain = line.strip()
|
|
||||||
if domain and not domain.startswith('#'):
|
|
||||||
domains.append(domain)
|
|
||||||
|
|
||||||
domains = list(set(domains))
|
|
||||||
logger.info(f"Loaded {len(domains)} domains")
|
|
||||||
return domains
|
|
||||||
|
|
||||||
def extract_email_address(header_value: str) -> str:
|
|
||||||
"""Extract pure email address from header like 'Name <email@domain.com>'"""
|
|
||||||
if not header_value:
|
|
||||||
return ''
|
|
||||||
name, addr = parseaddr(header_value)
|
|
||||||
return addr.lower() if addr else header_value.lower()
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# BOUNCE HANDLING
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def is_ses_bounce_notification(parsed_email) -> bool:
|
|
||||||
"""Check if email is from SES MAILER-DAEMON"""
|
|
||||||
from_header = (parsed_email.get('From') or '').lower()
|
|
||||||
return 'mailer-daemon@' in from_header and 'amazonses.com' in from_header
|
|
||||||
|
|
||||||
def get_bounce_info_from_dynamodb(message_id: str) -> Optional[Dict]:
|
|
||||||
"""
|
|
||||||
Look up original sender info from DynamoDB for bounce correlation.
|
|
||||||
The message_id here is from the bounce notification, we need to find
|
|
||||||
the original outbound message.
|
|
||||||
"""
|
|
||||||
if not DYNAMODB_AVAILABLE or not messages_table:
|
|
||||||
return None
|
|
||||||
|
|
||||||
for attempt in range(config.bounce_lookup_retries):
|
|
||||||
try:
|
|
||||||
# Message-ID könnte verschiedene Formate haben
|
|
||||||
# Versuche mit und ohne < > Klammern
|
|
||||||
clean_id = message_id.strip('<>').split('@')[0]
|
|
||||||
|
|
||||||
response = messages_table.get_item(Key={'MessageId': clean_id})
|
|
||||||
item = response.get('Item')
|
|
||||||
|
|
||||||
if item:
|
|
||||||
logger.info(f"Found bounce info for {clean_id}")
|
|
||||||
return {
|
|
||||||
'original_source': item.get('original_source', ''),
|
|
||||||
'recipients': item.get('recipients', []),
|
|
||||||
'bounceType': item.get('bounceType', 'Unknown'),
|
|
||||||
'bounceSubType': item.get('bounceSubType', 'Unknown'),
|
|
||||||
'bouncedRecipients': item.get('bouncedRecipients', []),
|
|
||||||
'timestamp': item.get('timestamp', '')
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt < config.bounce_lookup_retries - 1:
|
|
||||||
logger.debug(f"Bounce record not found, retry {attempt + 1}...")
|
|
||||||
time.sleep(config.bounce_lookup_delay)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"DynamoDB error looking up bounce: {e}")
|
|
||||||
if attempt < config.bounce_lookup_retries - 1:
|
|
||||||
time.sleep(config.bounce_lookup_delay)
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def rewrite_bounce_headers(parsed_email, bounce_info: Dict) -> Tuple[Any, bool]:
|
|
||||||
"""
|
|
||||||
Rewrite bounce email headers so the recipient sees it correctly.
|
|
||||||
|
|
||||||
Problem: SES sends bounces FROM mailer-daemon@amazonses.com
|
|
||||||
but we want the user to see FROM the bounced address
|
|
||||||
|
|
||||||
Returns: (modified_email, was_modified)
|
|
||||||
"""
|
|
||||||
if not bounce_info:
|
|
||||||
return parsed_email, False
|
|
||||||
|
|
||||||
original_source = bounce_info.get('original_source', '')
|
|
||||||
bounced_recipients = bounce_info.get('bouncedRecipients', [])
|
|
||||||
bounce_type = bounce_info.get('bounceType', 'Unknown')
|
|
||||||
bounce_subtype = bounce_info.get('bounceSubType', 'Unknown')
|
|
||||||
|
|
||||||
if not bounced_recipients:
|
|
||||||
return parsed_email, False
|
|
||||||
|
|
||||||
# Use first bounced recipient as the new "From"
|
|
||||||
new_from = bounced_recipients[0] if isinstance(bounced_recipients[0], str) else bounced_recipients[0].get('emailAddress', '')
|
|
||||||
|
|
||||||
if not new_from:
|
|
||||||
return parsed_email, False
|
|
||||||
|
|
||||||
logger.info(f"Rewriting bounce: FROM {parsed_email.get('From')} -> {new_from}")
|
|
||||||
|
|
||||||
# Add diagnostic headers
|
|
||||||
parsed_email['X-Original-SES-From'] = parsed_email.get('From', '')
|
|
||||||
parsed_email['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
|
|
||||||
parsed_email['X-Original-Recipient'] = original_source
|
|
||||||
|
|
||||||
# Rewrite From header
|
|
||||||
if 'From' in parsed_email:
|
|
||||||
del parsed_email['From']
|
|
||||||
parsed_email['From'] = new_from
|
|
||||||
|
|
||||||
# Add Reply-To if not present
|
|
||||||
if not parsed_email.get('Reply-To'):
|
|
||||||
parsed_email['Reply-To'] = new_from
|
|
||||||
|
|
||||||
# Improve subject for clarity
|
|
||||||
subject = parsed_email.get('Subject', '')
|
|
||||||
if 'delivery status' in subject.lower() or not subject:
|
|
||||||
if 'Subject' in parsed_email:
|
|
||||||
del parsed_email['Subject']
|
|
||||||
parsed_email['Subject'] = f"Undeliverable: Message to {new_from}"
|
|
||||||
|
|
||||||
return parsed_email, True
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# EMAIL RULES (FORWARDS & AUTO-REPLY)
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class EmailRule:
|
|
||||||
"""Represents a forward or auto-reply rule"""
|
|
||||||
email: str
|
|
||||||
rule_type: str # 'forward' or 'autoreply'
|
|
||||||
enabled: bool = True
|
|
||||||
target: str = '' # Forward destination
|
|
||||||
subject: str = '' # Auto-reply subject
|
|
||||||
body: str = '' # Auto-reply body
|
|
||||||
start_date: Optional[date] = None
|
|
||||||
end_date: Optional[date] = None
|
|
||||||
|
|
||||||
def is_active(self) -> bool:
|
|
||||||
"""Check if rule is currently active (considering date range)"""
|
|
||||||
if not self.enabled:
|
|
||||||
return False
|
|
||||||
|
|
||||||
today = date.today()
|
|
||||||
|
|
||||||
if self.start_date and today < self.start_date:
|
|
||||||
return False
|
|
||||||
if self.end_date and today > self.end_date:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def get_rules_for_email(email_address: str) -> List[EmailRule]:
|
|
||||||
"""Fetch all rules for an email address from DynamoDB"""
|
|
||||||
if not DYNAMODB_AVAILABLE or not rules_table:
|
|
||||||
return []
|
|
||||||
|
|
||||||
email_lower = email_address.lower()
|
|
||||||
rules = []
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = rules_table.query(
|
|
||||||
KeyConditionExpression=Key('email').eq(email_lower)
|
|
||||||
)
|
|
||||||
|
|
||||||
for item in response.get('Items', []):
|
|
||||||
rule = EmailRule(
|
|
||||||
email=item.get('email', ''),
|
|
||||||
rule_type=item.get('rule_type', ''),
|
|
||||||
enabled=item.get('enabled', True),
|
|
||||||
target=item.get('target', ''),
|
|
||||||
subject=item.get('subject', ''),
|
|
||||||
body=item.get('body', ''),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Parse dates if present
|
|
||||||
if item.get('start_date'):
|
|
||||||
try:
|
|
||||||
rule.start_date = datetime.strptime(item['start_date'], '%Y-%m-%d').date()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
if item.get('end_date'):
|
|
||||||
try:
|
|
||||||
rule.end_date = datetime.strptime(item['end_date'], '%Y-%m-%d').date()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
rules.append(rule)
|
|
||||||
|
|
||||||
logger.debug(f"Found {len(rules)} rules for {email_address}")
|
|
||||||
return rules
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error fetching rules for {email_address}: {e}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def should_send_autoreply(recipient: str, sender: str) -> bool:
|
|
||||||
"""
|
|
||||||
Check if we should send an auto-reply.
|
|
||||||
Implements cooldown to avoid reply loops.
|
|
||||||
"""
|
|
||||||
# Don't reply to automated messages
|
|
||||||
sender_lower = sender.lower()
|
|
||||||
if any(x in sender_lower for x in ['noreply', 'no-reply', 'mailer-daemon', 'postmaster', 'bounce', 'notification']):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check cooldown cache
|
|
||||||
cache_key = f"{recipient}:{sender}"
|
|
||||||
|
|
||||||
with autoreply_cache_lock:
|
|
||||||
last_reply = autoreply_cache.get(cache_key)
|
|
||||||
if last_reply:
|
|
||||||
hours_since = (datetime.now() - last_reply).total_seconds() / 3600
|
|
||||||
if hours_since < config.autoreply_cooldown_hours:
|
|
||||||
logger.debug(f"Auto-reply cooldown active for {cache_key}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Update cache
|
|
||||||
autoreply_cache[cache_key] = datetime.now()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def send_autoreply(rule: EmailRule, original_from: str, original_subject: str, domain: str) -> bool:
|
|
||||||
"""Send an auto-reply email via SES"""
|
|
||||||
if not should_send_autoreply(rule.email, original_from):
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Build reply message
|
|
||||||
reply_subject = rule.subject or f"Re: {original_subject}"
|
|
||||||
reply_body = rule.body or "This is an automated reply. I am currently unavailable."
|
|
||||||
|
|
||||||
# Add original subject reference if not in body
|
|
||||||
if original_subject and original_subject not in reply_body:
|
|
||||||
reply_body += f"\n\n---\nRegarding: {original_subject}"
|
|
||||||
|
|
||||||
msg = MIMEText(reply_body, 'plain', 'utf-8')
|
|
||||||
msg['Subject'] = reply_subject
|
|
||||||
msg['From'] = formataddr((config.autoreply_from_name, rule.email))
|
|
||||||
msg['To'] = original_from
|
|
||||||
msg['Date'] = formatdate(localtime=True)
|
|
||||||
msg['Auto-Submitted'] = 'auto-replied'
|
|
||||||
msg['X-Auto-Response-Suppress'] = 'All'
|
|
||||||
msg['Precedence'] = 'auto_reply'
|
|
||||||
|
|
||||||
# Send via SES
|
|
||||||
ses.send_raw_email(
|
|
||||||
Source=rule.email,
|
|
||||||
Destinations=[original_from],
|
|
||||||
RawMessage={'Data': msg.as_string()}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"✉ Auto-reply sent from {rule.email} to {original_from}")
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
autoreplies_sent.labels(domain=domain).inc()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to send auto-reply: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def send_forward(rule: EmailRule, raw_email: bytes, original_from: str, original_subject: str, domain: str) -> bool:
|
|
||||||
"""Forward email to target address"""
|
|
||||||
if not rule.target:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Parse original email
|
|
||||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email)
|
|
||||||
|
|
||||||
# Create forwarded message
|
|
||||||
fwd_msg = MIMEMultipart('mixed')
|
|
||||||
fwd_msg['Subject'] = f"Fwd: {original_subject or parsed.get('Subject', '(no subject)')}"
|
|
||||||
fwd_msg['From'] = rule.email
|
|
||||||
fwd_msg['To'] = rule.target
|
|
||||||
fwd_msg['Date'] = formatdate(localtime=True)
|
|
||||||
fwd_msg['X-Forwarded-For'] = rule.email
|
|
||||||
fwd_msg['X-Original-From'] = original_from
|
|
||||||
|
|
||||||
# Add forward header text
|
|
||||||
forward_header = MIMEText(
|
|
||||||
f"---------- Forwarded message ----------\n"
|
|
||||||
f"From: {parsed.get('From', 'Unknown')}\n"
|
|
||||||
f"Date: {parsed.get('Date', 'Unknown')}\n"
|
|
||||||
f"Subject: {parsed.get('Subject', '(no subject)')}\n"
|
|
||||||
f"To: {parsed.get('To', rule.email)}\n\n",
|
|
||||||
'plain', 'utf-8'
|
|
||||||
)
|
|
||||||
fwd_msg.attach(forward_header)
|
|
||||||
|
|
||||||
# Attach original message
|
|
||||||
from email.mime.message import MIMEMessage
|
|
||||||
fwd_msg.attach(MIMEMessage(parsed))
|
|
||||||
|
|
||||||
# Send via SES
|
|
||||||
ses.send_raw_email(
|
|
||||||
Source=rule.email,
|
|
||||||
Destinations=[rule.target],
|
|
||||||
RawMessage={'Data': fwd_msg.as_string()}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"➡ Forward sent from {rule.email} to {rule.target}")
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
forwards_sent.labels(domain=domain).inc()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to forward email: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# MAIN EMAIL PROCESSING
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def process_message(domain: str, message: dict) -> bool:
|
|
||||||
"""Process single SQS message with full feature set"""
|
|
||||||
receipt_handle = message['ReceiptHandle']
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Parse SQS message body
|
|
||||||
body = json.loads(message['Body'])
|
|
||||||
|
|
||||||
# Handle SNS wrapper (Lambda shim format)
|
|
||||||
if body.get('Type') == 'Notification':
|
|
||||||
ses_data = json.loads(body.get('Message', '{}'))
|
|
||||||
else:
|
|
||||||
ses_data = body.get('ses', body)
|
|
||||||
|
|
||||||
mail = ses_data.get('mail', {})
|
|
||||||
receipt = ses_data.get('receipt', {})
|
|
||||||
|
|
||||||
message_id = mail.get('messageId', 'unknown')
|
|
||||||
from_addr = mail.get('source', '')
|
|
||||||
recipients = receipt.get('recipients', []) or mail.get('destination', [])
|
|
||||||
|
|
||||||
logger.info(f"Processing: {message_id} for {domain}")
|
|
||||||
logger.info(f" From: {from_addr}")
|
|
||||||
logger.info(f" To: {recipients}")
|
|
||||||
|
|
||||||
# ----------------------------------------
|
|
||||||
# 1. Download email from S3
|
|
||||||
# ----------------------------------------
|
|
||||||
bucket = domain_to_bucket_name(domain)
|
|
||||||
object_key = message_id
|
|
||||||
|
|
||||||
# Check receipt action for actual S3 location
|
|
||||||
actions = receipt.get('action', {})
|
|
||||||
if isinstance(actions, dict):
|
|
||||||
actions = [actions]
|
|
||||||
for action in actions:
|
|
||||||
if isinstance(action, dict) and action.get('type') == 'S3':
|
|
||||||
bucket = action.get('bucketName', bucket)
|
|
||||||
object_key = action.get('objectKey', object_key)
|
|
||||||
break
|
|
||||||
|
|
||||||
try:
|
|
||||||
s3_response = s3.get_object(Bucket=bucket, Key=object_key)
|
|
||||||
raw_email = s3_response['Body'].read()
|
|
||||||
except ClientError as e:
|
|
||||||
logger.error(f"Failed to download from S3: {bucket}/{object_key} - {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Parse email
|
|
||||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_email)
|
|
||||||
subject = parsed.get('Subject', '')
|
|
||||||
|
|
||||||
# ----------------------------------------
|
|
||||||
# 2. Bounce Detection & Rewriting
|
|
||||||
# ----------------------------------------
|
|
||||||
is_bounce = is_ses_bounce_notification(parsed)
|
|
||||||
bounce_rewritten = False
|
|
||||||
|
|
||||||
if is_bounce:
|
|
||||||
logger.info("🔄 Detected SES bounce notification")
|
|
||||||
|
|
||||||
# Extract Message-ID from bounce to correlate with original
|
|
||||||
bounce_msg_id = parsed.get('Message-ID', '')
|
|
||||||
|
|
||||||
# Also check X-Original-Message-Id or parse from body
|
|
||||||
# SES bounces often contain the original Message-ID in headers or body
|
|
||||||
original_msg_id = parsed.get('X-Original-Message-Id', '')
|
|
||||||
if not original_msg_id:
|
|
||||||
# Try to extract from References header
|
|
||||||
refs = parsed.get('References', '')
|
|
||||||
if refs:
|
|
||||||
original_msg_id = refs.split()[0] if refs else ''
|
|
||||||
|
|
||||||
lookup_id = original_msg_id or bounce_msg_id
|
|
||||||
|
|
||||||
if lookup_id:
|
|
||||||
bounce_info = get_bounce_info_from_dynamodb(lookup_id)
|
|
||||||
if bounce_info:
|
|
||||||
parsed, bounce_rewritten = rewrite_bounce_headers(parsed, bounce_info)
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
bounces_processed.labels(
|
|
||||||
domain=domain,
|
|
||||||
type=bounce_info.get('bounceType', 'Unknown')
|
|
||||||
).inc()
|
|
||||||
|
|
||||||
# ----------------------------------------
|
|
||||||
# 3. Deliver to local mailboxes
|
|
||||||
# ----------------------------------------
|
|
||||||
success_count = 0
|
|
||||||
failed_recipients = []
|
|
||||||
delivered_recipients = []
|
|
||||||
|
|
||||||
# Get modified raw email if bounce was rewritten
|
|
||||||
if bounce_rewritten:
|
|
||||||
raw_email = parsed.as_bytes()
|
|
||||||
|
|
||||||
smtp_conn = smtp_pool.get_connection()
|
|
||||||
if not smtp_conn:
|
|
||||||
logger.error("Could not get SMTP connection")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
for recipient in recipients:
|
|
||||||
try:
|
|
||||||
smtp_conn.sendmail(from_addr, [recipient], raw_email)
|
|
||||||
success_count += 1
|
|
||||||
delivered_recipients.append(recipient)
|
|
||||||
logger.info(f" ✓ Delivered to {recipient}")
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_processed.labels(domain=domain, status='success').inc()
|
|
||||||
|
|
||||||
except smtplib.SMTPRecipientsRefused as e:
|
|
||||||
logger.warning(f" ✗ Recipient refused: {recipient}")
|
|
||||||
failed_recipients.append(recipient)
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_processed.labels(domain=domain, status='recipient_refused').inc()
|
|
||||||
|
|
||||||
except smtplib.SMTPException as e:
|
|
||||||
logger.error(f" ✗ SMTP error for {recipient}: {e}")
|
|
||||||
failed_recipients.append(recipient)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
smtp_pool.return_connection(smtp_conn)
|
|
||||||
|
|
||||||
# ----------------------------------------
|
|
||||||
# 4. Process Rules (Forwards & Auto-Reply)
|
|
||||||
# ----------------------------------------
|
|
||||||
if not is_bounce: # Don't process rules for bounces
|
|
||||||
for recipient in delivered_recipients:
|
|
||||||
rules = get_rules_for_email(recipient)
|
|
||||||
|
|
||||||
for rule in rules:
|
|
||||||
if not rule.is_active():
|
|
||||||
continue
|
|
||||||
|
|
||||||
if rule.rule_type == 'autoreply':
|
|
||||||
send_autoreply(rule, from_addr, subject, domain)
|
|
||||||
|
|
||||||
elif rule.rule_type == 'forward':
|
|
||||||
send_forward(rule, raw_email, from_addr, subject, domain)
|
|
||||||
|
|
||||||
# ----------------------------------------
|
|
||||||
# 5. Update S3 metadata
|
|
||||||
# ----------------------------------------
|
|
||||||
if success_count > 0:
|
|
||||||
try:
|
|
||||||
metadata = {
|
|
||||||
'processed': 'true',
|
|
||||||
'processed_at': str(int(time.time())),
|
|
||||||
'delivered_to': ','.join(delivered_recipients),
|
|
||||||
'status': 'delivered'
|
|
||||||
}
|
|
||||||
if failed_recipients:
|
|
||||||
metadata['failed_recipients'] = ','.join(failed_recipients)
|
|
||||||
if bounce_rewritten:
|
|
||||||
metadata['bounce_rewritten'] = 'true'
|
|
||||||
|
|
||||||
s3.copy_object(
|
|
||||||
Bucket=bucket,
|
|
||||||
Key=object_key,
|
|
||||||
CopySource={'Bucket': bucket, 'Key': object_key},
|
|
||||||
Metadata=metadata,
|
|
||||||
MetadataDirective='REPLACE'
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to update S3 metadata: {e}")
|
|
||||||
|
|
||||||
return success_count > 0
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error processing message: {e}", exc_info=True)
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_processed.labels(domain=domain, status='error').inc()
|
|
||||||
return False
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# DOMAIN POLLER
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
|
||||||
"""Poll single domain's queue continuously"""
|
|
||||||
logger.info(f"Starting poller for {domain}")
|
|
||||||
|
|
||||||
while not stop_event.is_set():
|
|
||||||
try:
|
|
||||||
response = sqs.receive_message(
|
|
||||||
QueueUrl=queue_url,
|
|
||||||
MaxNumberOfMessages=config.max_messages,
|
|
||||||
WaitTimeSeconds=config.poll_interval,
|
|
||||||
VisibilityTimeout=config.visibility_timeout
|
|
||||||
)
|
|
||||||
|
|
||||||
messages = response.get('Messages', [])
|
|
||||||
|
|
||||||
# Update queue size metric
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
try:
|
|
||||||
attrs = sqs.get_queue_attributes(
|
|
||||||
QueueUrl=queue_url,
|
|
||||||
AttributeNames=['ApproximateNumberOfMessages']
|
|
||||||
)
|
|
||||||
queue_size.labels(domain=domain).set(
|
|
||||||
int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
for message in messages:
|
|
||||||
if stop_event.is_set():
|
|
||||||
break
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_in_flight.inc()
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
try:
|
|
||||||
success = process_message(domain, message)
|
|
||||||
|
|
||||||
if success:
|
|
||||||
sqs.delete_message(
|
|
||||||
QueueUrl=queue_url,
|
|
||||||
ReceiptHandle=message['ReceiptHandle']
|
|
||||||
)
|
|
||||||
# If not successful, message becomes visible again after timeout
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
emails_in_flight.dec()
|
|
||||||
processing_time.labels(domain=domain).observe(time.time() - start_time)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error polling {domain}: {e}")
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
logger.info(f"Stopped poller for {domain}")
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# UNIFIED WORKER
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
class UnifiedWorker:
|
|
||||||
"""Main worker coordinating all domain pollers"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.stop_event = threading.Event()
|
|
||||||
self.executor: Optional[ThreadPoolExecutor] = None
|
|
||||||
self.domains: List[str] = []
|
|
||||||
self.queue_urls: Dict[str, str] = {}
|
|
||||||
|
|
||||||
def setup(self):
|
|
||||||
"""Initialize worker"""
|
|
||||||
self.domains = load_domains()
|
|
||||||
|
|
||||||
if not self.domains:
|
|
||||||
logger.error("No domains configured!")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Get queue URLs
|
|
||||||
for domain in self.domains:
|
|
||||||
url = get_queue_url(domain)
|
|
||||||
if url:
|
|
||||||
self.queue_urls[domain] = url
|
|
||||||
|
|
||||||
if not self.queue_urls:
|
|
||||||
logger.error("No valid queues found!")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Initialize SMTP pool
|
|
||||||
smtp_pool.initialize()
|
|
||||||
|
|
||||||
logger.info(f"Initialized with {len(self.queue_urls)} domains")
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start all domain pollers"""
|
|
||||||
self.executor = ThreadPoolExecutor(
|
|
||||||
max_workers=config.worker_threads,
|
|
||||||
thread_name_prefix='poller'
|
|
||||||
)
|
|
||||||
|
|
||||||
futures = []
|
|
||||||
for domain, queue_url in self.queue_urls.items():
|
|
||||||
future = self.executor.submit(poll_domain, domain, queue_url, self.stop_event)
|
|
||||||
futures.append(future)
|
|
||||||
|
|
||||||
logger.info(f"Started {len(futures)} domain pollers")
|
|
||||||
|
|
||||||
try:
|
|
||||||
for future in as_completed(futures):
|
|
||||||
try:
|
|
||||||
future.result()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Poller error: {e}")
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop gracefully"""
|
|
||||||
logger.info("Stopping worker...")
|
|
||||||
self.stop_event.set()
|
|
||||||
|
|
||||||
if self.executor:
|
|
||||||
self.executor.shutdown(wait=True, cancel_futures=False)
|
|
||||||
|
|
||||||
smtp_pool.close_all()
|
|
||||||
logger.info("Worker stopped")
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# HEALTH CHECK SERVER
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def start_health_server(worker: UnifiedWorker):
|
|
||||||
"""HTTP health endpoint"""
|
|
||||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
||||||
|
|
||||||
class HealthHandler(BaseHTTPRequestHandler):
|
|
||||||
def do_GET(self):
|
|
||||||
if self.path == '/health' or self.path == '/':
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-Type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
|
|
||||||
status = {
|
|
||||||
'status': 'healthy',
|
|
||||||
'domains': len(worker.queue_urls),
|
|
||||||
'dynamodb': DYNAMODB_AVAILABLE,
|
|
||||||
'features': {
|
|
||||||
'bounce_rewriting': True,
|
|
||||||
'auto_reply': DYNAMODB_AVAILABLE,
|
|
||||||
'forwarding': DYNAMODB_AVAILABLE
|
|
||||||
},
|
|
||||||
'timestamp': datetime.utcnow().isoformat()
|
|
||||||
}
|
|
||||||
self.wfile.write(json.dumps(status).encode())
|
|
||||||
|
|
||||||
elif self.path == '/domains':
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-Type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(json.dumps(list(worker.queue_urls.keys())).encode())
|
|
||||||
|
|
||||||
elif self.path == '/metrics-status':
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-Type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(json.dumps({
|
|
||||||
'prometheus_enabled': PROMETHEUS_ENABLED,
|
|
||||||
'metrics_port': config.metrics_port if PROMETHEUS_ENABLED else None
|
|
||||||
}).encode())
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.send_response(404)
|
|
||||||
self.end_headers()
|
|
||||||
|
|
||||||
def log_message(self, format, *args):
|
|
||||||
pass
|
|
||||||
|
|
||||||
server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler)
|
|
||||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
|
||||||
thread.start()
|
|
||||||
logger.info(f"Health server on port {config.health_port}")
|
|
||||||
|
|
||||||
# ============================================
|
|
||||||
# ENTRY POINT
|
|
||||||
# ============================================
|
|
||||||
|
|
||||||
def main():
|
|
||||||
worker = UnifiedWorker()
|
|
||||||
|
|
||||||
def signal_handler(signum, frame):
|
|
||||||
logger.info(f"Received signal {signum}")
|
|
||||||
worker.stop()
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, signal_handler)
|
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
|
||||||
|
|
||||||
worker.setup()
|
|
||||||
|
|
||||||
if PROMETHEUS_ENABLED:
|
|
||||||
start_http_server(config.metrics_port)
|
|
||||||
logger.info(f"Prometheus metrics on port {config.metrics_port}")
|
|
||||||
|
|
||||||
start_health_server(worker)
|
|
||||||
|
|
||||||
logger.info("=" * 60)
|
|
||||||
logger.info(" UNIFIED EMAIL WORKER (Full Featured)")
|
|
||||||
logger.info("=" * 60)
|
|
||||||
logger.info(f" Domains: {len(worker.queue_urls)}")
|
|
||||||
logger.info(f" Threads: {config.worker_threads}")
|
|
||||||
logger.info(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
|
|
||||||
logger.info(f" SMTP Pool: {config.smtp_pool_size} connections")
|
|
||||||
logger.info("")
|
|
||||||
logger.info(" Features:")
|
|
||||||
logger.info(" ✓ Bounce Detection & Header Rewriting")
|
|
||||||
logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office")
|
|
||||||
logger.info(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding")
|
|
||||||
logger.info(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics")
|
|
||||||
logger.info("=" * 60)
|
|
||||||
|
|
||||||
worker.start()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
Loading…
Reference in New Issue