Cython Integration and Test Case Enhancements #1

Merged
foefl merged 10 commits from test_cython into main 2025-10-22 10:17:38 +00:00
16 changed files with 21590 additions and 116 deletions

33
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:fdc7b9b5d89abe575d708e809c823d506cd743ed354efd14bbe662a23e7b9fd9"
content_hash = "sha256:3b6355e97f9ec4986d016609fce5a358357a894972810f07bcedd274117446d2"
[[metadata.targets]]
requires_python = ">=3.11,<3.14"
@ -379,6 +379,24 @@ files = [
{file = "bracex-2.6.tar.gz", hash = "sha256:98f1347cd77e22ee8d967a30ad4e310b233f7754dbf31ff3fceb76145ba47dc7"},
]
[[package]]
name = "build"
version = "1.3.0"
requires_python = ">=3.9"
summary = "A simple, correct Python build frontend"
groups = ["dev"]
dependencies = [
"colorama; os_name == \"nt\"",
"importlib-metadata>=4.6; python_full_version < \"3.10.2\"",
"packaging>=19.1",
"pyproject-hooks",
"tomli>=1.1.0; python_version < \"3.11\"",
]
files = [
{file = "build-1.3.0-py3-none-any.whl", hash = "sha256:7145f0b5061ba90a1500d60bd1b13ca0a8a4cebdd0cc16ed8adf1c0e739f43b4"},
{file = "build-1.3.0.tar.gz", hash = "sha256:698edd0ea270bde950f53aed21f3a0135672206f3911e0176261a31e0e07b397"},
]
[[package]]
name = "bump-my-version"
version = "1.2.4"
@ -527,7 +545,7 @@ version = "0.4.6"
requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
summary = "Cross-platform colored terminal text."
groups = ["default", "dev", "nb", "tests"]
marker = "sys_platform == \"win32\" or platform_system == \"Windows\""
marker = "sys_platform == \"win32\" or os_name == \"nt\" or platform_system == \"Windows\""
files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
@ -3077,6 +3095,17 @@ files = [
{file = "pyparsing-3.2.5.tar.gz", hash = "sha256:2df8d5b7b2802ef88e8d016a2eb9c7aeaa923529cd251ed0fe4608275d4105b6"},
]
[[package]]
name = "pyproject-hooks"
version = "1.2.0"
requires_python = ">=3.7"
summary = "Wrappers to call pyproject.toml-based build backend hooks."
groups = ["dev"]
files = [
{file = "pyproject_hooks-1.2.0-py3-none-any.whl", hash = "sha256:9e5c6bfa8dcc30091c74b0cf803c81fdd29d94f01992a7707bc97babb1141913"},
{file = "pyproject_hooks-1.2.0.tar.gz", hash = "sha256:1e859bd5c40fae9448642dd871adf459e5e2084186e8d2c2a79a824c970da1f8"},
]
[[package]]
name = "pytest"
version = "8.4.2"

45
pdm_build.py Normal file
View File

@ -0,0 +1,45 @@
import re
import zipfile
from pathlib import Path
from Cython.Build import cythonize
ext_modules = cythonize(["src/dopt_sensor_anomalies/detection.py"])
def pdm_build_initialize(context):
context.ensure_build_dir()
def pdm_build_update_setup_kwargs(context, setup_kwargs):
setup_kwargs.update(
ext_modules=ext_modules,
)
def pdm_build_finalize(context, artifact):
print(">>>>>> Context: ", context)
print(">>>>>> Artifact: ", artifact)
pth_artifact = Path(artifact)
if pth_artifact.suffix == ".whl":
delete_source_files_from_wheel(pth_artifact)
def delete_source_files_from_wheel(pth_to_whl: Path):
assert pth_to_whl.exists(), "wheel file not existing"
tmp_dir = pth_to_whl.parent / "tmp"
tmp_dir.mkdir()
filename = pth_to_whl.name
tmp_whl = tmp_dir / filename
pattern = re.compile(r".*\.c$|.*detection.py$|.*\.pyi$")
with zipfile.ZipFile(pth_to_whl, mode="r") as src:
with zipfile.ZipFile(tmp_whl, mode="w") as dst:
for filename in src.namelist():
if pattern.match(filename) is None:
data = src.read(filename)
dst.writestr(filename, data)
tmp_whl.replace(pth_to_whl)
tmp_dir.rmdir()

