from flask import Flask, request, jsonify, send_file from paddleocr import PaddleOCR import base64 from PIL import Image from io import BytesIO import traceback import numpy as np import cv2 import logging import os import uuid import datetime import shutil from functools import wraps # logging.basicConfig( # level=logging.DEBUG, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('debug.log'), # Logs in Datei schreiben # logging.StreamHandler() # Logs in Konsole anzeigen # ] # ) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Handler für Datei # file_handler = logging.FileHandler('debug.log') # file_handler.setLevel(logging.DEBUG) # file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # Handler für Konsole console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # Handler hinzufügen logger.handlers = [] # Bestehende Handler entfernen logger.addHandler(file_handler) logger.addHandler(console_handler) logger.propagate = False # Verhindern, dass Logs an den Root-Logger weitergeleitet werden app = Flask(__name__) def get_dir_name(): timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') unique_id = str(uuid.uuid4())[:8] return f"{timestamp}_{unique_id}" def create_debug_directory(dir_name): """Erstellt ein eindeutiges Verzeichnis für Debug-Bilder""" base_dir = 'images' full_path = os.path.join(base_dir, dir_name) if not os.path.exists(base_dir): os.makedirs(base_dir) os.makedirs(full_path) return full_path def preprocess_image(image, debug_dir): """Bildverarbeitung mit optionalen Optimierungen""" try: # Graustufenkonvertierung gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) # Kontrastverbesserung mit CLAHE clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) # Erhöhter Clip-Limit enhanced = clahe.apply(gray) # Rauschunterdrückung mit optimierten Parametern denoised = cv2.fastNlMeansDenoising( enhanced, h=15, # Stärkere Rauschreduzierung templateWindowSize=7, searchWindowSize=21 ) # Thumbnail als WebP denoised_rgb = cv2.cvtColor(denoised, cv2.COLOR_GRAY2RGB) thumbnail = Image.fromarray(denoised_rgb) thumbnail.thumbnail((256, 256)) thumbnail_path = os.path.join(debug_dir, 'thumbnail.webp') thumbnail.save(thumbnail_path, 'WEBP', quality=85) return denoised except Exception as e: logger.error(f"Preprocessing error: {str(e)}") raise def require_localhost(f): @wraps(f) def decorated_function(*args, **kwargs): if request.remote_addr not in ('127.0.0.1', '::1'): logger.warning(f"Nicht autorisierte Anfrage von {request.remote_addr}") return jsonify({ 'error': 'Zugriff verweigert. Nur localhost ist erlaubt.' }), 403 return f(*args, **kwargs) return decorated_function def merge_text_blocks(ocr_results, y_threshold=100, x_column_threshold=50): """ Merge OCR-detected lines into text blocks by first grouping into columns and then merging within columns. Args: ocr_results: List of OCR results, where each item is [box, (text, confidence)] y_threshold: Maximum vertical distance (in pixels) between lines to consider them part of the same block x_column_threshold: Maximum horizontal distance (in pixels) between left edges to consider boxes in the same column Returns: List of merged text blocks with updated boxes, text, and confidence """ if not ocr_results: logger.debug("No OCR results to process.") return [] # Schritt 1: Boxen in Spalten einordnen # Sortiere Boxen nach der x-Koordinate der linken Kante, um Spalten zu identifizieren sorted_by_x = sorted(ocr_results, key=lambda x: min([point[0] for point in x[0]])) columns = [] current_column = [sorted_by_x[0]] for i in range(1, len(sorted_by_x)): prev_box = current_column[-1][0] curr_box = sorted_by_x[i][0] # Berechne den horizontalen Abstand zwischen den linken Kanten prev_left_x = min([point[0] for point in prev_box]) curr_left_x = min([point[0] for point in curr_box]) x_distance = abs(curr_left_x - prev_left_x) if x_distance <= x_column_threshold: # Box gehört zur aktuellen Spalte current_column.append(sorted_by_x[i]) else: # Neue Spalte beginnen columns.append(current_column) current_column = [sorted_by_x[i]] # Letzte Spalte hinzufügen if current_column: columns.append(current_column) # Schritt 2: Debug-Ausgabe der Spalten logger.debug(f"Found {len(columns)} columns:") for col_idx, column in enumerate(columns): logger.debug(f"Column {col_idx + 1}:") for box_idx, item in enumerate(column): box = item[0] text = item[1][0] confidence = item[1][1] left_x = min([point[0] for point in box]) top_y = min([point[1] for point in box]) logger.debug(f" Box {box_idx + 1}: Text='{text}', Confidence={confidence:.2f}, LeftX={left_x:.2f}, TopY={top_y:.2f}, Box={box}") # Schritt 3: Innerhalb jeder Spalte Boxen nach y-Koordinaten sortieren und Blöcke bilden final_results = [] for col_idx, column in enumerate(columns): logger.debug(f"\nProcessing Column {col_idx + 1} for merging into blocks:") # Sortiere Boxen in der Spalte nach y-Koordinate (top-left corner) sorted_column = sorted(column, key=lambda x: min([point[1] for point in x[0]])) # Merge Boxen innerhalb der Spalte basierend auf dem vertikalen Abstand current_block = { 'box': sorted_column[0][0], 'text': sorted_column[0][1][0], 'confidence': sorted_column[0][1][1] } for i in range(1, len(sorted_column)): prev_box = current_block['box'] curr_box = sorted_column[i][0] curr_text = sorted_column[i][1][0] curr_confidence = sorted_column[i][1][1] # Berechne den vertikalen Abstand zwischen der unteren Kante der vorherigen Box # und der oberen Kante der aktuellen Box prev_bottom_y = max([point[1] for point in prev_box]) curr_top_y = min([point[1] for point in curr_box]) y_distance = curr_top_y - prev_bottom_y # Logge den vertikalen Abstand logger.debug(f"Comparing boxes {i-1} and {i} in Column {col_idx + 1}:") logger.debug(f" Previous text: {current_block['text']}") logger.debug(f" Current text: {curr_text}") logger.debug(f" Vertical distance (y_distance): {y_distance:.2f}") logger.debug(f" y_threshold: {y_threshold}") # Merge, wenn der vertikale Abstand klein genug ist if y_distance <= y_threshold: logger.debug(" Merging boxes into a single block.") # Aktualisiere die Bounding-Box, um beide Boxen zu umfassen all_points = prev_box + curr_box min_x = min([point[0] for point in all_points]) min_y = min([point[1] for point in all_points]) max_x = max([point[0] for point in all_points]) max_y = max([point[1] for point in all_points]) current_block['box'] = [[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]] # Kombiniere den Text mit einem Leerzeichen current_block['text'] += " " + curr_text # Aktualisiere die Confidence (z. B. Durchschnitt) current_block['confidence'] = (current_block['confidence'] + curr_confidence) / 2 else: logger.debug(" Not merging boxes—starting a new block.") # Wenn nicht gemerged wird, füge den aktuellen Block zu den Ergebnissen hinzu final_results.append([ current_block['box'], (current_block['text'], current_block['confidence']) ]) current_block = { 'box': curr_box, 'text': curr_text, 'confidence': curr_confidence } # Füge den letzten Block der Spalte hinzu final_results.append([ current_block['box'], (current_block['text'], current_block['confidence']) ]) # Debug-Ausgabe der finalen Blöcke logger.debug("\nFinal merged blocks:") for idx, item in enumerate(final_results): box = item[0] text = item[1][0] confidence = item[1][1] logger.debug(f"Block {idx + 1}: Text='{text}', Confidence={confidence:.2f}, Box={box}") return final_results # Update the ocr_endpoint function to use the merge_text_blocks function @app.route('/api/ocr', methods=['POST']) def ocr_endpoint(): debug_dir = None try: # Verzeichnis erstellen dir_name = get_dir_name() debug_dir = create_debug_directory(dir_name) # Bildverarbeitung data = request.get_json() image_data = base64.b64decode(data['image']) # Originalbild als WebP speichern original_image = Image.open(BytesIO(image_data)).convert('RGB') webp_path = os.path.join(debug_dir, 'original.webp') original_image.save(webp_path, 'WEBP', quality=50) # WebP-Bild für Verarbeitung laden with open(webp_path, 'rb') as f: webp_image = Image.open(BytesIO(f.read())).convert('RGB') # Vorverarbeitung processed_image = preprocess_image(np.array(webp_image), debug_dir) # OCR mit optimierter Konfiguration # ocr = PaddleOCR( # use_angle_cls=True, # lang='en', # det_model_dir='en_PP-OCRv3_det', # rec_model_dir='en_PP-OCRv3_rec', # det_limit_side_len=processed_image.shape[0] * 2, # use_dilation=True, # det_db_score_mode='fast', # det_db_box_thresh=0.3, # Adjusted parameter # det_db_unclip_ratio=2.5 # Adjusted parameter # ) ocr = PaddleOCR( use_angle_cls=True, lang='en', det_model_dir='en_PP-OCRv3_det', rec_model_dir='en_PP-OCRv3_rec', det_limit_side_len=processed_image.shape[0] * 2, use_dilation=True, det_db_score_mode='fast' ) # OCR durchführen try: result = ocr.ocr(processed_image, rec=True, cls=True) # Debug-Informationen in Datei speichern with open(os.path.join(debug_dir, 'ocr_results.txt'), 'w') as f: f.write(f"Raw OCR result:\n{result}\n\n") if not result: logger.warning("No results returned from OCR") return jsonify({ 'warning': 'No text detected', 'debug_dir': debug_dir }), 200 if not result[0]: logger.warning("Empty results list from OCR") return jsonify({ 'warning': 'Empty results list', 'debug_dir': debug_dir }), 200 # Merge text blocks merged_results = merge_text_blocks(result[0], y_threshold=15, x_column_threshold=50) # Ergebnisse verarbeiten extracted_results = [] for idx, item in enumerate(merged_results): try: box = item[0] text = item[1][0] if item[1] else '' confidence = float(item[1][1]) if item[1] and len(item[1]) > 1 else 0.0 extracted_results.append({ 'box': box, 'text': text, 'confidence': confidence, 'name': dir_name }) except Exception as proc_err: logger.error(f"Error processing result {idx}: {str(proc_err)}") # Statistiken in Debug-Datei speichern with open(os.path.join(debug_dir, 'statistics.txt'), 'w') as f: f.write(f"Total results: {len(extracted_results)}\n") if extracted_results: avg_confidence = np.mean([r['confidence'] for r in extracted_results]) f.write(f"Average confidence: {avg_confidence}\n") f.write("\nDetailed results:\n") for idx, result in enumerate(extracted_results): f.write(f"Result {idx+1}:\n") f.write(f"Text: {result['text']}\n") f.write(f"Confidence: {result['confidence']}\n") f.write(f"Name: {dir_name}\n") f.write(f"Box coordinates: {result['box']}\n\n") return jsonify({ 'status': 'success', 'results': extracted_results, }) except Exception as ocr_err: logger.error(f"OCR processing error: {str(ocr_err)}") logger.error(traceback.format_exc()) return jsonify({ 'error': 'OCR processing failed', 'details': str(ocr_err), 'debug_dir': debug_dir }), 500 except Exception as e: logger.error(f"Fehler: {str(e)}") return jsonify({ 'error': 'Verarbeitungsfehler', 'details': str(e), 'debug_dir': dir_name if debug_dir else None }), 500 @app.route('/api/cleanup', methods=['POST']) @require_localhost def cleanup_endpoint(): try: data = request.get_json() used_ids = data.get('usedIds', []) dryrun = data.get('dryrun', True) # Standardwert: True # Validierung der Eingabedaten if not isinstance(used_ids, list): logger.error("Invalid data format: 'usedIds' should be a list") return jsonify({ 'error': "'usedIds' muss eine Liste von IDs sein." }), 400 if not isinstance(dryrun, bool): logger.error("Invalid data format: 'dryrun' should be a boolean") return jsonify({ 'error': "'dryrun' muss ein boolescher Wert sein (true oder false)." }), 400 base_dir = 'images' if not os.path.exists(base_dir): logger.info(f"Das Basisverzeichnis '{base_dir}' existiert nicht. Nichts zu bereinigen.") return jsonify({ 'status': 'success', 'message': f"Das Basisverzeichnis '{base_dir}' existiert nicht.", 'deletedIds': [], 'dryrun': dryrun }), 200 all_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] dirs_to_delete = [d for d in all_dirs if d not in used_ids] if dryrun: logger.info("Dry-Run aktiviert. Keine Verzeichnisse werden gelöscht.") return jsonify({ 'status': 'dryrun', 'message': 'Die folgenden Verzeichnisse würden gelöscht werden.', 'dirsToDelete': dirs_to_delete, 'dryrun': dryrun }), 200 else: deleted_ids = [] failed_deletions = [] for dir_name in dirs_to_delete: dir_path = os.path.join(base_dir, dir_name) try: shutil.rmtree(dir_path) deleted_ids.append(dir_name) logger.info(f"Verzeichnis '{dir_name}' erfolgreich gelöscht.") except Exception as delete_err: logger.error(f"Fehler beim Löschen des Verzeichnisses '{dir_name}': {str(delete_err)}") failed_deletions.append({ 'dir': dir_name, 'error': str(delete_err) }) response = { 'status': 'success', 'deletedIds': deleted_ids, 'dryrun': dryrun } if failed_deletions: response['failedDeletions'] = failed_deletions return jsonify(response), 200 except Exception as e: logger.error(f"Cleanup-Fehler: {str(e)}") logger.error(traceback.format_exc()) return jsonify({ 'error': 'Bereinigungsfehler', 'details': str(e) }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)