Apply changes to meet customer specification #27

Merged
foefl merged 14 commits from main_adapted into main 2026-01-07 09:30:57 +00:00
17 changed files with 3956 additions and 2372 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@ reports/
# credentials # credentials
CREDENTIALS* CREDENTIALS*
*.pth *.pth
*.pdf
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -14,7 +14,9 @@ Die im Rahmen des Projekts erstellte Anwendung erlaubt es, Bilder von Sensoren,
Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte. Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte.
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten zwei Einträge mit jeweils ``0`` oder ``1``, ein Ergebnis der Anomaliedetektion für jeden Sensor. Bei einem Ergebnis von ``0`` wurde keine Anomalie festgestellt, bei ``1`` hingegen schon. Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten vier Einträge, für jeden Sensor jeweils der ermittelte Anomalie-Score und die Entscheidung, ob eine Anomalie vorliegt (``1``) oder nicht (``0``).
Die Reihenfolge aller Werte in der CSV-Datei orientiert sich an der Spezifikation, die durch EKF-Diagnostics bereitgestellt wurde.
Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt. Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt.
@ -29,14 +31,14 @@ Die Nutzung erfolgt über ein CLI, das über ein Python-Skript abgerufen werden
Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden: Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden:
```pwsh ```pwsh
cd ekf-sensor-anomalies-deployment\python cd dopt_ekf_sensor-anomalies_v{AKTUELLE-VERSION}\python
.\python.exe .\cli.py --help .\python.exe .\cli.py --help
``` ```
Dieser Befehl gibt den folgenden Hilfetext aus: Dieser Befehl gibt den folgenden Hilfetext aus:
``` ```
usage: cli.py [-h] img_path calib_value_x calib_value_y usage: cli.py [-h] [-t ANOMALY_THRESHOLD] img_path calib_value_x calib_value_y
simple CLI tool to analyse single sensor images for anomalies simple CLI tool to analyse single sensor images for anomalies
@ -47,6 +49,8 @@ positional arguments:
options: options:
-h, --help show this help message and exit -h, --help show this help message and exit
-t ANOMALY_THRESHOLD, --anomaly_threshold ANOMALY_THRESHOLD
optional anomaly threshold to set, default: 0.14, type: float
``` ```
Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter: Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter:
@ -55,7 +59,9 @@ Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese b
- ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl - ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
- ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl - ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
Alle Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben. Diese Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
Optional kann für die Pipeline auch der Schwellwert für die Anomaliedetektion definiert werden. Dieser Wert bestimmt, ab wann eine Unregelmäßigkeit in den Bildern tatsächlich als Anomlie gewertet wird, und muss als Gleitkommazahl bereitgestellt werden. Die Übergabe erfolgt über den Parameter ``-t`` bzw. ``--anomaly_threshold``. Der Standardwert liegt bei 0,14.
### Fehlermeldungen ### Fehlermeldungen

14
cli.py
View File

@ -5,7 +5,7 @@ from typing import cast
from dopt_basics import cli from dopt_basics import cli
from dopt_sensor_anomalies import _interface from dopt_sensor_anomalies import _interface, constants
@dc.dataclass() @dc.dataclass()
@ -13,6 +13,7 @@ class CliArgs:
img_path: str img_path: str
calib_value_x: float calib_value_x: float
calib_value_y: float calib_value_y: float
anomaly_threshold: float
def main() -> int: def main() -> int:
@ -34,6 +35,16 @@ def main() -> int:
help="calibration value in pixels per mcm for y axis, type: float", help="calibration value in pixels per mcm for y axis, type: float",
type=float, type=float,
) )
parser.add_argument(
"-t",
"--anomaly_threshold",
help=(
f"optional anomaly threshold to set, default: "
f"{constants.ANOMALY_THRESHOLD_DEFAULT}, type: float"
),
default=constants.ANOMALY_THRESHOLD_DEFAULT,
type=float,
)
args = cast(CliArgs, parser.parse_args()) args = cast(CliArgs, parser.parse_args())
with cli.LoadingAnimation( with cli.LoadingAnimation(
@ -44,6 +55,7 @@ def main() -> int:
args.img_path, args.img_path,
args.calib_value_x, args.calib_value_x,
args.calib_value_y, args.calib_value_y,
args.anomaly_threshold,
) )
if status != 0: if status != 0:
loader.stop(interrupt=True) loader.stop(interrupt=True)

