From 3963fa8d0b879a6ea6b9b8c10b3ee7675f10d629 Mon Sep 17 00:00:00 2001 From: foefl Date: Thu, 9 Oct 2025 10:52:42 +0200 Subject: [PATCH] major code rework --- src/dopt_sensor_anomalies/detection.py | 203 +++++++++---------------- 1 file changed, 70 insertions(+), 133 deletions(-) diff --git a/src/dopt_sensor_anomalies/detection.py b/src/dopt_sensor_anomalies/detection.py index de9cf04..5c898e7 100644 --- a/src/dopt_sensor_anomalies/detection.py +++ b/src/dopt_sensor_anomalies/detection.py @@ -92,20 +92,33 @@ def measure_length( file_path: Path, pixels_per_metric_X: float, pixels_per_metric_Y: float, -) -> tuple[list[str | int], t.SensorImages]: - # ---------------------------- - # To identify the midpoint of a 2D area - # Input: - # file_path (string): path to file including file name and extension - # pixelsPerMetricX (float): scaling parameter, Pixels per micrometer in image - # pixelsPerMetricY (float): scaling parameter, Pixels per micrometer in image - # Output: - # data_csv (list): contains measured lengths and areas of electrodes (i.e., column 1 to 18 of csv file) - # image of left sensor - # image of right sensor - # ---------------------------- - file_stem = file_path.stem - folder_path = file_path.parent +) -> tuple[t.CsvData, t.SensorImages]: + """detect and measure the size of the electrodes + + Parameters + ---------- + file_path : Path + path to file to analyse + pixels_per_metric_X : float + scaling parameter x dimension, Pixels per micrometer in image + pixels_per_metric_Y : float + scaling parameter y dimension, Pixels per micrometer in image + + Returns + ------- + tuple[t.CsvData, t.SensorImages] + t.CsvData: (list) data to save as CSV according to requirements, contains strings and ints + t.SensorImages: (TypedDict) contains left and right image corresponding to each sensor + + Raises + ------ + errors.ImageNotReadError + image was not read successfully + errors.ContourCalculationError + during contour detection there were several possible error causes + errors.InvalidElectrodeCount + an invalid number of electrodes were detected + """ data_csv: list[str | int] = [] image = cv2.imread(str(file_path)) if image is None: @@ -113,9 +126,6 @@ def measure_length( cropped = image[500:1500, 100 : image.shape[1] - 100] orig = cropped.copy() - # TODO: check removal - # height, width = cropped.shape[0], cropped.shape[1] - # change colours in the image to black and white gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, const.THRESHOLD_BW, 255, cv2.THRESH_BINARY) @@ -126,20 +136,12 @@ def measure_length( # find contours in the edge map cnts = cv2.findContours(edged.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) cnts = imutils.grab_contours(cnts) - if cnts is None: raise errors.ContourCalculationError( "No contours were found in the provided image. Can not continue analysis." ) - # sort the contours from left to right (i.e., use x coordinates) cnts, _ = contours.sort_contours(cnts) - # TODO: remove??? - # bounding_boxes = list(set([cv2.boundingRect(c) for c in cnts])) - # cnts = [c for _, c in sorted(zip(bounding_boxes, cnts), key=lambda b: b[0][0])] - # min_area = 1000 # adjust as needed - # filtered_cnts = [c for c in cnts if cv2.contourArea(c) > min_area] - # check if this sorting was correct (might not be correct if we have overlaps or misfindings) # get x coordinates of bounding boxes x_coords = [cv2.boundingRect(c)[0] for c in cnts] @@ -150,14 +152,6 @@ def measure_length( "Contour detection not valid: contours are not " "properly sorted from left to right." ) - - ################################################################## - # TODO: Remove?? - # ---------------------------------------- just for internal evaluation --------------------------------------- - output_image = gray.copy() - # ---------------------------------------- just for internal evaluation --------------------------------------- - ################################################################## - # to store only electrodes contours and nothing redundant accepted_boxes: list[t.Box] = [] filtered_cnts: list[Any] = [] @@ -213,60 +207,6 @@ def measure_length( ] ) - ################################################################## - # TODO: Remove?? - # ---------------------------------------- just for internal evaluation --------------------------------------- - count = 1 - # loop over the original points and draw everything - cv2.drawContours(output_image, [box.astype("int")], -1, (0, 255, 0), 2) - for x, y in box: - cv2.circle(output_image, (int(x), int(y)), 5, (0, 0, 255), -1) - # draw the midpoints on the image - cv2.circle(output_image, (int(tltrX), int(tltrY)), 5, (255, 0, 0), -1) - cv2.circle(output_image, (int(blbrX), int(blbrY)), 5, (255, 0, 0), -1) - cv2.circle(output_image, (int(tlblX), int(tlblY)), 5, (255, 0, 0), -1) - cv2.circle(output_image, (int(trbrX), int(trbrY)), 5, (255, 0, 0), -1) - # draw lines between the midpoints - cv2.line( - output_image, - (int(tltrX), int(tltrY)), - (int(blbrX), int(blbrY)), - (255, 0, 255), - 2, - ) - cv2.line( - output_image, - (int(tlblX), int(tlblY)), - (int(trbrX), int(trbrY)), - (255, 0, 255), - 2, - ) - # draw the object sizes on the image - cv2.putText( - output_image, - "{:.2f}".format(dimA), - (int(tltrX - 100), int(tltrY + 40)), - cv2.FONT_HERSHEY_SIMPLEX, - 1.75, - (0, 255, 0), - 3, - ) - cv2.putText( - output_image, - "{:.2f}".format(dimB), - (int(trbrX - 100), int(trbrY)), - cv2.FONT_HERSHEY_SIMPLEX, - 1.75, - (0, 255, 0), - 3, - ) - # cv2.imwrite(path.join(folder_path, f'{name}_contour_{count}.png'), output_image) - count += 1 - - cv2.imwrite(str(folder_path / f"{file_stem}_all_contours.png"), output_image) - # ---------------------------------------- just for internal evaluation --------------------------------------- - ################################################################## - if not filtered_cnts: raise errors.ContourCalculationError( "Contour detection not valid: no contours recognized" @@ -294,39 +234,34 @@ def measure_length( cropped_sensor_left = orig[y_min:y_max, x_min:x_middle] cropped_sensor_right = orig[y_min:y_max, x_middle:x_max] - ################################################################## - # TODO: Remove?? - # ---------------------------------------- just for internal evaluation --------------------------------------- - # save the cropped images for left and right sensor - try: - cv2.imwrite(path.join(folder_path, f"{file_stem}_left.png"), cropped_sensor_left) - cv2.imwrite(path.join(folder_path, f"{file_stem}_right.png"), cropped_sensor_right) - except Exception as err: - print(f"not possible: Error: {err}") - # ---------------------------------------- just for internal evaluation --------------------------------------- - ################################################################## - return data_csv, t.SensorImages(left=cropped_sensor_left, right=cropped_sensor_right) # helper function # anomaly detection def infer_image( - image: npt.NDArray, + image: npt.NDArray[np.uint8], model: Patchcore, ) -> t.InferenceResult: - # ---------------------------- - # To evaluate the image - # Input: - # image (numpy.ndarray): represents image to be checked for anomalies - # model (serialized PyTorch state dictionary): model for anomaly detection - # Output: - # img_np (numpy.ndarray) - # anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies - # anomaly_score (float): evaluation metric, \in [0, 1] with close to 0 being no anomaly detected - # anomaly_label (bool): anomaly detected (1) or not (0) - # ---------------------------- + """evaluate one image + Parameters + ---------- + image : npt.NDArray[np.uint8] + represents image to be checked for anomalies + model : Patchcore + (loaded PyTorch state dictionary): model for anomaly detection + + Returns + ------- + t.InferenceResult + contains: + img (numpy.ndarray) + anomaly_map_resized (numpy.ndarray): heatmap to visualize detected anomalies + anomaly_score (float): evaluation metric, in [0, 1] with close to 0 being no + anomaly detected + anomaly_label (bool): anomaly detected (1) or not (0) + """ torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(torch_device) @@ -355,24 +290,34 @@ def infer_image( img_np = np.array(pil_image) anomaly_map_resized = cv2.resize(anomaly_map, (img_np.shape[1], img_np.shape[0])) - return img_np, anomaly_map_resized, anomaly_score, anomaly_label + return t.InferenceResult( + img=img_np, + anomaly_map_resized=anomaly_map_resized, + anomaly_score=anomaly_score, + anomaly_label=anomaly_label, + ) # ** main function def anomaly_detection( file_path: Path, detection_models: t.DetectionModels, - data_csv: list[str | int], + data_csv: t.CsvData, sensor_images: t.SensorImages, ) -> None: - # ---------------------------- - # To load the model, call function for anomaly detection and store the results - # Input: - # file_path (string): path to file including file name and extension - # data_csv (list of floats): results from measuring the electrodes - # Output: - # none - # ---------------------------- + """load the model, call function for anomaly detection and store the results + + Parameters + ---------- + file_path : Path + path to file to analyse + detection_models : t.DetectionModels + collection of model paths for the left and right sensor + data_csv : t.CsvData + (list) data to save as CSV according to requirements, contains strings and ints + sensor_images : t.SensorImages + _description_ + """ file_stem = file_path.stem folder_path = file_path.parent @@ -389,25 +334,17 @@ def anomaly_detection( # loop over left and right sensor for i, (side, image) in enumerate(sensor_images.items()): # Ich habe die Modellpfade als Funktionsparameter hinzugefügt - image = cast(npt.NDArray, image) + image = cast(npt.NDArray[np.uint8], image) checkpoint = torch.load(detection_models[side]) model.load_state_dict(checkpoint["model_state_dict"]) - _, anomaly_map_resized, score, label = infer_image(image, model) - - ################################################################## - # TODO: Remove?? - # ---------------------------------------- just for internal evaluation --------------------------------------- - print(score) - # ---------------------------------------- just for internal evaluation --------------------------------------- - ################################################################## - - data_csv.extend([int(label)]) + result = infer_image(image, model) + data_csv.extend([int(result.anomaly_label)]) ax = axes[i] ax.axis("off") ax.imshow(image, alpha=0.8) - ax.imshow(anomaly_map_resized, cmap="jet", alpha=0.5) + ax.imshow(result.anomaly_map_resized, cmap="jet", alpha=0.5) plt.subplots_adjust(wspace=0, hspace=0) plt.savefig(