|
|
||
|---|---|---|
| .. | ||
| ARCHITECTURE.md | ||
| CHANGELOG.md | ||
| COMPATIBILITY.md | ||
| MIGRATION.md | ||
| QUICKSTART.md | ||
| README.md | ||
| SUMMARY.md | ||
README.md
Unified Email Worker (Modular Version)
Multi-domain email processing worker for AWS SES/S3/SQS with bounce handling, auto-replies, forwarding, and sender blocking.
🏗️ Architecture
email-worker/
├── config.py # Configuration management
├── logger.py # Structured logging
├── aws/ # AWS service handlers
│ ├── s3_handler.py # S3 operations (download, metadata)
│ ├── sqs_handler.py # SQS polling
│ ├── ses_handler.py # SES email sending
│ └── dynamodb_handler.py # DynamoDB (rules, bounces, blocklist)
├── email_processing/ # Email processing
│ ├── parser.py # Email parsing utilities
│ ├── bounce_handler.py # Bounce detection & rewriting
│ ├── rules_processor.py # OOO & forwarding logic
│ └── blocklist.py # Sender blocking with wildcards
├── smtp/ # SMTP delivery
│ ├── pool.py # Connection pooling
│ └── delivery.py # SMTP/LMTP delivery with retry
├── metrics/ # Monitoring
│ └── prometheus.py # Prometheus metrics
├── worker.py # Message processing logic
├── domain_poller.py # Domain queue poller
├── unified_worker.py # Main worker coordinator
├── health_server.py # Health check HTTP server
└── main.py # Entry point
✨ Features
- ✅ Multi-Domain Processing: Parallel processing of multiple domains via thread pool
- ✅ Bounce Detection: Automatic SES bounce notification rewriting
- ✅ Auto-Reply/OOO: Out-of-office automatic replies
- ✅ Email Forwarding: Rule-based forwarding to internal/external addresses
- ✅ Sender Blocking: Wildcard-based sender blocklist per recipient
- ✅ SMTP Connection Pooling: Efficient reuse of connections
- ✅ LMTP Support: Direct delivery to Dovecot (bypasses Postfix transport_maps)
- ✅ Prometheus Metrics: Comprehensive monitoring
- ✅ Health Checks: HTTP health endpoint for container orchestration
- ✅ Graceful Shutdown: Proper cleanup on SIGTERM/SIGINT
🔧 Configuration
All configuration via environment variables:
AWS Settings
AWS_REGION=us-east-2
Domains
# Option 1: Comma-separated list
DOMAINS=example.com,another.com
# Option 2: File with one domain per line
DOMAINS_FILE=/etc/email-worker/domains.txt
Worker Settings
WORKER_THREADS=10
POLL_INTERVAL=20 # SQS long polling (seconds)
MAX_MESSAGES=10 # Max messages per poll
VISIBILITY_TIMEOUT=300 # Message visibility timeout (seconds)
SMTP Delivery
SMTP_HOST=localhost
SMTP_PORT=25
SMTP_USE_TLS=false
SMTP_USER=
SMTP_PASS=
SMTP_POOL_SIZE=5
INTERNAL_SMTP_PORT=2525 # Port for internal delivery (bypasses transport_maps)
LMTP (Direct Dovecot Delivery)
LMTP_ENABLED=false # Set to 'true' to use LMTP
LMTP_HOST=localhost
LMTP_PORT=24
DynamoDB Tables
DYNAMODB_RULES_TABLE=email-rules
DYNAMODB_MESSAGES_TABLE=ses-outbound-messages
DYNAMODB_BLOCKED_TABLE=email-blocked-senders
Bounce Handling
BOUNCE_LOOKUP_RETRIES=3
BOUNCE_LOOKUP_DELAY=1.0
Monitoring
METRICS_PORT=8000 # Prometheus metrics
HEALTH_PORT=8080 # Health check endpoint
📊 DynamoDB Schemas
email-rules
{
"email_address": "user@example.com", // Partition Key
"ooo_active": true,
"ooo_message": "I am currently out of office...",
"ooo_content_type": "text", // "text" or "html"
"forwards": ["other@example.com", "external@gmail.com"]
}
ses-outbound-messages
{
"MessageId": "abc123...", // Partition Key (SES Message-ID)
"original_source": "sender@example.com",
"recipients": ["recipient@other.com"],
"timestamp": "2025-01-01T12:00:00Z",
"bounceType": "Permanent",
"bounceSubType": "General",
"bouncedRecipients": ["recipient@other.com"]
}
email-blocked-senders
{
"email_address": "user@example.com", // Partition Key
"blocked_patterns": [
"spam@*.com", // Wildcard support
"noreply@badsite.com",
"*@malicious.org"
]
}
🚀 Usage
Installation
cd email-worker
pip install -r requirements.txt
Run
python3 main.py
Docker
FROM python:3.11-slim
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python3", "main.py"]
📈 Metrics
Available at http://localhost:8000/metrics:
emails_processed_total{domain, status}- Total emails processedemails_in_flight- Currently processing emailsemail_processing_seconds{domain}- Processing time histogramqueue_messages_available{domain}- Queue size gaugebounces_processed_total{domain, type}- Bounce notificationsautoreplies_sent_total{domain}- Auto-replies sentforwards_sent_total{domain}- Forwards sentblocked_senders_total{domain}- Blocked emails
🏥 Health Checks
Available at http://localhost:8080/health:
{
"status": "healthy",
"domains": 5,
"domain_list": ["example.com", "another.com"],
"dynamodb": true,
"features": {
"bounce_rewriting": true,
"auto_reply": true,
"forwarding": true,
"blocklist": true,
"lmtp": false
},
"timestamp": "2025-01-22T10:00:00.000000"
}
🔍 Key Improvements in Modular Version
1. Fixed Critical Bugs
- ✅ Fixed
signal.SIGINTtypo (wassignalIGINT) - ✅ Proper S3 metadata before deletion (audit trail)
- ✅ Batch DynamoDB calls for blocklist (performance)
- ✅ Error handling for S3 delete failures
2. Better Architecture
- Separation of Concerns: Each component has single responsibility
- Testability: Easy to unit test individual components
- Maintainability: Changes isolated to specific modules
- Extensibility: Easy to add new features
3. Performance
- Batch Blocklist Checks: One DynamoDB call for all recipients
- Connection Pooling: Reusable SMTP connections
- Efficient Metrics: Optional Prometheus integration
4. Reliability
- Proper Error Handling: Each component handles its own errors
- Graceful Degradation: Works even if DynamoDB unavailable
- Audit Trail: All actions logged to S3 metadata
🔐 Security Features
- Domain Validation: Workers only process their assigned domains
- Loop Prevention: Detects and skips already-processed emails
- Blocklist Support: Wildcard-based sender blocking
- Internal vs External: Separate handling prevents loops
📝 Example Usage
Enable OOO for user
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('email-rules')
table.put_item(Item={
'email_address': 'john@example.com',
'ooo_active': True,
'ooo_message': 'I am out of office until Feb 1st.',
'ooo_content_type': 'html'
})
Block spam senders
table = dynamodb.Table('email-blocked-senders')
table.put_item(Item={
'email_address': 'john@example.com',
'blocked_patterns': [
'*@spam.com',
'noreply@*.marketing.com',
'newsletter@*'
]
})
Forward emails
table = dynamodb.Table('email-rules')
table.put_item(Item={
'email_address': 'support@example.com',
'forwards': [
'john@example.com',
'jane@example.com',
'external@gmail.com'
]
})
🐛 Troubleshooting
Worker not processing emails
- Check queue URLs:
curl http://localhost:8080/domains - Check logs for SQS errors
- Verify IAM permissions for SQS/S3/SES/DynamoDB
Bounces not rewritten
- Check DynamoDB table name:
DYNAMODB_MESSAGES_TABLE - Verify Lambda function is writing bounce records
- Check logs for DynamoDB lookup errors
Auto-replies not sent
- Verify DynamoDB rules table accessible
- Check
ooo_activeistrue(boolean, not string) - Review logs for SES send errors
Blocked emails still delivered
- Verify blocklist table exists and is accessible
- Check wildcard patterns are lowercase
- Review logs for blocklist check errors
📄 License
MIT License - See LICENSE file for details