From b13084d1c89f58fef24c8bbaa7538e0a8e2e7c78 Mon Sep 17 00:00:00 2001 From: foefl Date: Wed, 8 Oct 2025 09:39:48 +0200 Subject: [PATCH] add baseline code --- src/dopt-sensor-anomalies/detection.py | 403 +++++++++++++++++++++++++ 1 file changed, 403 insertions(+) create mode 100644 src/dopt-sensor-anomalies/detection.py diff --git a/src/dopt-sensor-anomalies/detection.py b/src/dopt-sensor-anomalies/detection.py new file mode 100644 index 0000000..51a3e61 --- /dev/null +++ b/src/dopt-sensor-anomalies/detection.py @@ -0,0 +1,403 @@ +import csv +from os import path + +# Image.MAX_IMAGE_PIXELS = None +import cv2 +import matplotlib.pyplot as plt +from anomalib.data import Folder +from anomalib.engine import Engine +from anomalib.metrics import AUROC, F1Score +from anomalib.models import Patchcore +from imutils import contours, grab_contours, is_cv2, perspective +from numpy import all, array, linalg +from numpy import max as npmax +from numpy import min as npmin +from numpy import sum as npsum +from pandas import DataFrame +from PIL import Image +from scipy.spatial import distance as dist +from torch import as_tensor, cuda, device, float32, load, no_grad +from torchvision.transforms.v2.functional import to_dtype, to_image + +# input parameters +file_path = r"C:\Users\demon\Documents\EKF\Analyse_fuer_Florian\bild2.bmp" +pixelsPerMetricX = 0.251 +pixelsPerMetricY = 0.251 + + +# internal parameters +schwellwert = 63 # threshold to distringuish black (electrodes) and white areas +model_path = [ + r"C:\Users\demon\Documents\EKF\Modelle\patchcore_model_links.pth", + r"C:\Users\demon\Documents\EKF\Modelle\patchcore_model_rechts.pth", +] # path to anomaly detection models +backbone = "resnet18" # parameters for AI model +layers = ["layer1", "layer2"] +ratio = 0.05 + + +# measuring +def midpoint(ptA, ptB): + # ---------------------------- + # To identify the midpoint of a 2D area + # Input: + # ptA (numpy.ndarray of shape (2, )): tuple of coordinates x, y + # ptB (numpy.ndarray of shape (2, )): tuple of coordinates x, y + # Output (tuple (float, float)): + # tuple of midpoint coordinates + # ---------------------------- + + return ((ptA[0] + ptB[0]) * 0.5, (ptA[1] + ptB[1]) * 0.5) + + +def check_box_redundancy(box1, box2, tolerance=5): + # ---------------------------- + # To check if bounding box has already been identified and is just a redundant one + # Input: + # box1 (tuple(float, float), (float, float), float)): tuple of box values: ((center_x, center_y), (width, height), angle) + # box2 (tuple(float, float), (float, float), float)): tuple of box values: ((center_x, center_y), (width, height), angle) + # tolerance (float): distance threshold for width and height + # Output (Boole): + # redundancy evaluation + # ---------------------------- + + # unpack the boxes + (c1, s1, a1) = box1 + (c2, s2, a2) = box2 + + # sort width and height such that (w, h) == (h, w) is treated the same (might have been recognized in different orders) + s1 = sorted(s1) + s2 = sorted(s2) + + center_dist = linalg.norm(array(c1) - array(c2)) + size_diff = linalg.norm(array(s1) - array(s2)) + + return center_dist < tolerance and size_diff < tolerance + + +def measure_length(file_path, pixelsPerMetricX, pixelsPerMetricY): + # ---------------------------- + # To identify the midpoint of a 2D area + # Input: + # file_path (string): path to file including file name and extension + # pixelsPerMetricX (float): scaling parameter, Pixels per micrometer in image + # pixelsPerMetricY (float): scaling parameter, Pixels per micrometer in image + # Output: + # data_csv (list): contains measured lengths and areas of electrodes (i.e., column 1 to 18 of csv file) + # image of left sensor + # image of right sensor + # ---------------------------- + + file = path.basename(file_path) + # extract file name and ending separately + name, endung = path.splitext(file) + # extract folder path + folder_path = path.dirname(file_path) + + # for data output + data_csv = [] + + # read + image = cv2.imread(file_path) + + # check if image was read + if image is None: + error = "error: no image read" + return + + # crop image + cropped = image[500:1500, 100 : image.shape[1] - 100] + + # store original image for later output + orig = cropped.copy() + height, width = cropped.shape[0], cropped.shape[1] + + # change colours in the image to black and white + gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(gray, schwellwert, 255, cv2.THRESH_BINARY) + + # perform edge detection, identify rectangular shapes + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) + closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) + edged = cv2.Canny(closed, 50, 100) + + # find contours in the edge map + cnts = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + cnts = grab_contours(cnts) + + if cnts is None: + print(f"{file}: offenbar nichts gefunden") + return None, None + + # sort the contours from left to right (i.e., use x coordinates) + cnts, _ = contours.sort_contours(cnts) + # bounding_boxes = list(set([cv2.boundingRect(c) for c in cnts])) + # cnts = [c for _, c in sorted(zip(bounding_boxes, cnts), key=lambda b: b[0][0])] + + # min_area = 1000 # adjust as needed + # filtered_cnts = [c for c in cnts if cv2.contourArea(c) > min_area] + + # check if this sorting was correct (might not be correct if we have overlaps or misfindings) + # get x coordinates of bounding boxes + x_coords = [cv2.boundingRect(c)[0] for c in cnts] + # check if x coordinates are sorted in increasing order + is_sorted = all(x1 <= x2 for x1, x2 in zip(x_coords, x_coords[1:])) + if not is_sorted: + error = ( + "contour detection not valid: contours are not properly sorted from left to right" + ) + return None, None + + # ---------------------------------------- just for internal evaluation --------------------------------------- + output_image = gray.copy() + # ---------------------------------------- just for internal evaluation --------------------------------------- + + # to store only electrodes contours and nothing redundant + accepted_boxes = [] + filtered_cnts = [] + + # loop over the contours individually + for c in cnts: + # compute the rotated bounding box of the contour + rbox = cv2.minAreaRect(c) + box = cv2.cv.BoxPoints(rbox) if is_cv2() else cv2.boxPoints(rbox) + box = array(box, dtype="int") + # order the points in the contour in top-left, top-right, bottom-right, and bottom-left + box = perspective.order_points(box) + + # unpack the bounding box + (tl, tr, br, bl) = box + # compute the midpoints between the top-left and top-right as well as bottom-left and bottom-right coordinates + (tltrX, tltrY) = midpoint(tl, tr) + (blbrX, blbrY) = midpoint(bl, br) + # compute the midpoints between the top-left and top-right as well as the top-right and bottom-right coordinates + (tlblX, tlblY) = midpoint(tl, bl) + (trbrX, trbrY) = midpoint(tr, br) + + # compute the Euclidean distance between the midpoints + dA = dist.euclidean((tltrX, tltrY), (blbrX, blbrY)) + dB = dist.euclidean((tlblX, tlblY), (trbrX, trbrY)) + + # if the contour is not sufficiently large, ignore it + if dA < 100 or dB < 100: + continue + + # check for redundancy + is_duplicate = any( + check_box_redundancy(rbox, existing) for existing in accepted_boxes + ) + if is_duplicate: + continue + + # accept box and contour + accepted_boxes.append(rbox) + filtered_cnts.append(c) + + # compute the size of the electrode object + dimA = dA / pixelsPerMetricY # y + dimB = dB / pixelsPerMetricX # x + + data_csv.extend( + [ + f"{dimB:.3f}".replace(".", ","), + f"{dimA:.3f}".replace(".", ","), + f"{dimA * dimB:.1f}".replace(".", ","), + ] + ) + + # ---------------------------------------- just for internal evaluation --------------------------------------- + count = 1 + # loop over the original points and draw everything + cv2.drawContours(output_image, [box.astype("int")], -1, (0, 255, 0), 2) + for x, y in box: + cv2.circle(output_image, (int(x), int(y)), 5, (0, 0, 255), -1) + # draw the midpoints on the image + cv2.circle(output_image, (int(tltrX), int(tltrY)), 5, (255, 0, 0), -1) + cv2.circle(output_image, (int(blbrX), int(blbrY)), 5, (255, 0, 0), -1) + cv2.circle(output_image, (int(tlblX), int(tlblY)), 5, (255, 0, 0), -1) + cv2.circle(output_image, (int(trbrX), int(trbrY)), 5, (255, 0, 0), -1) + # draw lines between the midpoints + cv2.line( + output_image, + (int(tltrX), int(tltrY)), + (int(blbrX), int(blbrY)), + (255, 0, 255), + 2, + ) + cv2.line( + output_image, + (int(tlblX), int(tlblY)), + (int(trbrX), int(trbrY)), + (255, 0, 255), + 2, + ) + # draw the object sizes on the image + cv2.putText( + output_image, + "{:.2f}".format(dimA), + (int(tltrX - 100), int(tltrY + 40)), + cv2.FONT_HERSHEY_SIMPLEX, + 1.75, + (0, 255, 0), + 3, + ) + cv2.putText( + output_image, + "{:.2f}".format(dimB), + (int(trbrX - 100), int(trbrY)), + cv2.FONT_HERSHEY_SIMPLEX, + 1.75, + (0, 255, 0), + 3, + ) + # cv2.imwrite(path.join(folder_path, f'{name}_contour_{count}.png'), output_image) + count += 1 + + cv2.imwrite(path.join(folder_path, f"{name}_all_contours.png"), output_image) + # ---------------------------------------- just for internal evaluation --------------------------------------- + + if not filtered_cnts: + error = "contour detection not valid: no contours recognized" + return None, None + + # if incorrect number of electrodes has been identified + if len(filtered_cnts) != 6: + print("falsche Anzahl an Elektroden identifiziert", len(filtered_cnts)) + data_csv = [-1] * 6 + return data_csv, None + + else: + # identify left and right sensor areas + x_min = min(npmin(c[:, 0, 0]) for c in filtered_cnts) - 20 + x_max = max(npmax(c[:, 0, 0]) for c in filtered_cnts) + 20 + y_min = min(npmin(c[:, 0, 1]) for c in filtered_cnts) - 20 + y_max = max(npmax(c[:, 0, 1]) for c in filtered_cnts) + 20 + + rightmost_x_third = max(filtered_cnts[2][:, 0, 0]) + leftmost_x_fourth = min(filtered_cnts[3][:, 0, 0]) + x_middle = rightmost_x_third + int((leftmost_x_fourth - rightmost_x_third) / 2.0) + + # perform further cropping and separation of left and right sensor + cropped_sensor_left = orig[y_min:y_max, x_min:x_middle] + cropped_sensor_right = orig[y_min:y_max, x_middle:x_max] + + # ---------------------------------------- just for internal evaluation --------------------------------------- + # save the cropped images for left and right sensor + try: + cv2.imwrite(path.join(folder_path, f"{name}_left.png"), cropped_sensor_left) + cv2.imwrite(path.join(folder_path, f"{name}_right.png"), cropped_sensor_right) + except: + print("not possible") + # ---------------------------------------- just for internal evaluation --------------------------------------- + + return data_csv, (cropped_sensor_left, cropped_sensor_right) + + +# anomaly detection +def infer_image(image, model): + # ---------------------------- + # To evaluate the image + # Input: + # image (numpy.ndarray): represents image to be checked for anomalies + # model (serialized PyTorch state dictionary): model for anomaly detection + # Output: + # img_np (numpy.ndarray) + # anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies + # anomaly_score (float): evaluation metric, \in [0, 1] with close to 0 being no anomaly detected + # anomaly_label (bool): anomaly detected (1) or not (0) + # ---------------------------- + + torch_device = device("cuda" if cuda.is_available() else "cpu") + model.to(torch_device) + + image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # this is optional + pil_image = Image.fromarray(image_rgb) + image = pil_image.convert("RGB") + input_tensor = ( + to_dtype(to_image(image), float32, scale=True) if as_tensor else array(image) / 255.0 + ) + input_tensor = input_tensor.unsqueeze(0) + input_tensor = input_tensor.to(torch_device) + + model.eval() + with no_grad(): + output = model(input_tensor) + + anomaly_score = output.pred_score.item() + anomaly_label = output.pred_label.item() + anomaly_map = output.anomaly_map.squeeze().cpu().numpy() + + # resize heatmap to original image size + img_np = array(image) + anomaly_map_resized = cv2.resize(anomaly_map, (img_np.shape[1], img_np.shape[0])) + + return img_np, anomaly_map_resized, anomaly_score, anomaly_label + + +def anomaly_detection(file_path, data_csv, sensor_images): + # ---------------------------- + # To load the model, call function for anomaly detection and store the results + # Input: + # file_path (string): path to file including file name and extension + # data_csv (list of floats): results from measuring the electrodes + # Output: + # none + # ---------------------------- + file = path.basename(file_path) + # extract file name and ending separately + name, endung = path.splitext(file) + # extract folder path + folder_path = path.dirname(file_path) + + # reconstruct the model and initialize the engine + model = Patchcore(backbone=backbone, layers=layers, coreset_sampling_ratio=ratio) + engine = Engine() + + # preparation for plot + fig, axes = plt.subplots(1, 2, figsize=(12, 6)) + + # loop over left and right sensor + for i, image in enumerate(sensor_images): + # load the model + checkpoint = load(model_path[i]) + model.load_state_dict(checkpoint["model_state_dict"]) + + # evaluate image + img_np, anomaly_map_resized, score, label = infer_image(image, model) + + # ---------------------------------------- just for internal evaluation --------------------------------------- + print(score) + # ---------------------------------------- just for internal evaluation --------------------------------------- + + # add result to data_csv + data_csv.extend([int(label)]) + + # store heatmap + ax = axes[i] + ax.axis("off") + ax.imshow(image, alpha=0.8) + ax.imshow(anomaly_map_resized, cmap="jet", alpha=0.5) + + plt.subplots_adjust(wspace=0, hspace=0) + plt.savefig( + path.join(folder_path, f"{name}_Heatmap.png"), bbox_inches="tight", pad_inches=0 + ) + plt.close() + + # save csv file + df = DataFrame([data_csv]) + df.to_csv( + path.join(folder_path, f"{name}.csv"), + mode="w", + index=False, + header=False, + quoting=csv.QUOTE_NONE, + sep=";", + ) + + return + + +data_csv, sensors = measure_length(file_path, pixelsPerMetricX, pixelsPerMetricY) + +anomaly_detection(file_path, data_csv, sensors)