email-amazon/unified-worker/email-worker/docs/README.md

8.3 KiB

Unified Email Worker (Modular Version)

Multi-domain email processing worker for AWS SES/S3/SQS with bounce handling, auto-replies, forwarding, and sender blocking.

🏗️ Architecture

email-worker/
├── config.py                 # Configuration management
├── logger.py                 # Structured logging
├── aws/                      # AWS service handlers
│   ├── s3_handler.py        # S3 operations (download, metadata)
│   ├── sqs_handler.py       # SQS polling
│   ├── ses_handler.py       # SES email sending
│   └── dynamodb_handler.py  # DynamoDB (rules, bounces, blocklist)
├── email_processing/         # Email processing
│   ├── parser.py            # Email parsing utilities
│   ├── bounce_handler.py    # Bounce detection & rewriting
│   ├── rules_processor.py   # OOO & forwarding logic
│   └── blocklist.py         # Sender blocking with wildcards
├── smtp/                     # SMTP delivery
│   ├── pool.py              # Connection pooling
│   └── delivery.py          # SMTP/LMTP delivery with retry
├── metrics/                  # Monitoring
│   └── prometheus.py        # Prometheus metrics
├── worker.py                # Message processing logic
├── domain_poller.py         # Domain queue poller
├── unified_worker.py        # Main worker coordinator
├── health_server.py         # Health check HTTP server
└── main.py                  # Entry point

Features

  • Multi-Domain Processing: Parallel processing of multiple domains via thread pool
  • Bounce Detection: Automatic SES bounce notification rewriting
  • Auto-Reply/OOO: Out-of-office automatic replies
  • Email Forwarding: Rule-based forwarding to internal/external addresses
  • Sender Blocking: Wildcard-based sender blocklist per recipient
  • SMTP Connection Pooling: Efficient reuse of connections
  • LMTP Support: Direct delivery to Dovecot (bypasses Postfix transport_maps)
  • Prometheus Metrics: Comprehensive monitoring
  • Health Checks: HTTP health endpoint for container orchestration
  • Graceful Shutdown: Proper cleanup on SIGTERM/SIGINT

🔧 Configuration

All configuration via environment variables:

AWS Settings

AWS_REGION=us-east-2

Domains

# Option 1: Comma-separated list
DOMAINS=example.com,another.com

# Option 2: File with one domain per line
DOMAINS_FILE=/etc/email-worker/domains.txt

Worker Settings

WORKER_THREADS=10
POLL_INTERVAL=20              # SQS long polling (seconds)
MAX_MESSAGES=10               # Max messages per poll
VISIBILITY_TIMEOUT=300        # Message visibility timeout (seconds)

SMTP Delivery

SMTP_HOST=localhost
SMTP_PORT=25
SMTP_USE_TLS=false
SMTP_USER=
SMTP_PASS=
SMTP_POOL_SIZE=5
INTERNAL_SMTP_PORT=2525       # Port for internal delivery (bypasses transport_maps)

LMTP (Direct Dovecot Delivery)

LMTP_ENABLED=false            # Set to 'true' to use LMTP
LMTP_HOST=localhost
LMTP_PORT=24

DynamoDB Tables

DYNAMODB_RULES_TABLE=email-rules
DYNAMODB_MESSAGES_TABLE=ses-outbound-messages
DYNAMODB_BLOCKED_TABLE=email-blocked-senders

Bounce Handling

BOUNCE_LOOKUP_RETRIES=3
BOUNCE_LOOKUP_DELAY=1.0

Monitoring

METRICS_PORT=8000             # Prometheus metrics
HEALTH_PORT=8080              # Health check endpoint

📊 DynamoDB Schemas

email-rules

{
  "email_address": "user@example.com",    // Partition Key
  "ooo_active": true,
  "ooo_message": "I am currently out of office...",
  "ooo_content_type": "text",             // "text" or "html"
  "forwards": ["other@example.com", "external@gmail.com"]
}

ses-outbound-messages

{
  "MessageId": "abc123...",               // Partition Key (SES Message-ID)
  "original_source": "sender@example.com",
  "recipients": ["recipient@other.com"],
  "timestamp": "2025-01-01T12:00:00Z",
  "bounceType": "Permanent",
  "bounceSubType": "General",
  "bouncedRecipients": ["recipient@other.com"]
}

