add baseline code

This commit is contained in:
Florian Förster 2025-10-08 09:39:48 +02:00
parent eca732ee72
commit b13084d1c8

View File

@ -0,0 +1,403 @@
import csv
from os import path
# Image.MAX_IMAGE_PIXELS = None
import cv2
import matplotlib.pyplot as plt
from anomalib.data import Folder
from anomalib.engine import Engine
from anomalib.metrics import AUROC, F1Score
from anomalib.models import Patchcore
from imutils import contours, grab_contours, is_cv2, perspective
from numpy import all, array, linalg
from numpy import max as npmax
from numpy import min as npmin
from numpy import sum as npsum
from pandas import DataFrame
from PIL import Image
from scipy.spatial import distance as dist
from torch import as_tensor, cuda, device, float32, load, no_grad
from torchvision.transforms.v2.functional import to_dtype, to_image
# input parameters
file_path = r"C:\Users\demon\Documents\EKF\Analyse_fuer_Florian\bild2.bmp"
pixelsPerMetricX = 0.251
pixelsPerMetricY = 0.251
# internal parameters
schwellwert = 63 # threshold to distringuish black (electrodes) and white areas
model_path = [
r"C:\Users\demon\Documents\EKF\Modelle\patchcore_model_links.pth",
r"C:\Users\demon\Documents\EKF\Modelle\patchcore_model_rechts.pth",
] # path to anomaly detection models
backbone = "resnet18" # parameters for AI model
layers = ["layer1", "layer2"]
ratio = 0.05
# measuring
def midpoint(ptA, ptB):
# ----------------------------
# To identify the midpoint of a 2D area
# Input:
# ptA (numpy.ndarray of shape (2, )): tuple of coordinates x, y
# ptB (numpy.ndarray of shape (2, )): tuple of coordinates x, y
# Output (tuple (float, float)):
# tuple of midpoint coordinates
# ----------------------------
return ((ptA[0] + ptB[0]) * 0.5, (ptA[1] + ptB[1]) * 0.5)
def check_box_redundancy(box1, box2, tolerance=5):
# ----------------------------
# To check if bounding box has already been identified and is just a redundant one
# Input:
# box1 (tuple(float, float), (float, float), float)): tuple of box values: ((center_x, center_y), (width, height), angle)
# box2 (tuple(float, float), (float, float), float)): tuple of box values: ((center_x, center_y), (width, height), angle)
# tolerance (float): distance threshold for width and height
# Output (Boole):
# redundancy evaluation
# ----------------------------
# unpack the boxes
(c1, s1, a1) = box1
(c2, s2, a2) = box2
# sort width and height such that (w, h) == (h, w) is treated the same (might have been recognized in different orders)
s1 = sorted(s1)
s2 = sorted(s2)
center_dist = linalg.norm(array(c1) - array(c2))
size_diff = linalg.norm(array(s1) - array(s2))
return center_dist < tolerance and size_diff < tolerance
def measure_length(file_path, pixelsPerMetricX, pixelsPerMetricY):
# ----------------------------
# To identify the midpoint of a 2D area
# Input:
# file_path (string): path to file including file name and extension
# pixelsPerMetricX (float): scaling parameter, Pixels per micrometer in image
# pixelsPerMetricY (float): scaling parameter, Pixels per micrometer in image
# Output:
# data_csv (list): contains measured lengths and areas of electrodes (i.e., column 1 to 18 of csv file)
# image of left sensor
# image of right sensor
# ----------------------------
file = path.basename(file_path)
# extract file name and ending separately
name, endung = path.splitext(file)
# extract folder path
folder_path = path.dirname(file_path)
# for data output
data_csv = []
# read
image = cv2.imread(file_path)
# check if image was read
if image is None:
error = "error: no image read"
return
# crop image
cropped = image[500:1500, 100 : image.shape[1] - 100]
# store original image for later output
orig = cropped.copy()
height, width = cropped.shape[0], cropped.shape[1]
# change colours in the image to black and white
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, schwellwert, 255, cv2.THRESH_BINARY)
# perform edge detection, identify rectangular shapes
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
edged = cv2.Canny(closed, 50, 100)
# find contours in the edge map
cnts = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
cnts = grab_contours(cnts)
if cnts is None:
print(f"{file}: offenbar nichts gefunden")
return None, None
# sort the contours from left to right (i.e., use x coordinates)
cnts, _ = contours.sort_contours(cnts)
# bounding_boxes = list(set([cv2.boundingRect(c) for c in cnts]))
# cnts = [c for _, c in sorted(zip(bounding_boxes, cnts), key=lambda b: b[0][0])]
# min_area = 1000 # adjust as needed
# filtered_cnts = [c for c in cnts if cv2.contourArea(c) > min_area]
# check if this sorting was correct (might not be correct if we have overlaps or misfindings)
# get x coordinates of bounding boxes
x_coords = [cv2.boundingRect(c)[0] for c in cnts]
# check if x coordinates are sorted in increasing order
is_sorted = all(x1 <= x2 for x1, x2 in zip(x_coords, x_coords[1:]))
if not is_sorted:
error = (
"contour detection not valid: contours are not properly sorted from left to right"
)
return None, None
# ---------------------------------------- just for internal evaluation ---------------------------------------
output_image = gray.copy()
# ---------------------------------------- just for internal evaluation ---------------------------------------
# to store only electrodes contours and nothing redundant
accepted_boxes = []
filtered_cnts = []
# loop over the contours individually
for c in cnts:
# compute the rotated bounding box of the contour
rbox = cv2.minAreaRect(c)
box = cv2.cv.BoxPoints(rbox) if is_cv2() else cv2.boxPoints(rbox)
box = array(box, dtype="int")
# order the points in the contour in top-left, top-right, bottom-right, and bottom-left
box = perspective.order_points(box)
# unpack the bounding box
(tl, tr, br, bl) = box
# compute the midpoints between the top-left and top-right as well as bottom-left and bottom-right coordinates
(tltrX, tltrY) = midpoint(tl, tr)
(blbrX, blbrY) = midpoint(bl, br)
# compute the midpoints between the top-left and top-right as well as the top-right and bottom-right coordinates
(tlblX, tlblY) = midpoint(tl, bl)
(trbrX, trbrY) = midpoint(tr, br)
# compute the Euclidean distance between the midpoints
dA = dist.euclidean((tltrX, tltrY), (blbrX, blbrY))
dB = dist.euclidean((tlblX, tlblY), (trbrX, trbrY))
# if the contour is not sufficiently large, ignore it
if dA < 100 or dB < 100:
continue
# check for redundancy
is_duplicate = any(
check_box_redundancy(rbox, existing) for existing in accepted_boxes
)
if is_duplicate:
continue
# accept box and contour
accepted_boxes.append(rbox)
filtered_cnts.append(c)
# compute the size of the electrode object
dimA = dA / pixelsPerMetricY # y
dimB = dB / pixelsPerMetricX # x
data_csv.extend(
[
f"{dimB:.3f}".replace(".", ","),
f"{dimA:.3f}".replace(".", ","),
f"{dimA * dimB:.1f}".replace(".", ","),
]
)
# ---------------------------------------- just for internal evaluation ---------------------------------------
count = 1
# loop over the original points and draw everything
cv2.drawContours(output_image, [box.astype("int")], -1, (0, 255, 0), 2)
for x, y in box:
cv2.circle(output_image, (int(x), int(y)), 5, (0, 0, 255), -1)
# draw the midpoints on the image
cv2.circle(output_image, (int(tltrX), int(tltrY)), 5, (255, 0, 0), -1)
cv2.circle(output_image, (int(blbrX), int(blbrY)), 5, (255, 0, 0), -1)
cv2.circle(output_image, (int(tlblX), int(tlblY)), 5, (255, 0, 0), -1)
cv2.circle(output_image, (int(trbrX), int(trbrY)), 5, (255, 0, 0), -1)
# draw lines between the midpoints
cv2.line(
output_image,
(int(tltrX), int(tltrY)),
(int(blbrX), int(blbrY)),
(255, 0, 255),
2,
)
cv2.line(
output_image,
(int(tlblX), int(tlblY)),
(int(trbrX), int(trbrY)),
(255, 0, 255),
2,
)
# draw the object sizes on the image
cv2.putText(
output_image,
"{:.2f}".format(dimA),
(int(tltrX - 100), int(tltrY + 40)),
cv2.FONT_HERSHEY_SIMPLEX,
1.75,
(0, 255, 0),
3,
)
cv2.putText(
output_image,
"{:.2f}".format(dimB),
(int(trbrX - 100), int(trbrY)),
cv2.FONT_HERSHEY_SIMPLEX,
1.75,
(0, 255, 0),
3,
)
# cv2.imwrite(path.join(folder_path, f'{name}_contour_{count}.png'), output_image)
count += 1
cv2.imwrite(path.join(folder_path, f"{name}_all_contours.png"), output_image)
# ---------------------------------------- just for internal evaluation ---------------------------------------
if not filtered_cnts:
error = "contour detection not valid: no contours recognized"
return None, None
# if incorrect number of electrodes has been identified
if len(filtered_cnts) != 6:
print("falsche Anzahl an Elektroden identifiziert", len(filtered_cnts))
data_csv = [-1] * 6
return data_csv, None
else:
# identify left and right sensor areas
x_min = min(npmin(c[:, 0, 0]) for c in filtered_cnts) - 20
x_max = max(npmax(c[:, 0, 0]) for c in filtered_cnts) + 20
y_min = min(npmin(c[:, 0, 1]) for c in filtered_cnts) - 20
y_max = max(npmax(c[:, 0, 1]) for c in filtered_cnts) + 20
rightmost_x_third = max(filtered_cnts[2][:, 0, 0])
leftmost_x_fourth = min(filtered_cnts[3][:, 0, 0])
x_middle = rightmost_x_third + int((leftmost_x_fourth - rightmost_x_third) / 2.0)
# perform further cropping and separation of left and right sensor
cropped_sensor_left = orig[y_min:y_max, x_min:x_middle]
cropped_sensor_right = orig[y_min:y_max, x_middle:x_max]
# ---------------------------------------- just for internal evaluation ---------------------------------------
# save the cropped images for left and right sensor
try:
cv2.imwrite(path.join(folder_path, f"{name}_left.png"), cropped_sensor_left)
cv2.imwrite(path.join(folder_path, f"{name}_right.png"), cropped_sensor_right)
except:
print("not possible")
# ---------------------------------------- just for internal evaluation ---------------------------------------
return data_csv, (cropped_sensor_left, cropped_sensor_right)
# anomaly detection
def infer_image(image, model):
# ----------------------------
# To evaluate the image
# Input:
# image (numpy.ndarray): represents image to be checked for anomalies
# model (serialized PyTorch state dictionary): model for anomaly detection
# Output:
# img_np (numpy.ndarray)
# anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies
# anomaly_score (float): evaluation metric, \in [0, 1] with close to 0 being no anomaly detected
# anomaly_label (bool): anomaly detected (1) or not (0)
# ----------------------------
torch_device = device("cuda" if cuda.is_available() else "cpu")
model.to(torch_device)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # this is optional
pil_image = Image.fromarray(image_rgb)
image = pil_image.convert("RGB")
input_tensor = (
to_dtype(to_image(image), float32, scale=True) if as_tensor else array(image) / 255.0
)
input_tensor = input_tensor.unsqueeze(0)
input_tensor = input_tensor.to(torch_device)
model.eval()
with no_grad():
output = model(input_tensor)
anomaly_score = output.pred_score.item()
anomaly_label = output.pred_label.item()
anomaly_map = output.anomaly_map.squeeze().cpu().numpy()
# resize heatmap to original image size
img_np = array(image)
anomaly_map_resized = cv2.resize(anomaly_map, (img_np.shape[1], img_np.shape[0]))
return img_np, anomaly_map_resized, anomaly_score, anomaly_label
def anomaly_detection(file_path, data_csv, sensor_images):
# ----------------------------
# To load the model, call function for anomaly detection and store the results
# Input:
# file_path (string): path to file including file name and extension
# data_csv (list of floats): results from measuring the electrodes
# Output:
# none
# ----------------------------
file = path.basename(file_path)
# extract file name and ending separately
name, endung = path.splitext(file)
# extract folder path
folder_path = path.dirname(file_path)
# reconstruct the model and initialize the engine
model = Patchcore(backbone=backbone, layers=layers, coreset_sampling_ratio=ratio)
engine = Engine()
# preparation for plot
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
# loop over left and right sensor
for i, image in enumerate(sensor_images):
# load the model
checkpoint = load(model_path[i])
model.load_state_dict(checkpoint["model_state_dict"])
# evaluate image
img_np, anomaly_map_resized, score, label = infer_image(image, model)
# ---------------------------------------- just for internal evaluation ---------------------------------------
print(score)
# ---------------------------------------- just for internal evaluation ---------------------------------------
# add result to data_csv
data_csv.extend([int(label)])
# store heatmap
ax = axes[i]
ax.axis("off")
ax.imshow(image, alpha=0.8)
ax.imshow(anomaly_map_resized, cmap="jet", alpha=0.5)
plt.subplots_adjust(wspace=0, hspace=0)
plt.savefig(
path.join(folder_path, f"{name}_Heatmap.png"), bbox_inches="tight", pad_inches=0
)
plt.close()
# save csv file
df = DataFrame([data_csv])
df.to_csv(
path.join(folder_path, f"{name}.csv"),
mode="w",
index=False,
header=False,
quoting=csv.QUOTE_NONE,
sep=";",
)
return
data_csv, sensors = measure_length(file_path, pixelsPerMetricX, pixelsPerMetricY)
anomaly_detection(file_path, data_csv, sensors)