View File

@ -6,7 +6,7 @@ from unittest.mock import patch
from dopt_basics import cli from dopt_basics import cli
from dopt_sensor_anomalies import _interface from dopt_sensor_anomalies import _interface, constants
@dc.dataclass() @dc.dataclass()
@ -14,6 +14,7 @@ class CliArgs:
img_path: str img_path: str
calib_value_x: float calib_value_x: float
calib_value_y: float calib_value_y: float
anomaly_threshold: float
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "src") @patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "src")
@ -37,6 +38,16 @@ def main() -> int:
help="calibration value in pixels per mcm for y axis, type: float", help="calibration value in pixels per mcm for y axis, type: float",
type=float, type=float,
) )
parser.add_argument(
"-t",
"--anomaly_threshold",
help=(
f"optional anomaly threshold to set, default: "
f"{constants.ANOMALY_THRESHOLD_DEFAULT}, type: float"
),
default=constants.ANOMALY_THRESHOLD_DEFAULT,
type=float,
)
args = cast(CliArgs, parser.parse_args()) args = cast(CliArgs, parser.parse_args())
with cli.LoadingAnimation( with cli.LoadingAnimation(
@ -47,6 +58,7 @@ def main() -> int:
args.img_path, args.img_path,
args.calib_value_x, args.calib_value_x,
args.calib_value_y, args.calib_value_y,
args.anomaly_threshold,
) )
if status != 0: if status != 0:
loader.stop(interrupt=True) loader.stop(interrupt=True)

View File

@ -11,7 +11,9 @@ Die im Rahmen des Projekts erstellte Anwendung erlaubt es, Bilder von Sensoren,
Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte. Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte.
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten zwei Einträge mit jeweils ``0`` oder ``1``, ein Ergebnis der Anomaliedetektion für jeden Sensor. Bei einem Ergebnis von ``0`` wurde keine Anomalie festgestellt, bei ``1`` hingegen schon. Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten vier Einträge, für jeden Sensor jeweils der ermittelte Anomalie-Score und die Entscheidung, ob eine Anomalie vorliegt (``1``) oder nicht (``0``).
Die Reihenfolge aller Werte in der CSV-Datei orientiert sich an der Spezifikation, die durch EKF-Diagnostics bereitgestellt wurde.
Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt. Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt.
@ -26,14 +28,14 @@ Die Nutzung erfolgt über ein CLI, das über ein Python-Skript abgerufen werden
Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden: Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden:
```pwsh ```pwsh
cd ekf-sensor-anomalies-deployment\python cd dopt_ekf_sensor-anomalies_v{AKTUELLE-VERSION}\python
.\python.exe .\cli.py --help .\python.exe .\cli.py --help
``` ```
Dieser Befehl gibt den folgenden Hilfetext aus: Dieser Befehl gibt den folgenden Hilfetext aus:
``` ```
usage: cli.py [-h] img_path calib_value_x calib_value_y usage: cli.py [-h] [-t ANOMALY_THRESHOLD] img_path calib_value_x calib_value_y
simple CLI tool to analyse single sensor images for anomalies simple CLI tool to analyse single sensor images for anomalies
@ -41,11 +43,11 @@ positional arguments:
img_path file path to the image which is to be analysed img_path file path to the image which is to be analysed
calib_value_x calibration value in pixels per mcm for x axis, type: float calib_value_x calibration value in pixels per mcm for x axis, type: float
calib_value_y calibration value in pixels per mcm for y axis, type: float calib_value_y calibration value in pixels per mcm for y axis, type: float
```
\newpage
```
options: options:
-h, --help show this help message and exit -h, --help show this help message and exit
-t ANOMALY_THRESHOLD, --anomaly_threshold ANOMALY_THRESHOLD
optional anomaly threshold to set, default: 0.14, type: float
``` ```
Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter: Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter:
@ -54,7 +56,9 @@ Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese b
- ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl - ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
- ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl - ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
Alle Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben. Diese Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
Optional kann für die Pipeline auch der Schwellwert für die Anomaliedetektion definiert werden. Dieser Wert bestimmt, ab wann eine Unregelmäßigkeit in den Bildern tatsächlich als Anomlie gewertet wird, und muss als Gleitkommazahl bereitgestellt werden. Die Übergabe erfolgt über den Parameter ``-t`` bzw. ``--anomaly_threshold``. Der Standardwert liegt bei 0,14.
### Fehlermeldungen ### Fehlermeldungen

