email-amazon/unified-worker/email-worker/docs/ARCHITECTURE.md

20 KiB

Architecture Documentation

📐 System Overview

┌─────────────────────────────────────────────────────────────────────┐
│                         AWS Cloud Services                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┐     ┌──────────┐     ┌──────────┐                   │
│  │   SQS    │────▶│    S3    │     │   SES    │                   │
│  │  Queues  │     │ Buckets  │     │ Sending  │                   │
│  └──────────┘     └──────────┘     └──────────┘                   │
│       │                 │                 │                         │
│       │                 │                 │                         │
│  ┌────▼─────────────────▼─────────────────▼───────────────┐       │
│  │              DynamoDB Tables                            │       │
│  │  • email-rules (OOO, Forwarding)                       │       │
│  │  • ses-outbound-messages (Bounce Tracking)              │       │
│  │  • email-blocked-senders (Blocklist)                    │       │
│  └─────────────────────────────────────────────────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              │ Polling & Processing
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      Unified Email Worker                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────┐      │
│  │              Main Thread (unified_worker.py)            │      │
│  │  • Coordination                                         │      │
│  │  • Status Monitoring                                    │      │
│  │  • Signal Handling                                      │      │
│  └────────────┬────────────────────────────────────────────┘      │
│               │                                                     │
│               ├──▶ Domain Poller Thread 1 (example.com)           │
│               ├──▶ Domain Poller Thread 2 (another.com)           │
│               ├──▶ Domain Poller Thread 3 (...)                   │
│               ├──▶ Health Server Thread (port 8080)               │
│               └──▶ Metrics Server Thread (port 8000)              │
│                                                                     │
│  ┌──────────────────────────────────────────────────────┐         │
│  │         SMTP Connection Pool                         │         │
│  │  • Connection Reuse                                  │         │
│  │  • Health Checks                                     │         │
│  │  • Auto-reconnect                                    │         │
│  └──────────────────────────────────────────────────────┘         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              │ SMTP/LMTP Delivery
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      Mail Server (Docker Mailserver)                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Port 25   (SMTP - from pool)                                      │
│  Port 2525 (SMTP - internal delivery, bypasses transport_maps)     │
│  Port 24   (LMTP - direct to Dovecot, bypasses Postfix)            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

🔄 Message Flow

1. Email Reception

1. SES receives email
2. SES stores in S3 bucket (domain-emails/)
3. SES publishes SNS notification
4. SNS enqueues message to SQS (domain-queue)

2. Worker Processing

┌─────────────────────────────────────────────────────────────┐
│ Domain Poller (domain_poller.py)                            │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 1. Poll SQS Queue (20s long poll)                           │
│    • Receive up to 10 messages                              │
│    • Extract SES notification from SNS wrapper              │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Download from S3 (s3_handler.py)                         │
│    • Get raw email bytes                                    │
│    • Handle retry if not found yet                          │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 3. Parse Email (parser.py)                                  │
│    • Parse MIME structure                                   │
│    • Extract headers, body, attachments                     │
│    • Check for loop prevention marker                       │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Bounce Detection (bounce_handler.py)                     │
│    • Check if from mailer-daemon@amazonses.com              │
│    • Lookup original sender in DynamoDB                     │
│    • Rewrite From/Reply-To headers                          │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Blocklist Check (blocklist.py)                           │
│    • Batch lookup blocked patterns for all recipients       │
│    • Check sender against wildcard patterns                 │
│    • Mark blocked recipients                                │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 6. Process Rules for Each Recipient (rules_processor.py)    │
│    ├─▶ Auto-Reply (OOO)                                     │
│    │   • Check if ooo_active = true                         │
│    │   • Don't reply to auto-submitted messages             │
│    │   • Create reply with original message quoted          │
│    │   • Send via SES (external) or Port 2525 (internal)    │
│    │                                                         │
│    └─▶ Forwarding                                           │
│        • Get forward addresses from rule                    │
│        • Create forward with FWD: prefix                    │
│        • Preserve attachments                               │
│        • Send via SES (external) or Port 2525 (internal)    │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 7. SMTP Delivery (delivery.py)                              │
│    • Get connection from pool                               │
│    • Send to each recipient (not blocked)                   │
│    • Track success/permanent/temporary failures             │
│    • Return connection to pool                              │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 8. Update S3 Metadata (s3_handler.py)                       │
│    ├─▶ All Blocked: mark_as_blocked() + delete()            │
│    ├─▶ Some Success: mark_as_processed()                    │
│    └─▶ All Invalid: mark_as_all_invalid()                   │
└─────────────────────────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│ 9. Delete from Queue                                        │
│    • Success or permanent failure → delete                  │
│    • Temporary failure → keep in queue (retry)              │
└─────────────────────────────────────────────────────────────┘

🧩 Component Details

AWS Handlers (aws/)

s3_handler.py

  • Purpose: All S3 operations
  • Key Methods:
    • get_email(): Download with retry logic
    • mark_as_processed(): Update metadata on success
    • mark_as_all_invalid(): Update metadata on permanent failure
    • mark_as_blocked(): Set metadata before deletion
    • delete_blocked_email(): Delete after marking

sqs_handler.py

  • Purpose: Queue operations
  • Key Methods:
    • get_queue_url(): Resolve domain to queue
    • receive_messages(): Long poll with attributes
    • delete_message(): Remove after processing
    • get_queue_size(): For metrics

