generated from dopt-python/py311
Cython Integration and Test Case Enhancements #1
33
pdm.lock
generated
33
pdm.lock
generated
@ -5,7 +5,7 @@
|
|||||||
groups = ["default", "dev", "lint", "nb", "tests"]
|
groups = ["default", "dev", "lint", "nb", "tests"]
|
||||||
strategy = ["inherit_metadata"]
|
strategy = ["inherit_metadata"]
|
||||||
lock_version = "4.5.0"
|
lock_version = "4.5.0"
|
||||||
content_hash = "sha256:fdc7b9b5d89abe575d708e809c823d506cd743ed354efd14bbe662a23e7b9fd9"
|
content_hash = "sha256:3b6355e97f9ec4986d016609fce5a358357a894972810f07bcedd274117446d2"
|
||||||
|
|
||||||
[[metadata.targets]]
|
[[metadata.targets]]
|
||||||
requires_python = ">=3.11,<3.14"
|
requires_python = ">=3.11,<3.14"
|
||||||
@ -379,6 +379,24 @@ files = [
|
|||||||
{file = "bracex-2.6.tar.gz", hash = "sha256:98f1347cd77e22ee8d967a30ad4e310b233f7754dbf31ff3fceb76145ba47dc7"},
|
{file = "bracex-2.6.tar.gz", hash = "sha256:98f1347cd77e22ee8d967a30ad4e310b233f7754dbf31ff3fceb76145ba47dc7"},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "build"
|
||||||
|
version = "1.3.0"
|
||||||
|
requires_python = ">=3.9"
|
||||||
|
summary = "A simple, correct Python build frontend"
|
||||||
|
groups = ["dev"]
|
||||||
|
dependencies = [
|
||||||
|
"colorama; os_name == \"nt\"",
|
||||||
|
"importlib-metadata>=4.6; python_full_version < \"3.10.2\"",
|
||||||
|
"packaging>=19.1",
|
||||||
|
"pyproject-hooks",
|
||||||
|
"tomli>=1.1.0; python_version < \"3.11\"",
|
||||||
|
]
|
||||||
|
files = [
|
||||||
|
{file = "build-1.3.0-py3-none-any.whl", hash = "sha256:7145f0b5061ba90a1500d60bd1b13ca0a8a4cebdd0cc16ed8adf1c0e739f43b4"},
|
||||||
|
{file = "build-1.3.0.tar.gz", hash = "sha256:698edd0ea270bde950f53aed21f3a0135672206f3911e0176261a31e0e07b397"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bump-my-version"
|
name = "bump-my-version"
|
||||||
version = "1.2.4"
|
version = "1.2.4"
|
||||||
@ -527,7 +545,7 @@ version = "0.4.6"
|
|||||||
requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
|
requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
|
||||||
summary = "Cross-platform colored terminal text."
|
summary = "Cross-platform colored terminal text."
|
||||||
groups = ["default", "dev", "nb", "tests"]
|
groups = ["default", "dev", "nb", "tests"]
|
||||||
marker = "sys_platform == \"win32\" or platform_system == \"Windows\""
|
marker = "sys_platform == \"win32\" or os_name == \"nt\" or platform_system == \"Windows\""
|
||||||
files = [
|
files = [
|
||||||
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
|
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
|
||||||
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
|
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
|
||||||
@ -3077,6 +3095,17 @@ files = [
|
|||||||
{file = "pyparsing-3.2.5.tar.gz", hash = "sha256:2df8d5b7b2802ef88e8d016a2eb9c7aeaa923529cd251ed0fe4608275d4105b6"},
|
{file = "pyparsing-3.2.5.tar.gz", hash = "sha256:2df8d5b7b2802ef88e8d016a2eb9c7aeaa923529cd251ed0fe4608275d4105b6"},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pyproject-hooks"
|
||||||
|
version = "1.2.0"
|
||||||
|
requires_python = ">=3.7"
|
||||||
|
summary = "Wrappers to call pyproject.toml-based build backend hooks."
|
||||||
|
groups = ["dev"]
|
||||||
|
files = [
|
||||||
|
{file = "pyproject_hooks-1.2.0-py3-none-any.whl", hash = "sha256:9e5c6bfa8dcc30091c74b0cf803c81fdd29d94f01992a7707bc97babb1141913"},
|
||||||
|
{file = "pyproject_hooks-1.2.0.tar.gz", hash = "sha256:1e859bd5c40fae9448642dd871adf459e5e2084186e8d2c2a79a824c970da1f8"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pytest"
|
name = "pytest"
|
||||||
version = "8.4.2"
|
version = "8.4.2"
|
||||||
|
|||||||
45
pdm_build.py
Normal file
45
pdm_build.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import re
|
||||||
|
import zipfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from Cython.Build import cythonize
|
||||||
|
|
||||||
|
ext_modules = cythonize(["src/dopt_sensor_anomalies/detection.py"])
|
||||||
|
|
||||||
|
|
||||||
|
def pdm_build_initialize(context):
|
||||||
|
context.ensure_build_dir()
|
||||||
|
|
||||||
|
|
||||||
|
def pdm_build_update_setup_kwargs(context, setup_kwargs):
|
||||||
|
setup_kwargs.update(
|
||||||
|
ext_modules=ext_modules,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pdm_build_finalize(context, artifact):
|
||||||
|
print(">>>>>> Context: ", context)
|
||||||
|
print(">>>>>> Artifact: ", artifact)
|
||||||
|
pth_artifact = Path(artifact)
|
||||||
|
|
||||||
|
if pth_artifact.suffix == ".whl":
|
||||||
|
delete_source_files_from_wheel(pth_artifact)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_source_files_from_wheel(pth_to_whl: Path):
|
||||||
|
assert pth_to_whl.exists(), "wheel file not existing"
|
||||||
|
tmp_dir = pth_to_whl.parent / "tmp"
|
||||||
|
tmp_dir.mkdir()
|
||||||
|
filename = pth_to_whl.name
|
||||||
|
tmp_whl = tmp_dir / filename
|
||||||
|
pattern = re.compile(r".*\.c$|.*detection.py$|.*\.pyi$")
|
||||||
|
|
||||||
|
with zipfile.ZipFile(pth_to_whl, mode="r") as src:
|
||||||
|
with zipfile.ZipFile(tmp_whl, mode="w") as dst:
|
||||||
|
for filename in src.namelist():
|
||||||
|
if pattern.match(filename) is None:
|
||||||
|
data = src.read(filename)
|
||||||
|
dst.writestr(filename, data)
|
||||||
|
|
||||||
|
tmp_whl.replace(pth_to_whl)
|
||||||
|
tmp_dir.rmdir()
|
||||||
@ -11,9 +11,13 @@ readme = "README.md"
|
|||||||
license = {text = "LicenseRef-Proprietary"}
|
license = {text = "LicenseRef-Proprietary"}
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["pdm-backend"]
|
requires = ["pdm-backend", "Cython", "setuptools"]
|
||||||
build-backend = "pdm.backend"
|
build-backend = "pdm.backend"
|
||||||
|
|
||||||
|
[tool.pdm.build]
|
||||||
|
package-dir = "src"
|
||||||
|
run-setuptools = true
|
||||||
|
|
||||||
|
|
||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
line-length = 94
|
line-length = 94
|
||||||
@ -116,9 +120,6 @@ replace = "version = \"{new_version}\""
|
|||||||
[tool.pdm]
|
[tool.pdm]
|
||||||
distribution = true
|
distribution = true
|
||||||
|
|
||||||
[tool.pdm.build]
|
|
||||||
package-dir = "src"
|
|
||||||
|
|
||||||
[tool.pdm.resolution]
|
[tool.pdm.resolution]
|
||||||
respect-source-order = true
|
respect-source-order = true
|
||||||
|
|
||||||
@ -145,6 +146,7 @@ dev = [
|
|||||||
"nox>=2025.2.9",
|
"nox>=2025.2.9",
|
||||||
"cython>=3.1.4",
|
"cython>=3.1.4",
|
||||||
"setuptools>=80.9.0",
|
"setuptools>=80.9.0",
|
||||||
|
"build>=1.3.0",
|
||||||
]
|
]
|
||||||
nb = [
|
nb = [
|
||||||
"jupyterlab>=4.3.5",
|
"jupyterlab>=4.3.5",
|
||||||
|
|||||||
20
setup.py
Normal file
20
setup.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from Cython.Build import cythonize
|
||||||
|
from Cython.Compiler import Options
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
Options.docstrings = False
|
||||||
|
Options.embed_pos_in_docstring = False
|
||||||
|
Options.annotate = False
|
||||||
|
Options.fast_fail = True
|
||||||
|
|
||||||
|
ext_modules = cythonize(
|
||||||
|
["src/dopt_sensor_anomalies/detection.py"],
|
||||||
|
compiler_directives={
|
||||||
|
"language_level": 3,
|
||||||
|
"embedsignature": False,
|
||||||
|
"annotation_typing": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
setup(ext_modules=ext_modules)
|
||||||
21220
src/dopt_sensor_anomalies/detection.c
Normal file
21220
src/dopt_sensor_anomalies/detection.c
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,5 @@
|
|||||||
import csv
|
import csv
|
||||||
|
import warnings
|
||||||
from os import path
|
from os import path
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Final, cast
|
from typing import Any, Final, cast
|
||||||
@ -24,6 +25,13 @@ from dopt_sensor_anomalies import constants as const
|
|||||||
from dopt_sensor_anomalies import errors
|
from dopt_sensor_anomalies import errors
|
||||||
from dopt_sensor_anomalies import types as t
|
from dopt_sensor_anomalies import types as t
|
||||||
|
|
||||||
|
# Suppress the specific HuggingFace cache symlink warning
|
||||||
|
warnings.filterwarnings(
|
||||||
|
"ignore",
|
||||||
|
message=".*huggingface_hub.*cache-system uses symlinks.*",
|
||||||
|
category=UserWarning,
|
||||||
|
)
|
||||||
|
|
||||||
# input parameters: user-defined
|
# input parameters: user-defined
|
||||||
file_path: Path = Path(r"C:\Users\demon\Documents\EKF\Analyse_fuer_Florian\bild2.bmp")
|
file_path: Path = Path(r"C:\Users\demon\Documents\EKF\Analyse_fuer_Florian\bild2.bmp")
|
||||||
pixels_per_metric_X: float = 0.251
|
pixels_per_metric_X: float = 0.251
|
||||||
@ -35,20 +43,6 @@ def midpoint(
|
|||||||
pt_A: npt.NDArray[np.floating],
|
pt_A: npt.NDArray[np.floating],
|
||||||
pt_B: npt.NDArray[np.floating],
|
pt_B: npt.NDArray[np.floating],
|
||||||
) -> tuple[float, float]:
|
) -> tuple[float, float]:
|
||||||
"""to identify the midpoint of a 2D area
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
pt_A : npt.NDArray[np.floating]
|
|
||||||
tuple of coordinates x, y; shape (2, )
|
|
||||||
pt_B : npt.NDArray[np.floating]
|
|
||||||
tuple of coordinates x, y; shape (2, )
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
tuple[float, float]
|
|
||||||
tuple of midpoint coordinates
|
|
||||||
"""
|
|
||||||
return ((pt_A[0] + pt_B[0]) * 0.5, (pt_A[1] + pt_B[1]) * 0.5)
|
return ((pt_A[0] + pt_B[0]) * 0.5, (pt_A[1] + pt_B[1]) * 0.5)
|
||||||
|
|
||||||
|
|
||||||
@ -57,22 +51,6 @@ def check_box_redundancy(
|
|||||||
box_2: t.Box,
|
box_2: t.Box,
|
||||||
tolerance: float = 5.0,
|
tolerance: float = 5.0,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""to check if bounding box has already been identified and is just a redundant one
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
box_1 : t.Box
|
|
||||||
tuple of box values: ((center_x, center_y), (width, height), angle)
|
|
||||||
box_2 : t.Box
|
|
||||||
tuple of box values: ((center_x, center_y), (width, height), angle)
|
|
||||||
tolerance : float, optional
|
|
||||||
distance threshold for width and height, by default 5.0
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
bool
|
|
||||||
redundancy evaluation
|
|
||||||
"""
|
|
||||||
# unpack the boxes
|
# unpack the boxes
|
||||||
c1, s1, _ = box_1
|
c1, s1, _ = box_1
|
||||||
c2, s2, _ = box_2
|
c2, s2, _ = box_2
|
||||||
@ -93,32 +71,6 @@ def measure_length(
|
|||||||
pixels_per_metric_X: float,
|
pixels_per_metric_X: float,
|
||||||
pixels_per_metric_Y: float,
|
pixels_per_metric_Y: float,
|
||||||
) -> tuple[t.CsvData, t.SensorImages]:
|
) -> tuple[t.CsvData, t.SensorImages]:
|
||||||
"""detect and measure the size of the electrodes
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
file_path : Path
|
|
||||||
path to file to analyse
|
|
||||||
pixels_per_metric_X : float
|
|
||||||
scaling parameter x dimension, Pixels per micrometer in image
|
|
||||||
pixels_per_metric_Y : float
|
|
||||||
scaling parameter y dimension, Pixels per micrometer in image
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
tuple[t.CsvData, t.SensorImages]
|
|
||||||
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints
|
|
||||||
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
|
|
||||||
|
|
||||||
Raises
|
|
||||||
------
|
|
||||||
errors.ImageNotReadError
|
|
||||||
image was not read successfully
|
|
||||||
errors.ContourCalculationError
|
|
||||||
during contour detection there were several possible error causes
|
|
||||||
errors.InvalidElectrodeCount
|
|
||||||
an invalid number of electrodes were detected
|
|
||||||
"""
|
|
||||||
data_csv: list[str | int] = []
|
data_csv: list[str | int] = []
|
||||||
image = cv2.imread(str(file_path))
|
image = cv2.imread(str(file_path))
|
||||||
if image is None:
|
if image is None:
|
||||||
@ -216,7 +168,7 @@ def measure_length(
|
|||||||
num_contours = len(filtered_cnts)
|
num_contours = len(filtered_cnts)
|
||||||
if num_contours != const.NUM_VALID_ELECTRODES:
|
if num_contours != const.NUM_VALID_ELECTRODES:
|
||||||
raise errors.InvalidElectrodeCount(
|
raise errors.InvalidElectrodeCount(
|
||||||
f"Number of counted electroedes does not match the "
|
f"Number of counted electrodes does not match the "
|
||||||
f"expected value: count = {num_contours}, expected = {const.NUM_VALID_ELECTRODES}"
|
f"expected value: count = {num_contours}, expected = {const.NUM_VALID_ELECTRODES}"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -243,38 +195,15 @@ def infer_image(
|
|||||||
image: npt.NDArray[np.uint8],
|
image: npt.NDArray[np.uint8],
|
||||||
model: Patchcore,
|
model: Patchcore,
|
||||||
) -> t.InferenceResult:
|
) -> t.InferenceResult:
|
||||||
"""evaluate one image
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
image : npt.NDArray[np.uint8]
|
|
||||||
represents image to be checked for anomalies
|
|
||||||
model : Patchcore
|
|
||||||
(loaded PyTorch state dictionary): model for anomaly detection
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
t.InferenceResult
|
|
||||||
contains:
|
|
||||||
img (numpy.ndarray)
|
|
||||||
anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies
|
|
||||||
anomaly_score (float): evaluation metric, in [0, 1] with close to 0 being no
|
|
||||||
anomaly detected
|
|
||||||
anomaly_label (bool): anomaly detected (1) or not (0)
|
|
||||||
"""
|
|
||||||
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||||
model.to(torch_device)
|
model.to(torch_device)
|
||||||
|
|
||||||
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # this is optional
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||||
pil_image = Image.fromarray(image_rgb)
|
pil_image = Image.fromarray(image_rgb)
|
||||||
pil_image = pil_image.convert("RGB")
|
pil_image = pil_image.convert("RGB")
|
||||||
input_tensor = (
|
image_np = np.array(pil_image).astype(np.float32) / 255.0
|
||||||
to_dtype(to_image(pil_image), torch.float32, scale=True)
|
input_tensor = torch.from_numpy(image_np).permute(2, 0, 1)
|
||||||
if torch.as_tensor # ?? Question: Wie passt diese Funktion hier rein?
|
|
||||||
# ?? Konvertiert, aber wird zur Evaluation der Aussage genutzt (sollte immer wahr sein?)
|
|
||||||
else np.array(pil_image) / 255.0
|
|
||||||
)
|
|
||||||
# ?? Ist das immer ein Torch-Tensor? Falls nicht, müsste die Methode geändert werden
|
|
||||||
input_tensor = input_tensor.unsqueeze(0)
|
input_tensor = input_tensor.unsqueeze(0)
|
||||||
input_tensor = input_tensor.to(torch_device)
|
input_tensor = input_tensor.to(torch_device)
|
||||||
|
|
||||||
@ -305,19 +234,6 @@ def anomaly_detection(
|
|||||||
data_csv: t.CsvData,
|
data_csv: t.CsvData,
|
||||||
sensor_images: t.SensorImages,
|
sensor_images: t.SensorImages,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""load the model, call function for anomaly detection and store the results
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
file_path : Path
|
|
||||||
path to file to analyse
|
|
||||||
detection_models : t.DetectionModels
|
|
||||||
collection of model paths for the left and right sensor
|
|
||||||
data_csv : t.CsvData
|
|
||||||
(list) data to save as CSV according to requirements, contains strings and ints
|
|
||||||
sensor_images : t.SensorImages
|
|
||||||
_description_
|
|
||||||
"""
|
|
||||||
file_stem = file_path.stem
|
file_stem = file_path.stem
|
||||||
folder_path = file_path.parent
|
folder_path = file_path.parent
|
||||||
|
|
||||||
@ -325,12 +241,8 @@ def anomaly_detection(
|
|||||||
model = Patchcore(
|
model = Patchcore(
|
||||||
backbone=const.BACKBONE, layers=const.LAYERS, coreset_sampling_ratio=const.RATIO
|
backbone=const.BACKBONE, layers=const.LAYERS, coreset_sampling_ratio=const.RATIO
|
||||||
)
|
)
|
||||||
# ?? benötigt? Wird nicht genutzt
|
|
||||||
# engine = Engine()
|
|
||||||
|
|
||||||
# preparation for plot
|
# preparation for plot
|
||||||
_, axes = plt.subplots(1, 2, figsize=(12, 6))
|
_, axes = plt.subplots(1, 2, figsize=(12, 6))
|
||||||
|
|
||||||
# loop over left and right sensor
|
# loop over left and right sensor
|
||||||
for i, (side, image) in enumerate(sensor_images.items()):
|
for i, (side, image) in enumerate(sensor_images.items()):
|
||||||
# Ich habe die Modellpfade als Funktionsparameter hinzugefügt
|
# Ich habe die Modellpfade als Funktionsparameter hinzugefügt
|
||||||
|
|||||||
135
src/dopt_sensor_anomalies/detection.pyi
Normal file
135
src/dopt_sensor_anomalies/detection.pyi
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import numpy.typing as npt
|
||||||
|
from anomalib.models import Patchcore
|
||||||
|
|
||||||
|
from dopt_sensor_anomalies import types as t
|
||||||
|
|
||||||
|
def midpoint(
|
||||||
|
pt_A: npt.NDArray[np.floating],
|
||||||
|
pt_B: npt.NDArray[np.floating],
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""to identify the midpoint of a 2D area
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
pt_A : npt.NDArray[np.floating]
|
||||||
|
tuple of coordinates x, y; shape (2, )
|
||||||
|
pt_B : npt.NDArray[np.floating]
|
||||||
|
tuple of coordinates x, y; shape (2, )
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
tuple[float, float]
|
||||||
|
tuple of midpoint coordinates
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def check_box_redundancy(
|
||||||
|
box_1: t.Box,
|
||||||
|
box_2: t.Box,
|
||||||
|
tolerance: float = 5.0,
|
||||||
|
) -> bool:
|
||||||
|
"""to check if bounding box has already been identified and is just a redundant one
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
box_1 : t.Box
|
||||||
|
tuple of box values: ((center_x, center_y), (width, height), angle)
|
||||||
|
box_2 : t.Box
|
||||||
|
tuple of box values: ((center_x, center_y), (width, height), angle)
|
||||||
|
tolerance : float, optional
|
||||||
|
distance threshold for width and height, by default 5.0
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
bool
|
||||||
|
redundancy evaluation
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def measure_length(
|
||||||
|
file_path: Path,
|
||||||
|
pixels_per_metric_X: float,
|
||||||
|
pixels_per_metric_Y: float,
|
||||||
|
) -> tuple[t.CsvData, t.SensorImages]:
|
||||||
|
"""detect and measure the size of the electrodes
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
file_path : Path
|
||||||
|
path to file to analyse
|
||||||
|
pixels_per_metric_X : float
|
||||||
|
scaling parameter x dimension, Pixels per micrometer in image
|
||||||
|
pixels_per_metric_Y : float
|
||||||
|
scaling parameter y dimension, Pixels per micrometer in image
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
tuple[t.CsvData, t.SensorImages]
|
||||||
|
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints
|
||||||
|
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
errors.ImageNotReadError
|
||||||
|
image was not read successfully
|
||||||
|
errors.ContourCalculationError
|
||||||
|
during contour detection there were several possible error causes
|
||||||
|
errors.InvalidElectrodeCount
|
||||||
|
an invalid number of electrodes were detected
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def infer_image(
|
||||||
|
image: npt.NDArray[np.uint8],
|
||||||
|
model: Patchcore,
|
||||||
|
) -> t.InferenceResult:
|
||||||
|
"""evaluate one image
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
image : npt.NDArray[np.uint8]
|
||||||
|
represents image to be checked for anomalies
|
||||||
|
model : Patchcore
|
||||||
|
(loaded PyTorch state dictionary): model for anomaly detection
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
t.InferenceResult
|
||||||
|
contains:
|
||||||
|
img (numpy.ndarray)
|
||||||
|
anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies
|
||||||
|
anomaly_score (float): evaluation metric, in [0, 1] with close to 0 being no
|
||||||
|
anomaly detected
|
||||||
|
anomaly_label (bool): anomaly detected (1) or not (0)
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def anomaly_detection(
|
||||||
|
file_path: Path,
|
||||||
|
detection_models: t.DetectionModels,
|
||||||
|
data_csv: t.CsvData,
|
||||||
|
sensor_images: t.SensorImages,
|
||||||
|
) -> None:
|
||||||
|
"""load the model, call function for anomaly detection and store the results
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
file_path : Path
|
||||||
|
path to file to analyse
|
||||||
|
detection_models : t.DetectionModels
|
||||||
|
collection of model paths for the left and right sensor
|
||||||
|
data_csv : t.CsvData
|
||||||
|
(list) data to save as CSV according to requirements, contains strings and ints
|
||||||
|
sensor_images : t.SensorImages
|
||||||
|
_description_
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
def pipeline(
|
||||||
|
user_file_path: str,
|
||||||
|
pixels_per_metric_X: float,
|
||||||
|
pixels_per_metric_Y: float,
|
||||||
|
) -> None: ...
|
||||||
BIN
tests/_img/failures/window_15_fail_electrode.bmp
Normal file
BIN
tests/_img/failures/window_15_fail_electrode.bmp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.7 MiB |
BIN
tests/_img/failures/window_19_fail_model.bmp
Normal file
BIN
tests/_img/failures/window_19_fail_model.bmp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.7 MiB |
BIN
tests/_models/patchcore_model_left_hand_side.pth
Normal file
BIN
tests/_models/patchcore_model_left_hand_side.pth
Normal file
Binary file not shown.
BIN
tests/_models/patchcore_model_right_hand_side.pth
Normal file
BIN
tests/_models/patchcore_model_right_hand_side.pth
Normal file
Binary file not shown.
1
tests/_results/window_19_fail_model.csv
Normal file
1
tests/_results/window_19_fail_model.csv
Normal file
@ -0,0 +1 @@
|
|||||||
|
1177,318;804,803;947509,0;876,575;808,853;709020,9;952,191;804,781;766305,3;944,223;792,829;748607,2;838,797;804,902;675148,9;1203,187;792,829;953921,4;0;0
|
||||||
|
BIN
tests/_results/window_19_fail_model_Heatmap.png
Normal file
BIN
tests/_results/window_19_fail_model_Heatmap.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 88 KiB |
62
tests/conftest.py
Normal file
62
tests/conftest.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from dopt_sensor_anomalies.constants import MODEL_FOLDER_NAME
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session", autouse=True)
|
||||||
|
def setup_temp_dir(tmp_path_factory):
|
||||||
|
tmp_dir = tmp_path_factory.mktemp("root")
|
||||||
|
folder_structure = "lib/folder"
|
||||||
|
pth = tmp_dir / folder_structure
|
||||||
|
pth.mkdir(parents=True, exist_ok=True)
|
||||||
|
# models
|
||||||
|
pth_models = tmp_dir / MODEL_FOLDER_NAME
|
||||||
|
pth_models.mkdir(parents=True, exist_ok=True)
|
||||||
|
_root_imgs = (Path(__file__).parent / "_models").glob("*.pth")
|
||||||
|
for model in _root_imgs:
|
||||||
|
dst = pth_models / model.name
|
||||||
|
shutil.copy(model, dst)
|
||||||
|
# images
|
||||||
|
pth_img = tmp_dir / "images"
|
||||||
|
pth_img.mkdir(parents=True, exist_ok=True)
|
||||||
|
_root_imgs = (Path(__file__).parent / "_img").glob("**/*.bmp")
|
||||||
|
for img in _root_imgs:
|
||||||
|
dst = pth_img / img.name
|
||||||
|
shutil.copy(img, dst)
|
||||||
|
|
||||||
|
with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
|
||||||
|
yield tmp_dir
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session", autouse=True)
|
||||||
|
def results_folder(setup_temp_dir) -> Path:
|
||||||
|
if os.getenv("DOPT_WRITE_RESULTS", False):
|
||||||
|
results_base = Path(__file__).parent
|
||||||
|
else:
|
||||||
|
results_base = setup_temp_dir
|
||||||
|
results = results_base / "_results"
|
||||||
|
if not results.exists():
|
||||||
|
results.mkdir()
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def path_img_with_failure_ElectrodeCount(setup_temp_dir) -> Path:
|
||||||
|
filename = "window_15_fail_electrode.bmp"
|
||||||
|
pth_img = setup_temp_dir / f"images/{filename}"
|
||||||
|
assert pth_img.exists(), "failure image not existing"
|
||||||
|
return pth_img
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def path_img_with_failure_TrainedModel(setup_temp_dir) -> Path:
|
||||||
|
filename = "window_19_fail_model.bmp"
|
||||||
|
pth_img = setup_temp_dir / f"images/{filename}"
|
||||||
|
assert pth_img.exists(), "failure image not existing"
|
||||||
|
return pth_img
|
||||||
@ -1,10 +1,14 @@
|
|||||||
|
import shutil
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
import dopt_sensor_anomalies._find_paths
|
||||||
import dopt_sensor_anomalies.detection as detect
|
import dopt_sensor_anomalies.detection as detect
|
||||||
import dopt_sensor_anomalies.types as t
|
import dopt_sensor_anomalies.types as t
|
||||||
|
from dopt_sensor_anomalies import constants
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
@ -84,3 +88,40 @@ def test_measure_length(single_img_path):
|
|||||||
assert 235 < img_right.shape[0] < 260
|
assert 235 < img_right.shape[0] < 260
|
||||||
assert 910 < img_right.shape[1] < 960
|
assert 910 < img_right.shape[1] < 960
|
||||||
assert img_right.shape[2] == 3
|
assert img_right.shape[2] == 3
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.new
|
||||||
|
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "lib")
|
||||||
|
def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
|
||||||
|
pixels_per_metric_X: float = 0.251
|
||||||
|
pixels_per_metric_Y: float = 0.251
|
||||||
|
MODEL_FOLDER = dopt_sensor_anomalies._find_paths.get_model_folder()
|
||||||
|
assert MODEL_FOLDER.exists(), "model folder not existing"
|
||||||
|
DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER)
|
||||||
|
assert DETECTION_MODELS["left"].exists()
|
||||||
|
assert DETECTION_MODELS["right"].exists()
|
||||||
|
data_csv, sensor_images = detect.measure_length(
|
||||||
|
path_img_with_failure_TrainedModel,
|
||||||
|
pixels_per_metric_X,
|
||||||
|
pixels_per_metric_Y,
|
||||||
|
)
|
||||||
|
print(">>>>>>> Data: ", data_csv)
|
||||||
|
# measured sizes
|
||||||
|
assert len(data_csv) == 18
|
||||||
|
assert sensor_images["left"] is not None
|
||||||
|
assert sensor_images["right"] is not None
|
||||||
|
detect.anomaly_detection(
|
||||||
|
file_path=path_img_with_failure_TrainedModel,
|
||||||
|
detection_models=DETECTION_MODELS,
|
||||||
|
data_csv=data_csv,
|
||||||
|
sensor_images=sensor_images,
|
||||||
|
)
|
||||||
|
# check files for existence
|
||||||
|
root_img = path_img_with_failure_TrainedModel.parent
|
||||||
|
file_stem = path_img_with_failure_TrainedModel.stem
|
||||||
|
csv_file = root_img / f"{file_stem}.csv"
|
||||||
|
heatmap_file = root_img / f"{file_stem}{constants.HEATMAP_FILENAME_SUFFIX}.png"
|
||||||
|
assert csv_file.exists()
|
||||||
|
assert heatmap_file.exists()
|
||||||
|
shutil.copy(csv_file, (results_folder / csv_file.name))
|
||||||
|
shutil.copy(heatmap_file, (results_folder / heatmap_file.name))
|
||||||
|
|||||||
@ -5,16 +5,23 @@ import pytest
|
|||||||
|
|
||||||
from dopt_sensor_anomalies import _find_paths
|
from dopt_sensor_anomalies import _find_paths
|
||||||
|
|
||||||
|
# @pytest.fixture(scope="session", autouse=True)
|
||||||
|
# def setup_temp_dir(tmp_path_factory):
|
||||||
|
# tmp_dir = tmp_path_factory.mktemp("root")
|
||||||
|
# folder_structure = "lib/folder"
|
||||||
|
# pth = tmp_dir / folder_structure
|
||||||
|
# pth.mkdir(parents=True, exist_ok=True)
|
||||||
|
# # models
|
||||||
|
# folder_models = "lib/models"
|
||||||
|
# pth_models = tmp_dir / folder_models
|
||||||
|
# pth_models.mkdir(parents=True, exist_ok=True)
|
||||||
|
# _root_models = (Path(__file__).parent / "_models").glob("*.pth")
|
||||||
|
# for model in _root_models:
|
||||||
|
# dst = pth_models / model.name
|
||||||
|
# shutil.copy(model, dst)
|
||||||
|
|
||||||
@pytest.fixture(scope="module", autouse=True)
|
# with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
|
||||||
def setup_temp_dir(tmp_path_factory):
|
# yield
|
||||||
tmp_dir = tmp_path_factory.mktemp("root")
|
|
||||||
folder_structure = "lib/folder"
|
|
||||||
pth = tmp_dir / folder_structure
|
|
||||||
pth.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
with patch("dopt_sensor_anomalies._find_paths.LIB_ROOT_PATH", pth):
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user