Binary file not shown.

View File

@ -1,6 +1,6 @@
[project] [project]
name = "dopt-sensor-anomalies" name = "dopt-sensor-anomalies"
version = "0.1.5" version = "0.2.0dev0"
description = "anomaly detection for sensor images for quality assurance processes" description = "anomaly detection for sensor images for quality assurance processes"
authors = [ authors = [
{name = "d-opt GmbH (resp.: Florian Foerster)", email = "f.foerster@d-opt.com"}, {name = "d-opt GmbH (resp.: Florian Foerster)", email = "f.foerster@d-opt.com"},
@ -77,7 +77,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.1.5" current_version = "0.2.0dev0"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@ -1 +1,115 @@
pdm build -d build/ $DEPLOYMENT_PATH = 'B:\deployments\EKF-Diagnostics'
$ENV_PATH = 'B:\deployments\EKF-Diagnostics\dopt_ekf_sensor-anomalies'
$PY_PATH = Join-Path -Path $ENV_PATH -ChildPath 'python'
$SRC_PATH = (Get-Location).Path
Write-Output "Build Pipeline for d-opt EKF-Diagnostics sensor anomaly detection"
Write-Output "Building package..."
.\scripts\publish.ps1
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were build errors"
Exit
}
Write-Output "Built package successfully"
# manual
Write-Output "Generate manual..."
.\scripts\cvt_manual.ps1
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there errors while generating the README file"
Exit
}
Write-Output "Generated manual file successfully"
Write-Output "Copying manual file..."
$readme_src_path = Join-Path -Path $SRC_PATH -ChildPath 'docs\manual.pdf'
$readme_dest_path = Join-Path -Path $ENV_PATH -ChildPath 'manual'
Copy-Item -Path $readme_src_path -Destination $readme_dest_path -Force
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while copying the manual file"
Exit
}
Write-Output "Copied manual file successfully"
Write-Output "Go into env directory..."
Set-Location $ENV_PATH
Write-Output "Install package into environment..."
pycage venv add -p -i http://localhost:8001/simple/ dopt-sensor-anomalies
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while installing the package into the environment"
Exit
}
Write-Output "Successfully installed package"
# copy CLI file
Write-Output "Copying CLI script file..."
$cli_path = Join-Path -Path $SRC_PATH -ChildPath 'cli.py'
Copy-Item -Path $cli_path -Destination $PY_PATH -Force
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while copying the CLI script file"
Exit
}
Write-Output "Copied CLI script file successfully"
# copy model files
Write-Output "Copying model weight files..."
$model_src_path = Join-Path -Path $SRC_PATH -ChildPath 'tests\_models\'
$model_dest_path = Join-Path -Path $ENV_PATH -ChildPath 'models'
Copy-Item -Path "$model_src_path\*.pth" -Destination $model_dest_path -Force
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while copying the model weight files"
Exit
}
Write-Output "Copied model weight files successfully"
# env preparation
Write-Output "Preparing environment with cleanup and pre-compilation..."
pycage clean dist-info
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while deleting the distribution info files"
Exit
}
pycage compile -q -d -f
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors during the pre-compilation process"
Exit
}
Write-Output "Successfully prepared environment with cleanup and pre-compilation"
Write-Output "Get version string..."
$pyproject = Get-Content $SRC_PATH\pyproject.toml -Raw
if ($pyproject -match '\[project\].*?\n[\s\w\n=""-_{}]*\[[\w-]*\]') {
$projectBlock = $matches[0]
if ($projectBlock -match 'version\s*=\s*"([^"]+)"') {
$version = $matches[1]
Write-Output "The version string is: $version"
}
else {
Write-Output "[PWSH] Exiting script because the version string was not found"
Exit
}
}
else {
Write-Output "[PWSH] Exiting script because the version string was not found"
Exit
}
Write-Output "Packaging whole standalone environment in a ZIP file"
$dest_path = Join-Path -Path $DEPLOYMENT_PATH -ChildPath "dopt_ekf_sensor-anomalies_v$version.zip"
$compress = @{
Path = "$ENV_PATH\**"
CompressionLevel = "Optimal"
DestinationPath = $dest_path
}
Compress-Archive @compress -Force
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors during the archive compression operation"
Exit
}
Write-Output "Successfully compressed archive. Saved under: >$dest_path<"
Write-Output "Go back to source directory..."
Set-Location $SRC_PATH