ses_handler.py

  • Purpose: Send emails via SES
  • Key Methods:
    • send_raw_email(): Send raw MIME message

dynamodb_handler.py

  • Purpose: All DynamoDB operations
  • Key Methods:
    • get_email_rules(): OOO and forwarding rules
    • get_bounce_info(): Bounce lookup with retry
    • get_blocked_patterns(): Single recipient
    • batch_get_blocked_patterns(): Multiple recipients (efficient!)

Email Processors (email_processing/)

parser.py

  • Purpose: Email parsing utilities
  • Key Methods:
    • parse_bytes(): Parse raw email
    • extract_body_parts(): Get text/html bodies
    • is_processed_by_worker(): Loop detection

bounce_handler.py

  • Purpose: Bounce detection and rewriting
  • Key Methods:
    • is_ses_bounce_notification(): Detect MAILER-DAEMON
    • apply_bounce_logic(): Rewrite headers

blocklist.py

  • Purpose: Sender blocking with wildcards
  • Key Methods:
    • is_sender_blocked(): Single check
    • batch_check_blocked_senders(): Batch check (preferred!)
  • Wildcard Support: Uses fnmatch for patterns like *@spam.com

rules_processor.py

  • Purpose: OOO and forwarding logic
  • Key Methods:
    • process_rules_for_recipient(): Main entry point
    • _handle_ooo(): Auto-reply logic
    • _handle_forwards(): Forwarding logic
    • _create_ooo_reply(): Build OOO message
    • _create_forward_message(): Build forward with attachments

SMTP Components (smtp/)

pool.py

  • Purpose: Connection pooling
  • Features:
    • Lazy initialization
    • Health checks (NOOP)
    • Auto-reconnect on stale connections
    • Thread-safe queue

delivery.py

  • Purpose: Actual email delivery
  • Features:
    • SMTP or LMTP support
    • Retry logic for connection errors
    • Permanent vs temporary failure detection
    • Connection pool integration

Monitoring (metrics/)

prometheus.py

  • Purpose: Metrics collection
  • Metrics:
    • Counters: processed, bounces, autoreplies, forwards, blocked
    • Gauges: in_flight, queue_size
    • Histograms: processing_time

🔐 Security Features

1. Domain Validation

Each worker only processes messages for its assigned domains:

if recipient_domain.lower() != domain.lower():
    log("Security: Ignored message for wrong domain")
    return True  # Delete from queue

2. Loop Prevention

Detects already-processed emails:

if parsed.get('X-SES-Worker-Processed'):
    log("Loop prevention: Already processed")
    skip_rules = True

3. Blocklist Wildcards

Supports flexible patterns:

blocked_patterns = [
    "*@spam.com",           # Any user at spam.com
    "noreply@*.com",        # noreply at any .com
    "newsletter@example.*"  # newsletter at any example TLD
]

4. Internal vs External Routing

Prevents SES loops for internal forwards:

if is_internal_address(forward_to):
    # Direct SMTP to port 2525 (bypasses transport_maps)
    send_internal_email(...)
else:
    # Send via SES
    ses.send_raw_email(...)

📊 Data Flow Diagrams

Bounce Rewriting Flow

SES Bounce → Worker → DynamoDB Lookup → Header Rewrite → Delivery
             ↓
         Message-ID
             ↓
    ses-outbound-messages
         {MessageId: "abc",
          original_source: "real@sender.com",
          bouncedRecipients: ["failed@domain.com"]}
             ↓
    Rewrite From: mailer-daemon@amazonses.com
                → failed@domain.com

Blocklist Check Flow

Incoming Email → Batch DynamoDB Call → Pattern Matching → Decision
   ↓                     ↓                    ↓              ↓
sender@spam.com    Get patterns for      fnmatch()      Block/Allow
                   all recipients      "*@spam.com"
                                         matches!

Performance Optimizations

1. Batch DynamoDB Calls

# ❌ Old way: N calls for N recipients
for recipient in recipients:
    patterns = dynamodb.get_blocked_patterns(recipient)

# ✅ New way: 1 call for N recipients
patterns_by_recipient = dynamodb.batch_get_blocked_patterns(recipients)

2. Connection Pooling

# ❌ Old way: New connection per email
conn = smtplib.SMTP(host, port)
conn.sendmail(...)
conn.quit()

# ✅ New way: Reuse connections
conn = pool.get_connection()  # Reuses existing
conn.sendmail(...)
pool.return_connection(conn)  # Returns to pool

3. Parallel Domain Processing

Domain 1 Thread ──▶ Process 10 emails/poll
Domain 2 Thread ──▶ Process 10 emails/poll
Domain 3 Thread ──▶ Process 10 emails/poll
                    (All in parallel!)

🔄 Error Handling Strategy

Retry Logic

  • Temporary Errors: Keep in queue, retry (visibility timeout)
  • Permanent Errors: Mark in S3, delete from queue
  • S3 Not Found: Retry up to 5 times (eventual consistency)

Connection Failures

for attempt in range(max_retries):
    try:
        conn.sendmail(...)
        return True
    except SMTPServerDisconnected:
        log("Connection lost, retrying...")
        time.sleep(0.3)
        continue  # Try again

Audit Trail

All actions recorded in S3 metadata:

{
  "processed": "true",
  "processed_at": "1706000000",
  "processed_by": "worker-example.com",
  "status": "delivered",
  "invalid_inboxes": "baduser@example.com",
  "blocked_sender": "spam@bad.com"
}