diff --git a/parallelisation/02-1_ngt_points.py b/parallelisation/02-1_ngt_points.py new file mode 100644 index 0000000..ac8b35a --- /dev/null +++ b/parallelisation/02-1_ngt_points.py @@ -0,0 +1,410 @@ +# %% +from __future__ import annotations + +import sys +from collections.abc import Collection, Iterable +from dataclasses import InitVar, dataclass, field, fields, is_dataclass +from pathlib import Path +from typing import Any, Generic, Literal, Never, TypeVar + +import pyvips + +T = TypeVar("T") + +# %% +BASE_PATH = Path( + r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1" +) +assert BASE_PATH.exists() + +TMP_DATA_PATH = Path( + r"B:\projects\KSG\Ordnerstruktur\Daten" +) # needed to find generated RGB images +assert TMP_DATA_PATH.exists() +assert TMP_DATA_PATH.is_dir() +# %% +ngt_folder = Path( + r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1\614706_helles Entek[3136761]_1_A_1\ngpointdata" +) +assert ngt_folder.exists() + + +# %% +@dataclass(slots=True) +class NgtLine: + e1: int + e2: int + e3: int + e4: int + e5: int + e6: int + e7: int + e8: int + e9: int + e10: int + e11: int + + +@dataclass(slots=True) +class NgtDoc: + _lines: InitVar[Iterable[NgtLine]] + file: Path + rgb_img: Path + lines: tuple[NgtLine, ...] = field(init=False) + + def __post_init__( + self, + _lines: Iterable[NgtLine], + ) -> None: + self.lines = tuple(_lines) + + @property + def len(self) -> int: + return len(self.lines) + + +@dataclass(slots=True) +class NgtDocs: + _docs: InitVar[Iterable[NgtDoc]] + docs: tuple[NgtDoc, ...] = field(init=False) + total_lines: int = field(init=False) + + def __post_init__( + self, + _lines: Iterable[NgtDoc], + ) -> None: + self.docs = tuple(_lines) + self.total_lines = sum(doc.len for doc in self.docs) + + @property + def len(self) -> int: + return len(self.docs) + + +def get_ngt_doc( + ngt_file: Path, +) -> NgtDoc: + ngt_lines: list[NgtLine] = [] + with open(ngt_file, "r", encoding="utf-8") as f: + for line_no, line in enumerate(f): + if line_no in (0, 1): + continue + contents = (int(e) for e in line.strip("\n ").split(",")) + ngt_line = NgtLine(*contents) + ngt_lines.append(ngt_line) + + rgb_img_folder = TMP_DATA_PATH.joinpath(*ngt_file.parts[-4:-2], "checkimg") + assert rgb_img_folder.exists() + rgb_img_path = tuple(rgb_img_folder.glob(f"*{ngt_file.stem}*"))[0] + assert rgb_img_path.exists() + + return NgtDoc(ngt_lines, ngt_file, rgb_img_path) + + +def get_all_ngt_docs( + package_path: Path, +) -> NgtDocs: + parsed_docs: list[NgtDoc] = [] + for ngt_file_path in package_path.rglob("ngpointdata/*.ngt"): + ngt_doc = get_ngt_doc(ngt_file_path) + + parsed_docs.append(ngt_doc) + + return NgtDocs(parsed_docs) + + +class Chunk(Generic[T]): + def __init__( + self, + capacity: int, + overfill_once: bool = True, + cap_property: str | None = None, + ) -> None: + self.capacity = capacity + self.overfill = overfill_once + self.capacity_left: int = capacity + self.capacity_held: int = 0 + self.full: bool = False + self._contents: list[T] = [] + self.cap_property = cap_property + + def __repr__(self) -> str: + return self._contents.__repr__() + + def __str__(self) -> str: + return self._contents.__str__() + + @property + def contents(self) -> list[T]: + return self._contents + + def __len__(self) -> int: + return len(self._contents) + + def __getitem__( + self, + idx: int, + ) -> T: + return self._contents[idx] + + def __raise_for_exceeded_capacity(self) -> Never: + raise ValueError("Item can not be added due to capacity limit.") + + def _capacity_sufficient( + self, + size_to_add: int, + ) -> bool: + if self.full: + return False + + new_size = len(self) + size_to_add + + if self.overfill and (new_size > self.capacity): + return True + elif new_size > self.capacity: + return False + else: + return True + + def _recalc_capacity( + self, + size_to_add: int, + ) -> None: + self.capacity_held += size_to_add + self.capacity_left = max(self.capacity_left - size_to_add, 0) + if self.capacity_left == 0: + self.full = True + else: + self.full = False + + def append( + self, + object_: T, + ) -> None: + size_to_add: int = 1 + if self.cap_property is not None: + if not hasattr(object_, self.cap_property): + raise AttributeError("Object does not posses the wanted property") + attr = getattr(object_, self.cap_property) + if not isinstance(attr, int): + raise TypeError("Capacity property must be an integer") + size_to_add = attr + + if self._capacity_sufficient(size_to_add): + self._contents.append(object_) + self._recalc_capacity(size_to_add) + else: + self.__raise_for_exceeded_capacity() + + def extend( + self, + collection: Collection[T], + ) -> None: + size_to_add: int = len(collection) + if self.cap_property is not None: + if not all(hasattr(object_, self.cap_property) for object_ in collection): + raise AttributeError("Object does not posses the wanted property") + if not all( + isinstance(getattr(object_, self.cap_property), int) for object_ in collection + ): + raise TypeError("Capacity property must be an integer") + size_to_add = sum(getattr(object_, self.cap_property) for object_ in collection) + + if self._capacity_sufficient(size_to_add): + self._contents.extend(collection) + self._recalc_capacity(size_to_add) + else: + self.__raise_for_exceeded_capacity() + + def remove( + self, + object_: T, + ) -> None: + size_to_add: int = 1 + if self.cap_property is not None: + if not hasattr(object_, self.cap_property): + raise AttributeError("Object does not posses the wanted property") + attr = getattr(object_, self.cap_property) + if not isinstance(attr, int): + raise TypeError("Capacity property must be an integer") + size_to_add = attr + size_to_remove = (-1) * size_to_add + + self._contents.remove(object_) + self._recalc_capacity(size_to_remove) + + +def get_deep_size( + obj: Any, + seen: set[int] | None = None, + unit: Literal["b", "kb", "mb", "gb"] = "b", +): + if seen is None: + seen = set() + + obj_id = id(obj) + if obj_id in seen: + return 0 + seen.add(obj_id) + size = sys.getsizeof(obj) + + if is_dataclass(obj): + for f in fields(obj): + size += get_deep_size(getattr(obj, f.name), seen) + elif isinstance(obj, dict): + size += sum(get_deep_size(v, seen) + get_deep_size(k, seen) for k, v in obj.items()) + elif isinstance(obj, (list, tuple, set)): + size += sum(get_deep_size(i, seen) for i in obj) + + match unit: + case "kb": + size /= 1024 + case "mb": + size /= 1024 * 1024 + case "gb": + size /= 1024 * 1024 * 1024 + + return size + + +def build_chunks( + ngt_docs: NgtDocs, + num_chunks: int, +) -> list[Chunk[NgtDoc]]: + sorted_docs = sorted(ngt_docs.docs, key=lambda x: x.len, reverse=True) + + chunks: list[Chunk[NgtDoc]] = [] + for _ in range(num_chunks): + chunks.append(Chunk(int(1e8), overfill_once=True, cap_property="len")) + + for doc in sorted_docs: + chunks = sorted(chunks, key=lambda x: x.capacity_held) + chunks[0].append(doc) + + return chunks + + +# %% +ngt_docs = get_all_ngt_docs(BASE_PATH) +get_deep_size(ngt_docs, None, "mb") +# %% +ngt_docs.len +# %% +ngt_docs.total_lines +# %% +NUM_WORKERS = 4 +chunks = build_chunks(ngt_docs, NUM_WORKERS) +# %% +for c in chunks: + print(f"Cap hold: {c.capacity_held:<5} Cap left: {c.capacity_left:<10}") + +# %% +# convert golden master +# PRODUCT_PATH = BASE_PATH.parent +# PRODUCT_PATH + +# r_paths = tuple(PRODUCT_PATH.rglob("curdata*/**/*_R.jpg")) + +# for r_pth in r_paths: +# base, _ = r_pth.stem.split("_") +# g_pth = r_pth.parent / (base + "_G.jpg") +# b_pth = r_pth.parent / (base + "_B.jpg") + +# r = pyvips.Image.new_from_file(r_pth, access="sequential") +# g = pyvips.Image.new_from_file(g_pth, access="sequential") +# b = pyvips.Image.new_from_file(b_pth, access="sequential") +# rgb_image = r.bandjoin([g, b]) # type: ignore +# rgb_image = rgb_image.copy(interpretation="srgb") +# filename = f"{base}_RGB.png" +# rgb_image.write_to_file(r_pth.parent / filename) + + +# %% +def process_image_dummy( + package_folder: Path, + chunk: Chunk[NgtDoc], +) -> None: + # load golden master images + # Package folder is the root folder of the current package. + # Make sure to load RGB golden master images from the correct location. + # These images have to be created upfront. + # Here, assume all images are already converted to RGB. + PRODUCT_PATH = package_folder.parent + print(PRODUCT_PATH) + CAM_MAPPING: dict[str, str] = { + "A_1": "camera1", + "A_2": "camera2", + "B_1": "camera3", + "B_2": "camera4", + } + orig_imgs: dict[str, pyvips.Image] = {} + + for golden_master in PRODUCT_PATH.rglob("curdata*/**/*_RGB.png"): + cam_name = golden_master.parent.name + orig_imgs[cam_name] = pyvips.Image.new_from_file(golden_master) # type: ignore + + print("Orig Images: ", orig_imgs) + + idx = 0 + # A chunk consists of "NgtDoc" dataclasses which represent one NGT file each. Definition: + # see above. The first two lines in each doc are skipped and not contained. + # We are operating on a per-file basis. This ensures there are no data races when + # using multiple processes. + # iterate through all documents + for doc in chunk: + if idx == 1: # ignore: early break condition + break + # ** load patch image (image containing the alleged errors) + # image-doc pairs (NGT file <-> image) + img = pyvips.Image.new_from_file(doc.rgb_img) + # identify camera ID ("A_1", "A_2", "B_1", "B_2") + cam_id = doc.rgb_img.parts[-3][-3:] + # use corresponding golden master + golden_master = orig_imgs[CAM_MAPPING[cam_id]] + + idx += 1 # ignore: early break condition + + # Each document "NgtDoc" contains its content as a collection of "NgtLine" + # objects (definition above as well). Each value in the line (comma separated) is + # accessible as property of this line object. + for line in doc.lines: + # iterate through all lines + break # ignore: early break condition + + # ** Pipeline steps outlined in the initial presentation by Susanne + # extract_values: read the coordinates (can be obtained as properties from the + # "line" class) + # find_patch: -- identify correct model (and correct cam) + # ** identified model needed for management of inference process + # extract_GM_patch: extract patch from golden master (var: golden_master) + # find_AOI_image_area: extract patch from error area (var: img) + # insert_AOI_image_area: insert the patch into the golden master + # (var: golden_master), but use a new reference to keep the references to + # the original golden masters alive + + # !! Now save this image somewhere + # !! Caution: The images can be very large. If we process file after file + # !! without deleting it, we could unneccessarily fill the disk or memory. + # ** Recommendation: Use a shared qeue (see below) with a capacity and put + # ** jobs in a blocking manner to interrupt the feeder process (this function) + # Example: + # queue = mp.Queue(20) -- a queue with a capacity of 20 jobs + # queue.put(JOB, block=True) -- place job in queue, block if no capacity left + + # // eval image + # should use a queue with one central inference process + # ?? question: Can all models be loaded at once? Is the VRAM large enough? + # background: We can reduce loading times if the models are loaded once at + # startup. We then just queue inference jobs from this worker function with + # an identifier for the relevant model. In the inference process a lookup is + # performed for this model, the reference obtained and inference started. + # Storage of results can be done in the inference worker or by a separate + # thread/process. + ... + + +# %% +BASE_PATH +# %% +process_image_dummy(BASE_PATH, chunks[0]) +# %%