from flask import Flask, request, jsonify, send_file from paddleocr import PaddleOCR import base64 from PIL import Image from io import BytesIO import traceback import numpy as np import cv2 import logging import os import uuid import datetime import shutil from functools import wraps logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) def get_dir_name(): timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') unique_id = str(uuid.uuid4())[:8] return f"{timestamp}_{unique_id}" def create_debug_directory(dir_name): """Erstellt ein eindeutiges Verzeichnis für Debug-Bilder""" base_dir = 'images' full_path = os.path.join(base_dir, dir_name) if not os.path.exists(base_dir): os.makedirs(base_dir) os.makedirs(full_path) return full_path def preprocess_image(image, debug_dir): """Bildverarbeitung mit optionalen Optimierungen""" try: # Graustufenkonvertierung gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) # Kontrastverbesserung mit CLAHE clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) # Erhöhter Clip-Limit enhanced = clahe.apply(gray) # Rauschunterdrückung mit optimierten Parametern denoised = cv2.fastNlMeansDenoising( enhanced, h=15, # Stärkere Rauschreduzierung templateWindowSize=7, searchWindowSize=21 ) # Thumbnail als WebP denoised_rgb = cv2.cvtColor(denoised, cv2.COLOR_GRAY2RGB) thumbnail = Image.fromarray(denoised_rgb) thumbnail.thumbnail((256, 256)) thumbnail_path = os.path.join(debug_dir, 'thumbnail.webp') thumbnail.save(thumbnail_path, 'WEBP', quality=85) return denoised except Exception as e: logger.error(f"Preprocessing error: {str(e)}") raise def require_localhost(f): @wraps(f) def decorated_function(*args, **kwargs): if request.remote_addr not in ('127.0.0.1', '::1'): logger.warning(f"Nicht autorisierte Anfrage von {request.remote_addr}") return jsonify({ 'error': 'Zugriff verweigert. Nur localhost ist erlaubt.' }), 403 return f(*args, **kwargs) return decorated_function @app.route('/api/ocr', methods=['POST']) def ocr_endpoint(): debug_dir = None try: # Verzeichnis erstellen dir_name = get_dir_name() debug_dir = create_debug_directory(dir_name) # Bildverarbeitung data = request.get_json() image_data = base64.b64decode(data['image']) # Originalbild als WebP speichern original_image = Image.open(BytesIO(image_data)).convert('RGB') webp_path = os.path.join(debug_dir, 'original.webp') original_image.save(webp_path, 'WEBP', quality=50) # WebP-Bild für Verarbeitung laden with open(webp_path, 'rb') as f: webp_image = Image.open(BytesIO(f.read())).convert('RGB') # Vorverarbeitung processed_image = preprocess_image(np.array(webp_image), debug_dir) # OCR mit optimierter Konfiguration ocr = PaddleOCR( use_angle_cls=True, lang='en', det_model_dir='en_PP-OCRv3_det', rec_model_dir='en_PP-OCRv3_rec', det_limit_side_len=processed_image.shape[0] * 2, use_dilation=True, det_db_score_mode='fast' ) # OCR durchführen try: result = ocr.ocr(processed_image, rec=True, cls=True) # Debug-Informationen in Datei speichern with open(os.path.join(debug_dir, 'ocr_results.txt'), 'w') as f: f.write(f"Raw OCR result:\n{result}\n\n") if not result: logger.warning("No results returned from OCR") return jsonify({ 'warning': 'No text detected', 'debug_dir': debug_dir }), 200 if not result[0]: logger.warning("Empty results list from OCR") return jsonify({ 'warning': 'Empty results list', 'debug_dir': debug_dir }), 200 # Ergebnisse verarbeiten extracted_results = [] for idx, item in enumerate(result[0]): try: box = item[0] text = item[1][0] if item[1] else '' confidence = float(item[1][1]) if item[1] and len(item[1]) > 1 else 0.0 extracted_results.append({ 'box': box, 'text': text, 'confidence': confidence, 'name': dir_name }) except Exception as proc_err: logger.error(f"Error processing result {idx}: {str(proc_err)}") # Statistiken in Debug-Datei speichern with open(os.path.join(debug_dir, 'statistics.txt'), 'w') as f: f.write(f"Total results: {len(extracted_results)}\n") if extracted_results: avg_confidence = np.mean([r['confidence'] for r in extracted_results]) f.write(f"Average confidence: {avg_confidence}\n") f.write("\nDetailed results:\n") for idx, result in enumerate(extracted_results): f.write(f"Result {idx+1}:\n") f.write(f"Text: {result['text']}\n") f.write(f"Confidence: {result['confidence']}\n") f.write(f"Name: {dir_name}\n") f.write(f"Box coordinates: {result['box']}\n\n") return jsonify({ 'status': 'success', 'results': extracted_results, }) except Exception as ocr_err: logger.error(f"OCR processing error: {str(ocr_err)}") logger.error(traceback.format_exc()) return jsonify({ 'error': 'OCR processing failed', 'details': str(ocr_err), 'debug_dir': debug_dir }), 500 except Exception as e: logger.error(f"Fehler: {str(e)}") return jsonify({ 'error': 'Verarbeitungsfehler', 'details': str(e), 'debug_dir': dir_name if debug_dir else None }), 500 @app.route('/api/cleanup', methods=['POST']) @require_localhost def cleanup_endpoint(): try: data = request.get_json() used_ids = data.get('usedIds', []) dryrun = data.get('dryrun', True) # Standardwert: True # Validierung der Eingabedaten if not isinstance(used_ids, list): logger.error("Invalid data format: 'usedIds' should be a list") return jsonify({ 'error': "'usedIds' muss eine Liste von IDs sein." }), 400 if not isinstance(dryrun, bool): logger.error("Invalid data format: 'dryrun' should be a boolean") return jsonify({ 'error': "'dryrun' muss ein boolescher Wert sein (true oder false)." }), 400 base_dir = 'images' if not os.path.exists(base_dir): logger.info(f"Das Basisverzeichnis '{base_dir}' existiert nicht. Nichts zu bereinigen.") return jsonify({ 'status': 'success', 'message': f"Das Basisverzeichnis '{base_dir}' existiert nicht.", 'deletedIds': [], 'dryrun': dryrun }), 200 all_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] dirs_to_delete = [d for d in all_dirs if d not in used_ids] if dryrun: logger.info("Dry-Run aktiviert. Keine Verzeichnisse werden gelöscht.") return jsonify({ 'status': 'dryrun', 'message': 'Die folgenden Verzeichnisse würden gelöscht werden.', 'dirsToDelete': dirs_to_delete, 'dryrun': dryrun }), 200 else: deleted_ids = [] failed_deletions = [] for dir_name in dirs_to_delete: dir_path = os.path.join(base_dir, dir_name) try: shutil.rmtree(dir_path) deleted_ids.append(dir_name) logger.info(f"Verzeichnis '{dir_name}' erfolgreich gelöscht.") except Exception as delete_err: logger.error(f"Fehler beim Löschen des Verzeichnisses '{dir_name}': {str(delete_err)}") failed_deletions.append({ 'dir': dir_name, 'error': str(delete_err) }) response = { 'status': 'success', 'deletedIds': deleted_ids, 'dryrun': dryrun } if failed_deletions: response['failedDeletions'] = failed_deletions return jsonify(response), 200 except Exception as e: logger.error(f"Cleanup-Fehler: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ 'error': 'Bereinigungsfehler', 'details': str(e) }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)