445 lines
17 KiB
Python
445 lines
17 KiB
Python
from flask import Flask, request, jsonify, send_file
|
|
from paddleocr import PaddleOCR
|
|
import base64
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
import traceback
|
|
import numpy as np
|
|
import cv2
|
|
import logging
|
|
import os
|
|
import uuid
|
|
import datetime
|
|
import shutil
|
|
from functools import wraps
|
|
|
|
# logging.basicConfig(
|
|
# level=logging.DEBUG,
|
|
# format='%(asctime)s - %(levelname)s - %(message)s',
|
|
# handlers=[
|
|
# logging.FileHandler('debug.log'), # Logs in Datei schreiben
|
|
# logging.StreamHandler() # Logs in Konsole anzeigen
|
|
# ]
|
|
# )
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Handler für Datei
|
|
file_handler = logging.FileHandler('debug.log')
|
|
file_handler.setLevel(logging.DEBUG)
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
# Handler für Konsole
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.DEBUG)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
# Handler hinzufügen
|
|
logger.handlers = [] # Bestehende Handler entfernen
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(console_handler)
|
|
logger.propagate = False # Verhindern, dass Logs an den Root-Logger weitergeleitet werden
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
def get_dir_name():
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
return f"{timestamp}_{unique_id}"
|
|
|
|
def create_debug_directory(dir_name):
|
|
"""Erstellt ein eindeutiges Verzeichnis für Debug-Bilder"""
|
|
base_dir = 'images'
|
|
full_path = os.path.join(base_dir, dir_name)
|
|
|
|
if not os.path.exists(base_dir):
|
|
os.makedirs(base_dir)
|
|
|
|
os.makedirs(full_path)
|
|
return full_path
|
|
|
|
def preprocess_image(image, debug_dir):
|
|
"""Bildverarbeitung mit optionalen Optimierungen"""
|
|
try:
|
|
# Graustufenkonvertierung
|
|
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
|
|
|
|
# Kontrastverbesserung mit CLAHE
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) # Erhöhter Clip-Limit
|
|
enhanced = clahe.apply(gray)
|
|
|
|
# Rauschunterdrückung mit optimierten Parametern
|
|
denoised = cv2.fastNlMeansDenoising(
|
|
enhanced,
|
|
h=15, # Stärkere Rauschreduzierung
|
|
templateWindowSize=7,
|
|
searchWindowSize=21
|
|
)
|
|
|
|
# Thumbnail als WebP
|
|
denoised_rgb = cv2.cvtColor(denoised, cv2.COLOR_GRAY2RGB)
|
|
thumbnail = Image.fromarray(denoised_rgb)
|
|
thumbnail.thumbnail((256, 256))
|
|
thumbnail_path = os.path.join(debug_dir, 'thumbnail.webp')
|
|
thumbnail.save(thumbnail_path, 'WEBP', quality=85)
|
|
|
|
return denoised
|
|
|
|
except Exception as e:
|
|
logger.error(f"Preprocessing error: {str(e)}")
|
|
raise
|
|
|
|
def require_localhost(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if request.remote_addr not in ('127.0.0.1', '::1'):
|
|
logger.warning(f"Nicht autorisierte Anfrage von {request.remote_addr}")
|
|
return jsonify({
|
|
'error': 'Zugriff verweigert. Nur localhost ist erlaubt.'
|
|
}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
def merge_text_blocks(ocr_results, y_threshold=100, x_column_threshold=50):
|
|
"""
|
|
Merge OCR-detected lines into text blocks by first grouping into columns and then merging within columns.
|
|
|
|
Args:
|
|
ocr_results: List of OCR results, where each item is [box, (text, confidence)]
|
|
y_threshold: Maximum vertical distance (in pixels) between lines to consider them part of the same block
|
|
x_column_threshold: Maximum horizontal distance (in pixels) between left edges to consider boxes in the same column
|
|
|
|
Returns:
|
|
List of merged text blocks with updated boxes, text, and confidence
|
|
"""
|
|
if not ocr_results:
|
|
logger.debug("No OCR results to process.")
|
|
return []
|
|
|
|
# Schritt 1: Boxen in Spalten einordnen
|
|
# Sortiere Boxen nach der x-Koordinate der linken Kante, um Spalten zu identifizieren
|
|
sorted_by_x = sorted(ocr_results, key=lambda x: min([point[0] for point in x[0]]))
|
|
|
|
columns = []
|
|
current_column = [sorted_by_x[0]]
|
|
|
|
for i in range(1, len(sorted_by_x)):
|
|
prev_box = current_column[-1][0]
|
|
curr_box = sorted_by_x[i][0]
|
|
|
|
# Berechne den horizontalen Abstand zwischen den linken Kanten
|
|
prev_left_x = min([point[0] for point in prev_box])
|
|
curr_left_x = min([point[0] for point in curr_box])
|
|
x_distance = abs(curr_left_x - prev_left_x)
|
|
|
|
if x_distance <= x_column_threshold:
|
|
# Box gehört zur aktuellen Spalte
|
|
current_column.append(sorted_by_x[i])
|
|
else:
|
|
# Neue Spalte beginnen
|
|
columns.append(current_column)
|
|
current_column = [sorted_by_x[i]]
|
|
|
|
# Letzte Spalte hinzufügen
|
|
if current_column:
|
|
columns.append(current_column)
|
|
|
|
# Schritt 2: Debug-Ausgabe der Spalten
|
|
logger.debug(f"Found {len(columns)} columns:")
|
|
for col_idx, column in enumerate(columns):
|
|
logger.debug(f"Column {col_idx + 1}:")
|
|
for box_idx, item in enumerate(column):
|
|
box = item[0]
|
|
text = item[1][0]
|
|
confidence = item[1][1]
|
|
left_x = min([point[0] for point in box])
|
|
top_y = min([point[1] for point in box])
|
|
logger.debug(f" Box {box_idx + 1}: Text='{text}', Confidence={confidence:.2f}, LeftX={left_x:.2f}, TopY={top_y:.2f}, Box={box}")
|
|
|
|
# Schritt 3: Innerhalb jeder Spalte Boxen nach y-Koordinaten sortieren und Blöcke bilden
|
|
final_results = []
|
|
for col_idx, column in enumerate(columns):
|
|
logger.debug(f"\nProcessing Column {col_idx + 1} for merging into blocks:")
|
|
|
|
# Sortiere Boxen in der Spalte nach y-Koordinate (top-left corner)
|
|
sorted_column = sorted(column, key=lambda x: min([point[1] for point in x[0]]))
|
|
|
|
# Merge Boxen innerhalb der Spalte basierend auf dem vertikalen Abstand
|
|
current_block = {
|
|
'box': sorted_column[0][0],
|
|
'text': sorted_column[0][1][0],
|
|
'confidence': sorted_column[0][1][1]
|
|
}
|
|
|
|
for i in range(1, len(sorted_column)):
|
|
prev_box = current_block['box']
|
|
curr_box = sorted_column[i][0]
|
|
curr_text = sorted_column[i][1][0]
|
|
curr_confidence = sorted_column[i][1][1]
|
|
|
|
# Berechne den vertikalen Abstand zwischen der unteren Kante der vorherigen Box
|
|
# und der oberen Kante der aktuellen Box
|
|
prev_bottom_y = max([point[1] for point in prev_box])
|
|
curr_top_y = min([point[1] for point in curr_box])
|
|
y_distance = curr_top_y - prev_bottom_y
|
|
|
|
# Logge den vertikalen Abstand
|
|
logger.debug(f"Comparing boxes {i-1} and {i} in Column {col_idx + 1}:")
|
|
logger.debug(f" Previous text: {current_block['text']}")
|
|
logger.debug(f" Current text: {curr_text}")
|
|
logger.debug(f" Vertical distance (y_distance): {y_distance:.2f}")
|
|
logger.debug(f" y_threshold: {y_threshold}")
|
|
|
|
# Merge, wenn der vertikale Abstand klein genug ist
|
|
if y_distance <= y_threshold:
|
|
logger.debug(" Merging boxes into a single block.")
|
|
# Aktualisiere die Bounding-Box, um beide Boxen zu umfassen
|
|
all_points = prev_box + curr_box
|
|
min_x = min([point[0] for point in all_points])
|
|
min_y = min([point[1] for point in all_points])
|
|
max_x = max([point[0] for point in all_points])
|
|
max_y = max([point[1] for point in all_points])
|
|
current_block['box'] = [[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]]
|
|
|
|
# Kombiniere den Text mit einem Leerzeichen
|
|
current_block['text'] += " " + curr_text
|
|
|
|
# Aktualisiere die Confidence (z. B. Durchschnitt)
|
|
current_block['confidence'] = (current_block['confidence'] + curr_confidence) / 2
|
|
else:
|
|
logger.debug(" Not merging boxes—starting a new block.")
|
|
# Wenn nicht gemerged wird, füge den aktuellen Block zu den Ergebnissen hinzu
|
|
final_results.append([
|
|
current_block['box'],
|
|
(current_block['text'], current_block['confidence'])
|
|
])
|
|
current_block = {
|
|
'box': curr_box,
|
|
'text': curr_text,
|
|
'confidence': curr_confidence
|
|
}
|
|
|
|
# Füge den letzten Block der Spalte hinzu
|
|
final_results.append([
|
|
current_block['box'],
|
|
(current_block['text'], current_block['confidence'])
|
|
])
|
|
|
|
# Debug-Ausgabe der finalen Blöcke
|
|
logger.debug("\nFinal merged blocks:")
|
|
for idx, item in enumerate(final_results):
|
|
box = item[0]
|
|
text = item[1][0]
|
|
confidence = item[1][1]
|
|
logger.debug(f"Block {idx + 1}: Text='{text}', Confidence={confidence:.2f}, Box={box}")
|
|
|
|
return final_results
|
|
|
|
|
|
|
|
# Update the ocr_endpoint function to use the merge_text_blocks function
|
|
@app.route('/api/ocr', methods=['POST'])
|
|
def ocr_endpoint():
|
|
debug_dir = None
|
|
try:
|
|
# Verzeichnis erstellen
|
|
dir_name = get_dir_name()
|
|
debug_dir = create_debug_directory(dir_name)
|
|
|
|
# Bildverarbeitung
|
|
data = request.get_json()
|
|
image_data = base64.b64decode(data['image'])
|
|
|
|
# Originalbild als WebP speichern
|
|
original_image = Image.open(BytesIO(image_data)).convert('RGB')
|
|
webp_path = os.path.join(debug_dir, 'original.webp')
|
|
original_image.save(webp_path, 'WEBP', quality=50)
|
|
|
|
# WebP-Bild für Verarbeitung laden
|
|
with open(webp_path, 'rb') as f:
|
|
webp_image = Image.open(BytesIO(f.read())).convert('RGB')
|
|
|
|
# Vorverarbeitung
|
|
processed_image = preprocess_image(np.array(webp_image), debug_dir)
|
|
|
|
# OCR mit optimierter Konfiguration
|
|
# ocr = PaddleOCR(
|
|
# use_angle_cls=True,
|
|
# lang='en',
|
|
# det_model_dir='en_PP-OCRv3_det',
|
|
# rec_model_dir='en_PP-OCRv3_rec',
|
|
# det_limit_side_len=processed_image.shape[0] * 2,
|
|
# use_dilation=True,
|
|
# det_db_score_mode='fast',
|
|
# det_db_box_thresh=0.3, # Adjusted parameter
|
|
# det_db_unclip_ratio=2.5 # Adjusted parameter
|
|
# )
|
|
ocr = PaddleOCR(
|
|
use_angle_cls=True,
|
|
lang='en',
|
|
det_model_dir='en_PP-OCRv3_det',
|
|
rec_model_dir='en_PP-OCRv3_rec',
|
|
det_limit_side_len=processed_image.shape[0] * 2,
|
|
use_dilation=True,
|
|
det_db_score_mode='fast'
|
|
)
|
|
# OCR durchführen
|
|
try:
|
|
result = ocr.ocr(processed_image, rec=True, cls=True)
|
|
|
|
# Debug-Informationen in Datei speichern
|
|
with open(os.path.join(debug_dir, 'ocr_results.txt'), 'w') as f:
|
|
f.write(f"Raw OCR result:\n{result}\n\n")
|
|
|
|
if not result:
|
|
logger.warning("No results returned from OCR")
|
|
return jsonify({
|
|
'warning': 'No text detected',
|
|
'debug_dir': debug_dir
|
|
}), 200
|
|
|
|
if not result[0]:
|
|
logger.warning("Empty results list from OCR")
|
|
return jsonify({
|
|
'warning': 'Empty results list',
|
|
'debug_dir': debug_dir
|
|
}), 200
|
|
|
|
# Merge text blocks
|
|
merged_results = merge_text_blocks(result[0], y_threshold=15, x_column_threshold=50)
|
|
|
|
# Ergebnisse verarbeiten
|
|
extracted_results = []
|
|
for idx, item in enumerate(merged_results):
|
|
try:
|
|
box = item[0]
|
|
text = item[1][0] if item[1] else ''
|
|
confidence = float(item[1][1]) if item[1] and len(item[1]) > 1 else 0.0
|
|
|
|
extracted_results.append({
|
|
'box': box,
|
|
'text': text,
|
|
'confidence': confidence,
|
|
'name': dir_name
|
|
})
|
|
except Exception as proc_err:
|
|
logger.error(f"Error processing result {idx}: {str(proc_err)}")
|
|
|
|
# Statistiken in Debug-Datei speichern
|
|
with open(os.path.join(debug_dir, 'statistics.txt'), 'w') as f:
|
|
f.write(f"Total results: {len(extracted_results)}\n")
|
|
if extracted_results:
|
|
avg_confidence = np.mean([r['confidence'] for r in extracted_results])
|
|
f.write(f"Average confidence: {avg_confidence}\n")
|
|
f.write("\nDetailed results:\n")
|
|
for idx, result in enumerate(extracted_results):
|
|
f.write(f"Result {idx+1}:\n")
|
|
f.write(f"Text: {result['text']}\n")
|
|
f.write(f"Confidence: {result['confidence']}\n")
|
|
f.write(f"Name: {dir_name}\n")
|
|
f.write(f"Box coordinates: {result['box']}\n\n")
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'results': extracted_results,
|
|
})
|
|
|
|
except Exception as ocr_err:
|
|
logger.error(f"OCR processing error: {str(ocr_err)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
'error': 'OCR processing failed',
|
|
'details': str(ocr_err),
|
|
'debug_dir': debug_dir
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler: {str(e)}")
|
|
return jsonify({
|
|
'error': 'Verarbeitungsfehler',
|
|
'details': str(e),
|
|
'debug_dir': dir_name if debug_dir else None
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/cleanup', methods=['POST'])
|
|
@require_localhost
|
|
def cleanup_endpoint():
|
|
try:
|
|
data = request.get_json()
|
|
used_ids = data.get('usedIds', [])
|
|
dryrun = data.get('dryrun', True) # Standardwert: True
|
|
|
|
# Validierung der Eingabedaten
|
|
if not isinstance(used_ids, list):
|
|
logger.error("Invalid data format: 'usedIds' should be a list")
|
|
return jsonify({
|
|
'error': "'usedIds' muss eine Liste von IDs sein."
|
|
}), 400
|
|
|
|
if not isinstance(dryrun, bool):
|
|
logger.error("Invalid data format: 'dryrun' should be a boolean")
|
|
return jsonify({
|
|
'error': "'dryrun' muss ein boolescher Wert sein (true oder false)."
|
|
}), 400
|
|
|
|
base_dir = 'images'
|
|
if not os.path.exists(base_dir):
|
|
logger.info(f"Das Basisverzeichnis '{base_dir}' existiert nicht. Nichts zu bereinigen.")
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f"Das Basisverzeichnis '{base_dir}' existiert nicht.",
|
|
'deletedIds': [],
|
|
'dryrun': dryrun
|
|
}), 200
|
|
|
|
all_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
|
|
dirs_to_delete = [d for d in all_dirs if d not in used_ids]
|
|
|
|
if dryrun:
|
|
logger.info("Dry-Run aktiviert. Keine Verzeichnisse werden gelöscht.")
|
|
return jsonify({
|
|
'status': 'dryrun',
|
|
'message': 'Die folgenden Verzeichnisse würden gelöscht werden.',
|
|
'dirsToDelete': dirs_to_delete,
|
|
'dryrun': dryrun
|
|
}), 200
|
|
else:
|
|
deleted_ids = []
|
|
failed_deletions = []
|
|
for dir_name in dirs_to_delete:
|
|
dir_path = os.path.join(base_dir, dir_name)
|
|
try:
|
|
shutil.rmtree(dir_path)
|
|
deleted_ids.append(dir_name)
|
|
logger.info(f"Verzeichnis '{dir_name}' erfolgreich gelöscht.")
|
|
except Exception as delete_err:
|
|
logger.error(f"Fehler beim Löschen des Verzeichnisses '{dir_name}': {str(delete_err)}")
|
|
failed_deletions.append({
|
|
'dir': dir_name,
|
|
'error': str(delete_err)
|
|
})
|
|
|
|
response = {
|
|
'status': 'success',
|
|
'deletedIds': deleted_ids,
|
|
'dryrun': dryrun
|
|
}
|
|
if failed_deletions:
|
|
response['failedDeletions'] = failed_deletions
|
|
|
|
return jsonify(response), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Cleanup-Fehler: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return jsonify({
|
|
'error': 'Bereinigungsfehler',
|
|
'details': str(e)
|
|
}), 500
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=True)
|