docker/email_api/email_api/app.py

360 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
from flask import Flask, request, jsonify
import smtplib
import base64
import gzip
import logging
import os
import time
import boto3
from email.parser import BytesParser
from email.policy import default
from email.utils import getaddresses
import requests
if sys.version_info < (3, 12):
raise RuntimeError("Python 3.12 oder höher erforderlich")
# --- Logging mit Timestamp ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
load_dotenv = None
try:
from dotenv import load_dotenv as _ld
load_dotenv = _ld
except ImportError:
pass
if load_dotenv:
load_dotenv()
app = Flask(__name__)
SMTP_HOST = "localhost"
SMTP_PORT = 25
API_TOKEN = os.environ.get('API_TOKEN')
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
API_KEY = os.environ['MAILCOW_API_KEY']
MAILCOW_API = os.environ['MAILCOW_API']
s3_client = boto3.client('s3', region_name=AWS_REGION)
def domain_exists(domain):
"""
Prüft per /get/domain/all, ob `domain` im System ist.
"""
url = f"{MAILCOW_API}/get/domain/all"
headers = {'X-API-Key': API_KEY}
resp = requests.get(url, headers=headers, timeout=5)
resp.raise_for_status()
domains = resp.json()
return any(d.get('domain_name', '').lower() == domain.lower() for d in domains)
def inbox_exists(domain, local_part):
"""
Liefert True, wenn domain im System ist UND local_part@domain ein Postfach hat.
"""
# 1) Domain-Check
if not domain_exists(domain):
logger.info(f"Domain '{domain}' unknown skip mailbox lookup")
return False
# 2) Nur dann Mailbox-Listing holen
url = f"{MAILCOW_API}/get/mailbox/all/{domain}"
headers = {'X-API-Key': API_KEY}
resp = requests.get(url, headers=headers, timeout=5)
resp.raise_for_status()
mailboxes = resp.json()
return any(m.get('local_part', '').lower() == local_part.lower() for m in mailboxes)
def mark_email_as_processed(bucket, key, status, processor='rest-api'):
"""Setzt processed-Metadaten auf einen beliebigen Status."""
try:
s3_client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata={
'processed': status,
'processed_timestamp': str(int(time.time())),
'processor': processor
},
MetadataDirective='REPLACE'
)
return True
except Exception as e:
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
return False
@app.route('/stats/<domain>', methods=['GET'])
def stats_domain(domain):
# Auth
auth = request.headers.get('Authorization')
if auth != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401
bucket = domain.replace('.', '-') + '-emails'
paginator = s3_client.get_paginator('list_objects_v2')
total = 0
counts = {
'true': 0,
'unknownDomain': 0,
'unknownUser': 0
}
details = {
'unknownDomain': [],
'unknownUser': []
}
for page in paginator.paginate(Bucket=bucket):
for obj in page.get('Contents', []):
key = obj['Key']
total += 1
head = s3_client.head_object(Bucket=bucket, Key=key)
meta = head.get('Metadata', {})
status = meta.get('processed', 'none')
if status in counts:
counts[status] += 1
else:
# wir ignorieren andere Status
continue
# Für unknownDomain und unknownUser zusätzlich E-Mail parsen
if status in ('unknownDomain', 'unknownUser'):
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
try:
msg = BytesParser(policy=default).parsebytes(body)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else None
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
except Exception as e:
logger.error(f"Fehler beim Parsen {bucket}/{key}: {e}")
from_addr = None
to_addrs = []
details[status].append({
'key': key,
'from': from_addr,
'to': to_addrs
})
result = {
'domain': domain,
'total_messages': total,
'successful': counts['true'],
'wrong_domain': counts['unknownDomain'],
'unknown_user': counts['unknownUser'],
'details': details
}
logger.info(f"Stats for {domain}: {result}")
return jsonify(result), 200
@app.route('/process/<domain>', methods=['POST'])
def process_email(domain):
auth = request.headers.get('Authorization')
if auth != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
if not data:
return jsonify({'error': 'Invalid payload'}), 400
request_id = data.get('request_id', 'no-request-id')
payload_summary = {
k: (len(v) if k == 'email_content' else v)
for k, v in data.items()
if k != 'email_content' or isinstance(v, (str, bytes))
}
logger.info(
f"[{request_id}] INCOMING POST /process/{domain}: "
f"payload_summary={payload_summary}"
)
# 1) E-Mail decodieren und parsen wie gehabt
content = data.get('email_content')
compressed = data.get('compressed', False)
raw = base64.b64decode(content)
email_bytes = gzip.decompress(raw) if compressed else raw
msg = BytesParser(policy=default).parsebytes(email_bytes)
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else f'lambda@{domain}'
recipients = []
for hdr in ('to','cc','bcc'):
recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))]
if not recipients:
return jsonify({'error': 'No recipients'}), 400
# 2) Filter: nur Postfächer der angefragten Domain, die auch existieren
valid_recipients = []
for addr in recipients:
try:
local, dom = addr.split('@', 1)
except ValueError:
continue
if dom.lower() != domain.lower():
# andere Domain: überspringen
continue
if inbox_exists(domain, local):
valid_recipients.append(addr)
else:
logger.info(f"Skipping non-existent inbox: {addr}")
if not valid_recipients:
logger.info(f"[{request_id}] Keine gültigen Inboxes für {domain} skip.")
return jsonify({'message': 'No valid inboxes skipped'}), 404
# 3) Senden an die gefilterten Adressen
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.sendmail(from_addr, valid_recipients, email_bytes)
return jsonify({
'message': 'Email forwarded',
'forwarded_to': valid_recipients
}), 200
@app.route('/retry/<domain>', methods=['GET'])
def retry_domain_emails(domain):
auth = request.headers.get('Authorization')
if auth != f'Bearer {API_TOKEN}':
return jsonify({'error': 'Unauthorized'}), 401
# 1) Domain-Check ganz am Anfang
if not domain_exists(domain):
logger.info(f"Retry aborted: unknown domain '{domain}'")
return jsonify({'error': f"Unknown domain '{domain}'"}), 404
bucket = domain.replace('.', '-') + '-emails'
paginator = s3_client.get_paginator('list_objects_v2')
# 2) alle unprocessed Keys sammeln
unprocessed = []
for page in paginator.paginate(Bucket=bucket):
for obj in page.get('Contents', []):
head = s3_client.head_object(Bucket=bucket, Key=obj['Key'])
if head.get('Metadata', {}).get('processed') != 'true':
unprocessed.append(obj['Key'])
request_id = f"retry-{domain}-{int(time.time())}"
logger.info(f"[{request_id}] RETRY for domain={domain}, keys={unprocessed}")
results = {'processed': [], 'failed': []}
for key in unprocessed:
try:
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
msg = BytesParser(policy=default).parsebytes(body)
from_addr = (
getaddresses(msg.get_all('from', []))[0][1]
if msg.get_all('from') else f'retry@{domain}'
)
# Sammle alle To/Cc/Bcc
recipients = []
for hdr in ('to', 'cc', 'bcc'):
recipients += [addr for _n, addr in getaddresses(msg.get_all(hdr, []))]
if not recipients:
# gar keine Adressen → überspringen
mark_email_as_processed(bucket, key, 'unknownDomain')
results['processed'].append(key)
results['failed'].append({
'key': key,
'status': 'unknownDomain',
'reason': 'no recipients'
})
continue
# 3) Domain-Match: nur Mails, die an die angefragte Domain adressiert sind
domains_in_mail = {addr.split('@')[-1].lower() for addr in recipients if '@' in addr}
if domain.lower() not in domains_in_mail:
mark_email_as_processed(bucket, key, 'unknownDomain')
results['processed'].append(key)
results['failed'].append({
'key': key,
'status': 'unknownDomain',
'from': from_addr,
'to': recipients
})
continue
# 4) Inbox-Check: nur existierende Postfächer zulassen
valid_recipients = []
for addr in recipients:
try:
local, dom = addr.split('@', 1)
except ValueError:
continue
if dom.lower() == domain.lower() and inbox_exists(domain, local):
valid_recipients.append(addr)
else:
logger.info(f"Skipping non-existent inbox: {addr}")
if not valid_recipients:
mark_email_as_processed(bucket, key, 'unknownUser')
results['processed'].append(key)
results['failed'].append({
'key': key,
'status': 'unknownUser',
'from': from_addr,
'to': recipients
})
continue
# 5) Versand an die validierten Adressen
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.sendmail(from_addr, valid_recipients, body)
mark_email_as_processed(bucket, key, 'true')
results['processed'].append(key)
except smtplib.SMTPRecipientsRefused as e:
# falls Mailcow einzelne Adressen ablehnt
mark_email_as_processed(bucket, key, 'unknownUser')
refused = {
addr: {'code': code, 'message': msg.decode('utf-8','ignore') if isinstance(msg, bytes) else str(msg)}
for addr, (code, msg) in e.recipients.items()
}
results['processed'].append(key)
results['failed'].append({
'key': key,
'status': 'unknownUser',
'from': from_addr,
'to': valid_recipients,
'refused': refused
})
except Exception as e:
# alle anderen SMTP-Fehler behandeln wir als unknownDomain
mark_email_as_processed(bucket, key, 'unknownDomain')
results['processed'].append(key)
results['failed'].append({
'key': key,
'status': 'unknownDomain',
'from': from_addr,
'to': valid_recipients,
'error': str(e)
})
except Exception as e:
# Parsing- oder S3-Fehler
results['failed'].append({'key': key, 'error': str(e)})
return jsonify(results), 200
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({'status': 'OK'}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)