From d1279e167abb4b6a676487f9e2cfcfbab7bf7531 Mon Sep 17 00:00:00 2001 From: foefl Date: Tue, 6 Jan 2026 10:19:49 +0100 Subject: [PATCH] add output of anomaly score and sorted according to spec, related to #21 and #22 --- src/dopt_sensor_anomalies/constants.py | 2 ++ src/dopt_sensor_anomalies/detection.py | 35 ++++++++++++++++++------- src/dopt_sensor_anomalies/detection.pyi | 15 ++++++----- src/dopt_sensor_anomalies/types.py | 8 +++++- tests/test_detection.py | 12 ++++----- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/dopt_sensor_anomalies/constants.py b/src/dopt_sensor_anomalies/constants.py index 88f8d0b..d7f70b9 100644 --- a/src/dopt_sensor_anomalies/constants.py +++ b/src/dopt_sensor_anomalies/constants.py @@ -5,6 +5,8 @@ LIB_ROOT_PATH: Final[Path] = Path(__file__).parent STOP_FOLDER_NAME: Final[str] = "python" MODEL_FOLDER_NAME: Final[str] = "models" +EXPORT_DATA_SORTING: Final[tuple[str, ...]] = ("sensor_sizes", "right", "left") + THRESHOLD_BW: Final[int] = 63 BACKBONE: Final[str] = "wide_resnet50_2" LAYERS: Final[tuple[str, ...]] = ("layer1", "layer2", "layer3") diff --git a/src/dopt_sensor_anomalies/detection.py b/src/dopt_sensor_anomalies/detection.py index af97907..af82a8b 100644 --- a/src/dopt_sensor_anomalies/detection.py +++ b/src/dopt_sensor_anomalies/detection.py @@ -11,6 +11,7 @@ import numpy.typing as npt import torch from anomalib.models import Patchcore from dopt_basics import result_pattern +from dopt_basics.datastructures import flatten from imutils import contours, perspective from pandas import DataFrame from PIL import Image @@ -56,8 +57,8 @@ def measure_length( img_path: Path, pixels_per_metric_X: float, pixels_per_metric_Y: float, -) -> tuple[t.CsvData, t.SensorImages]: - data_csv: list[str | int] = [] +) -> tuple[t.ExportData, t.SensorImages]: + sensor_sizes: list[tuple[str, ...]] = [] image = cv2.imread(str(img_path)) if image is None: raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<") @@ -142,14 +143,14 @@ def measure_length( _, binary_warped = cv2.threshold(gray_warped, 80, 255, cv2.THRESH_BINARY) pixel_count = np.sum(binary_warped == 0) - data_csv.extend( - [ + sensor_sizes.append( + ( f"{dimB:.3f}".replace(".", ","), f"{dimA:.3f}".replace(".", ","), f"{pixel_count / pixels_per_metric_X / pixels_per_metric_Y:.1f}".replace( ".", "," ), - ] + ) ) if not filtered_cnts: # pragma: no cover @@ -176,7 +177,13 @@ def measure_length( cropped_sensor_left = orig[y_min:y_max, x_min:x_middle] cropped_sensor_right = orig[y_min:y_max, x_middle:x_max] - return data_csv, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right) + sensor_sizes_sorted = cast( + tuple[str, ...], + tuple(flatten(reversed(sensor_sizes))), # type: ignore + ) + export_data: t.ExportData = t.ExportData(sensor_sizes=sensor_sizes_sorted) + + return export_data, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right) def infer_image( @@ -218,7 +225,7 @@ def infer_image( def anomaly_detection( img_path: Path, detection_models: t.DetectionModels, - data_csv: t.CsvData, + export_data: t.ExportData, sensor_images: t.SensorImages, anomaly_threshold: float, ) -> None: @@ -236,7 +243,10 @@ def anomaly_detection( model.load_state_dict(checkpoint["model_state_dict"]) result = infer_image(image, model, anomaly_threshold) - data_csv.extend([int(result.anomaly_label)]) + export_data[side] = ( + f"{result.anomaly_score:.1f}".replace(".", ","), + str(int(result.anomaly_label)), + ) ax = axes[i] ax.axis("off") @@ -251,7 +261,12 @@ def anomaly_detection( ) plt.close() - df = DataFrame([data_csv]) + csv_data_sorted: tuple[tuple[str, ...]] = tuple( + export_data[key] for key in const.EXPORT_DATA_SORTING + ) + csv_data: tuple[str, ...] = tuple(flatten(csv_data_sorted)) + + df = DataFrame([csv_data]) df.to_csv( (folder_path / f"{file_stem}.csv"), mode="w", @@ -284,7 +299,7 @@ def pipeline( anomaly_detection( img_path=file_path, detection_models=DETECTION_MODELS, - data_csv=data_csv, + export_data=data_csv, sensor_images=sensor_images, anomaly_threshold=anomaly_threshold, ) diff --git a/src/dopt_sensor_anomalies/detection.pyi b/src/dopt_sensor_anomalies/detection.pyi index 65b8cb4..9a928f1 100644 --- a/src/dopt_sensor_anomalies/detection.pyi +++ b/src/dopt_sensor_anomalies/detection.pyi @@ -54,7 +54,7 @@ def measure_length( img_path: Path, pixels_per_metric_X: float, pixels_per_metric_Y: float, -) -> tuple[t.CsvData, t.SensorImages]: +) -> tuple[t.ExportData, t.SensorImages]: """detect and measure the size of the electrodes Parameters @@ -68,8 +68,10 @@ def measure_length( Returns ------- - tuple[t.CsvData, t.SensorImages] - t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints + tuple[t.ExportData, t.SensorImages] + t.ExportData: (TypedDict) data to save as CSV according to requirements, contains + strings for sensor sizes and anomaly detection results for the left and right hand + sensor respectively t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor Raises @@ -112,7 +114,7 @@ def infer_image( def anomaly_detection( img_path: Path, detection_models: t.DetectionModels, - data_csv: t.CsvData, + export_data: t.ExportData, sensor_images: t.SensorImages, anomaly_threshold: float, ) -> None: @@ -124,8 +126,9 @@ def anomaly_detection( path to file to analyse detection_models : t.DetectionModels collection of model paths for the left and right sensor - data_csv : t.CsvData - (list) data to save as CSV according to requirements, contains strings and ints + export_data: t.ExportData, + (TypedDict) data to save as CSV according to requirements, contains strings for sensor + sizes and anomaly detection results for the left and right hand sensor respectively sensor_images : t.SensorImages _description_ """ diff --git a/src/dopt_sensor_anomalies/types.py b/src/dopt_sensor_anomalies/types.py index c0cd9cc..7a4e657 100644 --- a/src/dopt_sensor_anomalies/types.py +++ b/src/dopt_sensor_anomalies/types.py @@ -1,6 +1,6 @@ import dataclasses as dc from pathlib import Path -from typing import TypeAlias, TypedDict +from typing import NotRequired, TypeAlias, TypedDict import numpy as np import numpy.typing as npt @@ -17,6 +17,12 @@ class InferenceResult: anomaly_label: bool +class ExportData(TypedDict): + sensor_sizes: tuple[str, ...] + left: NotRequired[tuple[str, str]] + right: NotRequired[tuple[str, str]] + + class SensorImages(TypedDict): left: npt.NDArray right: npt.NDArray diff --git a/tests/test_detection.py b/tests/test_detection.py index 273cf97..b91f8d5 100644 --- a/tests/test_detection.py +++ b/tests/test_detection.py @@ -67,9 +67,9 @@ def test_measure_length(single_img_path): pixels_per_metric_X, pixels_per_metric_Y, ) - assert len(data) == 18 - assert isinstance(data[0], str) - assert float(data[0].replace(",", ".")) == pytest.approx(1266.932) + assert len(data["sensor_sizes"]) == 18 + assert isinstance(data["sensor_sizes"][0], str) + assert float(data["sensor_sizes"][0].replace(",", ".")) == pytest.approx(1266.932) img_left = imgs["left"] assert 235 < img_left.shape[0] < 260 assert 910 < img_left.shape[1] < 960 @@ -89,20 +89,20 @@ def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel): DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER) assert DETECTION_MODELS["left"].exists() assert DETECTION_MODELS["right"].exists() - data_csv, sensor_images = detect.measure_length( + export_data, sensor_images = detect.measure_length( path_img_with_failure_TrainedModel, pixels_per_metric_X, pixels_per_metric_Y, ) # measured sizes - assert len(data_csv) == 18 + assert len(export_data["sensor_sizes"]) == 18 assert sensor_images["left"] is not None assert sensor_images["right"] is not None detect.anomaly_detection( img_path=path_img_with_failure_TrainedModel, detection_models=DETECTION_MODELS, - data_csv=data_csv, + export_data=export_data, sensor_images=sensor_images, anomaly_threshold=constants.ANOMALY_THRESHOLD_DEFAULT, )