# %% from __future__ import annotations import sys from collections.abc import Collection, Iterable from dataclasses import InitVar, dataclass, field, fields, is_dataclass from pathlib import Path from typing import Any, Generic, Literal, Never, TypeVar import pyvips T = TypeVar("T") # %% BASE_PATH = Path( r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1" ) assert BASE_PATH.exists() TMP_DATA_PATH = Path( r"B:\projects\KSG\Ordnerstruktur\Daten" ) # needed to find generated RGB images assert TMP_DATA_PATH.exists() assert TMP_DATA_PATH.is_dir() # %% ngt_folder = Path( r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1\614706_helles Entek[3136761]_1_A_1\ngpointdata" ) assert ngt_folder.exists() # %% @dataclass(slots=True) class NgtLine: e1: int e2: int e3: int e4: int e5: int e6: int e7: int e8: int e9: int e10: int e11: int @dataclass(slots=True) class NgtDoc: _lines: InitVar[Iterable[NgtLine]] file: Path rgb_img: Path lines: tuple[NgtLine, ...] = field(init=False) def __post_init__( self, _lines: Iterable[NgtLine], ) -> None: self.lines = tuple(_lines) @property def len(self) -> int: return len(self.lines) @dataclass(slots=True) class NgtDocs: _docs: InitVar[Iterable[NgtDoc]] docs: tuple[NgtDoc, ...] = field(init=False) total_lines: int = field(init=False) def __post_init__( self, _lines: Iterable[NgtDoc], ) -> None: self.docs = tuple(_lines) self.total_lines = sum(doc.len for doc in self.docs) @property def len(self) -> int: return len(self.docs) def get_ngt_doc( ngt_file: Path, ) -> NgtDoc: ngt_lines: list[NgtLine] = [] with open(ngt_file, "r", encoding="utf-8") as f: for line_no, line in enumerate(f): if line_no in (0, 1): continue contents = (int(e) for e in line.strip("\n ").split(",")) ngt_line = NgtLine(*contents) ngt_lines.append(ngt_line) rgb_img_folder = TMP_DATA_PATH.joinpath(*ngt_file.parts[-4:-2], "checkimg") assert rgb_img_folder.exists() rgb_img_path = tuple(rgb_img_folder.glob(f"*{ngt_file.stem}*"))[0] assert rgb_img_path.exists() return NgtDoc(ngt_lines, ngt_file, rgb_img_path) def get_all_ngt_docs( package_path: Path, ) -> NgtDocs: parsed_docs: list[NgtDoc] = [] for ngt_file_path in package_path.rglob("ngpointdata/*.ngt"): ngt_doc = get_ngt_doc(ngt_file_path) parsed_docs.append(ngt_doc) return NgtDocs(parsed_docs) class Chunk(Generic[T]): def __init__( self, capacity: int, overfill_once: bool = True, cap_property: str | None = None, ) -> None: self.capacity = capacity self.overfill = overfill_once self.capacity_left: int = capacity self.capacity_held: int = 0 self.full: bool = False self._contents: list[T] = [] self.cap_property = cap_property def __repr__(self) -> str: return self._contents.__repr__() def __str__(self) -> str: return self._contents.__str__() @property def contents(self) -> list[T]: return self._contents def __len__(self) -> int: return len(self._contents) def __getitem__( self, idx: int, ) -> T: return self._contents[idx] def __raise_for_exceeded_capacity(self) -> Never: raise ValueError("Item can not be added due to capacity limit.") def _capacity_sufficient( self, size_to_add: int, ) -> bool: if self.full: return False new_size = len(self) + size_to_add if self.overfill and (new_size > self.capacity): return True elif new_size > self.capacity: return False else: return True def _recalc_capacity( self, size_to_add: int, ) -> None: self.capacity_held += size_to_add self.capacity_left = max(self.capacity_left - size_to_add, 0) if self.capacity_left == 0: self.full = True else: self.full = False def append( self, object_: T, ) -> None: size_to_add: int = 1 if self.cap_property is not None: if not hasattr(object_, self.cap_property): raise AttributeError("Object does not posses the wanted property") attr = getattr(object_, self.cap_property) if not isinstance(attr, int): raise TypeError("Capacity property must be an integer") size_to_add = attr if self._capacity_sufficient(size_to_add): self._contents.append(object_) self._recalc_capacity(size_to_add) else: self.__raise_for_exceeded_capacity() def extend( self, collection: Collection[T], ) -> None: size_to_add: int = len(collection) if self.cap_property is not None: if not all(hasattr(object_, self.cap_property) for object_ in collection): raise AttributeError("Object does not posses the wanted property") if not all( isinstance(getattr(object_, self.cap_property), int) for object_ in collection ): raise TypeError("Capacity property must be an integer") size_to_add = sum(getattr(object_, self.cap_property) for object_ in collection) if self._capacity_sufficient(size_to_add): self._contents.extend(collection) self._recalc_capacity(size_to_add) else: self.__raise_for_exceeded_capacity() def remove( self, object_: T, ) -> None: size_to_add: int = 1 if self.cap_property is not None: if not hasattr(object_, self.cap_property): raise AttributeError("Object does not posses the wanted property") attr = getattr(object_, self.cap_property) if not isinstance(attr, int): raise TypeError("Capacity property must be an integer") size_to_add = attr size_to_remove = (-1) * size_to_add self._contents.remove(object_) self._recalc_capacity(size_to_remove) def get_deep_size( obj: Any, seen: set[int] | None = None, unit: Literal["b", "kb", "mb", "gb"] = "b", ): if seen is None: seen = set() obj_id = id(obj) if obj_id in seen: return 0 seen.add(obj_id) size = sys.getsizeof(obj) if is_dataclass(obj): for f in fields(obj): size += get_deep_size(getattr(obj, f.name), seen) elif isinstance(obj, dict): size += sum(get_deep_size(v, seen) + get_deep_size(k, seen) for k, v in obj.items()) elif isinstance(obj, (list, tuple, set)): size += sum(get_deep_size(i, seen) for i in obj) match unit: case "kb": size /= 1024 case "mb": size /= 1024 * 1024 case "gb": size /= 1024 * 1024 * 1024 return size def build_chunks( ngt_docs: NgtDocs, num_chunks: int, ) -> list[Chunk[NgtDoc]]: sorted_docs = sorted(ngt_docs.docs, key=lambda x: x.len, reverse=True) chunks: list[Chunk[NgtDoc]] = [] for _ in range(num_chunks): chunks.append(Chunk(int(1e8), overfill_once=True, cap_property="len")) for doc in sorted_docs: chunks = sorted(chunks, key=lambda x: x.capacity_held) chunks[0].append(doc) return chunks # %% ngt_docs = get_all_ngt_docs(BASE_PATH) get_deep_size(ngt_docs, None, "mb") # %% ngt_docs.len # %% ngt_docs.total_lines # %% NUM_WORKERS = 4 chunks = build_chunks(ngt_docs, NUM_WORKERS) # %% for c in chunks: print(f"Cap hold: {c.capacity_held:<5} Cap left: {c.capacity_left:<10}") # %% # convert golden master # PRODUCT_PATH = BASE_PATH.parent # PRODUCT_PATH # r_paths = tuple(PRODUCT_PATH.rglob("curdata*/**/*_R.jpg")) # for r_pth in r_paths: # base, _ = r_pth.stem.split("_") # g_pth = r_pth.parent / (base + "_G.jpg") # b_pth = r_pth.parent / (base + "_B.jpg") # r = pyvips.Image.new_from_file(r_pth, access="sequential") # g = pyvips.Image.new_from_file(g_pth, access="sequential") # b = pyvips.Image.new_from_file(b_pth, access="sequential") # rgb_image = r.bandjoin([g, b]) # type: ignore # rgb_image = rgb_image.copy(interpretation="srgb") # filename = f"{base}_RGB.png" # rgb_image.write_to_file(r_pth.parent / filename) # %% def process_image_dummy( package_folder: Path, chunk: Chunk[NgtDoc], ) -> None: # load golden master images # Package folder is the root folder of the current package. # Make sure to load RGB golden master images from the correct location. # These images have to be created upfront. # Here, assume all images are already converted to RGB. PRODUCT_PATH = package_folder.parent print(PRODUCT_PATH) CAM_MAPPING: dict[str, str] = { "A_1": "camera1", "A_2": "camera2", "B_1": "camera3", "B_2": "camera4", } orig_imgs: dict[str, pyvips.Image] = {} for golden_master in PRODUCT_PATH.rglob("curdata*/**/*_RGB.png"): cam_name = golden_master.parent.name orig_imgs[cam_name] = pyvips.Image.new_from_file(golden_master) # type: ignore print("Orig Images: ", orig_imgs) idx = 0 # A chunk consists of "NgtDoc" dataclasses which represent one NGT file each. Definition: # see above. The first two lines in each doc are skipped and not contained. # We are operating on a per-file basis. This ensures there are no data races when # using multiple processes. # iterate through all documents for doc in chunk: if idx == 1: # ignore: early break condition break # ** load patch image (image containing the alleged errors) # image-doc pairs (NGT file <-> image) img = pyvips.Image.new_from_file(doc.rgb_img) # identify camera ID ("A_1", "A_2", "B_1", "B_2") cam_id = doc.rgb_img.parts[-3][-3:] # use corresponding golden master golden_master = orig_imgs[CAM_MAPPING[cam_id]] idx += 1 # ignore: early break condition # Each document "NgtDoc" contains its content as a collection of "NgtLine" # objects (definition above as well). Each value in the line (comma separated) is # accessible as property of this line object. for line in doc.lines: # iterate through all lines break # ignore: early break condition # ** Pipeline steps outlined in the initial presentation by Susanne # extract_values: read the coordinates (can be obtained as properties from the # "line" class) # find_patch: -- identify correct model (and correct cam) # ** identified model needed for management of inference process # extract_GM_patch: extract patch from golden master (var: golden_master) # find_AOI_image_area: extract patch from error area (var: img) # insert_AOI_image_area: insert the patch into the golden master # (var: golden_master), but use a new reference to keep the references to # the original golden masters alive # !! Now save this image somewhere # !! Caution: The images can be very large. If we process file after file # !! without deleting it, we could unneccessarily fill the disk or memory. # ** Recommendation: Use a shared qeue (see below) with a capacity and put # ** jobs in a blocking manner to interrupt the feeder process (this function) # Example: # queue = mp.Queue(20) -- a queue with a capacity of 20 jobs # queue.put(JOB, block=True) -- place job in queue, block if no capacity left # // eval image # should use a queue with one central inference process # ?? question: Can all models be loaded at once? Is the VRAM large enough? # background: We can reduce loading times if the models are loaded once at # startup. We then just queue inference jobs from this worker function with # an identifier for the relevant model. In the inference process a lookup is # performed for this model, the reference obtained and inference started. # Storage of results can be done in the inference worker or by a separate # thread/process. ... # %% BASE_PATH # %% process_image_dummy(BASE_PATH, chunks[0]) # %%