1
scripts/build_pdm.ps1 Normal file
View File

@ -0,0 +1 @@
pdm build -d build/

View File

@ -30,11 +30,13 @@ def sensor_anomalies_detection(
user_img_path: str, user_img_path: str,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
anomaly_threshold: float,
) -> int: ) -> int:
res = detection.pipeline( res = detection.pipeline(
user_img_path=user_img_path, user_img_path=user_img_path,
pixels_per_metric_X=pixels_per_metric_X, pixels_per_metric_X=pixels_per_metric_X,
pixels_per_metric_Y=pixels_per_metric_Y, pixels_per_metric_Y=pixels_per_metric_Y,
anomaly_threshold=anomaly_threshold,
) )
if res.status.code != 0: if res.status.code != 0:
_print_error_state(res.status, out_stream=sys.stderr) _print_error_state(res.status, out_stream=sys.stderr)

View File

@ -5,11 +5,13 @@ LIB_ROOT_PATH: Final[Path] = Path(__file__).parent
STOP_FOLDER_NAME: Final[str] = "python" STOP_FOLDER_NAME: Final[str] = "python"
MODEL_FOLDER_NAME: Final[str] = "models" MODEL_FOLDER_NAME: Final[str] = "models"
EXPORT_DATA_SORTING: Final[tuple[str, ...]] = ("sensor_sizes", "right", "left")
THRESHOLD_BW: Final[int] = 63 THRESHOLD_BW: Final[int] = 63
BACKBONE: Final[str] = "wide_resnet50_2" BACKBONE: Final[str] = "wide_resnet50_2"
LAYERS: Final[tuple[str, ...]] = ("layer1", "layer2", "layer3") LAYERS: Final[tuple[str, ...]] = ("layer1", "layer2", "layer3")
RATIO: Final[float] = 0.01 RATIO: Final[float] = 0.01
ANOMALY_THRESHOLD: Final[float] = 0.14 ANOMALY_THRESHOLD_DEFAULT: Final[float] = 0.14
NUM_VALID_ELECTRODES: Final[int] = 6 NUM_VALID_ELECTRODES: Final[int] = 6
HEATMAP_FILENAME_SUFFIX: Final[str] = "_Heatmap" HEATMAP_FILENAME_SUFFIX: Final[str] = "_Heatmap"

File diff suppressed because one or more lines are too long

View File