email-blocked-senders

{
  "email_address": "user@example.com",    // Partition Key
  "blocked_patterns": [
    "spam@*.com",                         // Wildcard support
    "noreply@badsite.com",
    "*@malicious.org"
  ]
}

🚀 Usage

Installation

cd email-worker
pip install -r requirements.txt

Run

python3 main.py

Docker

FROM python:3.11-slim

WORKDIR /app
COPY . /app

RUN pip install --no-cache-dir -r requirements.txt

CMD ["python3", "main.py"]

📈 Metrics

Available at http://localhost:8000/metrics:

  • emails_processed_total{domain, status} - Total emails processed
  • emails_in_flight - Currently processing emails
  • email_processing_seconds{domain} - Processing time histogram
  • queue_messages_available{domain} - Queue size gauge
  • bounces_processed_total{domain, type} - Bounce notifications
  • autoreplies_sent_total{domain} - Auto-replies sent
  • forwards_sent_total{domain} - Forwards sent
  • blocked_senders_total{domain} - Blocked emails

🏥 Health Checks

Available at http://localhost:8080/health:

{
  "status": "healthy",
  "domains": 5,
  "domain_list": ["example.com", "another.com"],
  "dynamodb": true,
  "features": {
    "bounce_rewriting": true,
    "auto_reply": true,
    "forwarding": true,
    "blocklist": true,
    "lmtp": false
  },
  "timestamp": "2025-01-22T10:00:00.000000"
}

🔍 Key Improvements in Modular Version

1. Fixed Critical Bugs

  • Fixed signal.SIGINT typo (was signalIGINT)
  • Proper S3 metadata before deletion (audit trail)
  • Batch DynamoDB calls for blocklist (performance)
  • Error handling for S3 delete failures

2. Better Architecture

  • Separation of Concerns: Each component has single responsibility
  • Testability: Easy to unit test individual components
  • Maintainability: Changes isolated to specific modules
  • Extensibility: Easy to add new features

3. Performance

  • Batch Blocklist Checks: One DynamoDB call for all recipients
  • Connection Pooling: Reusable SMTP connections
  • Efficient Metrics: Optional Prometheus integration

4. Reliability

  • Proper Error Handling: Each component handles its own errors
  • Graceful Degradation: Works even if DynamoDB unavailable
  • Audit Trail: All actions logged to S3 metadata

🔐 Security Features

  1. Domain Validation: Workers only process their assigned domains
  2. Loop Prevention: Detects and skips already-processed emails
  3. Blocklist Support: Wildcard-based sender blocking
  4. Internal vs External: Separate handling prevents loops

📝 Example Usage

Enable OOO for user

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('email-rules')

table.put_item(Item={
    'email_address': 'john@example.com',
    'ooo_active': True,
    'ooo_message': 'I am out of office until Feb 1st.',
    'ooo_content_type': 'html'
})

Block spam senders

table = dynamodb.Table('email-blocked-senders')

table.put_item(Item={
    'email_address': 'john@example.com',
    'blocked_patterns': [
        '*@spam.com',
        'noreply@*.marketing.com',
        'newsletter@*'
    ]
})

Forward emails

table = dynamodb.Table('email-rules')

table.put_item(Item={
    'email_address': 'support@example.com',
    'forwards': [
        'john@example.com',
        'jane@example.com',
        'external@gmail.com'
    ]
})

🐛 Troubleshooting

Worker not processing emails

  1. Check queue URLs: curl http://localhost:8080/domains
  2. Check logs for SQS errors
  3. Verify IAM permissions for SQS/S3/SES/DynamoDB

Bounces not rewritten

  1. Check DynamoDB table name: DYNAMODB_MESSAGES_TABLE
  2. Verify Lambda function is writing bounce records
  3. Check logs for DynamoDB lookup errors

Auto-replies not sent

  1. Verify DynamoDB rules table accessible
  2. Check ooo_active is true (boolean, not string)
  3. Review logs for SES send errors

Blocked emails still delivered

  1. Verify blocklist table exists and is accessible
  2. Check wildcard patterns are lowercase
  3. Review logs for blocklist check errors

📄 License

MIT License - See LICENSE file for details