generated from dopt-python/py311
working prototype with significant speed-up
This commit is contained in:
parent
a9c6e4a260
commit
13c3c43261
@ -44,6 +44,17 @@ def main() -> None:
|
||||
)
|
||||
paths_dst.append(p_data)
|
||||
|
||||
p_orig_data = (
|
||||
BASE_PATH / "_Originaldaten/614706_helles Entek/614706_helles Entek[3136761]_3"
|
||||
)
|
||||
assert p_orig_data.exists(), "original data not existing"
|
||||
paths_src.append(p_orig_data)
|
||||
|
||||
p_data = recreate_folder(
|
||||
"Verifizierdaten_1/20260225/614706_helles Entek/614706_helles Entek[3136761]_3"
|
||||
)
|
||||
paths_dst.append(p_data)
|
||||
|
||||
for src, dst in zip(paths_src, paths_dst):
|
||||
shutil.copytree(src, dst, dirs_exist_ok=True)
|
||||
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
import cProfile
|
||||
import pstats
|
||||
import time
|
||||
|
||||
from KSG_anomaly_detection import _prepare_env
|
||||
from KSG_anomaly_detection import _prepare_env, delegator
|
||||
from KSG_anomaly_detection.monitor import monitor_folder_simple
|
||||
|
||||
profiler = cProfile.Profile()
|
||||
|
||||
PROFILE = False
|
||||
USE_NEW_IMPL = True
|
||||
PROFILE = True
|
||||
USE_NEW_IMPL = False
|
||||
USE_MP = False
|
||||
ONLY_PREPARE = False
|
||||
|
||||
|
||||
@ -16,15 +18,25 @@ def main() -> None:
|
||||
if ONLY_PREPARE:
|
||||
return
|
||||
|
||||
if PROFILE:
|
||||
profiler.enable()
|
||||
monitor_folder_simple(use_new=USE_NEW_IMPL)
|
||||
profiler.disable()
|
||||
mp_pool = delegator.MPPool()
|
||||
|
||||
stats = pstats.Stats(profiler).sort_stats("cumtime")
|
||||
stats.print_stats(20)
|
||||
else:
|
||||
monitor_folder_simple(use_new=USE_NEW_IMPL)
|
||||
try:
|
||||
t1 = time.perf_counter()
|
||||
if PROFILE:
|
||||
profiler.enable()
|
||||
monitor_folder_simple(mp_pool=mp_pool, use_new=USE_NEW_IMPL, use_mp=USE_MP)
|
||||
profiler.disable()
|
||||
|
||||
stats = pstats.Stats(profiler).sort_stats("cumtime")
|
||||
ENTRIES_TO_SHOW = 40 if USE_MP else 20
|
||||
stats.print_stats(ENTRIES_TO_SHOW)
|
||||
else:
|
||||
monitor_folder_simple(mp_pool=mp_pool, use_new=USE_NEW_IMPL, use_mp=USE_MP)
|
||||
t2 = time.perf_counter()
|
||||
finally:
|
||||
mp_pool.close()
|
||||
|
||||
print(f"Elapsed time: {t2 - t1} s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -7,4 +7,5 @@ PATH = r"B:\projects\KSG\Ordnerstruktur"
|
||||
FOLDER_LIST = [
|
||||
r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1",
|
||||
r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_2",
|
||||
r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_3",
|
||||
]
|
||||
|
||||
@ -1,36 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import multiprocessing as mp
|
||||
from collections.abc import Iterable, Sequence
|
||||
from collections.abc import Callable, Collection, Iterable
|
||||
from typing import Any, TypeVar
|
||||
|
||||
import psutil
|
||||
|
||||
T = TypeVar("T")
|
||||
D = TypeVar("D")
|
||||
|
||||
|
||||
class MPPool:
|
||||
def __init__(self) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
) -> None:
|
||||
self.num_workers = psutil.cpu_count(logical=False) or 4
|
||||
print("Set number of workers to: ", self.num_workers)
|
||||
self.pool = mp.Pool(processes=self.num_workers)
|
||||
|
||||
def enrich_data_funcargs(
|
||||
self,
|
||||
data: Iterable[T],
|
||||
arg: D,
|
||||
) -> list[tuple[T, D]]:
|
||||
return [(entry, arg) for entry in data]
|
||||
|
||||
def get_chunksize(
|
||||
self,
|
||||
data: Collection[Any],
|
||||
) -> int:
|
||||
chunk_size = max(1, math.ceil(len(data) / self.num_workers))
|
||||
|
||||
return chunk_size
|
||||
|
||||
def chunk_data(
|
||||
self,
|
||||
data: list[T],
|
||||
chunk_size: int | None = None,
|
||||
) -> Sequence[Sequence[T]]:
|
||||
) -> list[list[T]]:
|
||||
if chunk_size is None:
|
||||
chunk_size = max(1, len(data) // self.num_workers)
|
||||
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
|
||||
chunks_assigned = chunks[: self.num_workers]
|
||||
|
||||
if len(chunks) > self.num_workers:
|
||||
open_chunk = chunks[-1]
|
||||
for idx, entry in enumerate(open_chunk):
|
||||
chunks[idx].append(entry)
|
||||
del chunks[-1]
|
||||
if len(chunks) - self.num_workers > 0:
|
||||
open_chunks = chunks[self.num_workers :]
|
||||
open_entries = (entry for chunk in open_chunks for entry in chunk)
|
||||
|
||||
return chunks
|
||||
for idx, entry in enumerate(open_entries):
|
||||
chunks_assigned[idx].append(entry)
|
||||
|
||||
def stop(self) -> None:
|
||||
return chunks_assigned
|
||||
|
||||
def map(
|
||||
self,
|
||||
func: Callable[[Any], None],
|
||||
chunks: Iterable[Any],
|
||||
) -> None:
|
||||
# assumes pre-batched data with "chunk_data"
|
||||
_ = self.pool.map(func, chunks, chunksize=1)
|
||||
|
||||
def starmap(
|
||||
self,
|
||||
func: Callable[[Any], None],
|
||||
chunks: Iterable[tuple[Any, ...]],
|
||||
) -> None:
|
||||
# assumes pre-batched data with "chunk_data"
|
||||
_ = self.pool.starmap(func, chunks, chunksize=1)
|
||||
|
||||
def close(self) -> None:
|
||||
self.pool.close()
|
||||
self.pool.join()
|
||||
|
||||
def terminate(self) -> None:
|
||||
self.pool.terminate()
|
||||
|
||||
@ -5,7 +5,7 @@ import time
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
|
||||
from KSG_anomaly_detection import config, config_for_test
|
||||
from KSG_anomaly_detection import config, config_for_test, delegator
|
||||
from KSG_anomaly_detection.preparation import Preparation
|
||||
from KSG_anomaly_detection.window_manager import WindowManager
|
||||
|
||||
@ -126,7 +126,7 @@ def monitor_folder(manager: WindowManager):
|
||||
time.sleep(60)
|
||||
|
||||
|
||||
def monitor_folder_simple(use_new: bool):
|
||||
def monitor_folder_simple(mp_pool: delegator.MPPool, use_new: bool, use_mp: bool):
|
||||
print("starting procedure...")
|
||||
|
||||
for folder in config_for_test.FOLDER_LIST:
|
||||
@ -156,9 +156,9 @@ def monitor_folder_simple(use_new: bool):
|
||||
continue # zu nächstem neuen folder springen
|
||||
|
||||
# Aufgabe 3: check_img im Originalordner anpassen (d. h. gelbe Farbe: work in progress)
|
||||
print("'change_image_to_yellow'...")
|
||||
SKIP_NEXT = True
|
||||
SKIP_NEXT = False
|
||||
if not SKIP_NEXT:
|
||||
print("'change_image_to_yellow'...")
|
||||
if use_new:
|
||||
preparation.change_image_to_yellow_new()
|
||||
else:
|
||||
@ -168,9 +168,9 @@ def monitor_folder_simple(use_new: bool):
|
||||
|
||||
# Aufgabe 4: AOI-Bilder in RGB überführen und zwischenspeichern
|
||||
# wir erhalten hier den Speicherort sowie ggf. Fehlermeldungen zurück
|
||||
print("'create_rgb_images_and_patches'...")
|
||||
SKIP_NEXT = False
|
||||
if not SKIP_NEXT:
|
||||
if not use_mp and not SKIP_NEXT:
|
||||
print("'create_rgb_images_and_patches'...")
|
||||
if use_new:
|
||||
current_folder, result = (
|
||||
preparation.create_rgb_images_and_patches_new()
|
||||
@ -178,6 +178,16 @@ def monitor_folder_simple(use_new: bool):
|
||||
else:
|
||||
current_folder, result = preparation.create_rgb_images_and_patches()
|
||||
|
||||
SKIP_NEXT = False
|
||||
if use_mp and not SKIP_NEXT:
|
||||
print("'create_rgb_images_and_patches' multiprocessing...")
|
||||
if use_new:
|
||||
current_folder, result = (
|
||||
preparation.create_rgb_images_and_patches_new2(mp_pool)
|
||||
)
|
||||
else:
|
||||
current_folder, result = preparation.create_rgb_images_and_patches()
|
||||
|
||||
print("finished routine")
|
||||
|
||||
if result is not None:
|
||||
|
||||
@ -1,17 +1,19 @@
|
||||
import multiprocessing
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
from pprint import pprint
|
||||
from shutil import copytree
|
||||
from typing import Literal, cast
|
||||
from typing import Literal, TypeAlias, cast
|
||||
|
||||
import pyvips
|
||||
from PIL import Image
|
||||
from pyvips import Image as vipsImage
|
||||
|
||||
from KSG_anomaly_detection import config
|
||||
from KSG_anomaly_detection import config, delegator
|
||||
|
||||
Image.MAX_IMAGE_PIXELS = None
|
||||
COLOUR_ASSIGNMENT = {"R": [255, 0, 0], "G": [0, 255, 0], "B": [0, 0, 0]}
|
||||
@ -334,10 +336,10 @@ class Preparation:
|
||||
)
|
||||
# print(f">>> {checkimg_folders=}")
|
||||
|
||||
images = tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg"))
|
||||
print(f">>> {len(images)=}")
|
||||
|
||||
sys.exit(0)
|
||||
# images = tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg"))
|
||||
# print(f">>> {len(images)=}")
|
||||
# pprint(images)
|
||||
# sys.exit(0)
|
||||
|
||||
# iterate through all 'checkimg' folders recursively
|
||||
for checkimg_folder in checkimg_folders:
|
||||
@ -379,3 +381,66 @@ class Preparation:
|
||||
rgb_image.write_to_file(save_path_rgb / filename)
|
||||
|
||||
return "folder_name", None
|
||||
|
||||
def create_rgb_images_and_patches_new2(self, pool: delegator.MPPool):
|
||||
# in the folders of interest, we iterate over all images and search for the three that belong together
|
||||
# (because in advance we do not know how many there are)
|
||||
|
||||
# create folder name in our temp folder "Backup" and store it
|
||||
# therefore, first extract the name of the current folder from the whole path
|
||||
rgb_saving_path = cast(Path, Path(config.CURRENT_PATH_RGB) / self.path.name)
|
||||
|
||||
try:
|
||||
rgb_saving_path.mkdir(parents=True, exist_ok=False)
|
||||
except FileExistsError:
|
||||
return (
|
||||
None,
|
||||
f"Fehlermeldung: Ordner {Path(self.folder_path).parts[-1]} existiert bereits.",
|
||||
)
|
||||
except Exception as e:
|
||||
return None, f"Fehlermeldung: {e}"
|
||||
|
||||
images = cast(
|
||||
tuple[Path, ...], tuple(self.original_data_path.rglob("checkimg/R_NG*_*.jpg"))
|
||||
)
|
||||
images = pool.enrich_data_funcargs(images, rgb_saving_path)
|
||||
chunks = pool.chunk_data(images)
|
||||
# these are all images which must be processed
|
||||
pool.map(transform_to_rgb, chunks)
|
||||
|
||||
return "folder_name", None
|
||||
|
||||
|
||||
def transform_to_rgb(
|
||||
files: Iterable[tuple[Path, Path]],
|
||||
) -> None:
|
||||
# iterable contains path to image file and the base saving path
|
||||
# for RGB images
|
||||
# saving_path is "new_folder_path" from above
|
||||
# must be included in function call
|
||||
for image, saving_path in files:
|
||||
relative_path = image.parts[-3:-1]
|
||||
save_path_rgb = saving_path.joinpath(*relative_path)
|
||||
|
||||
save_path_rgb.mkdir(parents=True, exist_ok=True)
|
||||
base_folder = image.parent
|
||||
assert base_folder.is_dir(), "base folder of image not a directory"
|
||||
|
||||
match = re.match(r"R_NG(\d+)_(\d+)\.jpg$", image.name)
|
||||
if not match:
|
||||
continue
|
||||
num1, num2 = match.groups()
|
||||
|
||||
# find all three images belonging together
|
||||
r_path = image
|
||||
g_path = base_folder / f"G_NG{num1}_{num2}.jpg"
|
||||
b_path = base_folder / f"B_NG{num1}_{num2}.jpg"
|
||||
|
||||
# open all three images and combine them to RGB
|
||||
r = pyvips.Image.new_from_file(r_path, access="sequential")
|
||||
g = pyvips.Image.new_from_file(g_path, access="sequential")
|
||||
b = pyvips.Image.new_from_file(b_path, access="sequential")
|
||||
rgb_image = r.bandjoin([g, b]) # type: ignore
|
||||
rgb_image = rgb_image.copy(interpretation="srgb")
|
||||
filename = f"RGB_NG{num1}_{num2}.png"
|
||||
rgb_image.write_to_file(save_path_rgb / filename)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user