From 13c3c432615f27eb9c52ce3b8729704c082200e1 Mon Sep 17 00:00:00 2001 From: foefl Date: Mon, 2 Mar 2026 15:47:44 +0100 Subject: [PATCH] working prototype with significant speed-up --- src/KSG_anomaly_detection/_prepare_env.py | 11 +++ src/KSG_anomaly_detection/_profile.py | 34 ++++++--- src/KSG_anomaly_detection/config_for_test.py | 1 + src/KSG_anomaly_detection/delegator.py | 61 +++++++++++++--- src/KSG_anomaly_detection/monitor.py | 22 ++++-- src/KSG_anomaly_detection/preparation.py | 77 ++++++++++++++++++-- 6 files changed, 173 insertions(+), 33 deletions(-) diff --git a/src/KSG_anomaly_detection/_prepare_env.py b/src/KSG_anomaly_detection/_prepare_env.py index 8aaf35f..7bfbaaf 100644 --- a/src/KSG_anomaly_detection/_prepare_env.py +++ b/src/KSG_anomaly_detection/_prepare_env.py @@ -44,6 +44,17 @@ def main() -> None: ) paths_dst.append(p_data) + p_orig_data = ( + BASE_PATH / "_Originaldaten/614706_helles Entek/614706_helles Entek[3136761]_3" + ) + assert p_orig_data.exists(), "original data not existing" + paths_src.append(p_orig_data) + + p_data = recreate_folder( + "Verifizierdaten_1/20260225/614706_helles Entek/614706_helles Entek[3136761]_3" + ) + paths_dst.append(p_data) + for src, dst in zip(paths_src, paths_dst): shutil.copytree(src, dst, dirs_exist_ok=True) diff --git a/src/KSG_anomaly_detection/_profile.py b/src/KSG_anomaly_detection/_profile.py index 6253f55..246ede4 100644 --- a/src/KSG_anomaly_detection/_profile.py +++ b/src/KSG_anomaly_detection/_profile.py @@ -1,13 +1,15 @@ import cProfile import pstats +import time -from KSG_anomaly_detection import _prepare_env +from KSG_anomaly_detection import _prepare_env, delegator from KSG_anomaly_detection.monitor import monitor_folder_simple profiler = cProfile.Profile() -PROFILE = False -USE_NEW_IMPL = True +PROFILE = True +USE_NEW_IMPL = False +USE_MP = False ONLY_PREPARE = False @@ -16,15 +18,25 @@ def main() -> None: if ONLY_PREPARE: return - if PROFILE: - profiler.enable() - monitor_folder_simple(use_new=USE_NEW_IMPL) - profiler.disable() + mp_pool = delegator.MPPool() - stats = pstats.Stats(profiler).sort_stats("cumtime") - stats.print_stats(20) - else: - monitor_folder_simple(use_new=USE_NEW_IMPL) + try: + t1 = time.perf_counter() + if PROFILE: + profiler.enable() + monitor_folder_simple(mp_pool=mp_pool, use_new=USE_NEW_IMPL, use_mp=USE_MP) + profiler.disable() + + stats = pstats.Stats(profiler).sort_stats("cumtime") + ENTRIES_TO_SHOW = 40 if USE_MP else 20 + stats.print_stats(ENTRIES_TO_SHOW) + else: + monitor_folder_simple(mp_pool=mp_pool, use_new=USE_NEW_IMPL, use_mp=USE_MP) + t2 = time.perf_counter() + finally: + mp_pool.close() + + print(f"Elapsed time: {t2 - t1} s") if __name__ == "__main__": diff --git a/src/KSG_anomaly_detection/config_for_test.py b/src/KSG_anomaly_detection/config_for_test.py index cacc114..036e8a6 100644 --- a/src/KSG_anomaly_detection/config_for_test.py +++ b/src/KSG_anomaly_detection/config_for_test.py @@ -7,4 +7,5 @@ PATH = r"B:\projects\KSG\Ordnerstruktur" FOLDER_LIST = [ r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1", r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_2", + r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_3", ] diff --git a/src/KSG_anomaly_detection/delegator.py b/src/KSG_anomaly_detection/delegator.py index 4f913f1..12e33c3 100644 --- a/src/KSG_anomaly_detection/delegator.py +++ b/src/KSG_anomaly_detection/delegator.py @@ -1,36 +1,77 @@ from __future__ import annotations +import math import multiprocessing as mp -from collections.abc import Iterable, Sequence +from collections.abc import Callable, Collection, Iterable from typing import Any, TypeVar import psutil T = TypeVar("T") +D = TypeVar("D") class MPPool: - def __init__(self) -> None: + def __init__( + self, + ) -> None: self.num_workers = psutil.cpu_count(logical=False) or 4 + print("Set number of workers to: ", self.num_workers) self.pool = mp.Pool(processes=self.num_workers) + def enrich_data_funcargs( + self, + data: Iterable[T], + arg: D, + ) -> list[tuple[T, D]]: + return [(entry, arg) for entry in data] + + def get_chunksize( + self, + data: Collection[Any], + ) -> int: + chunk_size = max(1, math.ceil(len(data) / self.num_workers)) + + return chunk_size + def chunk_data( self, data: list[T], chunk_size: int | None = None, - ) -> Sequence[Sequence[T]]: + ) -> list[list[T]]: if chunk_size is None: chunk_size = max(1, len(data) // self.num_workers) chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] + chunks_assigned = chunks[: self.num_workers] - if len(chunks) > self.num_workers: - open_chunk = chunks[-1] - for idx, entry in enumerate(open_chunk): - chunks[idx].append(entry) - del chunks[-1] + if len(chunks) - self.num_workers > 0: + open_chunks = chunks[self.num_workers :] + open_entries = (entry for chunk in open_chunks for entry in chunk) - return chunks + for idx, entry in enumerate(open_entries): + chunks_assigned[idx].append(entry) - def stop(self) -> None: + return chunks_assigned + + def map( + self, + func: Callable[[Any], None], + chunks: Iterable[Any], + ) -> None: + # assumes pre-batched data with "chunk_data" + _ = self.pool.map(func, chunks, chunksize=1) + + def starmap( + self, + func: Callable[[Any], None], + chunks: Iterable[tuple[Any, ...]], + ) -> None: + # assumes pre-batched data with "chunk_data" + _ = self.pool.starmap(func, chunks, chunksize=1) + + def close(self) -> None: self.pool.close() self.pool.join() + + def terminate(self) -> None: + self.pool.terminate() diff --git a/src/KSG_anomaly_detection/monitor.py b/src/KSG_anomaly_detection/monitor.py index 2f78f34..be0777f 100644 --- a/src/KSG_anomaly_detection/monitor.py +++ b/src/KSG_anomaly_detection/monitor.py @@ -5,7 +5,7 @@ import time import traceback from pathlib import Path -from KSG_anomaly_detection import config, config_for_test +from KSG_anomaly_detection import config, config_for_test, delegator from KSG_anomaly_detection.preparation import Preparation from KSG_anomaly_detection.window_manager import WindowManager @@ -126,7 +126,7 @@ def monitor_folder(manager: WindowManager): time.sleep(60) -def monitor_folder_simple(use_new: bool): +def monitor_folder_simple(mp_pool: delegator.MPPool, use_new: bool, use_mp: bool): print("starting procedure...") for folder in config_for_test.FOLDER_LIST: @@ -156,9 +156,9 @@ def monitor_folder_simple(use_new: bool): continue # zu nächstem neuen folder springen # Aufgabe 3: check_img im Originalordner anpassen (d. h. gelbe Farbe: work in progress) - print("'change_image_to_yellow'...") - SKIP_NEXT = True + SKIP_NEXT = False if not SKIP_NEXT: + print("'change_image_to_yellow'...") if use_new: preparation.change_image_to_yellow_new() else: @@ -168,9 +168,9 @@ def monitor_folder_simple(use_new: bool): # Aufgabe 4: AOI-Bilder in RGB überführen und zwischenspeichern # wir erhalten hier den Speicherort sowie ggf. Fehlermeldungen zurück - print("'create_rgb_images_and_patches'...") SKIP_NEXT = False - if not SKIP_NEXT: + if not use_mp and not SKIP_NEXT: + print("'create_rgb_images_and_patches'...") if use_new: current_folder, result = ( preparation.create_rgb_images_and_patches_new() @@ -178,6 +178,16 @@ def monitor_folder_simple(use_new: bool): else: current_folder, result = preparation.create_rgb_images_and_patches() + SKIP_NEXT = False + if use_mp and not SKIP_NEXT: + print("'create_rgb_images_and_patches' multiprocessing...") + if use_new: + current_folder, result = ( + preparation.create_rgb_images_and_patches_new2(mp_pool) + ) + else: + current_folder, result = preparation.create_rgb_images_and_patches() + print("finished routine") if result is not None: diff --git a/src/KSG_anomaly_detection/preparation.py b/src/KSG_anomaly_detection/preparation.py index 4f2b35a..96e508f 100644 --- a/src/KSG_anomaly_detection/preparation.py +++ b/src/KSG_anomaly_detection/preparation.py @@ -1,17 +1,19 @@ +import multiprocessing import os import re import sys import traceback +from collections.abc import Iterable from pathlib import Path from pprint import pprint from shutil import copytree -from typing import Literal, cast +from typing import Literal, TypeAlias, cast import pyvips from PIL import Image from pyvips import Image as vipsImage -from KSG_anomaly_detection import config +from KSG_anomaly_detection import config, delegator Image.MAX_IMAGE_PIXELS = None COLOUR_ASSIGNMENT = {"R": [255, 0, 0], "G": [0, 255, 0], "B": [0, 0, 0]} @@ -334,10 +336,10 @@ class Preparation: ) # print(f">>> {checkimg_folders=}") - images = tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg")) - print(f">>> {len(images)=}") - - sys.exit(0) + # images = tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg")) + # print(f">>> {len(images)=}") + # pprint(images) + # sys.exit(0) # iterate through all 'checkimg' folders recursively for checkimg_folder in checkimg_folders: @@ -379,3 +381,66 @@ class Preparation: rgb_image.write_to_file(save_path_rgb / filename) return "folder_name", None + + def create_rgb_images_and_patches_new2(self, pool: delegator.MPPool): + # in the folders of interest, we iterate over all images and search for the three that belong together + # (because in advance we do not know how many there are) + + # create folder name in our temp folder "Backup" and store it + # therefore, first extract the name of the current folder from the whole path + rgb_saving_path = cast(Path, Path(config.CURRENT_PATH_RGB) / self.path.name) + + try: + rgb_saving_path.mkdir(parents=True, exist_ok=False) + except FileExistsError: + return ( + None, + f"Fehlermeldung: Ordner {Path(self.folder_path).parts[-1]} existiert bereits.", + ) + except Exception as e: + return None, f"Fehlermeldung: {e}" + + images = cast( + tuple[Path, ...], tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg")) + ) + images = pool.enrich_data_funcargs(images, rgb_saving_path) + chunks = pool.chunk_data(images) + # these are all images which must be processed + pool.map(transform_to_rgb, chunks) + + return "folder_name", None + + +def transform_to_rgb( + files: Iterable[tuple[Path, Path]], +) -> None: + # iterable contains path to image file and the base saving path + # for RGB images + # saving_path is "new_folder_path" from above + # must be included in function call + for image, saving_path in files: + relative_path = image.parts[-3:-1] + save_path_rgb = saving_path.joinpath(*relative_path) + + save_path_rgb.mkdir(parents=True, exist_ok=True) + base_folder = image.parent + assert base_folder.is_dir(), "base folder of image not a directory" + + match = re.match(r"R_NG(\d+)_(\d+)\.jpg$", image.name) + if not match: + continue + num1, num2 = match.groups() + + # find all three images belonging together + r_path = image + g_path = base_folder / f"G_NG{num1}_{num2}.jpg" + b_path = base_folder / f"B_NG{num1}_{num2}.jpg" + + # open all three images and combine them to RGB + r = pyvips.Image.new_from_file(r_path, access="sequential") + g = pyvips.Image.new_from_file(g_path, access="sequential") + b = pyvips.Image.new_from_file(b_path, access="sequential") + rgb_image = r.bandjoin([g, b]) # type: ignore + rgb_image = rgb_image.copy(interpretation="srgb") + filename = f"RGB_NG{num1}_{num2}.png" + rgb_image.write_to_file(save_path_rgb / filename)