import csv import warnings from pathlib import Path from typing import Any, Final, cast import cv2 import imutils import matplotlib.pyplot as plt import numpy as np import numpy.typing as npt import torch from anomalib.models import Patchcore from dopt_basics import result_pattern from imutils import contours, perspective from pandas import DataFrame from PIL import Image from scipy.spatial import distance as dist import dopt_sensor_anomalies._find_paths from dopt_sensor_anomalies import constants as const from dopt_sensor_anomalies import errors from dopt_sensor_anomalies import types as t # Suppress the specific HuggingFace cache symlink warning warnings.filterwarnings( "ignore", message=".*huggingface_hub.*cache-system uses symlinks.*", category=UserWarning, ) def midpoint( pt_A: npt.NDArray[np.floating], pt_B: npt.NDArray[np.floating], ) -> tuple[float, float]: return ((pt_A[0] + pt_B[0]) * 0.5, (pt_A[1] + pt_B[1]) * 0.5) def check_box_redundancy( box_1: t.Box, box_2: t.Box, tolerance: float = 5.0, ) -> bool: c1, s1, _ = box_1 c2, s2, _ = box_2 s1 = sorted(s1) s2 = sorted(s2) center_dist = cast(float, np.linalg.norm(np.array(c1) - np.array(c2))) size_diff = cast(float, np.linalg.norm(np.array(s1) - np.array(s2))) return bool(center_dist < tolerance and size_diff < tolerance) def measure_length( img_path: Path, pixels_per_metric_X: float, pixels_per_metric_Y: float, ) -> tuple[t.CsvData, t.SensorImages]: data_csv: list[str | int] = [] image = cv2.imread(str(img_path)) if image is None: raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<") cropped = image[500:1500, 100 : image.shape[1] - 100] orig = cropped.copy() gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, const.THRESHOLD_BW, 255, cv2.THRESH_BINARY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) edged = cv2.Canny(closed, 50, 100) cnts = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) cnts = imutils.grab_contours(cnts) if cnts is None: # pragma: no cover raise errors.ContourCalculationError( "No contours were found in the provided image. Can not continue analysis." ) cnts, _ = contours.sort_contours(cnts) x_coords = [cv2.boundingRect(c)[0] for c in cnts] is_sorted = np.all(x1 <= x2 for x1, x2 in zip(x_coords, x_coords[1:])) # type: ignore if not is_sorted: # pragma: no cover raise errors.ContourCalculationError( "Contour detection not valid: contours are not " "properly sorted from left to right." ) accepted_boxes: list[t.Box] = [] filtered_cnts: list[Any] = [] for c in cnts: rbox = cast(t.Box, cv2.minAreaRect(c)) box = cv2.boxPoints(rbox) box = np.array(box, dtype=np.int32) box = cast(npt.NDArray[np.float32], perspective.order_points(box)) (tl, tr, br, bl) = box (tltrX, tltrY) = midpoint(tl, tr) (blbrX, blbrY) = midpoint(bl, br) (tlblX, tlblY) = midpoint(tl, bl) (trbrX, trbrY) = midpoint(tr, br) dA = dist.euclidean((tltrX, tltrY), (blbrX, blbrY)) dB = dist.euclidean((tlblX, tlblY), (trbrX, trbrY)) if dA < 100 or dB < 100: continue is_duplicate = any( check_box_redundancy(rbox, existing) for existing in accepted_boxes ) if is_duplicate: continue accepted_boxes.append(rbox) filtered_cnts.append(c) dimA = dA / pixels_per_metric_Y dimB = dB / pixels_per_metric_X data_csv.extend( [ f"{dimB:.3f}".replace(".", ","), f"{dimA:.3f}".replace(".", ","), f"{dimA * dimB:.1f}".replace(".", ","), ] ) if not filtered_cnts: # pragma: no cover raise errors.ContourCalculationError( "Contour detection not valid: no contours recognized" ) num_contours = len(filtered_cnts) if num_contours != const.NUM_VALID_ELECTRODES: # pragma: no cover raise errors.InvalidElectrodeCount( f"Number of counted electrodes does not match the " f"expected value: count = {num_contours}, expected = {const.NUM_VALID_ELECTRODES}" ) x_min = max(min(np.min(c[:, 0, 0]) for c in filtered_cnts) - 20, 0) x_max = min(max(np.max(c[:, 0, 0]) for c in filtered_cnts) + 20, orig.shape[1]) y_min = max(min(np.min(c[:, 0, 1]) for c in filtered_cnts) - 20, 0) y_max = min(max(np.max(c[:, 0, 1]) for c in filtered_cnts) + 20, orig.shape[0]) rightmost_x_third = max(filtered_cnts[2][:, 0, 0]) leftmost_x_fourth = min(filtered_cnts[3][:, 0, 0]) x_middle = rightmost_x_third + int((leftmost_x_fourth - rightmost_x_third) / 2.0) cropped_sensor_left = orig[y_min:y_max, x_min:x_middle] cropped_sensor_right = orig[y_min:y_max, x_middle:x_max] return data_csv, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right) def infer_image( image: npt.NDArray[np.uint8], model: Patchcore, ) -> t.InferenceResult: torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(torch_device) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image_rgb) pil_image = pil_image.convert("RGB") image_np = np.array(pil_image).astype(np.float32) / 255.0 input_tensor = torch.from_numpy(image_np).permute(2, 0, 1) input_tensor = input_tensor.unsqueeze(0) input_tensor = input_tensor.to(torch_device) model.eval() with torch.no_grad(): output = model(input_tensor) anomaly_score = output.pred_score.item() anomaly_label = bool(1 if anomaly_score >= const.ANOMALY_THRESHOLD else 0) anomaly_map = output.anomaly_map.squeeze().cpu().numpy() img_np = np.array(pil_image) anomaly_map_resized = cv2.resize(anomaly_map, (img_np.shape[1], img_np.shape[0])) return t.InferenceResult( img=img_np, anomaly_map_resized=anomaly_map_resized, anomaly_score=anomaly_score, anomaly_label=anomaly_label, ) def anomaly_detection( img_path: Path, detection_models: t.DetectionModels, data_csv: t.CsvData, sensor_images: t.SensorImages, ) -> None: file_stem = img_path.stem folder_path = img_path.parent model = Patchcore( backbone=const.BACKBONE, layers=const.LAYERS, coreset_sampling_ratio=const.RATIO ) _, axes = plt.subplots(1, 2, figsize=(12, 6)) for i, (side, image) in enumerate(sensor_images.items()): image = cast(npt.NDArray[np.uint8], image) checkpoint = torch.load(detection_models[side]) model.load_state_dict(checkpoint["model_state_dict"]) result = infer_image(image, model) data_csv.extend([int(result.anomaly_label)]) ax = axes[i] ax.axis("off") ax.imshow(image, alpha=0.8) ax.imshow(result.anomaly_map_resized, cmap="jet", alpha=0.5) plt.subplots_adjust(wspace=0, hspace=0) plt.savefig( (folder_path / f"{file_stem}{const.HEATMAP_FILENAME_SUFFIX}.png"), bbox_inches="tight", pad_inches=0, ) plt.close() df = DataFrame([data_csv]) df.to_csv( (folder_path / f"{file_stem}.csv"), mode="w", index=False, header=False, quoting=csv.QUOTE_NONE, sep=";", ) @result_pattern.wrap_result(100) def pipeline( user_img_path: str, pixels_per_metric_X: float, pixels_per_metric_Y: float, ) -> None: file_path = Path(user_img_path) if not file_path.exists(): raise FileNotFoundError("The provided path seems not to exist") MODEL_FOLDER: Final[Path] = dopt_sensor_anomalies._find_paths.get_model_folder() DETECTION_MODELS: Final[t.DetectionModels] = ( dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER) ) data_csv, sensor_images = measure_length( file_path, pixels_per_metric_X, pixels_per_metric_Y ) anomaly_detection( img_path=file_path, detection_models=DETECTION_MODELS, data_csv=data_csv, sensor_images=sensor_images, )