/stats/<domain>
This commit is contained in:
parent
4388f6efc2
commit
34393b0807
|
|
@ -7,8 +7,6 @@ import logging
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import boto3
|
import boto3
|
||||||
from pathlib import Path
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from email.parser import BytesParser
|
from email.parser import BytesParser
|
||||||
from email.policy import default
|
from email.policy import default
|
||||||
from email.utils import getaddresses
|
from email.utils import getaddresses
|
||||||
|
|
@ -16,40 +14,114 @@ from email.utils import getaddresses
|
||||||
if sys.version_info < (3, 12):
|
if sys.version_info < (3, 12):
|
||||||
raise RuntimeError("Python 3.12 oder höher erforderlich")
|
raise RuntimeError("Python 3.12 oder höher erforderlich")
|
||||||
|
|
||||||
load_dotenv()
|
# --- Logging mit Timestamp ---
|
||||||
app = Flask(__name__)
|
logging.basicConfig(
|
||||||
logging.basicConfig(level=logging.INFO)
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)s %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
load_dotenv = None
|
||||||
|
try:
|
||||||
|
from dotenv import load_dotenv as _ld
|
||||||
|
load_dotenv = _ld
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if load_dotenv:
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
SMTP_HOST = "localhost"
|
SMTP_HOST = "localhost"
|
||||||
SMTP_PORT = 25
|
SMTP_PORT = 25
|
||||||
API_TOKEN = os.environ.get('API_TOKEN')
|
API_TOKEN = os.environ.get('API_TOKEN')
|
||||||
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
|
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
|
||||||
s3_client = boto3.client('s3', region_name=AWS_REGION)
|
s3_client = boto3.client('s3', region_name=AWS_REGION)
|
||||||
|
|
||||||
processed_requests = {}
|
def mark_email_as_processed(bucket, key, status, processor='rest-api'):
|
||||||
|
"""Setzt processed-Metadaten auf einen beliebigen Status."""
|
||||||
def load_domains_config():
|
|
||||||
return {"andreasknuth.de": {"bucket": "andreasknuth-de-emails"}}
|
|
||||||
|
|
||||||
def mark_email_as_processed(bucket, key):
|
|
||||||
try:
|
try:
|
||||||
s3_client.copy_object(
|
s3_client.copy_object(
|
||||||
Bucket=bucket,
|
Bucket=bucket,
|
||||||
Key=key,
|
Key=key,
|
||||||
CopySource={'Bucket': bucket, 'Key': key},
|
CopySource={'Bucket': bucket, 'Key': key},
|
||||||
Metadata={
|
Metadata={
|
||||||
'processed': 'true',
|
'processed': status,
|
||||||
'processed_timestamp': str(int(time.time())),
|
'processed_timestamp': str(int(time.time())),
|
||||||
'processor': 'rest-api'
|
'processor': processor
|
||||||
},
|
},
|
||||||
MetadataDirective='REPLACE'
|
MetadataDirective='REPLACE'
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Fehler beim Markieren: {e}")
|
logger.error(f"Fehler beim Markieren {bucket}/{key}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@app.route('/stats/<domain>', methods=['GET'])
|
||||||
|
def stats_domain(domain):
|
||||||
|
# Auth
|
||||||
|
auth = request.headers.get('Authorization')
|
||||||
|
if auth != f'Bearer {API_TOKEN}':
|
||||||
|
return jsonify({'error': 'Unauthorized'}), 401
|
||||||
|
|
||||||
|
bucket = domain.replace('.', '-') + '-emails'
|
||||||
|
paginator = s3_client.get_paginator('list_objects_v2')
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
counts = {
|
||||||
|
'true': 0,
|
||||||
|
'unknownDomain': 0,
|
||||||
|
'unknownUser': 0
|
||||||
|
}
|
||||||
|
details = {
|
||||||
|
'unknownDomain': [],
|
||||||
|
'unknownUser': []
|
||||||
|
}
|
||||||
|
|
||||||
|
for page in paginator.paginate(Bucket=bucket):
|
||||||
|
for obj in page.get('Contents', []):
|
||||||
|
key = obj['Key']
|
||||||
|
total += 1
|
||||||
|
|
||||||
|
head = s3_client.head_object(Bucket=bucket, Key=key)
|
||||||
|
meta = head.get('Metadata', {})
|
||||||
|
status = meta.get('processed', 'none')
|
||||||
|
if status in counts:
|
||||||
|
counts[status] += 1
|
||||||
|
else:
|
||||||
|
# wir ignorieren andere Status
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Für unknownDomain und unknownUser zusätzlich E-Mail parsen
|
||||||
|
if status in ('unknownDomain', 'unknownUser'):
|
||||||
|
body = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
|
||||||
|
try:
|
||||||
|
msg = BytesParser(policy=default).parsebytes(body)
|
||||||
|
from_addr = getaddresses(msg.get_all('from', []))[0][1] if msg.get_all('from') else None
|
||||||
|
to_addrs = [addr for _n, addr in getaddresses(msg.get_all('to', []))]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler beim Parsen {bucket}/{key}: {e}")
|
||||||
|
from_addr = None
|
||||||
|
to_addrs = []
|
||||||
|
details[status].append({
|
||||||
|
'key': key,
|
||||||
|
'from': from_addr,
|
||||||
|
'to': to_addrs
|
||||||
|
})
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'domain': domain,
|
||||||
|
'total_messages': total,
|
||||||
|
'successful': counts['true'],
|
||||||
|
'wrong_domain': counts['unknownDomain'],
|
||||||
|
'unknown_user': counts['unknownUser'],
|
||||||
|
'details': details
|
||||||
|
}
|
||||||
|
logger.info(f"Stats for {domain}: {result}")
|
||||||
|
return jsonify(result), 200
|
||||||
|
|
||||||
@app.route('/process/<domain>', methods=['POST'])
|
@app.route('/process/<domain>', methods=['POST'])
|
||||||
def process_email(domain):
|
def process_email(domain):
|
||||||
auth = request.headers.get('Authorization')
|
auth = request.headers.get('Authorization')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue