vocab-backend/ocr_server.py

445 lines
17 KiB
Python

from flask import Flask, request, jsonify, send_file
from paddleocr import PaddleOCR
import base64
from PIL import Image
from io import BytesIO
import traceback
import numpy as np
import cv2
import logging
import os
import uuid
import datetime
import shutil
from functools import wraps
# logging.basicConfig(
# level=logging.DEBUG,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('debug.log'), # Logs in Datei schreiben
# logging.StreamHandler() # Logs in Konsole anzeigen
# ]
# )
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Handler für Datei
# file_handler = logging.FileHandler('debug.log')
# file_handler.setLevel(logging.DEBUG)
# file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Handler für Konsole
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Handler hinzufügen
logger.handlers = [] # Bestehende Handler entfernen
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.propagate = False # Verhindern, dass Logs an den Root-Logger weitergeleitet werden
app = Flask(__name__)
def get_dir_name():
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
unique_id = str(uuid.uuid4())[:8]
return f"{timestamp}_{unique_id}"
def create_debug_directory(dir_name):
"""Erstellt ein eindeutiges Verzeichnis für Debug-Bilder"""
base_dir = 'images'
full_path = os.path.join(base_dir, dir_name)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
os.makedirs(full_path)
return full_path
def preprocess_image(image, debug_dir):
"""Bildverarbeitung mit optionalen Optimierungen"""
try:
# Graustufenkonvertierung
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
# Kontrastverbesserung mit CLAHE
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) # Erhöhter Clip-Limit
enhanced = clahe.apply(gray)
# Rauschunterdrückung mit optimierten Parametern
denoised = cv2.fastNlMeansDenoising(
enhanced,
h=15, # Stärkere Rauschreduzierung
templateWindowSize=7,
searchWindowSize=21
)
# Thumbnail als WebP
denoised_rgb = cv2.cvtColor(denoised, cv2.COLOR_GRAY2RGB)
thumbnail = Image.fromarray(denoised_rgb)
thumbnail.thumbnail((256, 256))
thumbnail_path = os.path.join(debug_dir, 'thumbnail.webp')
thumbnail.save(thumbnail_path, 'WEBP', quality=85)
return denoised
except Exception as e:
logger.error(f"Preprocessing error: {str(e)}")
raise
def require_localhost(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.remote_addr not in ('127.0.0.1', '::1'):
logger.warning(f"Nicht autorisierte Anfrage von {request.remote_addr}")
return jsonify({
'error': 'Zugriff verweigert. Nur localhost ist erlaubt.'
}), 403
return f(*args, **kwargs)
return decorated_function
def merge_text_blocks(ocr_results, y_threshold=100, x_column_threshold=50):
"""
Merge OCR-detected lines into text blocks by first grouping into columns and then merging within columns.
Args:
ocr_results: List of OCR results, where each item is [box, (text, confidence)]
y_threshold: Maximum vertical distance (in pixels) between lines to consider them part of the same block
x_column_threshold: Maximum horizontal distance (in pixels) between left edges to consider boxes in the same column
Returns:
List of merged text blocks with updated boxes, text, and confidence
"""
if not ocr_results:
logger.debug("No OCR results to process.")
return []
# Schritt 1: Boxen in Spalten einordnen
# Sortiere Boxen nach der x-Koordinate der linken Kante, um Spalten zu identifizieren
sorted_by_x = sorted(ocr_results, key=lambda x: min([point[0] for point in x[0]]))
columns = []
current_column = [sorted_by_x[0]]
for i in range(1, len(sorted_by_x)):
prev_box = current_column[-1][0]
curr_box = sorted_by_x[i][0]
# Berechne den horizontalen Abstand zwischen den linken Kanten
prev_left_x = min([point[0] for point in prev_box])
curr_left_x = min([point[0] for point in curr_box])
x_distance = abs(curr_left_x - prev_left_x)
if x_distance <= x_column_threshold:
# Box gehört zur aktuellen Spalte
current_column.append(sorted_by_x[i])
else:
# Neue Spalte beginnen
columns.append(current_column)
current_column = [sorted_by_x[i]]
# Letzte Spalte hinzufügen
if current_column:
columns.append(current_column)
# Schritt 2: Debug-Ausgabe der Spalten
logger.debug(f"Found {len(columns)} columns:")
for col_idx, column in enumerate(columns):
logger.debug(f"Column {col_idx + 1}:")
for box_idx, item in enumerate(column):
box = item[0]
text = item[1][0]
confidence = item[1][1]
left_x = min([point[0] for point in box])
top_y = min([point[1] for point in box])
logger.debug(f" Box {box_idx + 1}: Text='{text}', Confidence={confidence:.2f}, LeftX={left_x:.2f}, TopY={top_y:.2f}, Box={box}")
# Schritt 3: Innerhalb jeder Spalte Boxen nach y-Koordinaten sortieren und Blöcke bilden
final_results = []
for col_idx, column in enumerate(columns):
logger.debug(f"\nProcessing Column {col_idx + 1} for merging into blocks:")
# Sortiere Boxen in der Spalte nach y-Koordinate (top-left corner)
sorted_column = sorted(column, key=lambda x: min([point[1] for point in x[0]]))
# Merge Boxen innerhalb der Spalte basierend auf dem vertikalen Abstand
current_block = {
'box': sorted_column[0][0],
'text': sorted_column[0][1][0],
'confidence': sorted_column[0][1][1]
}
for i in range(1, len(sorted_column)):
prev_box = current_block['box']
curr_box = sorted_column[i][0]
curr_text = sorted_column[i][1][0]
curr_confidence = sorted_column[i][1][1]
# Berechne den vertikalen Abstand zwischen der unteren Kante der vorherigen Box
# und der oberen Kante der aktuellen Box
prev_bottom_y = max([point[1] for point in prev_box])
curr_top_y = min([point[1] for point in curr_box])
y_distance = curr_top_y - prev_bottom_y
# Logge den vertikalen Abstand
logger.debug(f"Comparing boxes {i-1} and {i} in Column {col_idx + 1}:")
logger.debug(f" Previous text: {current_block['text']}")
logger.debug(f" Current text: {curr_text}")
logger.debug(f" Vertical distance (y_distance): {y_distance:.2f}")
logger.debug(f" y_threshold: {y_threshold}")
# Merge, wenn der vertikale Abstand klein genug ist
if y_distance <= y_threshold:
logger.debug(" Merging boxes into a single block.")
# Aktualisiere die Bounding-Box, um beide Boxen zu umfassen
all_points = prev_box + curr_box
min_x = min([point[0] for point in all_points])
min_y = min([point[1] for point in all_points])
max_x = max([point[0] for point in all_points])
max_y = max([point[1] for point in all_points])
current_block['box'] = [[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]]
# Kombiniere den Text mit einem Leerzeichen
current_block['text'] += " " + curr_text
# Aktualisiere die Confidence (z. B. Durchschnitt)
current_block['confidence'] = (current_block['confidence'] + curr_confidence) / 2
else:
logger.debug(" Not merging boxes—starting a new block.")
# Wenn nicht gemerged wird, füge den aktuellen Block zu den Ergebnissen hinzu
final_results.append([
current_block['box'],
(current_block['text'], current_block['confidence'])
])
current_block = {
'box': curr_box,
'text': curr_text,
'confidence': curr_confidence
}
# Füge den letzten Block der Spalte hinzu
final_results.append([
current_block['box'],
(current_block['text'], current_block['confidence'])
])
# Debug-Ausgabe der finalen Blöcke
logger.debug("\nFinal merged blocks:")
for idx, item in enumerate(final_results):
box = item[0]
text = item[1][0]
confidence = item[1][1]
logger.debug(f"Block {idx + 1}: Text='{text}', Confidence={confidence:.2f}, Box={box}")
return final_results
# Update the ocr_endpoint function to use the merge_text_blocks function
@app.route('/api/ocr', methods=['POST'])
def ocr_endpoint():
debug_dir = None
try:
# Verzeichnis erstellen
dir_name = get_dir_name()
debug_dir = create_debug_directory(dir_name)
# Bildverarbeitung
data = request.get_json()
image_data = base64.b64decode(data['image'])
# Originalbild als WebP speichern
original_image = Image.open(BytesIO(image_data)).convert('RGB')
webp_path = os.path.join(debug_dir, 'original.webp')
original_image.save(webp_path, 'WEBP', quality=50)
# WebP-Bild für Verarbeitung laden
with open(webp_path, 'rb') as f:
webp_image = Image.open(BytesIO(f.read())).convert('RGB')
# Vorverarbeitung
processed_image = preprocess_image(np.array(webp_image), debug_dir)
# OCR mit optimierter Konfiguration
# ocr = PaddleOCR(
# use_angle_cls=True,
# lang='en',
# det_model_dir='en_PP-OCRv3_det',
# rec_model_dir='en_PP-OCRv3_rec',
# det_limit_side_len=processed_image.shape[0] * 2,
# use_dilation=True,
# det_db_score_mode='fast',
# det_db_box_thresh=0.3, # Adjusted parameter
# det_db_unclip_ratio=2.5 # Adjusted parameter
# )
ocr = PaddleOCR(
use_angle_cls=True,
lang='en',
det_model_dir='en_PP-OCRv3_det',
rec_model_dir='en_PP-OCRv3_rec',
det_limit_side_len=processed_image.shape[0] * 2,
use_dilation=True,
det_db_score_mode='fast'
)
# OCR durchführen
try:
result = ocr.ocr(processed_image, rec=True, cls=True)
# Debug-Informationen in Datei speichern
with open(os.path.join(debug_dir, 'ocr_results.txt'), 'w') as f:
f.write(f"Raw OCR result:\n{result}\n\n")
if not result:
logger.warning("No results returned from OCR")
return jsonify({
'warning': 'No text detected',
'debug_dir': debug_dir
}), 200
if not result[0]:
logger.warning("Empty results list from OCR")
return jsonify({
'warning': 'Empty results list',
'debug_dir': debug_dir
}), 200
# Merge text blocks
merged_results = merge_text_blocks(result[0], y_threshold=15, x_column_threshold=50)
# Ergebnisse verarbeiten
extracted_results = []
for idx, item in enumerate(merged_results):
try:
box = item[0]
text = item[1][0] if item[1] else ''
confidence = float(item[1][1]) if item[1] and len(item[1]) > 1 else 0.0
extracted_results.append({
'box': box,
'text': text,
'confidence': confidence,
'name': dir_name
})
except Exception as proc_err:
logger.error(f"Error processing result {idx}: {str(proc_err)}")
# Statistiken in Debug-Datei speichern
with open(os.path.join(debug_dir, 'statistics.txt'), 'w') as f:
f.write(f"Total results: {len(extracted_results)}\n")
if extracted_results:
avg_confidence = np.mean([r['confidence'] for r in extracted_results])
f.write(f"Average confidence: {avg_confidence}\n")
f.write("\nDetailed results:\n")
for idx, result in enumerate(extracted_results):
f.write(f"Result {idx+1}:\n")
f.write(f"Text: {result['text']}\n")
f.write(f"Confidence: {result['confidence']}\n")
f.write(f"Name: {dir_name}\n")
f.write(f"Box coordinates: {result['box']}\n\n")
return jsonify({
'status': 'success',
'results': extracted_results,
})
except Exception as ocr_err:
logger.error(f"OCR processing error: {str(ocr_err)}")
logger.error(traceback.format_exc())
return jsonify({
'error': 'OCR processing failed',
'details': str(ocr_err),
'debug_dir': debug_dir
}), 500
except Exception as e:
logger.error(f"Fehler: {str(e)}")
return jsonify({
'error': 'Verarbeitungsfehler',
'details': str(e),
'debug_dir': dir_name if debug_dir else None
}), 500
@app.route('/api/cleanup', methods=['POST'])
@require_localhost
def cleanup_endpoint():
try:
data = request.get_json()
used_ids = data.get('usedIds', [])
dryrun = data.get('dryrun', True) # Standardwert: True
# Validierung der Eingabedaten
if not isinstance(used_ids, list):
logger.error("Invalid data format: 'usedIds' should be a list")
return jsonify({
'error': "'usedIds' muss eine Liste von IDs sein."
}), 400
if not isinstance(dryrun, bool):
logger.error("Invalid data format: 'dryrun' should be a boolean")
return jsonify({
'error': "'dryrun' muss ein boolescher Wert sein (true oder false)."
}), 400
base_dir = 'images'
if not os.path.exists(base_dir):
logger.info(f"Das Basisverzeichnis '{base_dir}' existiert nicht. Nichts zu bereinigen.")
return jsonify({
'status': 'success',
'message': f"Das Basisverzeichnis '{base_dir}' existiert nicht.",
'deletedIds': [],
'dryrun': dryrun
}), 200
all_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
dirs_to_delete = [d for d in all_dirs if d not in used_ids]
if dryrun:
logger.info("Dry-Run aktiviert. Keine Verzeichnisse werden gelöscht.")
return jsonify({
'status': 'dryrun',
'message': 'Die folgenden Verzeichnisse würden gelöscht werden.',
'dirsToDelete': dirs_to_delete,
'dryrun': dryrun
}), 200
else:
deleted_ids = []
failed_deletions = []
for dir_name in dirs_to_delete:
dir_path = os.path.join(base_dir, dir_name)
try:
shutil.rmtree(dir_path)
deleted_ids.append(dir_name)
logger.info(f"Verzeichnis '{dir_name}' erfolgreich gelöscht.")
except Exception as delete_err:
logger.error(f"Fehler beim Löschen des Verzeichnisses '{dir_name}': {str(delete_err)}")
failed_deletions.append({
'dir': dir_name,
'error': str(delete_err)
})
response = {
'status': 'success',
'deletedIds': deleted_ids,
'dryrun': dryrun
}
if failed_deletions:
response['failedDeletions'] = failed_deletions
return jsonify(response), 200
except Exception as e:
logger.error(f"Cleanup-Fehler: {str(e)}")
logger.error(traceback.format_exc())
return jsonify({
'error': 'Bereinigungsfehler',
'details': str(e)
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)