@ -11,6 +11,7 @@ import numpy.typing as npt
import torch import torch
from anomalib.models import Patchcore from anomalib.models import Patchcore
from dopt_basics import result_pattern from dopt_basics import result_pattern
from dopt_basics.datastructures import flatten
from imutils import contours, perspective from imutils import contours, perspective
from pandas import DataFrame from pandas import DataFrame
from PIL import Image from PIL import Image
@ -56,8 +57,8 @@ def measure_length(
img_path: Path, img_path: Path,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]: ) -> tuple[t.ExportData, t.SensorImages]:
data_csv: list[str | int] = [] sensor_sizes: list[tuple[str, ...]] = []
image = cv2.imread(str(img_path)) image = cv2.imread(str(img_path))
if image is None: if image is None:
raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<") raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<")
@ -97,15 +98,16 @@ def measure_length(
box = cast(npt.NDArray[np.float32], perspective.order_points(box)) box = cast(npt.NDArray[np.float32], perspective.order_points(box))
(tl, tr, br, bl) = box (tl, tr, br, bl) = box
(tltrX, tltrY) = midpoint(tl, tr)
(blbrX, blbrY) = midpoint(bl, br)
(tlblX, tlblY) = midpoint(tl, bl)
(trbrX, trbrY) = midpoint(tr, br)
dA = dist.euclidean((tltrX, tltrY), (blbrX, blbrY)) widthA = np.linalg.norm(br - bl)
dB = dist.euclidean((tlblX, tlblY), (trbrX, trbrY)) widthB = np.linalg.norm(tr - tl)
max_width = int(max(widthA, widthB))
if dA < 100 or dB < 100: heightA = np.linalg.norm(tr - br)
heightB = np.linalg.norm(tl - bl)
max_height = int(max(heightA, heightB))
if max_width < 100 or max_height < 100:
continue continue
is_duplicate = any( is_duplicate = any(
@ -117,15 +119,38 @@ def measure_length(
accepted_boxes.append(rbox) accepted_boxes.append(rbox)
filtered_cnts.append(c) filtered_cnts.append(c)
dimA = dA / pixels_per_metric_Y dimA = max_width / pixels_per_metric_Y
dimB = dB / pixels_per_metric_X dimB = max_height / pixels_per_metric_X
data_csv.extend( offset = 20
dst = np.array(
[ [
f"{dimB:.3f}".replace(".", ","), [offset, offset],
[max_width - 1 + offset, offset],
[max_width - 1 + offset, max_height - 1 + offset],
[offset, max_height - 1 + offset],
],
dtype="float32",
)
M = cv2.getPerspectiveTransform(box, dst)
warped = cv2.warpPerspective(
orig, M, (max_width + 2 * offset, max_height + 2 * offset)
)
gray_warped = cv2.cvtColor(warped, cv2.COLOR_BGR2GRAY)
_, binary_warped = cv2.threshold(gray_warped, 80, 255, cv2.THRESH_BINARY)
pixel_count = np.sum(binary_warped == 0)
sensor_sizes.append(
(
f"{dimA:.3f}".replace(".", ","), f"{dimA:.3f}".replace(".", ","),
f"{dimA * dimB:.1f}".replace(".", ","), f"{dimB:.3f}".replace(".", ","),
] f"{pixel_count / pixels_per_metric_X / pixels_per_metric_Y:.1f}".replace(
".", ","
),
)
) )
if not filtered_cnts: # pragma: no cover if not filtered_cnts: # pragma: no cover
@ -152,12 +177,19 @@ def measure_length(
cropped_sensor_left = orig[y_min:y_max, x_min:x_middle] cropped_sensor_left = orig[y_min:y_max, x_min:x_middle]
cropped_sensor_right = orig[y_min:y_max, x_middle:x_max] cropped_sensor_right = orig[y_min:y_max, x_middle:x_max]
return data_csv, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right) sensor_sizes_sorted = cast(
tuple[str, ...],
tuple(flatten(reversed(sensor_sizes))), # type: ignore
)
export_data: t.ExportData = t.ExportData(sensor_sizes=sensor_sizes_sorted)
return export_data, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right)
def infer_image( def infer_image(
image: npt.NDArray[np.uint8], image: npt.NDArray[np.uint8],
model: Patchcore, model: Patchcore,
anomaly_threshold: float,
) -> t.InferenceResult: ) -> t.InferenceResult:
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(torch_device) model.to(torch_device)
@ -176,7 +208,7 @@ def infer_image(
output = model(input_tensor) output = model(input_tensor)
anomaly_score = output.pred_score.item() anomaly_score = output.pred_score.item()
anomaly_label = bool(1 if anomaly_score >= const.ANOMALY_THRESHOLD else 0) anomaly_label = bool(1 if anomaly_score >= anomaly_threshold else 0)
anomaly_map = output.anomaly_map.squeeze().cpu().numpy() anomaly_map = output.anomaly_map.squeeze().cpu().numpy()
img_np = np.array(pil_image) img_np = np.array(pil_image)
@ -193,8 +225,9 @@ def infer_image(
def anomaly_detection( def anomaly_detection(
img_path: Path, img_path: Path,
detection_models: t.DetectionModels, detection_models: t.DetectionModels,
data_csv: t.CsvData, export_data: t.ExportData,
sensor_images: t.SensorImages, sensor_images: t.SensorImages,
anomaly_threshold: float,
) -> None: ) -> None:
file_stem = img_path.stem file_stem = img_path.stem
folder_path = img_path.parent folder_path = img_path.parent
@ -209,8 +242,11 @@ def anomaly_detection(
checkpoint = torch.load(detection_models[side]) checkpoint = torch.load(detection_models[side])
model.load_state_dict(checkpoint["model_state_dict"]) model.load_state_dict(checkpoint["model_state_dict"])
result = infer_image(image, model) result = infer_image(image, model, anomaly_threshold)
data_csv.extend([int(result.anomaly_label)]) export_data[side] = (
f"{result.anomaly_score:.1f}".replace(".", ","),
str(int(result.anomaly_label)),
)
ax = axes[i] ax = axes[i]
ax.axis("off") ax.axis("off")
@ -225,7 +261,12 @@ def anomaly_detection(
) )
plt.close() plt.close()
df = DataFrame([data_csv]) csv_data_sorted: tuple[tuple[str, ...]] = tuple(
export_data[key] for key in const.EXPORT_DATA_SORTING
)
csv_data: tuple[str, ...] = tuple(flatten(csv_data_sorted))
df = DataFrame([csv_data])
df.to_csv( df.to_csv(
(folder_path / f"{file_stem}.csv"), (folder_path / f"{file_stem}.csv"),
mode="w", mode="w",
@ -241,6 +282,7 @@ def pipeline(
user_img_path: str, user_img_path: str,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
anomaly_threshold: float,
) -> None: ) -> None:
file_path = Path(user_img_path) file_path = Path(user_img_path)
if not file_path.exists(): if not file_path.exists():
@ -257,6 +299,7 @@ def pipeline(
anomaly_detection( anomaly_detection(
img_path=file_path, img_path=file_path,
detection_models=DETECTION_MODELS, detection_models=DETECTION_MODELS,
data_csv=data_csv, export_data=data_csv,
sensor_images=sensor_images, sensor_images=sensor_images,
anomaly_threshold=anomaly_threshold,
) )

View File

@ -54,7 +54,7 @@ def measure_length(
img_path: Path, img_path: Path,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
) -> tuple[t.CsvData, t.SensorImages]: ) -> tuple[t.ExportData, t.SensorImages]:
"""detect and measure the size of the electrodes """detect and measure the size of the electrodes
Parameters Parameters
@ -68,8 +68,10 @@ def measure_length(
Returns Returns
------- -------
tuple[t.CsvData, t.SensorImages] tuple[t.ExportData, t.SensorImages]
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints t.ExportData: (TypedDict) data to save as CSV according to requirements, contains
strings for sensor sizes and anomaly detection results for the left and right hand
sensor respectively
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
Raises Raises
@ -86,6 +88,7 @@ def measure_length(
def infer_image( def infer_image(
image: npt.NDArray[np.uint8], image: npt.NDArray[np.uint8],
model: Patchcore, model: Patchcore,
anomaly_threshold: float,
) -> t.InferenceResult: ) -> t.InferenceResult:
"""evaluate one image """evaluate one image
@ -111,8 +114,9 @@ def infer_image(
def anomaly_detection( def anomaly_detection(
img_path: Path, img_path: Path,
detection_models: t.DetectionModels, detection_models: t.DetectionModels,
data_csv: t.CsvData, export_data: t.ExportData,
sensor_images: t.SensorImages, sensor_images: t.SensorImages,
anomaly_threshold: float,
) -> None: ) -> None:
"""load the model, call function for anomaly detection and store the results """load the model, call function for anomaly detection and store the results
@ -122,8 +126,9 @@ def anomaly_detection(
path to file to analyse path to file to analyse
detection_models : t.DetectionModels detection_models : t.DetectionModels
collection of model paths for the left and right sensor collection of model paths for the left and right sensor
data_csv : t.CsvData export_data: t.ExportData,
(list) data to save as CSV according to requirements, contains strings and ints (TypedDict) data to save as CSV according to requirements, contains strings for sensor
sizes and anomaly detection results for the left and right hand sensor respectively
sensor_images : t.SensorImages sensor_images : t.SensorImages
_description_ _description_
""" """
@ -134,6 +139,7 @@ def pipeline(
user_img_path: str, user_img_path: str,
pixels_per_metric_X: float, pixels_per_metric_X: float,
pixels_per_metric_Y: float, pixels_per_metric_Y: float,
anomaly_threshold: float,
) -> None: ) -> None:
"""full pipeline defined by the agreed requirements """full pipeline defined by the agreed requirements
wrapped as result pattern, handle errors on higher abstraction level wrapped as result pattern, handle errors on higher abstraction level

View File

@ -1,6 +1,6 @@
import dataclasses as dc import dataclasses as dc
from pathlib import Path from pathlib import Path
from typing import TypeAlias, TypedDict from typing import NotRequired, TypeAlias, TypedDict
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
@ -17,6 +17,12 @@ class InferenceResult:
anomaly_label: bool anomaly_label: bool
class ExportData(TypedDict):
sensor_sizes: tuple[str, ...]
left: NotRequired[tuple[str, str]]
right: NotRequired[tuple[str, str]]
class SensorImages(TypedDict): class SensorImages(TypedDict):
left: npt.NDArray left: npt.NDArray
right: npt.NDArray right: npt.NDArray

View File

@ -67,9 +67,9 @@ def test_measure_length(single_img_path):
pixels_per_metric_X, pixels_per_metric_X,
pixels_per_metric_Y, pixels_per_metric_Y,
) )
assert len(data) == 18 assert len(data["sensor_sizes"]) == 18
assert isinstance(data[0], str) assert isinstance(data["sensor_sizes"][0], str)
assert float(data[0].replace(",", ".")) == pytest.approx(1266.932) assert float(data["sensor_sizes"][0].replace(",", ".")) == pytest.approx(1207.171)
img_left = imgs["left"] img_left = imgs["left"]
assert 235 < img_left.shape[0] < 260 assert 235 < img_left.shape[0] < 260
assert 910 < img_left.shape[1] < 960 assert 910 < img_left.shape[1] < 960
@ -89,21 +89,22 @@ def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER) DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER)
assert DETECTION_MODELS["left"].exists() assert DETECTION_MODELS["left"].exists()
assert DETECTION_MODELS["right"].exists() assert DETECTION_MODELS["right"].exists()
data_csv, sensor_images = detect.measure_length( export_data, sensor_images = detect.measure_length(
path_img_with_failure_TrainedModel, path_img_with_failure_TrainedModel,
pixels_per_metric_X, pixels_per_metric_X,
pixels_per_metric_Y, pixels_per_metric_Y,
) )
# measured sizes # measured sizes
assert len(data_csv) == 18 assert len(export_data["sensor_sizes"]) == 18
assert sensor_images["left"] is not None assert sensor_images["left"] is not None
assert sensor_images["right"] is not None assert sensor_images["right"] is not None
detect.anomaly_detection( detect.anomaly_detection(
img_path=path_img_with_failure_TrainedModel, img_path=path_img_with_failure_TrainedModel,
detection_models=DETECTION_MODELS, detection_models=DETECTION_MODELS,
data_csv=data_csv, export_data=export_data,
sensor_images=sensor_images, sensor_images=sensor_images,
anomaly_threshold=constants.ANOMALY_THRESHOLD_DEFAULT,
) )
# check files for existence # check files for existence
root_img = path_img_with_failure_TrainedModel.parent root_img = path_img_with_failure_TrainedModel.parent
@ -124,7 +125,12 @@ def test_full_pipeline_wrapped_FailImagePath(setup_temp_dir):
pixels_per_metric_X: float = 0.251 pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251 pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y) ret = detect.pipeline(
img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
)
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.ExceptionType is FileNotFoundError assert ret.status.ExceptionType is FileNotFoundError
assert ret.status.message == MESSAGE assert ret.status.message == MESSAGE
@ -140,7 +146,12 @@ def test_full_pipeline_wrapped_FailElectrodeCount(path_img_with_failure_Electrod
pixels_per_metric_X: float = 0.251 pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251 pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y) ret = detect.pipeline(
img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
)
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.ExceptionType is errors.InvalidElectrodeCount assert ret.status.ExceptionType is errors.InvalidElectrodeCount
assert MESSAGE in ret.status.message assert MESSAGE in ret.status.message
@ -164,7 +175,12 @@ def test_full_pipeline_wrapped_Success(results_folder, path_img_with_failure_Tra
pixels_per_metric_X: float = 0.251 pixels_per_metric_X: float = 0.251
pixels_per_metric_Y: float = 0.251 pixels_per_metric_Y: float = 0.251
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y) ret = detect.pipeline(
img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
)
assert ret.status == result_pattern.STATUS_HANDLER.SUCCESS assert ret.status == result_pattern.STATUS_HANDLER.SUCCESS
assert ret.status.code == 0 assert ret.status.code == 0
assert ret.status.ExceptionType is None assert ret.status.ExceptionType is None

View File

@ -54,7 +54,10 @@ def test_sensor_anomalies_detection_FailImagePath(setup_temp_dir):
with patch("sys.stderr", new_callable=StringIO) as mock_err: with patch("sys.stderr", new_callable=StringIO) as mock_err:
ret = _interface.sensor_anomalies_detection( ret = _interface.sensor_anomalies_detection(
img_path, pixels_per_metric_X, pixels_per_metric_Y img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
) )
captured = mock_err.getvalue() captured = mock_err.getvalue()
assert ret != 0 assert ret != 0
@ -72,7 +75,10 @@ def test_sensor_anomalies_detection_FailElectrodeCount(path_img_with_failure_Ele
with patch("sys.stderr", new_callable=StringIO) as mock_err: with patch("sys.stderr", new_callable=StringIO) as mock_err:
ret = _interface.sensor_anomalies_detection( ret = _interface.sensor_anomalies_detection(
img_path, pixels_per_metric_X, pixels_per_metric_Y img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
) )
captured = mock_err.getvalue() captured = mock_err.getvalue()
assert ret != 0 assert ret != 0
@ -99,7 +105,10 @@ def test_sensor_anomalies_detection_Success(
pixels_per_metric_Y: float = 0.251 pixels_per_metric_Y: float = 0.251
ret = _interface.sensor_anomalies_detection( ret = _interface.sensor_anomalies_detection(
img_path, pixels_per_metric_X, pixels_per_metric_Y img_path,
pixels_per_metric_X,
pixels_per_metric_Y,
constants.ANOMALY_THRESHOLD_DEFAULT,
) )
assert ret == 0 assert ret == 0