commit dbfb260b173bde2530a48a303bd71605af7ac2e0 Author: Andreas Knuth Date: Sun Oct 12 17:43:15 2025 -0500 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/Deployment Commands b/Deployment Commands new file mode 100644 index 0000000..52e6858 --- /dev/null +++ b/Deployment Commands @@ -0,0 +1,20 @@ +# Worker bauen +docker-compose build + +# Worker starten +docker-compose up -d + +# Logs ansehen +docker-compose logs -f + +# Logs nur für eine Domain +docker-compose logs -f worker-andreasknuth + +# Status prüfen +docker-compose ps + +# Worker neu starten +docker-compose restart + +# Worker stoppen +docker-compose down \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..62fda1c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +# Metadata +LABEL maintainer="your-email@example.com" +LABEL description="Domain-specific email worker for SMTP delivery" + +# Non-root user für Security +RUN useradd -m -u 1000 worker && \ + mkdir -p /app && \ + chown -R worker:worker /app + +# Boto3 installieren +RUN pip install --no-cache-dir boto3 + +# Worker Code +COPY --chown=worker:worker worker.py /app/worker.py + +WORKDIR /app +USER worker + +# Healthcheck +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD pgrep -f worker.py || exit 1 + +# Start worker mit unbuffered output +CMD ["python", "-u", "worker.py"] \ No newline at end of file diff --git a/Lambda IAM Policy b/Lambda IAM Policy new file mode 100644 index 0000000..5ec15a3 --- /dev/null +++ b/Lambda IAM Policy @@ -0,0 +1,35 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:HeadObject", + "s3:ListBucket", + "s3:CopyObject" + ], + "Resource": [ + "arn:aws:s3:::*-emails", + "arn:aws:s3:::*-emails/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueUrl" + ], + "Resource": "arn:aws:sqs:eu-central-1:123456789:*-queue" + }, + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:::*" + } + ] +} diff --git a/TESTS b/TESTS new file mode 100644 index 0000000..ff9d340 --- /dev/null +++ b/TESTS @@ -0,0 +1,13 @@ +# Via AWS SES CLI +aws ses send-email \ + --from "sender@example.com" \ + --destination "ToAddresses=test@andreasknuth.de" \ + --message "Subject={Data='Test',Charset=utf-8},Body={Text={Data='Test message',Charset=utf-8}}" \ + --region us-east-2 + +# Mail an mehrere Domains +aws ses send-email \ + --from "sender@example.com" \ + --destination "ToAddresses=test@andreasknuth.de,test@bizmatch.net" \ + --message "Subject={Data='Multi-Domain Test',Charset=utf-8},Body={Text={Data='Testing multiple domains',Charset=utf-8}}" \ + --region us-east-2 \ No newline at end of file diff --git a/check-dlq.py b/check-dlq.py new file mode 100644 index 0000000..f54c817 --- /dev/null +++ b/check-dlq.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# check-dlq.py +""" +Überprüft Dead Letter Queues und zeigt fehlgeschlagene E-Mails +""" + +import boto3 +import json +from datetime import datetime + +sqs = boto3.client('sqs', region_name='eu-central-1') +s3 = boto3.client('s3', region_name='eu-central-1') + +DOMAINS = ['andreasknuth.de', 'bizmatch.net'] + +def check_dlq_for_domain(domain): + """Überprüft DLQ für eine Domain""" + dlq_name = domain.replace('.', '-') + '-queue-dlq' + + try: + dlq_url = sqs.get_queue_url(QueueName=dlq_name)['QueueUrl'] + except: + print(f"⚠️ DLQ not found for {domain}") + return + + # DLQ Stats + attrs = sqs.get_queue_attributes( + QueueUrl=dlq_url, + AttributeNames=['ApproximateNumberOfMessages'] + )['Attributes'] + + count = int(attrs.get('ApproximateNumberOfMessages', 0)) + + if count == 0: + print(f"✅ {domain}: No failed messages") + return + + print(f"\n{'='*70}") + print(f"⚠️ {domain}: {count} failed message(s)") + print(f"{'='*70}\n") + + # Messages holen + response = sqs.receive_message( + QueueUrl=dlq_url, + MaxNumberOfMessages=10, + WaitTimeSeconds=0, + AttributeNames=['All'] + ) + + messages = response.get('Messages', []) + + for i, msg in enumerate(messages, 1): + try: + body = json.loads(msg['Body']) + + print(f"{i}. Failed Message:") + print(f" MessageId: {body.get('message_id', 'unknown')}") + print(f" From: {body.get('from', 'unknown')}") + print(f" To: {body.get('recipient', 'unknown')}") + print(f" Subject: {body.get('subject', 'unknown')}") + + # S3 Metadata für Fehlerdetails + bucket = body.get('bucket') + key = body.get('key') + + if bucket and key: + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) + + error = metadata.get('error', 'Unknown error') + failed_at = metadata.get('failed_at', 'unknown') + + if failed_at != 'unknown': + failed_dt = datetime.fromtimestamp(int(failed_at)) + print(f" Failed at: {failed_dt}") + + print(f" Error: {error}") + + except: + print(f" (Could not retrieve error details)") + + print() + + except Exception as e: + print(f" Error parsing message: {e}\n") + + +def main(): + print(f"\n{'='*70}") + print(f"Dead Letter Queue Check - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"{'='*70}") + + for domain in DOMAINS: + check_dlq_for_domain(domain) + + print(f"\n{'='*70}") + print("Options:") + print(" - Fix SMTP server issues") + print(" - Re-queue: python requeue-dlq.py ") + print(" - Delete: python purge-dlq.py ") + print(f"{'='*70}\n") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/create-all-queues.sh b/create-all-queues.sh new file mode 100644 index 0000000..643a521 --- /dev/null +++ b/create-all-queues.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# create-queue.sh +# Usage: DOMAIN=andreasknuth.de ./create-queue.sh + +set -e + +AWS_REGION="us-east-2" + +# Domain aus Environment Variable +if [ -z "$DOMAIN" ]; then + echo "Error: DOMAIN environment variable not set" + echo "Usage: DOMAIN=andreasknuth.de $0" + exit 1 +fi + +QUEUE_NAME="${DOMAIN//./-}-queue" +DLQ_NAME="${QUEUE_NAME}-dlq" + +echo "========================================" +echo "Creating SQS Queue for Email Delivery" +echo "========================================" +echo "" +echo "📧 Domain: $DOMAIN" +echo " Region: $AWS_REGION" +echo "" + +# Dead Letter Queue erstellen +echo "Creating DLQ: $DLQ_NAME" +DLQ_URL=$(aws sqs create-queue \ + --queue-name "${DLQ_NAME}" \ + --region "${AWS_REGION}" \ + --attributes '{ + "MessageRetentionPeriod": "1209600" + }' \ + --query 'QueueUrl' \ + --output text 2>/dev/null || aws sqs get-queue-url --queue-name "${DLQ_NAME}" --region "${AWS_REGION}" --query 'QueueUrl' --output text) + +echo " ✓ DLQ URL: ${DLQ_URL}" + +# DLQ ARN ermitteln +DLQ_ARN=$(aws sqs get-queue-attributes \ + --queue-url "${DLQ_URL}" \ + --region "${AWS_REGION}" \ + --attribute-names QueueArn \ + --query 'Attributes.QueueArn' \ + --output text) + +echo " ✓ DLQ ARN: ${DLQ_ARN}" +echo "" + +# Haupt-Queue erstellen mit Redrive Policy +echo "Creating Main Queue: $QUEUE_NAME" +QUEUE_URL=$(aws sqs create-queue \ + --queue-name "${QUEUE_NAME}" \ + --region "${AWS_REGION}" \ + --attributes "{ + \"VisibilityTimeout\": \"300\", + \"MessageRetentionPeriod\": \"86400\", + \"ReceiveMessageWaitTimeSeconds\": \"20\", + \"RedrivePolicy\": \"{\\\"deadLetterTargetArn\\\":\\\"${DLQ_ARN}\\\",\\\"maxReceiveCount\\\":\\\"3\\\"}\" + }" \ + --query 'QueueUrl' \ + --output text 2>/dev/null || aws sqs get-queue-url --queue-name "${QUEUE_NAME}" --region "${AWS_REGION}" --query 'QueueUrl' --output text) + +echo " ✓ Queue URL: ${QUEUE_URL}" +echo "" +echo "========================================" +echo "✅ Queue created successfully!" +echo "========================================" +echo "" +echo "Configuration:" +echo " Domain: $DOMAIN" +echo " Queue: $QUEUE_NAME" +echo " Queue URL: $QUEUE_URL" +echo " DLQ: $DLQ_NAME" +echo " Region: $AWS_REGION" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..427540b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,89 @@ +version: '3.8' + +services: + # Worker für andreasknuth.de + worker-andreasknuth: + build: . + container_name: worker-andreasknuth-de + restart: unless-stopped + network_mode: host # Zugriff auf lokales Netzwerk für Postfix + environment: + # AWS Credentials + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-central-1 + + # Worker Identity + - WORKER_NAME=worker-andreasknuth + - WORKER_DOMAIN=andreasknuth.de + + # SQS Queue (domain-spezifisch!) + - SQS_QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/123456789/andreasknuth-de-queue + + # Worker Settings + - POLL_INTERVAL=20 + - MAX_MESSAGES=10 + - VISIBILITY_TIMEOUT=300 + + # SMTP Configuration + - SMTP_HOST=192.168.1.10 + - SMTP_PORT=25 + - SMTP_USE_TLS=false + # Optional: SMTP Auth + # - SMTP_USER=username + # - SMTP_PASS=password + + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "5" + + healthcheck: + test: ["CMD", "pgrep", "-f", "worker.py"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + + # Worker für bizmatch.net (auf demselben Server!) + worker-bizmatch: + build: . + container_name: worker-bizmatch-net + restart: unless-stopped + network_mode: host + environment: + # AWS Credentials (gleich wie oben) + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-central-1 + + # Worker Identity (unterschiedlich!) + - WORKER_NAME=worker-bizmatch + - WORKER_DOMAIN=bizmatch.net + + # SQS Queue (unterschiedlich!) + - SQS_QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/123456789/bizmatch-net-queue + + # Worker Settings + - POLL_INTERVAL=20 + - MAX_MESSAGES=10 + - VISIBILITY_TIMEOUT=300 + + # SMTP Configuration (gleicher Server!) + - SMTP_HOST=192.168.1.10 + - SMTP_PORT=25 + - SMTP_USE_TLS=false + + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "5" + + healthcheck: + test: ["CMD", "pgrep", "-f", "worker.py"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s \ No newline at end of file diff --git a/get-queue-urls.sh b/get-queue-urls.sh new file mode 100644 index 0000000..76d33b5 --- /dev/null +++ b/get-queue-urls.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# get-queue-urls.sh + +aws sqs list-queues --region us-east-2 --query 'QueueUrls[?contains(@, `-queue`) && !contains(@, `-dlq`)]' --output table \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py new file mode 100644 index 0000000..ab5e244 --- /dev/null +++ b/lambda_function.py @@ -0,0 +1,387 @@ +import os +import boto3 +import json +import time +from email.parser import BytesParser +from email.policy import SMTP as SMTPPolicy + +s3 = boto3.client('s3') +sqs = boto3.client('sqs', region_name='us-east-2') + +# AWS Region +AWS_REGION = 'us-east-2' + +# Metadata Keys +PROCESSED_KEY = 'processed' +PROCESSED_VALUE = 'true' + + +def domain_to_bucket(domain: str) -> str: + """Konvertiert Domain zu S3 Bucket Namen""" + return domain.replace('.', '-') + '-emails' + + +def domain_to_queue_name(domain: str) -> str: + """Konvertiert Domain zu SQS Queue Namen""" + return domain.replace('.', '-') + '-queue' + + +def get_queue_url_for_domain(domain: str) -> str: + """ + Ermittelt SQS Queue URL für Domain + Queue Name: domain-mit-bindestrichen-queue + """ + queue_name = domain_to_queue_name(domain) + + try: + response = sqs.get_queue_url(QueueName=queue_name) + queue_url = response['QueueUrl'] + print(f"✓ Found queue: {queue_name}") + return queue_url + + except sqs.exceptions.QueueDoesNotExist: + raise Exception( + f"Queue does not exist: {queue_name} " + f"(for domain: {domain})" + ) + except Exception as e: + raise Exception(f"Error getting queue URL for {domain}: {e}") + + +def is_already_processed(bucket: str, key: str) -> bool: + """Prüft ob E-Mail bereits verarbeitet wurde""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + if metadata.get(PROCESSED_KEY) == PROCESSED_VALUE: + processed_at = metadata.get('processed_at', 'unknown') + print(f"✓ Already processed at {processed_at}") + return True + + return False + + except s3.exceptions.NoSuchKey: + print(f"⚠ Object {key} not found in {bucket}") + return True # Wenn nicht existiert, als verarbeitet betrachten + + except Exception as e: + print(f"⚠ Error checking processed status: {e}") + return False + + +def set_processing_lock(bucket: str, key: str) -> bool: + """ + Setzt Processing Lock um Duplicate Processing zu verhindern + Returns: True wenn Lock erfolgreich gesetzt, False wenn bereits locked + """ + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + # Prüfe auf existierenden Lock + processing_started = metadata.get('processing_started') + if processing_started: + lock_age = time.time() - float(processing_started) + + if lock_age < 300: # 5 Minuten Lock + print(f"⚠ Processing lock active (age: {lock_age:.0f}s)") + return False + else: + print(f"⚠ Stale lock detected ({lock_age:.0f}s old), overriding") + + # Setze neuen Lock + new_meta = metadata.copy() + new_meta['processing_started'] = str(int(time.time())) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=new_meta, + MetadataDirective='REPLACE' + ) + + print(f"✓ Processing lock set") + return True + + except Exception as e: + print(f"⚠ Error setting processing lock: {e}") + return True # Bei Fehler trotzdem verarbeiten (besser als Mail verlieren) + + +def mark_as_queued(bucket: str, key: str, queue_name: str): + """Markiert E-Mail als in Queue eingereiht""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['queued_at'] = str(int(time.time())) + metadata['queued_to'] = queue_name + metadata['status'] = 'queued' + metadata.pop('processing_started', None) # Lock entfernen + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + print(f"✓ Marked as queued to {queue_name}") + + except Exception as e: + print(f"⚠ Failed to mark as queued: {e}") + + +def send_to_queue(queue_url: str, bucket: str, key: str, + from_addr: str, recipient: str, domain: str, + subject: str, message_id: str): + """ + Sendet E-Mail-Job in domain-spezifische SQS Queue + """ + + # Queue Name aus URL extrahieren (für Logging) + queue_name = queue_url.split('/')[-1] + + message = { + 'bucket': bucket, + 'key': key, + 'from': from_addr, + 'recipient': recipient, # Nur 1 Empfänger + 'domain': domain, + 'subject': subject, + 'message_id': message_id, + 'timestamp': int(time.time()) + } + + try: + response = sqs.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps(message, ensure_ascii=False), + MessageAttributes={ + 'domain': { + 'StringValue': domain, + 'DataType': 'String' + }, + 'bucket': { + 'StringValue': bucket, + 'DataType': 'String' + }, + 'recipient': { + 'StringValue': recipient, + 'DataType': 'String' + }, + 'message_id': { + 'StringValue': message_id, + 'DataType': 'String' + } + } + ) + + sqs_message_id = response['MessageId'] + print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}") + + # Als queued markieren + mark_as_queued(bucket, key, queue_name) + + return sqs_message_id + + except Exception as e: + print(f"✗ Failed to queue message: {e}") + raise + + +def lambda_handler(event, context): + """ + Lambda Handler für SES Events + WICHTIG: SES ruft Lambda einmal PRO Empfänger auf! + Jedes Event hat genau 1 Empfänger in receipt.recipients + """ + + print(f"{'='*70}") + print(f"Lambda invoked: {context.request_id}") + print(f"Region: {AWS_REGION}") + print(f"{'='*70}") + + # SES Event parsen + try: + record = event['Records'][0] + ses = record['ses'] + except (KeyError, IndexError) as e: + print(f"✗ Invalid event structure: {e}") + return { + 'statusCode': 400, + 'body': json.dumps({'error': 'Invalid SES event'}) + } + + mail = ses['mail'] + receipt = ses['receipt'] + + message_id = mail['messageId'] + source = mail['source'] + timestamp = mail.get('timestamp', '') + + # ✨ WICHTIG: receipt.recipients enthält NUR den Empfänger für DIESES Event + # (NICHT mail.destination verwenden - das hat alle Original-Empfänger) + recipients = receipt.get('recipients', []) + + if not recipients or len(recipients) != 1: + print(f"✗ Unexpected recipients count: {len(recipients)}") + return { + 'statusCode': 400, + 'body': json.dumps({ + 'error': 'Expected exactly 1 recipient', + 'found': len(recipients) + }) + } + + # SES garantiert: genau 1 Empfänger pro Event + recipient = recipients[0] + domain = recipient.split('@')[1] + bucket = domain_to_bucket(domain) + + print(f"\n📧 Email Event:") + print(f" MessageId: {message_id}") + print(f" From: {source}") + print(f" To: {recipient}") + print(f" Domain: {domain}") + print(f" Bucket: {bucket}") + print(f" Timestamp: {timestamp}") + + # Queue für Domain ermitteln + try: + queue_url = get_queue_url_for_domain(domain) + queue_name = queue_url.split('/')[-1] + print(f" Queue: {queue_name}") + except Exception as e: + print(f"\n✗ ERROR: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({ + 'error': 'queue_not_configured', + 'domain': domain, + 'recipient': recipient, + 'message': str(e) + }) + } + + # S3 Object finden + try: + print(f"\n📦 Searching S3...") + response = s3.list_objects_v2( + Bucket=bucket, + Prefix=message_id, + MaxKeys=1 + ) + + if 'Contents' not in response or not response['Contents']: + raise Exception(f"No S3 object found for message {message_id}") + + key = response['Contents'][0]['Key'] + size = response['Contents'][0]['Size'] + print(f" Found: s3://{bucket}/{key}") + print(f" Size: {size:,} bytes ({size/1024:.1f} KB)") + + except Exception as e: + print(f"\n✗ S3 ERROR: {e}") + return { + 'statusCode': 404, + 'body': json.dumps({ + 'error': 's3_object_not_found', + 'message_id': message_id, + 'bucket': bucket, + 'details': str(e) + }) + } + + # Duplicate Check + print(f"\n🔍 Checking for duplicates...") + if is_already_processed(bucket, key): + print(f" Already processed, skipping") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'already_processed', + 'message_id': message_id, + 'recipient': recipient + }) + } + + # Processing Lock setzen + print(f"\n🔒 Setting processing lock...") + if not set_processing_lock(bucket, key): + print(f" Already being processed by another instance") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'already_processing', + 'message_id': message_id, + 'recipient': recipient + }) + } + + # E-Mail laden um Subject zu extrahieren (optional, für besseres Logging) + subject = '(unknown)' + try: + print(f"\n📖 Reading email for metadata...") + obj = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = obj['Body'].read() + + # Nur Headers parsen (schneller) + parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes) + subject = parsed.get('subject', '(no subject)') + + print(f" Subject: {subject}") + + except Exception as e: + print(f" ⚠ Could not parse email (continuing): {e}") + + # In domain-spezifische Queue einreihen + try: + print(f"\n📤 Queuing to {queue_name}...") + + sqs_message_id = send_to_queue( + queue_url=queue_url, + bucket=bucket, + key=key, + from_addr=source, + recipient=recipient, # Nur 1 Empfänger + domain=domain, + subject=subject, + message_id=message_id + ) + + print(f"\n{'='*70}") + print(f"✅ SUCCESS - Email queued for delivery") + print(f"{'='*70}\n") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'queued', + 'message_id': message_id, + 'sqs_message_id': sqs_message_id, + 'queue': queue_name, + 'domain': domain, + 'recipient': recipient, + 'subject': subject + }) + } + + except Exception as e: + print(f"\n{'='*70}") + print(f"✗ FAILED TO QUEUE") + print(f"{'='*70}") + print(f"Error: {e}") + + return { + 'statusCode': 500, + 'body': json.dumps({ + 'error': 'failed_to_queue', + 'message': str(e), + 'message_id': message_id, + 'recipient': recipient + }) + } \ No newline at end of file diff --git a/monitor-queues.py b/monitor-queues.py new file mode 100644 index 0000000..5224f25 --- /dev/null +++ b/monitor-queues.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# monitor-queues.py +""" +Überwacht alle Email-Queues und zeigt Statistiken +""" + +import boto3 +import json +from datetime import datetime + +sqs = boto3.client('sqs', region_name='eu-central-1') + +DOMAINS = ['andreasknuth.de', 'bizmatch.net'] + +def get_queue_stats(domain): + """Zeigt Queue-Statistiken für eine Domain""" + queue_name = domain.replace('.', '-') + '-queue' + dlq_name = queue_name + '-dlq' + + try: + # Main Queue URL + queue_url = sqs.get_queue_url(QueueName=queue_name)['QueueUrl'] + + # Queue Attributes + attrs = sqs.get_queue_attributes( + QueueUrl=queue_url, + AttributeNames=['All'] + )['Attributes'] + + # DLQ URL + dlq_url = sqs.get_queue_url(QueueName=dlq_name)['QueueUrl'] + + # DLQ Attributes + dlq_attrs = sqs.get_queue_attributes( + QueueUrl=dlq_url, + AttributeNames=['ApproximateNumberOfMessages'] + )['Attributes'] + + return { + 'domain': domain, + 'queue': { + 'available': int(attrs.get('ApproximateNumberOfMessages', 0)), + 'in_flight': int(attrs.get('ApproximateNumberOfMessagesNotVisible', 0)), + 'oldest_age': int(attrs.get('ApproximateAgeOfOldestMessage', 0)) + }, + 'dlq': { + 'count': int(dlq_attrs.get('ApproximateNumberOfMessages', 0)) + } + } + except Exception as e: + return { + 'domain': domain, + 'error': str(e) + } + + +def main(): + print(f"\n{'='*70}") + print(f"Email Queue Monitoring - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"{'='*70}\n") + + total_available = 0 + total_in_flight = 0 + total_dlq = 0 + + for domain in DOMAINS: + stats = get_queue_stats(domain) + + if 'error' in stats: + print(f"❌ {domain}: {stats['error']}") + continue + + queue = stats['queue'] + dlq = stats['dlq'] + + total_available += queue['available'] + total_in_flight += queue['in_flight'] + total_dlq += dlq['count'] + + status = "✅" if dlq['count'] == 0 else "⚠️" + + print(f"{status} {domain}") + print(f" Available: {queue['available']:>5} messages") + print(f" In Flight: {queue['in_flight']:>5} messages") + print(f" Oldest Age: {queue['oldest_age']:>5}s") + print(f" DLQ: {dlq['count']:>5} messages") + + if dlq['count'] > 0: + print(f" ⚠️ WARNING: {dlq['count']} failed message(s) in DLQ!") + + print() + + print(f"{'='*70}") + print(f"TOTALS:") + print(f" Available: {total_available} messages") + print(f" In Flight: {total_in_flight} messages") + print(f" Failed: {total_dlq} messages") + print(f"{'='*70}\n") + + if total_dlq > 0: + print(f"⚠️ Action required: {total_dlq} message(s) in Dead Letter Queues!") + print(f" Run: python check-dlq.py to investigate\n") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requeue-dlq.py b/requeue-dlq.py new file mode 100644 index 0000000..487504c --- /dev/null +++ b/requeue-dlq.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# requeue-dlq.py +""" +Verschiebt Messages aus DLQ zurück in Main Queue +""" + +import sys +import boto3 + +sqs = boto3.client('sqs', region_name='eu-central-1') + +def requeue_dlq(domain, max_messages=10): + """Verschiebt Messages aus DLQ zurück in Main Queue""" + + queue_name = domain.replace('.', '-') + '-queue' + dlq_name = queue_name + '-dlq' + + try: + queue_url = sqs.get_queue_url(QueueName=queue_name)['QueueUrl'] + dlq_url = sqs.get_queue_url(QueueName=dlq_name)['QueueUrl'] + except Exception as e: + print(f"❌ Error: {e}") + return + + print(f"Re-queuing up to {max_messages} messages from DLQ to main queue...") + print(f"Domain: {domain}") + print(f"From: {dlq_name}") + print(f"To: {queue_name}\n") + + confirm = input("Continue? (yes/no): ") + if confirm.lower() != 'yes': + print("Cancelled.") + return + + # Messages aus DLQ holen + response = sqs.receive_message( + QueueUrl=dlq_url, + MaxNumberOfMessages=max_messages, + WaitTimeSeconds=0 + ) + + messages = response.get('Messages', []) + + if not messages: + print("No messages in DLQ.") + return + + print(f"\nRe-queuing {len(messages)} message(s)...\n") + + for msg in messages: + # In Haupt-Queue schreiben + sqs.send_message( + QueueUrl=queue_url, + MessageBody=msg['Body'] + ) + + # Aus DLQ löschen + sqs.delete_message( + QueueUrl=dlq_url, + ReceiptHandle=msg['ReceiptHandle'] + ) + + print(f" ✓ Re-queued message {msg['MessageId']}") + + print(f"\n✅ Done! {len(messages)} message(s) re-queued to {queue_name}") + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print("Usage: python requeue-dlq.py ") + print("Example: python requeue-dlq.py andreasknuth.de") + sys.exit(1) + + domain = sys.argv[1] + requeue_dlq(domain) \ No newline at end of file diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..38c1806 --- /dev/null +++ b/worker.py @@ -0,0 +1,394 @@ +# worker.py +import os +import sys +import boto3 +import smtplib +import json +import time +import traceback +import signal +from email.parser import BytesParser +from email.policy import SMTP as SMTPPolicy +from datetime import datetime + +# AWS Clients +AWS_REGION = os.environ.get('AWS_REGION', 'eu-central-1') +s3 = boto3.client('s3', region_name=AWS_REGION) +sqs = boto3.client('sqs', region_name=AWS_REGION) + +# ✨ Worker Configuration (domain-spezifisch) +WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de' +WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}') +QUEUE_URL = os.environ.get('SQS_QUEUE_URL') + +# Worker Settings +POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20')) +MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10')) +VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300')) + +# SMTP Configuration (einfach, da nur 1 Domain pro Worker) +SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost') +SMTP_PORT = int(os.environ.get('SMTP_PORT', '25')) +SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true' +SMTP_USER = os.environ.get('SMTP_USER') +SMTP_PASS = os.environ.get('SMTP_PASS') + +# Graceful shutdown +shutdown_requested = False + + +def signal_handler(signum, frame): + global shutdown_requested + print(f"\n⚠ Shutdown signal received (signal {signum})") + shutdown_requested = True + + +signal.signal(signal.SIGTERM, signal_handler) +signal.signal(signal.SIGINT, signal_handler) + + +def log(message: str, level: str = 'INFO'): + """Structured logging with timestamp""" + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True) + + +def mark_as_processed(bucket: str, key: str): + """Markiert E-Mail als erfolgreich zugestellt""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['processed'] = 'true' + metadata['processed_at'] = str(int(time.time())) + metadata['processed_by'] = WORKER_NAME + metadata['status'] = 'delivered' + metadata.pop('processing_started', None) + metadata.pop('queued_at', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS') + + except Exception as e: + log(f"Failed to mark as processed: {e}", 'WARNING') + + +def mark_as_failed(bucket: str, key: str, error: str, receive_count: int): + """Markiert E-Mail als fehlgeschlagen""" + try: + head = s3.head_object(Bucket=bucket, Key=key) + metadata = head.get('Metadata', {}) or {} + + metadata['status'] = 'failed' + metadata['failed_at'] = str(int(time.time())) + metadata['failed_by'] = WORKER_NAME + metadata['error'] = error[:500] # S3 Metadata limit + metadata['retry_count'] = str(receive_count) + metadata.pop('processing_started', None) + + s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={'Bucket': bucket, 'Key': key}, + Metadata=metadata, + MetadataDirective='REPLACE' + ) + + log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR') + + except Exception as e: + log(f"Failed to mark as failed: {e}", 'WARNING') + + +def is_temporary_smtp_error(error_msg: str) -> bool: + """ + Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll) + 4xx Codes = temporär, 5xx = permanent + """ + temporary_indicators = [ + '421', # Service not available + '450', # Mailbox unavailable + '451', # Local error + '452', # Insufficient storage + '4', # Generisch 4xx + 'timeout', + 'connection refused', + 'connection reset', + 'network unreachable', + 'temporarily', + 'try again' + ] + + error_lower = error_msg.lower() + return any(indicator in error_lower for indicator in temporary_indicators) + + +def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple: + """ + Sendet E-Mail via SMTP + Returns: (success: bool, error: str or None) + """ + + log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") + + try: + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp: + smtp.ehlo() + + # STARTTLS falls konfiguriert + if SMTP_USE_TLS: + try: + smtp.starttls() + smtp.ehlo() + log("✓ STARTTLS enabled") + except Exception as e: + log(f"STARTTLS failed: {e}", 'WARNING') + + # Authentication falls konfiguriert + if SMTP_USER and SMTP_PASS: + try: + smtp.login(SMTP_USER, SMTP_PASS) + log("✓ SMTP authenticated") + except Exception as e: + log(f"SMTP auth failed: {e}", 'WARNING') + + # E-Mail senden + result = smtp.sendmail(from_addr, [recipient], raw_message) + + # Result auswerten + if isinstance(result, dict) and result: + # Empfänger wurde abgelehnt + error = result.get(recipient, 'Unknown refusal') + log(f"✗ Recipient refused: {error}", 'ERROR') + return False, str(error) + else: + # Erfolgreich + log(f"✓ Email delivered to {recipient}", 'SUCCESS') + return True, None + + except smtplib.SMTPException as e: + log(f"✗ SMTP error: {e}", 'ERROR') + return False, str(e) + + except Exception as e: + log(f"✗ Connection error: {e}", 'ERROR') + return False, str(e) + + +def process_message(message_body: dict, receive_count: int) -> bool: + """ + Verarbeitet eine E-Mail aus der Queue + Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry) + """ + + bucket = message_body['bucket'] + key = message_body['key'] + from_addr = message_body['from'] + recipient = message_body['recipient'] # Nur 1 Empfänger + domain = message_body['domain'] + subject = message_body.get('subject', '(unknown)') + message_id = message_body.get('message_id', '(unknown)') + + log(f"\n{'='*70}") + log(f"Processing email (Attempt #{receive_count}):") + log(f" MessageId: {message_id}") + log(f" Domain: {domain}") + log(f" From: {from_addr}") + log(f" To: {recipient}") + log(f" Subject: {subject}") + log(f" S3: s3://{bucket}/{key}") + log(f"{'='*70}") + + # ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen + if domain.lower() != WORKER_DOMAIN.lower(): + log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR') + log("This message should not be in this queue! Deleting...", 'ERROR') + return True # Message löschen (gehört nicht hierher) + + # E-Mail aus S3 laden + try: + response = s3.get_object(Bucket=bucket, Key=key) + raw_bytes = response['Body'].read() + log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)") + except s3.exceptions.NoSuchKey: + log(f"✗ S3 object not found (may have been deleted)", 'ERROR') + return True # Nicht retryable - Message löschen + except Exception as e: + log(f"✗ Failed to load from S3: {e}", 'ERROR') + return False # Könnte temporär sein - retry + + # E-Mail senden + success, error = send_email(from_addr, recipient, raw_bytes) + + if success: + # Erfolgreich zugestellt + mark_as_processed(bucket, key) + + log(f"{'='*70}") + log(f"✅ Email delivered successfully", 'SUCCESS') + log(f"{'='*70}\n") + + return True # Message löschen + + else: + # Fehler aufgetreten + error_msg = error or "Unknown SMTP error" + + # Prüfe ob temporärer Fehler (Retry sinnvoll) + if receive_count < 3 and is_temporary_smtp_error(error_msg): + log(f"⚠ Temporary error detected, will retry", 'WARNING') + log(f"{'='*70}\n") + return False # Message NICHT löschen → Retry + else: + # Permanenter Fehler oder max retries erreicht + mark_as_failed(bucket, key, error_msg, receive_count) + + log(f"{'='*70}") + log(f"✗ Email delivery failed permanently", 'ERROR') + log(f" Error: {error_msg}") + log(f"{'='*70}\n") + + return False # Nach 3 Versuchen → automatisch DLQ + + +def main_loop(): + """Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten""" + + log(f"\n{'='*70}") + log(f"🚀 Email Worker started") + log(f"{'='*70}") + log(f" Worker Name: {WORKER_NAME}") + log(f" Domain: {WORKER_DOMAIN}") + log(f" Queue: {QUEUE_URL}") + log(f" Region: {AWS_REGION}") + log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})") + log(f" Poll interval: {POLL_INTERVAL}s") + log(f" Max messages per poll: {MAX_MESSAGES}") + log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s") + log(f"{'='*70}\n") + + consecutive_errors = 0 + max_consecutive_errors = 10 + messages_processed = 0 + last_activity = time.time() + + while not shutdown_requested: + try: + # Messages aus Queue holen (Long Polling) + response = sqs.receive_message( + QueueUrl=QUEUE_URL, + MaxNumberOfMessages=MAX_MESSAGES, + WaitTimeSeconds=POLL_INTERVAL, + VisibilityTimeout=VISIBILITY_TIMEOUT, + AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'], + MessageAttributeNames=['All'] + ) + + # Reset error counter bei erfolgreicher Abfrage + consecutive_errors = 0 + + if 'Messages' not in response: + # Keine Nachrichten + if time.time() - last_activity > 60: + log(f"Waiting for messages... (processed: {messages_processed})") + last_activity = time.time() + continue + + message_count = len(response['Messages']) + log(f"\n✉ Received {message_count} message(s) from queue") + last_activity = time.time() + + # Messages verarbeiten + for msg in response['Messages']: + if shutdown_requested: + log("Shutdown requested, stopping processing") + break + + receipt_handle = msg['ReceiptHandle'] + + # Receive Count auslesen + receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1)) + + # Sent Timestamp (für Queue-Zeit-Berechnung) + sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 + queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 + + if queue_time > 0: + log(f"Message was in queue for {queue_time}s") + + try: + message_body = json.loads(msg['Body']) + + # E-Mail verarbeiten + success = process_message(message_body, receive_count) + + if success: + # Message aus Queue löschen + sqs.delete_message( + QueueUrl=QUEUE_URL, + ReceiptHandle=receipt_handle + ) + log("✓ Message deleted from queue") + messages_processed += 1 + else: + # Bei Fehler bleibt Message in Queue + log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)") + + except json.JSONDecodeError as e: + log(f"✗ Invalid message format: {e}", 'ERROR') + # Ungültige Messages löschen (nicht retryable) + sqs.delete_message( + QueueUrl=QUEUE_URL, + ReceiptHandle=receipt_handle + ) + + except Exception as e: + log(f"✗ Error processing message: {e}", 'ERROR') + traceback.print_exc() + # Message bleibt in Queue für Retry + + except KeyboardInterrupt: + log("\n⚠ Keyboard interrupt received") + break + + except Exception as e: + consecutive_errors += 1 + log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR') + traceback.print_exc() + + if consecutive_errors >= max_consecutive_errors: + log("Too many consecutive errors, shutting down", 'ERROR') + break + + # Kurze Pause bei Fehlern + time.sleep(5) + + log(f"\n{'='*70}") + log(f"👋 Worker shutting down") + log(f" Messages processed: {messages_processed}") + log(f"{'='*70}\n") + + +if __name__ == '__main__': + # Validierung + if not WORKER_DOMAIN: + log("ERROR: WORKER_DOMAIN not set!", 'ERROR') + sys.exit(1) + + if not QUEUE_URL: + log("ERROR: SQS_QUEUE_URL not set!", 'ERROR') + sys.exit(1) + + try: + main_loop() + except Exception as e: + log(f"Fatal error: {e}", 'ERROR') + traceback.print_exc() + sys.exit(1) \ No newline at end of file