add full pipeline including tests

This commit is contained in:
Florian Förster 2025-10-23 10:25:30 +02:00
parent bdf6456111
commit a9f9ffc5e7
4 changed files with 1834 additions and 1686 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,24 +1,20 @@
import csv import csv
import warnings import warnings
from os import path
from pathlib import Path from pathlib import Path
from typing import Any, Final, cast from typing import Any, Final, cast
# Image.MAX_IMAGE_PIXELS = None
import cv2 import cv2
import imutils import imutils
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
import torch import torch
# from anomalib.engine import Engine
from anomalib.models import Patchcore from anomalib.models import Patchcore
from dopt_basics import result_pattern
from imutils import contours, perspective from imutils import contours, perspective
from pandas import DataFrame from pandas import DataFrame
from PIL import Image from PIL import Image
from scipy.spatial import distance as dist from scipy.spatial import distance as dist
from torchvision.transforms.v2.functional import to_dtype, to_image
import dopt_sensor_anomalies._find_paths import dopt_sensor_anomalies._find_paths
from dopt_sensor_anomalies import constants as const from dopt_sensor_anomalies import constants as const
@ -67,14 +63,14 @@ def check_box_redundancy(
# ** main function # ** main function
def measure_length( def measure_length(
file_path: Path, img_path: Path,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]: ) -> tuple[t.CsvData, t.SensorImages]:
data_csv: list[str | int] = [] data_csv: list[str | int] = []
image = cv2.imread(str(file_path)) image = cv2.imread(str(img_path))
if image is None: if image is None:
raise errors.ImageNotReadError(f"Image could not be read from: >{file_path}<") raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<")
cropped = image[500:1500, 100 : image.shape[1] - 100] cropped = image[500:1500, 100 : image.shape[1] - 100]
orig = cropped.copy() orig = cropped.copy()
@ -229,13 +225,13 @@ def infer_image(
# ** main function # ** main function
def anomaly_detection( def anomaly_detection(
file_path: Path, img_path: Path,
detection_models: t.DetectionModels, detection_models: t.DetectionModels,
data_csv: t.CsvData, data_csv: t.CsvData,
sensor_images: t.SensorImages, sensor_images: t.SensorImages,
) -> None: ) -> None:
file_stem = file_path.stem file_stem = img_path.stem
folder_path = file_path.parent folder_path = img_path.parent
# reconstruct the model and initialize the engine # reconstruct the model and initialize the engine
model = Patchcore( model = Patchcore(
@ -277,12 +273,13 @@ def anomaly_detection(
) )
@result_pattern.wrap_result(100)
def pipeline( def pipeline(
user_file_path: str, user_img_path: str,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> None: ) -> None:
file_path = Path(user_file_path) file_path = Path(user_img_path)
if not file_path.exists(): if not file_path.exists():
raise FileNotFoundError("The provided path seems not to exist") raise FileNotFoundError("The provided path seems not to exist")
@ -295,7 +292,7 @@ def pipeline(
file_path, pixels_per_metric_X, pixels_per_metric_Y file_path, pixels_per_metric_X, pixels_per_metric_Y
) )
anomaly_detection( anomaly_detection(
file_path=file_path, img_path=file_path,
detection_models=DETECTION_MODELS, detection_models=DETECTION_MODELS,
data_csv=data_csv, data_csv=data_csv,
sensor_images=sensor_images, sensor_images=sensor_images,

View File

@ -3,6 +3,7 @@ from pathlib import Path
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from anomalib.models import Patchcore from anomalib.models import Patchcore
from dopt_basics import result_pattern
from dopt_sensor_anomalies import types as t from dopt_sensor_anomalies import types as t
@ -50,7 +51,7 @@ def check_box_redundancy(
... ...
def measure_length( def measure_length(
file_path: Path, img_path: Path,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]: ) -> tuple[t.CsvData, t.SensorImages]:
@ -58,7 +59,7 @@ def measure_length(
Parameters Parameters
---------- ----------
file_path : Path img_path : Path
path to file to analyse path to file to analyse
pixels_per_metric_X : float pixels_per_metric_X : float
scaling parameter x dimension, Pixels per micrometer in image scaling parameter x dimension, Pixels per micrometer in image
@ -108,7 +109,7 @@ def infer_image(
... ...
def anomaly_detection( def anomaly_detection(
file_path: Path, img_path: Path,
detection_models: t.DetectionModels, detection_models: t.DetectionModels,
data_csv: t.CsvData, data_csv: t.CsvData,
sensor_images: t.SensorImages, sensor_images: t.SensorImages,
@ -117,7 +118,7 @@ def anomaly_detection(
Parameters Parameters
---------- ----------
file_path : Path img_path : Path
path to file to analyse path to file to analyse
detection_models : t.DetectionModels detection_models : t.DetectionModels
collection of model paths for the left and right sensor collection of model paths for the left and right sensor
@ -128,8 +129,26 @@ def anomaly_detection(
""" """
... ...
@result_pattern.wrap_result(100)
def pipeline( def pipeline(
user_file_path: str, user_img_path: str,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> None: ... ) -> None:
"""full pipeline defined by the agreed requirements
wrapped as result pattern, handle errors on higher abstraction level
Parameters
----------
user_img_path : str
file path to the image which is to be analysed
pixels_per_metric_X : float
calibration value for the x axis to measure the size of the electrodes
pixels_per_metric_Y : float
calibration value for the y axis to measure the size of the electrodes
Raises
------
FileNotFoundError
provided image path was not found
"""

View File

@ -1,36 +1,36 @@
import shutil import shutil
from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import numpy as np import numpy as np
import pytest import pytest
from dopt_basics import result_pattern
import dopt_sensor_anomalies._find_paths import dopt_sensor_anomalies._find_paths
import dopt_sensor_anomalies.detection as detect import dopt_sensor_anomalies.detection as detect
import dopt_sensor_anomalies.types as t import dopt_sensor_anomalies.types as t
from dopt_sensor_anomalies import constants from dopt_sensor_anomalies import constants, errors
# TODO remove
# @pytest.fixture(scope="module")
# def img_paths() -> tuple[Path, ...]:
# img_folder = Path(__file__).parent / "_img"
# if not img_folder.exists():
# raise FileNotFoundError("Img path not existing")
# img_paths = tuple(img_folder.glob("*.bmp"))
# if not img_paths:
# raise ValueError("No images found")
# return img_paths
@pytest.fixture(scope="module") # @pytest.fixture(scope="module")
def img_paths() -> tuple[Path, ...]: # def single_img_path() -> Path:
img_folder = Path(__file__).parent / "_img" # img_folder = Path(__file__).parent / "_img"
if not img_folder.exists(): # if not img_folder.exists():
raise FileNotFoundError("Img path not existing") # raise FileNotFoundError("Img path not existing")
img_paths = tuple(img_folder.glob("*.bmp")) # img_paths = tuple(img_folder.glob("*_12.bmp"))
if not img_paths: # if not img_paths:
raise ValueError("No images found") # raise ValueError("No images found")
return img_paths # return img_paths[0]
@pytest.fixture(scope="module")
def single_img_path() -> Path:
img_folder = Path(__file__).parent / "_img"
if not img_folder.exists():
raise FileNotFoundError("Img path not existing")
img_paths = tuple(img_folder.glob("*_12.bmp"))
if not img_paths:
raise ValueError("No images found")
return img_paths[0]
def test_midpoint(): def test_midpoint():
@ -90,7 +90,6 @@ def test_measure_length(single_img_path):
assert img_right.shape[2] == 3 assert img_right.shape[2] == 3
@pytest.mark.new
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib") @patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel): def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
pixels_per_metric_X: float = 0.251 pixels_per_metric_X: float = 0.251
@ -111,7 +110,7 @@ def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
assert sensor_images["left"] is not None assert sensor_images["left"] is not None
assert sensor_images["right"] is not None assert sensor_images["right"] is not None
detect.anomaly_detection( detect.anomaly_detection(
file_path=path_img_with_failure_TrainedModel, img_path=path_img_with_failure_TrainedModel,
detection_models=DETECTION_MODELS, detection_models=DETECTION_MODELS,
data_csv=data_csv, data_csv=data_csv,
sensor_images=sensor_images, sensor_images=sensor_images,
@ -125,3 +124,57 @@ def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
assert heatmap_file.exists() assert heatmap_file.exists()
shutil.copy(csv_file, (results_folder / csv_file.name)) shutil.copy(csv_file, (results_folder / csv_file.name))
shutil.copy(heatmap_file, (results_folder / heatmap_file.name)) shutil.copy(heatmap_file, (results_folder / heatmap_file.name))
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
def test_full_pipeline_wrapped_FailImagePath(setup_temp_dir):
img_path = str(setup_temp_dir / "not-existing.bmp")
MESSAGE = "The provided path seems not to exist"
pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.ExceptionType is FileNotFoundError
assert ret.status.message == MESSAGE
with pytest.raises(FileNotFoundError, match=MESSAGE):
_ = ret.unwrap()
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
def test_full_pipeline_wrapped_FailElectrodeCount(path_img_with_failure_ElectrodeCount):
img_path = str(path_img_with_failure_ElectrodeCount)
MESSAGE = "Number of counted electrodes does not match the"
pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.ExceptionType is errors.InvalidElectrodeCount
assert MESSAGE in ret.status.message
with pytest.raises(errors.InvalidElectrodeCount, match=MESSAGE):
_ = ret.unwrap()
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
def test_full_pipeline_wrapped_Success(results_folder, path_img_with_failure_TrainedModel):
img_path = str(path_img_with_failure_TrainedModel)
pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
assert ret.status == result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.code == 0
assert ret.status.ExceptionType is None
# check files for existence
root_img = path_img_with_failure_TrainedModel.parent
file_stem = path_img_with_failure_TrainedModel.stem
csv_file = root_img / f"{file_stem}.csv"
heatmap_file = root_img / f"{file_stem}{constants.HEATMAP_FILENAME_SUFFIX}.png"
assert csv_file.exists()
assert heatmap_file.exists()
shutil.copy(csv_file, (results_folder / csv_file.name))
shutil.copy(heatmap_file, (results_folder / heatmap_file.name))