382 lines
20 KiB
Markdown
382 lines
20 KiB
Markdown
# Architecture Documentation
|
|
|
|
## 📐 System Overview
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────┐
|
|
│ AWS Cloud Services │
|
|
├─────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
|
│ │ SQS │────▶│ S3 │ │ SES │ │
|
|
│ │ Queues │ │ Buckets │ │ Sending │ │
|
|
│ └──────────┘ └──────────┘ └──────────┘ │
|
|
│ │ │ │ │
|
|
│ │ │ │ │
|
|
│ ┌────▼─────────────────▼─────────────────▼───────────────┐ │
|
|
│ │ DynamoDB Tables │ │
|
|
│ │ • email-rules (OOO, Forwarding) │ │
|
|
│ │ • ses-outbound-messages (Bounce Tracking) │ │
|
|
│ │ • email-blocked-senders (Blocklist) │ │
|
|
│ └─────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────┘
|
|
│
|
|
│ Polling & Processing
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────────┐
|
|
│ Unified Email Worker │
|
|
├─────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────────┐ │
|
|
│ │ Main Thread (unified_worker.py) │ │
|
|
│ │ • Coordination │ │
|
|
│ │ • Status Monitoring │ │
|
|
│ │ • Signal Handling │ │
|
|
│ └────────────┬────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ├──▶ Domain Poller Thread 1 (example.com) │
|
|
│ ├──▶ Domain Poller Thread 2 (another.com) │
|
|
│ ├──▶ Domain Poller Thread 3 (...) │
|
|
│ ├──▶ Health Server Thread (port 8080) │
|
|
│ └──▶ Metrics Server Thread (port 8000) │
|
|
│ │
|
|
│ ┌──────────────────────────────────────────────────────┐ │
|
|
│ │ SMTP Connection Pool │ │
|
|
│ │ • Connection Reuse │ │
|
|
│ │ • Health Checks │ │
|
|
│ │ • Auto-reconnect │ │
|
|
│ └──────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────┘
|
|
│
|
|
│ SMTP/LMTP Delivery
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────────┐
|
|
│ Mail Server (Docker Mailserver) │
|
|
├─────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ Port 25 (SMTP - from pool) │
|
|
│ Port 2525 (SMTP - internal delivery, bypasses transport_maps) │
|
|
│ Port 24 (LMTP - direct to Dovecot, bypasses Postfix) │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## 🔄 Message Flow
|
|
|
|
### 1. Email Reception
|
|
```
|
|
1. SES receives email
|
|
2. SES stores in S3 bucket (domain-emails/)
|
|
3. SES publishes SNS notification
|
|
4. SNS enqueues message to SQS (domain-queue)
|
|
```
|
|
|
|
### 2. Worker Processing
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Domain Poller (domain_poller.py) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 1. Poll SQS Queue (20s long poll) │
|
|
│ • Receive up to 10 messages │
|
|
│ • Extract SES notification from SNS wrapper │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 2. Download from S3 (s3_handler.py) │
|
|
│ • Get raw email bytes │
|
|
│ • Handle retry if not found yet │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 3. Parse Email (parser.py) │
|
|
│ • Parse MIME structure │
|
|
│ • Extract headers, body, attachments │
|
|
│ • Check for loop prevention marker │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 4. Bounce Detection (bounce_handler.py) │
|
|
│ • Check if from mailer-daemon@amazonses.com │
|
|
│ • Lookup original sender in DynamoDB │
|
|
│ • Rewrite From/Reply-To headers │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 5. Blocklist Check (blocklist.py) │
|
|
│ • Batch lookup blocked patterns for all recipients │
|
|
│ • Check sender against wildcard patterns │
|
|
│ • Mark blocked recipients │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 6. Process Rules for Each Recipient (rules_processor.py) │
|
|
│ ├─▶ Auto-Reply (OOO) │
|
|
│ │ • Check if ooo_active = true │
|
|
│ │ • Don't reply to auto-submitted messages │
|
|
│ │ • Create reply with original message quoted │
|
|
│ │ • Send via SES (external) or Port 2525 (internal) │
|
|
│ │ │
|
|
│ └─▶ Forwarding │
|
|
│ • Get forward addresses from rule │
|
|
│ • Create forward with FWD: prefix │
|
|
│ • Preserve attachments │
|
|
│ • Send via SES (external) or Port 2525 (internal) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 7. SMTP Delivery (delivery.py) │
|
|
│ • Get connection from pool │
|
|
│ • Send to each recipient (not blocked) │
|
|
│ • Track success/permanent/temporary failures │
|
|
│ • Return connection to pool │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 8. Update S3 Metadata (s3_handler.py) │
|
|
│ ├─▶ All Blocked: mark_as_blocked() + delete() │
|
|
│ ├─▶ Some Success: mark_as_processed() │
|
|
│ └─▶ All Invalid: mark_as_all_invalid() │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ 9. Delete from Queue │
|
|
│ • Success or permanent failure → delete │
|
|
│ • Temporary failure → keep in queue (retry) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## 🧩 Component Details
|
|
|
|
### AWS Handlers (`aws/`)
|
|
|
|
#### `s3_handler.py`
|
|
- **Purpose**: All S3 operations
|
|
- **Key Methods**:
|
|
- `get_email()`: Download with retry logic
|
|
- `mark_as_processed()`: Update metadata on success
|
|
- `mark_as_all_invalid()`: Update metadata on permanent failure
|
|
- `mark_as_blocked()`: Set metadata before deletion
|
|
- `delete_blocked_email()`: Delete after marking
|
|
|
|
#### `sqs_handler.py`
|
|
- **Purpose**: Queue operations
|
|
- **Key Methods**:
|
|
- `get_queue_url()`: Resolve domain to queue
|
|
- `receive_messages()`: Long poll with attributes
|
|
- `delete_message()`: Remove after processing
|
|
- `get_queue_size()`: For metrics
|
|
|
|
#### `ses_handler.py`
|
|
- **Purpose**: Send emails via SES
|
|
- **Key Methods**:
|
|
- `send_raw_email()`: Send raw MIME message
|
|
|
|
#### `dynamodb_handler.py`
|
|
- **Purpose**: All DynamoDB operations
|
|
- **Key Methods**:
|
|
- `get_email_rules()`: OOO and forwarding rules
|
|
- `get_bounce_info()`: Bounce lookup with retry
|
|
- `get_blocked_patterns()`: Single recipient
|
|
- `batch_get_blocked_patterns()`: Multiple recipients (efficient!)
|
|
|
|
### Email Processors (`email_processing/`)
|
|
|
|
#### `parser.py`
|
|
- **Purpose**: Email parsing utilities
|
|
- **Key Methods**:
|
|
- `parse_bytes()`: Parse raw email
|
|
- `extract_body_parts()`: Get text/html bodies
|
|
- `is_processed_by_worker()`: Loop detection
|
|
|
|
#### `bounce_handler.py`
|
|
- **Purpose**: Bounce detection and rewriting
|
|
- **Key Methods**:
|
|
- `is_ses_bounce_notification()`: Detect MAILER-DAEMON
|
|
- `apply_bounce_logic()`: Rewrite headers
|
|
|
|
#### `blocklist.py`
|
|
- **Purpose**: Sender blocking with wildcards
|
|
- **Key Methods**:
|
|
- `is_sender_blocked()`: Single check
|
|
- `batch_check_blocked_senders()`: Batch check (preferred!)
|
|
- **Wildcard Support**: Uses `fnmatch` for patterns like `*@spam.com`
|
|
|
|
#### `rules_processor.py`
|
|
- **Purpose**: OOO and forwarding logic
|
|
- **Key Methods**:
|
|
- `process_rules_for_recipient()`: Main entry point
|
|
- `_handle_ooo()`: Auto-reply logic
|
|
- `_handle_forwards()`: Forwarding logic
|
|
- `_create_ooo_reply()`: Build OOO message
|
|
- `_create_forward_message()`: Build forward with attachments
|
|
|
|
### SMTP Components (`smtp/`)
|
|
|
|
#### `pool.py`
|
|
- **Purpose**: Connection pooling
|
|
- **Features**:
|
|
- Lazy initialization
|
|
- Health checks (NOOP)
|
|
- Auto-reconnect on stale connections
|
|
- Thread-safe queue
|
|
|
|
#### `delivery.py`
|
|
- **Purpose**: Actual email delivery
|
|
- **Features**:
|
|
- SMTP or LMTP support
|
|
- Retry logic for connection errors
|
|
- Permanent vs temporary failure detection
|
|
- Connection pool integration
|
|
|
|
### Monitoring (`metrics/`)
|
|
|
|
#### `prometheus.py`
|
|
- **Purpose**: Metrics collection
|
|
- **Metrics**:
|
|
- Counters: processed, bounces, autoreplies, forwards, blocked
|
|
- Gauges: in_flight, queue_size
|
|
- Histograms: processing_time
|
|
|
|
## 🔐 Security Features
|
|
|
|
### 1. Domain Validation
|
|
Each worker only processes messages for its assigned domains:
|
|
```python
|
|
if recipient_domain.lower() != domain.lower():
|
|
log("Security: Ignored message for wrong domain")
|
|
return True # Delete from queue
|
|
```
|
|
|
|
### 2. Loop Prevention
|
|
Detects already-processed emails:
|
|
```python
|
|
if parsed.get('X-SES-Worker-Processed'):
|
|
log("Loop prevention: Already processed")
|
|
skip_rules = True
|
|
```
|
|
|
|
### 3. Blocklist Wildcards
|
|
Supports flexible patterns:
|
|
```python
|
|
blocked_patterns = [
|
|
"*@spam.com", # Any user at spam.com
|
|
"noreply@*.com", # noreply at any .com
|
|
"newsletter@example.*" # newsletter at any example TLD
|
|
]
|
|
```
|
|
|
|
### 4. Internal vs External Routing
|
|
Prevents SES loops for internal forwards:
|
|
```python
|
|
if is_internal_address(forward_to):
|
|
# Direct SMTP to port 2525 (bypasses transport_maps)
|
|
send_internal_email(...)
|
|
else:
|
|
# Send via SES
|
|
ses.send_raw_email(...)
|
|
```
|
|
|
|
## 📊 Data Flow Diagrams
|
|
|
|
### Bounce Rewriting Flow
|
|
```
|
|
SES Bounce → Worker → DynamoDB Lookup → Header Rewrite → Delivery
|
|
↓
|
|
Message-ID
|
|
↓
|
|
ses-outbound-messages
|
|
{MessageId: "abc",
|
|
original_source: "real@sender.com",
|
|
bouncedRecipients: ["failed@domain.com"]}
|
|
↓
|
|
Rewrite From: mailer-daemon@amazonses.com
|
|
→ failed@domain.com
|
|
```
|
|
|
|
### Blocklist Check Flow
|
|
```
|
|
Incoming Email → Batch DynamoDB Call → Pattern Matching → Decision
|
|
↓ ↓ ↓ ↓
|
|
sender@spam.com Get patterns for fnmatch() Block/Allow
|
|
all recipients "*@spam.com"
|
|
matches!
|
|
```
|
|
|
|
## ⚡ Performance Optimizations
|
|
|
|
### 1. Batch DynamoDB Calls
|
|
```python
|
|
# ❌ Old way: N calls for N recipients
|
|
for recipient in recipients:
|
|
patterns = dynamodb.get_blocked_patterns(recipient)
|
|
|
|
# ✅ New way: 1 call for N recipients
|
|
patterns_by_recipient = dynamodb.batch_get_blocked_patterns(recipients)
|
|
```
|
|
|
|
### 2. Connection Pooling
|
|
```python
|
|
# ❌ Old way: New connection per email
|
|
conn = smtplib.SMTP(host, port)
|
|
conn.sendmail(...)
|
|
conn.quit()
|
|
|
|
# ✅ New way: Reuse connections
|
|
conn = pool.get_connection() # Reuses existing
|
|
conn.sendmail(...)
|
|
pool.return_connection(conn) # Returns to pool
|
|
```
|
|
|
|
### 3. Parallel Domain Processing
|
|
```
|
|
Domain 1 Thread ──▶ Process 10 emails/poll
|
|
Domain 2 Thread ──▶ Process 10 emails/poll
|
|
Domain 3 Thread ──▶ Process 10 emails/poll
|
|
(All in parallel!)
|
|
```
|
|
|
|
## 🔄 Error Handling Strategy
|
|
|
|
### Retry Logic
|
|
- **Temporary Errors**: Keep in queue, retry (visibility timeout)
|
|
- **Permanent Errors**: Mark in S3, delete from queue
|
|
- **S3 Not Found**: Retry up to 5 times (eventual consistency)
|
|
|
|
### Connection Failures
|
|
```python
|
|
for attempt in range(max_retries):
|
|
try:
|
|
conn.sendmail(...)
|
|
return True
|
|
except SMTPServerDisconnected:
|
|
log("Connection lost, retrying...")
|
|
time.sleep(0.3)
|
|
continue # Try again
|
|
```
|
|
|
|
### Audit Trail
|
|
All actions recorded in S3 metadata:
|
|
```json
|
|
{
|
|
"processed": "true",
|
|
"processed_at": "1706000000",
|
|
"processed_by": "worker-example.com",
|
|
"status": "delivered",
|
|
"invalid_inboxes": "baduser@example.com",
|
|
"blocked_sender": "spam@bad.com"
|
|
}
|
|
```
|