generated from dopt-python/py311
Apply changes to meet customer specification #27
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,6 +6,7 @@ reports/
|
||||
# credentials
|
||||
CREDENTIALS*
|
||||
*.pth
|
||||
*.pdf
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
|
||||
14
README.md
14
README.md
@ -14,7 +14,9 @@ Die im Rahmen des Projekts erstellte Anwendung erlaubt es, Bilder von Sensoren,
|
||||
|
||||
Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte.
|
||||
|
||||
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten zwei Einträge mit jeweils ``0`` oder ``1``, ein Ergebnis der Anomaliedetektion für jeden Sensor. Bei einem Ergebnis von ``0`` wurde keine Anomalie festgestellt, bei ``1`` hingegen schon.
|
||||
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten vier Einträge, für jeden Sensor jeweils der ermittelte Anomalie-Score und die Entscheidung, ob eine Anomalie vorliegt (``1``) oder nicht (``0``).
|
||||
|
||||
Die Reihenfolge aller Werte in der CSV-Datei orientiert sich an der Spezifikation, die durch EKF-Diagnostics bereitgestellt wurde.
|
||||
|
||||
Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt.
|
||||
|
||||
@ -29,14 +31,14 @@ Die Nutzung erfolgt über ein CLI, das über ein Python-Skript abgerufen werden
|
||||
Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden:
|
||||
|
||||
```pwsh
|
||||
cd ekf-sensor-anomalies-deployment\python
|
||||
cd dopt_ekf_sensor-anomalies_v{AKTUELLE-VERSION}\python
|
||||
.\python.exe .\cli.py --help
|
||||
```
|
||||
|
||||
Dieser Befehl gibt den folgenden Hilfetext aus:
|
||||
|
||||
```
|
||||
usage: cli.py [-h] img_path calib_value_x calib_value_y
|
||||
usage: cli.py [-h] [-t ANOMALY_THRESHOLD] img_path calib_value_x calib_value_y
|
||||
|
||||
simple CLI tool to analyse single sensor images for anomalies
|
||||
|
||||
@ -47,6 +49,8 @@ positional arguments:
|
||||
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-t ANOMALY_THRESHOLD, --anomaly_threshold ANOMALY_THRESHOLD
|
||||
optional anomaly threshold to set, default: 0.14, type: float
|
||||
```
|
||||
|
||||
Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter:
|
||||
@ -55,7 +59,9 @@ Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese b
|
||||
- ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
|
||||
- ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
|
||||
|
||||
Alle Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
|
||||
Diese Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
|
||||
|
||||
Optional kann für die Pipeline auch der Schwellwert für die Anomaliedetektion definiert werden. Dieser Wert bestimmt, ab wann eine Unregelmäßigkeit in den Bildern tatsächlich als Anomlie gewertet wird, und muss als Gleitkommazahl bereitgestellt werden. Die Übergabe erfolgt über den Parameter ``-t`` bzw. ``--anomaly_threshold``. Der Standardwert liegt bei 0,14.
|
||||
|
||||
### Fehlermeldungen
|
||||
|
||||
|
||||
14
cli.py
14
cli.py
@ -5,7 +5,7 @@ from typing import cast
|
||||
|
||||
from dopt_basics import cli
|
||||
|
||||
from dopt_sensor_anomalies import _interface
|
||||
from dopt_sensor_anomalies import _interface, constants
|
||||
|
||||
|
||||
@dc.dataclass()
|
||||
@ -13,6 +13,7 @@ class CliArgs:
|
||||
img_path: str
|
||||
calib_value_x: float
|
||||
calib_value_y: float
|
||||
anomaly_threshold: float
|
||||
|
||||
|
||||
def main() -> int:
|
||||
@ -34,6 +35,16 @@ def main() -> int:
|
||||
help="calibration value in pixels per mcm for y axis, type: float",
|
||||
type=float,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--anomaly_threshold",
|
||||
help=(
|
||||
f"optional anomaly threshold to set, default: "
|
||||
f"{constants.ANOMALY_THRESHOLD_DEFAULT}, type: float"
|
||||
),
|
||||
default=constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
type=float,
|
||||
)
|
||||
args = cast(CliArgs, parser.parse_args())
|
||||
|
||||
with cli.LoadingAnimation(
|
||||
@ -44,6 +55,7 @@ def main() -> int:
|
||||
args.img_path,
|
||||
args.calib_value_x,
|
||||
args.calib_value_y,
|
||||
args.anomaly_threshold,
|
||||
)
|
||||
if status != 0:
|
||||
loader.stop(interrupt=True)
|
||||
|
||||
@ -6,7 +6,7 @@ from unittest.mock import patch
|
||||
|
||||
from dopt_basics import cli
|
||||
|
||||
from dopt_sensor_anomalies import _interface
|
||||
from dopt_sensor_anomalies import _interface, constants
|
||||
|
||||
|
||||
@dc.dataclass()
|
||||
@ -14,6 +14,7 @@ class CliArgs:
|
||||
img_path: str
|
||||
calib_value_x: float
|
||||
calib_value_y: float
|
||||
anomaly_threshold: float
|
||||
|
||||
|
||||
@patch("dopt_sensor_anomalies._find_paths.STOP_FOLDER_NAME", "src")
|
||||
@ -37,6 +38,16 @@ def main() -> int:
|
||||
help="calibration value in pixels per mcm for y axis, type: float",
|
||||
type=float,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--anomaly_threshold",
|
||||
help=(
|
||||
f"optional anomaly threshold to set, default: "
|
||||
f"{constants.ANOMALY_THRESHOLD_DEFAULT}, type: float"
|
||||
),
|
||||
default=constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
type=float,
|
||||
)
|
||||
args = cast(CliArgs, parser.parse_args())
|
||||
|
||||
with cli.LoadingAnimation(
|
||||
@ -47,6 +58,7 @@ def main() -> int:
|
||||
args.img_path,
|
||||
args.calib_value_x,
|
||||
args.calib_value_y,
|
||||
args.anomaly_threshold,
|
||||
)
|
||||
if status != 0:
|
||||
loader.stop(interrupt=True)
|
||||
|
||||
@ -11,7 +11,9 @@ Die im Rahmen des Projekts erstellte Anwendung erlaubt es, Bilder von Sensoren,
|
||||
|
||||
Ein zu analysierendes Bild enthält zwei Sensoren mit ihrer jeweils definierten Anzahl an Elektroden. Ein solches Bild kann über das bereitgestellte Command-Line-Interface (CLI) analysiert werden. Ist die Analyse erfolgreich, wird eine nach dem Bild benannte CSV-Datei im selben Verzeichnis, in welchem sich das Bild befindet, abgelegt. Diese CSV-Datei enthält die ermittelten Werte für die Abmessungen der Elektroden sowie deren Flächeninhalte. Jeder Sensor besitzt drei Elektroden. Demzufolge ergeben sich 2x3x3 = 18 Kennwerte.
|
||||
|
||||
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten zwei Einträge mit jeweils ``0`` oder ``1``, ein Ergebnis der Anomaliedetektion für jeden Sensor. Bei einem Ergebnis von ``0`` wurde keine Anomalie festgestellt, bei ``1`` hingegen schon.
|
||||
Darüber hinaus wird ermittelt, ob die Sensoren jeweils Anomalien aufweisen. Die CSV-Datei enthält zusätzlich zu den 18 Maß-Kennwerten vier Einträge, für jeden Sensor jeweils der ermittelte Anomalie-Score und die Entscheidung, ob eine Anomalie vorliegt (``1``) oder nicht (``0``).
|
||||
|
||||
Die Reihenfolge aller Werte in der CSV-Datei orientiert sich an der Spezifikation, die durch EKF-Diagnostics bereitgestellt wurde.
|
||||
|
||||
Im Zuge der Analyse wird eine Heatmap für das Bild erzeugt, welche Rückschlüsse auf Anomaliebereiche erlaubt. Die Datei wird analog der CSV-Datei im entsprechenden Ordner der ursprünglichen Bilddatei mit einem Suffix abgelegt.
|
||||
|
||||
@ -26,14 +28,14 @@ Die Nutzung erfolgt über ein CLI, das über ein Python-Skript abgerufen werden
|
||||
Die Funktionalität wird über ein CLI nutzbar gemacht. Hierfür liegt in der bereitgestellten Distribution im Ordner "python" ein Python-Skript ab. Dieses muss durch den ebenfalls in diesem Ordner befindlichen Interpreter "python.exe" aufgerufen werden. Erfolgt der Aufruf mit einem anderen Interpreter, werden die installierten Abhängigkeiten nicht gefunden und das Programm funktioniert nicht. Ausgehend vom Ordner, in dem das entpackte Applikationsverzeichnis abliegt, kann die Applikation folgendermaßen aufgerufen werden:
|
||||
|
||||
```pwsh
|
||||
cd ekf-sensor-anomalies-deployment\python
|
||||
cd dopt_ekf_sensor-anomalies_v{AKTUELLE-VERSION}\python
|
||||
.\python.exe .\cli.py --help
|
||||
```
|
||||
|
||||
Dieser Befehl gibt den folgenden Hilfetext aus:
|
||||
|
||||
```
|
||||
usage: cli.py [-h] img_path calib_value_x calib_value_y
|
||||
usage: cli.py [-h] [-t ANOMALY_THRESHOLD] img_path calib_value_x calib_value_y
|
||||
|
||||
simple CLI tool to analyse single sensor images for anomalies
|
||||
|
||||
@ -41,11 +43,11 @@ positional arguments:
|
||||
img_path file path to the image which is to be analysed
|
||||
calib_value_x calibration value in pixels per mcm for x axis, type: float
|
||||
calib_value_y calibration value in pixels per mcm for y axis, type: float
|
||||
```
|
||||
\newpage
|
||||
```
|
||||
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-t ANOMALY_THRESHOLD, --anomaly_threshold ANOMALY_THRESHOLD
|
||||
optional anomaly threshold to set, default: 0.14, type: float
|
||||
```
|
||||
|
||||
Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese benötigt die folgenden Parameter:
|
||||
@ -54,7 +56,9 @@ Das CLI besteht entsprechend der obigen Beschreibung aus einer Funktion. Diese b
|
||||
- ``calib_value_x``: den Kalibrierwert in x-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
|
||||
- ``calib_value_y``: den Kalibrierwert in y-Richtung zur Ermittlung der Abmessungen, angegeben in Pixel/µm als Gleitkommazahl
|
||||
|
||||
Alle Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
|
||||
Diese Parameter sind obligatorisch und müssen bereitgestellt werden. Die Analyse ist stets für nur ein Bild zur selben Zeit durchführbar. Eine Übergabe mehrerer Dateien ist nicht möglich. Ausgaben, die nicht auf Fehler zurückzuführen sind, werden standardmäßig über ``STDOUT`` ausgegeben.
|
||||
|
||||
Optional kann für die Pipeline auch der Schwellwert für die Anomaliedetektion definiert werden. Dieser Wert bestimmt, ab wann eine Unregelmäßigkeit in den Bildern tatsächlich als Anomlie gewertet wird, und muss als Gleitkommazahl bereitgestellt werden. Die Übergabe erfolgt über den Parameter ``-t`` bzw. ``--anomaly_threshold``. Der Standardwert liegt bei 0,14.
|
||||
|
||||
### Fehlermeldungen
|
||||
|
||||
|
||||
BIN
docs/manual.pdf
BIN
docs/manual.pdf
Binary file not shown.
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "dopt-sensor-anomalies"
|
||||
version = "0.1.5"
|
||||
version = "0.2.0dev0"
|
||||
description = "anomaly detection for sensor images for quality assurance processes"
|
||||
authors = [
|
||||
{name = "d-opt GmbH (resp.: Florian Foerster)", email = "f.foerster@d-opt.com"},
|
||||
@ -77,7 +77,7 @@ directory = "reports/coverage"
|
||||
|
||||
|
||||
[tool.bumpversion]
|
||||
current_version = "0.1.5"
|
||||
current_version = "0.2.0dev0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@ -1 +1,115 @@
|
||||
pdm build -d build/
|
||||
$DEPLOYMENT_PATH = 'B:\deployments\EKF-Diagnostics'
|
||||
$ENV_PATH = 'B:\deployments\EKF-Diagnostics\dopt_ekf_sensor-anomalies'
|
||||
$PY_PATH = Join-Path -Path $ENV_PATH -ChildPath 'python'
|
||||
$SRC_PATH = (Get-Location).Path
|
||||
|
||||
Write-Output "Build Pipeline for d-opt EKF-Diagnostics sensor anomaly detection"
|
||||
Write-Output "Building package..."
|
||||
.\scripts\publish.ps1
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were build errors"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Built package successfully"
|
||||
|
||||
# manual
|
||||
Write-Output "Generate manual..."
|
||||
.\scripts\cvt_manual.ps1
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there errors while generating the README file"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Generated manual file successfully"
|
||||
Write-Output "Copying manual file..."
|
||||
$readme_src_path = Join-Path -Path $SRC_PATH -ChildPath 'docs\manual.pdf'
|
||||
$readme_dest_path = Join-Path -Path $ENV_PATH -ChildPath 'manual'
|
||||
Copy-Item -Path $readme_src_path -Destination $readme_dest_path -Force
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors while copying the manual file"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Copied manual file successfully"
|
||||
|
||||
|
||||
Write-Output "Go into env directory..."
|
||||
Set-Location $ENV_PATH
|
||||
Write-Output "Install package into environment..."
|
||||
pycage venv add -p -i http://localhost:8001/simple/ dopt-sensor-anomalies
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors while installing the package into the environment"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Successfully installed package"
|
||||
|
||||
# copy CLI file
|
||||
Write-Output "Copying CLI script file..."
|
||||
$cli_path = Join-Path -Path $SRC_PATH -ChildPath 'cli.py'
|
||||
Copy-Item -Path $cli_path -Destination $PY_PATH -Force
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors while copying the CLI script file"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Copied CLI script file successfully"
|
||||
|
||||
# copy model files
|
||||
Write-Output "Copying model weight files..."
|
||||
$model_src_path = Join-Path -Path $SRC_PATH -ChildPath 'tests\_models\'
|
||||
$model_dest_path = Join-Path -Path $ENV_PATH -ChildPath 'models'
|
||||
Copy-Item -Path "$model_src_path\*.pth" -Destination $model_dest_path -Force
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors while copying the model weight files"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Copied model weight files successfully"
|
||||
|
||||
|
||||
# env preparation
|
||||
Write-Output "Preparing environment with cleanup and pre-compilation..."
|
||||
pycage clean dist-info
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors while deleting the distribution info files"
|
||||
Exit
|
||||
}
|
||||
pycage compile -q -d -f
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors during the pre-compilation process"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Successfully prepared environment with cleanup and pre-compilation"
|
||||
|
||||
Write-Output "Get version string..."
|
||||
$pyproject = Get-Content $SRC_PATH\pyproject.toml -Raw
|
||||
|
||||
if ($pyproject -match '\[project\].*?\n[\s\w\n=""-_{}]*\[[\w-]*\]') {
|
||||
$projectBlock = $matches[0]
|
||||
if ($projectBlock -match 'version\s*=\s*"([^"]+)"') {
|
||||
$version = $matches[1]
|
||||
Write-Output "The version string is: $version"
|
||||
}
|
||||
else {
|
||||
Write-Output "[PWSH] Exiting script because the version string was not found"
|
||||
Exit
|
||||
}
|
||||
}
|
||||
else {
|
||||
Write-Output "[PWSH] Exiting script because the version string was not found"
|
||||
Exit
|
||||
}
|
||||
|
||||
Write-Output "Packaging whole standalone environment in a ZIP file"
|
||||
$dest_path = Join-Path -Path $DEPLOYMENT_PATH -ChildPath "dopt_ekf_sensor-anomalies_v$version.zip"
|
||||
$compress = @{
|
||||
Path = "$ENV_PATH\**"
|
||||
CompressionLevel = "Optimal"
|
||||
DestinationPath = $dest_path
|
||||
}
|
||||
Compress-Archive @compress -Force
|
||||
|
||||
if ($? -eq $false){
|
||||
Write-Output "[PWSH] Exiting script because there were errors during the archive compression operation"
|
||||
Exit
|
||||
}
|
||||
Write-Output "Successfully compressed archive. Saved under: >$dest_path<"
|
||||
|
||||
Write-Output "Go back to source directory..."
|
||||
Set-Location $SRC_PATH
|
||||
|
||||
1
scripts/build_pdm.ps1
Normal file
1
scripts/build_pdm.ps1
Normal file
@ -0,0 +1 @@
|
||||
pdm build -d build/
|
||||
@ -30,11 +30,13 @@ def sensor_anomalies_detection(
|
||||
user_img_path: str,
|
||||
pixels_per_metric_X: float,
|
||||
pixels_per_metric_Y: float,
|
||||
anomaly_threshold: float,
|
||||
) -> int:
|
||||
res = detection.pipeline(
|
||||
user_img_path=user_img_path,
|
||||
pixels_per_metric_X=pixels_per_metric_X,
|
||||
pixels_per_metric_Y=pixels_per_metric_Y,
|
||||
anomaly_threshold=anomaly_threshold,
|
||||
)
|
||||
if res.status.code != 0:
|
||||
_print_error_state(res.status, out_stream=sys.stderr)
|
||||
|
||||
@ -5,11 +5,13 @@ LIB_ROOT_PATH: Final[Path] = Path(__file__).parent
|
||||
STOP_FOLDER_NAME: Final[str] = "python"
|
||||
MODEL_FOLDER_NAME: Final[str] = "models"
|
||||
|
||||
EXPORT_DATA_SORTING: Final[tuple[str, ...]] = ("sensor_sizes", "right", "left")
|
||||
|
||||
THRESHOLD_BW: Final[int] = 63
|
||||
BACKBONE: Final[str] = "wide_resnet50_2"
|
||||
LAYERS: Final[tuple[str, ...]] = ("layer1", "layer2", "layer3")
|
||||
RATIO: Final[float] = 0.01
|
||||
ANOMALY_THRESHOLD: Final[float] = 0.14
|
||||
ANOMALY_THRESHOLD_DEFAULT: Final[float] = 0.14
|
||||
|
||||
NUM_VALID_ELECTRODES: Final[int] = 6
|
||||
HEATMAP_FILENAME_SUFFIX: Final[str] = "_Heatmap"
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -11,6 +11,7 @@ import numpy.typing as npt
|
||||
import torch
|
||||
from anomalib.models import Patchcore
|
||||
from dopt_basics import result_pattern
|
||||
from dopt_basics.datastructures import flatten
|
||||
from imutils import contours, perspective
|
||||
from pandas import DataFrame
|
||||
from PIL import Image
|
||||
@ -56,8 +57,8 @@ def measure_length(
|
||||
img_path: Path,
|
||||
pixels_per_metric_X: float,
|
||||
pixels_per_metric_Y: float,
|
||||
) -> tuple[t.CsvData, t.SensorImages]:
|
||||
data_csv: list[str | int] = []
|
||||
) -> tuple[t.ExportData, t.SensorImages]:
|
||||
sensor_sizes: list[tuple[str, ...]] = []
|
||||
image = cv2.imread(str(img_path))
|
||||
if image is None:
|
||||
raise errors.ImageNotReadError(f"Image could not be read from: >{img_path}<")
|
||||
@ -97,15 +98,16 @@ def measure_length(
|
||||
box = cast(npt.NDArray[np.float32], perspective.order_points(box))
|
||||
|
||||
(tl, tr, br, bl) = box
|
||||
(tltrX, tltrY) = midpoint(tl, tr)
|
||||
(blbrX, blbrY) = midpoint(bl, br)
|
||||
(tlblX, tlblY) = midpoint(tl, bl)
|
||||
(trbrX, trbrY) = midpoint(tr, br)
|
||||
|
||||
dA = dist.euclidean((tltrX, tltrY), (blbrX, blbrY))
|
||||
dB = dist.euclidean((tlblX, tlblY), (trbrX, trbrY))
|
||||
widthA = np.linalg.norm(br - bl)
|
||||
widthB = np.linalg.norm(tr - tl)
|
||||
max_width = int(max(widthA, widthB))
|
||||
|
||||
if dA < 100 or dB < 100:
|
||||
heightA = np.linalg.norm(tr - br)
|
||||
heightB = np.linalg.norm(tl - bl)
|
||||
max_height = int(max(heightA, heightB))
|
||||
|
||||
if max_width < 100 or max_height < 100:
|
||||
continue
|
||||
|
||||
is_duplicate = any(
|
||||
@ -117,15 +119,38 @@ def measure_length(
|
||||
accepted_boxes.append(rbox)
|
||||
filtered_cnts.append(c)
|
||||
|
||||
dimA = dA / pixels_per_metric_Y
|
||||
dimB = dB / pixels_per_metric_X
|
||||
dimA = max_width / pixels_per_metric_Y
|
||||
dimB = max_height / pixels_per_metric_X
|
||||
|
||||
data_csv.extend(
|
||||
offset = 20
|
||||
|
||||
dst = np.array(
|
||||
[
|
||||
f"{dimB:.3f}".replace(".", ","),
|
||||
[offset, offset],
|
||||
[max_width - 1 + offset, offset],
|
||||
[max_width - 1 + offset, max_height - 1 + offset],
|
||||
[offset, max_height - 1 + offset],
|
||||
],
|
||||
dtype="float32",
|
||||
)
|
||||
|
||||
M = cv2.getPerspectiveTransform(box, dst)
|
||||
warped = cv2.warpPerspective(
|
||||
orig, M, (max_width + 2 * offset, max_height + 2 * offset)
|
||||
)
|
||||
|
||||
gray_warped = cv2.cvtColor(warped, cv2.COLOR_BGR2GRAY)
|
||||
_, binary_warped = cv2.threshold(gray_warped, 80, 255, cv2.THRESH_BINARY)
|
||||
pixel_count = np.sum(binary_warped == 0)
|
||||
|
||||
sensor_sizes.append(
|
||||
(
|
||||
f"{dimA:.3f}".replace(".", ","),
|
||||
f"{dimA * dimB:.1f}".replace(".", ","),
|
||||
]
|
||||
f"{dimB:.3f}".replace(".", ","),
|
||||
f"{pixel_count / pixels_per_metric_X / pixels_per_metric_Y:.1f}".replace(
|
||||
".", ","
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
if not filtered_cnts: # pragma: no cover
|
||||
@ -152,12 +177,19 @@ def measure_length(
|
||||
cropped_sensor_left = orig[y_min:y_max, x_min:x_middle]
|
||||
cropped_sensor_right = orig[y_min:y_max, x_middle:x_max]
|
||||
|
||||
return data_csv, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right)
|
||||
sensor_sizes_sorted = cast(
|
||||
tuple[str, ...],
|
||||
tuple(flatten(reversed(sensor_sizes))), # type: ignore
|
||||
)
|
||||
export_data: t.ExportData = t.ExportData(sensor_sizes=sensor_sizes_sorted)
|
||||
|
||||
return export_data, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right)
|
||||
|
||||
|
||||
def infer_image(
|
||||
image: npt.NDArray[np.uint8],
|
||||
model: Patchcore,
|
||||
anomaly_threshold: float,
|
||||
) -> t.InferenceResult:
|
||||
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
model.to(torch_device)
|
||||
@ -176,7 +208,7 @@ def infer_image(
|
||||
output = model(input_tensor)
|
||||
|
||||
anomaly_score = output.pred_score.item()
|
||||
anomaly_label = bool(1 if anomaly_score >= const.ANOMALY_THRESHOLD else 0)
|
||||
anomaly_label = bool(1 if anomaly_score >= anomaly_threshold else 0)
|
||||
anomaly_map = output.anomaly_map.squeeze().cpu().numpy()
|
||||
|
||||
img_np = np.array(pil_image)
|
||||
@ -193,8 +225,9 @@ def infer_image(
|
||||
def anomaly_detection(
|
||||
img_path: Path,
|
||||
detection_models: t.DetectionModels,
|
||||
data_csv: t.CsvData,
|
||||
export_data: t.ExportData,
|
||||
sensor_images: t.SensorImages,
|
||||
anomaly_threshold: float,
|
||||
) -> None:
|
||||
file_stem = img_path.stem
|
||||
folder_path = img_path.parent
|
||||
@ -209,8 +242,11 @@ def anomaly_detection(
|
||||
checkpoint = torch.load(detection_models[side])
|
||||
model.load_state_dict(checkpoint["model_state_dict"])
|
||||
|
||||
result = infer_image(image, model)
|
||||
data_csv.extend([int(result.anomaly_label)])
|
||||
result = infer_image(image, model, anomaly_threshold)
|
||||
export_data[side] = (
|
||||
f"{result.anomaly_score:.1f}".replace(".", ","),
|
||||
str(int(result.anomaly_label)),
|
||||
)
|
||||
|
||||
ax = axes[i]
|
||||
ax.axis("off")
|
||||
@ -225,7 +261,12 @@ def anomaly_detection(
|
||||
)
|
||||
plt.close()
|
||||
|
||||
df = DataFrame([data_csv])
|
||||
csv_data_sorted: tuple[tuple[str, ...]] = tuple(
|
||||
export_data[key] for key in const.EXPORT_DATA_SORTING
|
||||
)
|
||||
csv_data: tuple[str, ...] = tuple(flatten(csv_data_sorted))
|
||||
|
||||
df = DataFrame([csv_data])
|
||||
df.to_csv(
|
||||
(folder_path / f"{file_stem}.csv"),
|
||||
mode="w",
|
||||
@ -241,6 +282,7 @@ def pipeline(
|
||||
user_img_path: str,
|
||||
pixels_per_metric_X: float,
|
||||
pixels_per_metric_Y: float,
|
||||
anomaly_threshold: float,
|
||||
) -> None:
|
||||
file_path = Path(user_img_path)
|
||||
if not file_path.exists():
|
||||
@ -257,6 +299,7 @@ def pipeline(
|
||||
anomaly_detection(
|
||||
img_path=file_path,
|
||||
detection_models=DETECTION_MODELS,
|
||||
data_csv=data_csv,
|
||||
export_data=data_csv,
|
||||
sensor_images=sensor_images,
|
||||
anomaly_threshold=anomaly_threshold,
|
||||
)
|
||||
|
||||
@ -54,7 +54,7 @@ def measure_length(
|
||||
img_path: Path,
|
||||
pixels_per_metric_X: float,
|
||||
pixels_per_metric_Y: float,
|
||||
) -> tuple[t.CsvData, t.SensorImages]:
|
||||
) -> tuple[t.ExportData, t.SensorImages]:
|
||||
"""detect and measure the size of the electrodes
|
||||
|
||||
Parameters
|
||||
@ -68,8 +68,10 @@ def measure_length(
|
||||
|
||||
Returns
|
||||
-------
|
||||
tuple[t.CsvData, t.SensorImages]
|
||||
t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints
|
||||
tuple[t.ExportData, t.SensorImages]
|
||||
t.ExportData: (TypedDict) data to save as CSV according to requirements, contains
|
||||
strings for sensor sizes and anomaly detection results for the left and right hand
|
||||
sensor respectively
|
||||
t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor
|
||||
|
||||
Raises
|
||||
@ -86,6 +88,7 @@ def measure_length(
|
||||
def infer_image(
|
||||
image: npt.NDArray[np.uint8],
|
||||
model: Patchcore,
|
||||
anomaly_threshold: float,
|
||||
) -> t.InferenceResult:
|
||||
"""evaluate one image
|
||||
|
||||
@ -111,8 +114,9 @@ def infer_image(
|
||||
def anomaly_detection(
|
||||
img_path: Path,
|
||||
detection_models: t.DetectionModels,
|
||||
data_csv: t.CsvData,
|
||||
export_data: t.ExportData,
|
||||
sensor_images: t.SensorImages,
|
||||
anomaly_threshold: float,
|
||||
) -> None:
|
||||
"""load the model, call function for anomaly detection and store the results
|
||||
|
||||
@ -122,8 +126,9 @@ def anomaly_detection(
|
||||
path to file to analyse
|
||||
detection_models : t.DetectionModels
|
||||
collection of model paths for the left and right sensor
|
||||
data_csv : t.CsvData
|
||||
(list) data to save as CSV according to requirements, contains strings and ints
|
||||
export_data: t.ExportData,
|
||||
(TypedDict) data to save as CSV according to requirements, contains strings for sensor
|
||||
sizes and anomaly detection results for the left and right hand sensor respectively
|
||||
sensor_images : t.SensorImages
|
||||
_description_
|
||||
"""
|
||||
@ -134,6 +139,7 @@ def pipeline(
|
||||
user_img_path: str,
|
||||
pixels_per_metric_X: float,
|
||||
pixels_per_metric_Y: float,
|
||||
anomaly_threshold: float,
|
||||
) -> None:
|
||||
"""full pipeline defined by the agreed requirements
|
||||
wrapped as result pattern, handle errors on higher abstraction level
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import dataclasses as dc
|
||||
from pathlib import Path
|
||||
from typing import TypeAlias, TypedDict
|
||||
from typing import NotRequired, TypeAlias, TypedDict
|
||||
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
@ -17,6 +17,12 @@ class InferenceResult:
|
||||
anomaly_label: bool
|
||||
|
||||
|
||||
class ExportData(TypedDict):
|
||||
sensor_sizes: tuple[str, ...]
|
||||
left: NotRequired[tuple[str, str]]
|
||||
right: NotRequired[tuple[str, str]]
|
||||
|
||||
|
||||
class SensorImages(TypedDict):
|
||||
left: npt.NDArray
|
||||
right: npt.NDArray
|
||||
|
||||
@ -67,9 +67,9 @@ def test_measure_length(single_img_path):
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
)
|
||||
assert len(data) == 18
|
||||
assert isinstance(data[0], str)
|
||||
assert float(data[0].replace(",", ".")) == pytest.approx(1266.932)
|
||||
assert len(data["sensor_sizes"]) == 18
|
||||
assert isinstance(data["sensor_sizes"][0], str)
|
||||
assert float(data["sensor_sizes"][0].replace(",", ".")) == pytest.approx(1207.171)
|
||||
img_left = imgs["left"]
|
||||
assert 235 < img_left.shape[0] < 260
|
||||
assert 910 < img_left.shape[1] < 960
|
||||
@ -89,21 +89,22 @@ def test_isolated_pipeline(results_folder, path_img_with_failure_TrainedModel):
|
||||
DETECTION_MODELS = dopt_sensor_anomalies._find_paths.get_detection_models(MODEL_FOLDER)
|
||||
assert DETECTION_MODELS["left"].exists()
|
||||
assert DETECTION_MODELS["right"].exists()
|
||||
data_csv, sensor_images = detect.measure_length(
|
||||
export_data, sensor_images = detect.measure_length(
|
||||
path_img_with_failure_TrainedModel,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
)
|
||||
|
||||
# measured sizes
|
||||
assert len(data_csv) == 18
|
||||
assert len(export_data["sensor_sizes"]) == 18
|
||||
assert sensor_images["left"] is not None
|
||||
assert sensor_images["right"] is not None
|
||||
detect.anomaly_detection(
|
||||
img_path=path_img_with_failure_TrainedModel,
|
||||
detection_models=DETECTION_MODELS,
|
||||
data_csv=data_csv,
|
||||
export_data=export_data,
|
||||
sensor_images=sensor_images,
|
||||
anomaly_threshold=constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
# check files for existence
|
||||
root_img = path_img_with_failure_TrainedModel.parent
|
||||
@ -124,7 +125,12 @@ def test_full_pipeline_wrapped_FailImagePath(setup_temp_dir):
|
||||
pixels_per_metric_X: float = 0.251
|
||||
pixels_per_metric_Y: float = 0.251
|
||||
|
||||
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
|
||||
ret = detect.pipeline(
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
|
||||
assert ret.status.ExceptionType is FileNotFoundError
|
||||
assert ret.status.message == MESSAGE
|
||||
@ -140,7 +146,12 @@ def test_full_pipeline_wrapped_FailElectrodeCount(path_img_with_failure_Electrod
|
||||
pixels_per_metric_X: float = 0.251
|
||||
pixels_per_metric_Y: float = 0.251
|
||||
|
||||
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
|
||||
ret = detect.pipeline(
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
assert ret.status != result_pattern.STATUS_HANDLER.SUCCESS
|
||||
assert ret.status.ExceptionType is errors.InvalidElectrodeCount
|
||||
assert MESSAGE in ret.status.message
|
||||
@ -164,7 +175,12 @@ def test_full_pipeline_wrapped_Success(results_folder, path_img_with_failure_Tra
|
||||
pixels_per_metric_X: float = 0.251
|
||||
pixels_per_metric_Y: float = 0.251
|
||||
|
||||
ret = detect.pipeline(img_path, pixels_per_metric_X, pixels_per_metric_Y)
|
||||
ret = detect.pipeline(
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
assert ret.status == result_pattern.STATUS_HANDLER.SUCCESS
|
||||
assert ret.status.code == 0
|
||||
assert ret.status.ExceptionType is None
|
||||
|
||||
@ -54,7 +54,10 @@ def test_sensor_anomalies_detection_FailImagePath(setup_temp_dir):
|
||||
|
||||
with patch("sys.stderr", new_callable=StringIO) as mock_err:
|
||||
ret = _interface.sensor_anomalies_detection(
|
||||
img_path, pixels_per_metric_X, pixels_per_metric_Y
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
captured = mock_err.getvalue()
|
||||
assert ret != 0
|
||||
@ -72,7 +75,10 @@ def test_sensor_anomalies_detection_FailElectrodeCount(path_img_with_failure_Ele
|
||||
|
||||
with patch("sys.stderr", new_callable=StringIO) as mock_err:
|
||||
ret = _interface.sensor_anomalies_detection(
|
||||
img_path, pixels_per_metric_X, pixels_per_metric_Y
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
captured = mock_err.getvalue()
|
||||
assert ret != 0
|
||||
@ -99,7 +105,10 @@ def test_sensor_anomalies_detection_Success(
|
||||
pixels_per_metric_Y: float = 0.251
|
||||
|
||||
ret = _interface.sensor_anomalies_detection(
|
||||
img_path, pixels_per_metric_X, pixels_per_metric_Y
|
||||
img_path,
|
||||
pixels_per_metric_X,
|
||||
pixels_per_metric_Y,
|
||||
constants.ANOMALY_THRESHOLD_DEFAULT,
|
||||
)
|
||||
|
||||
assert ret == 0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user