View File

@ -11,9 +11,13 @@ readme = "README.md"
license = {text = "LicenseRef-Proprietary"}
[build-system]
requires = ["pdm-backend"]
requires = ["pdm-backend", "Cython", "setuptools"]
build-backend = "pdm.backend"
[tool.pdm.build]
package-dir = "src"
run-setuptools = true
[tool.ruff]
line-length = 94
@ -116,9 +120,6 @@ replace = "version = \"{new_version}\""
[tool.pdm]
distribution = true
[tool.pdm.build]
package-dir = "src"
[tool.pdm.resolution]
respect-source-order = true
@ -145,6 +146,7 @@ dev = [
"nox>=2025.2.9",
"cython>=3.1.4",
"setuptools>=80.9.0",
"build>=1.3.0",
]
nb = [
"jupyterlab>=4.3.5",

20
setup.py Normal file
View File

@ -0,0 +1,20 @@
from Cython.Build import cythonize
from Cython.Compiler import Options
from setuptools import setup
Options.docstrings = False
Options.embed_pos_in_docstring = False
Options.annotate = False
Options.fast_fail = True
ext_modules = cythonize(
["src/dopt_sensor_anomalies/detection.py"],
compiler_directives={
"language_level": 3,
"embedsignature": False,
"annotation_typing": True,
},
)
setup(ext_modules=ext_modules)

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
import csv
import warnings
from os import path
from pathlib import Path
from typing import Any, Final, cast
@ -24,6 +25,13 @@ from dopt_sensor_anomalies import constants as const
from dopt_sensor_anomalies import errors
from dopt_sensor_anomalies import types as t
# Suppress the specific HuggingFace cache symlink warning
warnings.filterwarnings(
"ignore",
message=".*huggingface_hub.*cache-system uses symlinks.*",
category=UserWarning,
)
# input parameters: user-defined
file_path: Path = Path(r"C:\Users\demon\Documents\EKF\Analyse_fuer_Florian\bild2.bmp")
pixels_per_metric_X: float = 0.251
@ -35,20 +43,6 @@ def midpoint(
pt_A: npt.NDArray[np.floating],
pt_B: npt.NDArray[np.floating],
) -> tuple[float, float]:
"""to identify the midpoint of a 2D area
Parameters
----------
pt_A : npt.NDArray[np.floating]
tuple of coordinates x, y; shape (2, )
pt_B : npt.NDArray[np.floating]
tuple of coordinates x, y; shape (2, )
Returns
-------
tuple[float, float]
tuple of midpoint coordinates
"""
return ((pt_A[0] + pt_B[0]) * 0.5, (pt_A[1] + pt_B[1]) * 0.5)
@ -57,22 +51,6 @@ def check_box_redundancy(
box_2: t.Box,
tolerance: float = 5.0,
) -> bool:
"""to check if bounding box has already been identified and is just a redundant one
Parameters
----------
box_1 : t.Box
tuple of box values: ((center_x, center_y), (width, height), angle)
box_2 : t.Box
tuple of box values: ((center_x, center_y), (width, height), angle)
tolerance : float, optional
distance threshold for width and height, by default 5.0
Returns
-------
bool
redundancy evaluation
"""
# unpack the boxes
c1, s1, _ = box_1
c2, s2, _ = box_2
@ -93,32 +71,6 @@ def measure_length(
pixels_per_metric_X: float,
pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]:
"""detect and measure the size of the electrodes
Parameters
----------
file_path : Path
path to file to analyse
pixels_per_metric_X : float
scaling parameter x dimension, Pixels per micrometer in image
pixels_per_metric_Y : float
scaling parameter y dimension, Pixels per micrometer in image
Returns
-------
tuple[t.CsvData, t.SensorImages]
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
Raises
------
errors.ImageNotReadError
image was not read successfully
errors.ContourCalculationError
during contour detection there were several possible error causes
errors.InvalidElectrodeCount
an invalid number of electrodes were detected
"""
data_csv: list[str | int] = []
image = cv2.imread(str(file_path))
if image is None:
@ -216,7 +168,7 @@ def measure_length(
num_contours = len(filtered_cnts)
if num_contours != const.NUM_VALID_ELECTRODES:
raise errors.InvalidElectrodeCount(
f"Number of counted electroedes does not match the "
f"Number of counted electrodes does not match the "
f"expected value: count = {num_contours}, expected = {const.NUM_VALID_ELECTRODES}"
)
@ -243,38 +195,15 @@ def infer_image(
image: npt.NDArray[np.uint8],
model: Patchcore,
) -> t.InferenceResult:
"""evaluate one image
Parameters
----------
image : npt.NDArray[np.uint8]
represents image to be checked for anomalies
model : Patchcore
(loaded PyTorch state dictionary): model for anomaly detection
Returns
-------
t.InferenceResult
contains:
img (numpy.ndarray)
anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies
anomaly_score (float): evaluation metric, in [0, 1] with close to 0 being no
anomaly detected
anomaly_label (bool): anomaly detected (1) or not (0)
"""
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(torch_device)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # this is optional
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image_rgb)
pil_image = pil_image.convert("RGB")
input_tensor = (
to_dtype(to_image(pil_image), torch.float32, scale=True)
if torch.as_tensor # ?? Question: Wie passt diese Funktion hier rein?
# ?? Konvertiert, aber wird zur Evaluation der Aussage genutzt (sollte immer wahr sein?)
else np.array(pil_image) / 255.0
)
# ?? Ist das immer ein Torch-Tensor? Falls nicht, müsste die Methode geändert werden
image_np = np.array(pil_image).astype(np.float32) / 255.0
input_tensor = torch.from_numpy(image_np).permute(2, 0, 1)
input_tensor = input_tensor.unsqueeze(0)
input_tensor = input_tensor.to(torch_device)
@ -305,19 +234,6 @@ def anomaly_detection(
data_csv: t.CsvData,
sensor_images: t.SensorImages,
) -> None:
"""load the model, call function for anomaly detection and store the results
Parameters
----------
file_path : Path
path to file to analyse
detection_models : t.DetectionModels
collection of model paths for the left and right sensor
data_csv : t.CsvData
(list) data to save as CSV according to requirements, contains strings and ints
sensor_images : t.SensorImages
_description_
"""
file_stem = file_path.stem
folder_path = file_path.parent
@ -325,12 +241,8 @@ def anomaly_detection(
model = Patchcore(
backbone=const.BACKBONE, layers=const.LAYERS, coreset_sampling_ratio=const.RATIO
)
# ?? benötigt? Wird nicht genutzt
# engine = Engine()
# preparation for plot
_, axes = plt.subplots(1, 2, figsize=(12, 6))
# loop over left and right sensor
for i, (side, image) in enumerate(sensor_images.items()):
# Ich habe die Modellpfade als Funktionsparameter hinzugefügt

View File

@ -0,0 +1,135 @@
from pathlib import Path
import numpy as np
import numpy.typing as npt
from anomalib.models import Patchcore
from dopt_sensor_anomalies import types as t
def midpoint(
pt_A: npt.NDArray[np.floating],
pt_B: npt.NDArray[np.floating],
) -> tuple[float, float]:
"""to identify the midpoint of a 2D area
Parameters
----------
pt_A : npt.NDArray[np.floating]
tuple of coordinates x, y; shape (2, )
pt_B : npt.NDArray[np.floating]
tuple of coordinates x, y; shape (2, )
Returns
-------
tuple[float, float]
tuple of midpoint coordinates
"""
...
def check_box_redundancy(
box_1: t.Box,
box_2: t.Box,
tolerance: float = 5.0,
) -> bool:
"""to check if bounding box has already been identified and is just a redundant one
Parameters
----------
box_1 : t.Box
tuple of box values: ((center_x, center_y), (width, height), angle)
box_2 : t.Box
tuple of box values: ((center_x, center_y), (width, height), angle)
tolerance : float, optional
distance threshold for width and height, by default 5.0
Returns
-------
bool
redundancy evaluation
"""
...
def measure_length(
file_path: Path,
pixels_per_metric_X: float,
pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]:
"""detect and measure the size of the electrodes
Parameters
----------
file_path : Path
path to file to analyse
pixels_per_metric_X : float
scaling parameter x dimension, Pixels per micrometer in image
pixels_per_metric_Y : float
scaling parameter y dimension, Pixels per micrometer in image
Returns
-------
tuple[t.CsvData, t.SensorImages]
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
Raises
------
errors.ImageNotReadError
image was not read successfully
errors.ContourCalculationError
during contour detection there were several possible error causes
errors.InvalidElectrodeCount
an invalid number of electrodes were detected
"""
...
def infer_image(
image: npt.NDArray[np.uint8],
model: Patchcore,
) -> t.InferenceResult:
"""evaluate one image
Parameters
----------
image : npt.NDArray[np.uint8]
represents image to be checked for anomalies
model : Patchcore
(loaded PyTorch state dictionary): model for anomaly detection
Returns
-------
t.InferenceResult
contains:
img (numpy.ndarray)
anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies
anomaly_score (float): evaluation metric, in [0, 1] with close to 0 being no
anomaly detected
anomaly_label (bool): anomaly detected (1) or not (0)
"""
...
def anomaly_detection(
file_path: Path,
detection_models: t.DetectionModels,
data_csv: t.CsvData,
sensor_images: t.SensorImages,
) -> None:
"""load the model, call function for anomaly detection and store the results
Parameters
----------
file_path : Path
path to file to analyse
detection_models : t.DetectionModels
collection of model paths for the left and right sensor
data_csv : t.CsvData
(list) data to save as CSV according to requirements, contains strings and ints
sensor_images : t.SensorImages
_description_
"""
...
def pipeline(
user_file_path: str,
pixels_per_metric_X: float,
pixels_per_metric_Y: float,
) -> None: ...

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 MiB

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
1177,318;804,803;947509,0;876,575;808,853;709020,9;952,191;804,781;766305,3;944,223;792,829;748607,2;838,797;804,902;675148,9;1203,187;792,829;953921,4;0;0
1 1177,318 804,803 947509,0 876,575 808,853 709020,9 952,191 804,781 766305,3 944,223 792,829 748607,2 838,797 804,902 675148,9 1203,187 792,829 953921,4 0 0

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

62
tests/conftest.py Normal file
View File

@ -0,0 +1,62 @@
import os
import shutil
from pathlib import Path
from unittest.mock import patch
import pytest
from dopt_sensor_anomalies.constants import MODEL_FOLDER_NAME
@pytest.fixture(scope="session", autouse=True)
def setup_temp_dir(tmp_path_factory):
tmp_dir = tmp_path_factory.mktemp("root")
folder_structure = "lib/folder"
pth = tmp_dir / folder_structure
pth.mkdir(parents=True, exist_ok=True)
# models
pth_models = tmp_dir / MODEL_FOLDER_NAME
pth_models.mkdir(parents=True, exist_ok=True)
_root_imgs = (Path(__file__).parent / "_models").glob("*.pth")
for model in _root_imgs:
dst = pth_models / model.name
shutil.copy(model, dst)
# images
pth_img = tmp_dir / "images"
pth_img.mkdir(parents=True, exist_ok=True)
_root_imgs = (Path(__file__).parent / "_img").glob("**/*.bmp")
for img in _root_imgs:
dst = pth_img / img.name
shutil.copy(img, dst)
with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
yield tmp_dir
@pytest.fixture(scope="session", autouse=True)
def results_folder(setup_temp_dir) -> Path:
if os.getenv("DOPT_WRITE_RESULTS", False):
results_base = Path(__file__).parent
else:
results_base = setup_temp_dir
results = results_base / "_results"
if not results.exists():
results.mkdir()
return results
@pytest.fixture(scope="session")
def path_img_with_failure_ElectrodeCount(setup_temp_dir) -> Path:
filename = "window_15_fail_electrode.bmp"
pth_img = setup_temp_dir / f"images/{filename}"
assert pth_img.exists(), "failure image not existing"
return pth_img
@pytest.fixture(scope="session")
def path_img_with_failure_TrainedModel(setup_temp_dir) -> Path:
filename = "window_19_fail_model.bmp"
pth_img = setup_temp_dir / f"images/{filename}"
assert pth_img.exists(), "failure image not existing"
return pth_img

View File

@ -1,10 +1,14 @@
import shutil
from pathlib import Path
from unittest.mock import patch
import numpy as np
import pytest
import dopt_sensor_anomalies._find_paths
import dopt_sensor_anomalies.detection as detect
import dopt_sensor_anomalies.types as t
from dopt_sensor_anomalies import constants
@pytest.fixture(scope="module")
@ -84,3 +88,40 @@ def test_measure_length(single_img_path):
assert 235 < img_right.shape[0] < 260
assert 910 < img_right.shape[1] < 960
assert img_right.shape[2] == 3
@pytest.mark.new
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251
MODEL_FOLDER = dopt_sensor_anomalies._find_paths.get_model_folder()
assert MODEL_FOLDER.exists(), "model folder not existing"
DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER)
assert DETECTION_MODELS["left"].exists()
assert DETECTION_MODELS["right"].exists()
data_csv, sensor_images = detect.measure_length(
path_img_with_failure_TrainedModel,
pixels_per_metric_X,
pixels_per_metric_Y,
)
print(">>>>>>> Data: ", data_csv)
# measured sizes
assert len(data_csv) == 18
assert sensor_images["left"] is not None
assert sensor_images["right"] is not None
detect.anomaly_detection(
file_path=path_img_with_failure_TrainedModel,
detection_models=DETECTION_MODELS,
data_csv=data_csv,
sensor_images=sensor_images,
)
# check files for existence
root_img = path_img_with_failure_TrainedModel.parent
file_stem = path_img_with_failure_TrainedModel.stem
csv_file = root_img / f"{file_stem}.csv"
heatmap_file = root_img / f"{file_stem}{constants.HEATMAP_FILENAME_SUFFIX}.png"
assert csv_file.exists()
assert heatmap_file.exists()
shutil.copy(csv_file, (results_folder / csv_file.name))
shutil.copy(heatmap_file, (results_folder / heatmap_file.name))

View File

@ -5,16 +5,23 @@ import pytest
from dopt_sensor_anomalies import _find_paths
# @pytest.fixture(scope="session", autouse=True)
# def setup_temp_dir(tmp_path_factory):
# tmp_dir = tmp_path_factory.mktemp("root")
# folder_structure = "lib/folder"
# pth = tmp_dir / folder_structure
# pth.mkdir(parents=True, exist_ok=True)
# # models
# folder_models = "lib/models"
# pth_models = tmp_dir / folder_models
# pth_models.mkdir(parents=True, exist_ok=True)
# _root_models = (Path(__file__).parent / "_models").glob("*.pth")
# for model in _root_models:
# dst = pth_models / model.name
# shutil.copy(model, dst)
@pytest.fixture(scope="module", autouse=True)
def setup_temp_dir(tmp_path_factory):
tmp_dir = tmp_path_factory.mktemp("root")
folder_structure = "lib/folder"
pth = tmp_dir / folder_structure
pth.mkdir(parents=True, exist_ok=True)
with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
yield
# with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
# yield
@pytest.fixture()