generated from dopt-python/py311
411 lines
12 KiB
Python
411 lines
12 KiB
Python
# %%
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from collections.abc import Collection, Iterable
|
|
from dataclasses import InitVar, dataclass, field, fields, is_dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Generic, Literal, Never, TypeVar
|
|
|
|
import pyvips
|
|
|
|
T = TypeVar("T")
|
|
|
|
# %%
|
|
BASE_PATH = Path(
|
|
r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1"
|
|
)
|
|
assert BASE_PATH.exists()
|
|
|
|
TMP_DATA_PATH = Path(
|
|
r"B:\projects\KSG\Ordnerstruktur\Daten"
|
|
) # needed to find generated RGB images
|
|
assert TMP_DATA_PATH.exists()
|
|
assert TMP_DATA_PATH.is_dir()
|
|
# %%
|
|
ngt_folder = Path(
|
|
r"B:\projects\KSG\Ordnerstruktur\Verifizierdaten_1\20260225\614706_helles Entek\614706_helles Entek[3136761]_1\614706_helles Entek[3136761]_1_A_1\ngpointdata"
|
|
)
|
|
assert ngt_folder.exists()
|
|
|
|
|
|
# %%
|
|
@dataclass(slots=True)
|
|
class NgtLine:
|
|
e1: int
|
|
e2: int
|
|
e3: int
|
|
e4: int
|
|
e5: int
|
|
e6: int
|
|
e7: int
|
|
e8: int
|
|
e9: int
|
|
e10: int
|
|
e11: int
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class NgtDoc:
|
|
_lines: InitVar[Iterable[NgtLine]]
|
|
file: Path
|
|
rgb_img: Path
|
|
lines: tuple[NgtLine, ...] = field(init=False)
|
|
|
|
def __post_init__(
|
|
self,
|
|
_lines: Iterable[NgtLine],
|
|
) -> None:
|
|
self.lines = tuple(_lines)
|
|
|
|
@property
|
|
def len(self) -> int:
|
|
return len(self.lines)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class NgtDocs:
|
|
_docs: InitVar[Iterable[NgtDoc]]
|
|
docs: tuple[NgtDoc, ...] = field(init=False)
|
|
total_lines: int = field(init=False)
|
|
|
|
def __post_init__(
|
|
self,
|
|
_docs: Iterable[NgtDoc],
|
|
) -> None:
|
|
self.docs = tuple(_docs)
|
|
self.total_lines = sum(doc.len for doc in self.docs)
|
|
|
|
@property
|
|
def len(self) -> int:
|
|
return len(self.docs)
|
|
|
|
|
|
def get_ngt_doc(
|
|
ngt_file: Path,
|
|
) -> NgtDoc:
|
|
ngt_lines: list[NgtLine] = []
|
|
with open(ngt_file, "r", encoding="utf-8") as f:
|
|
for line_no, line in enumerate(f):
|
|
if line_no in (0, 1):
|
|
continue
|
|
contents = (int(e) for e in line.strip("\n ").split(","))
|
|
ngt_line = NgtLine(*contents)
|
|
ngt_lines.append(ngt_line)
|
|
|
|
rgb_img_folder = TMP_DATA_PATH.joinpath(*ngt_file.parts[-4:-2], "checkimg")
|
|
assert rgb_img_folder.exists()
|
|
rgb_img_path = tuple(rgb_img_folder.glob(f"*{ngt_file.stem}*"))[0]
|
|
assert rgb_img_path.exists()
|
|
|
|
return NgtDoc(ngt_lines, ngt_file, rgb_img_path)
|
|
|
|
|
|
def get_all_ngt_docs(
|
|
package_path: Path,
|
|
) -> NgtDocs:
|
|
parsed_docs: list[NgtDoc] = []
|
|
for ngt_file_path in package_path.rglob("ngpointdata/*.ngt"):
|
|
ngt_doc = get_ngt_doc(ngt_file_path)
|
|
|
|
parsed_docs.append(ngt_doc)
|
|
|
|
return NgtDocs(parsed_docs)
|
|
|
|
|
|
class Chunk(Generic[T]):
|
|
def __init__(
|
|
self,
|
|
capacity: int,
|
|
overfill_once: bool = True,
|
|
cap_property: str | None = None,
|
|
) -> None:
|
|
self.capacity = capacity
|
|
self.overfill = overfill_once
|
|
self.capacity_left: int = capacity
|
|
self.capacity_held: int = 0
|
|
self.full: bool = False
|
|
self._contents: list[T] = []
|
|
self.cap_property = cap_property
|
|
|
|
def __repr__(self) -> str:
|
|
return self._contents.__repr__()
|
|
|
|
def __str__(self) -> str:
|
|
return self._contents.__str__()
|
|
|
|
@property
|
|
def contents(self) -> list[T]:
|
|
return self._contents
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._contents)
|
|
|
|
def __getitem__(
|
|
self,
|
|
idx: int,
|
|
) -> T:
|
|
return self._contents[idx]
|
|
|
|
def __raise_for_exceeded_capacity(self) -> Never:
|
|
raise ValueError("Item can not be added due to capacity limit.")
|
|
|
|
def _capacity_sufficient(
|
|
self,
|
|
size_to_add: int,
|
|
) -> bool:
|
|
if self.full:
|
|
return False
|
|
|
|
new_size = len(self) + size_to_add
|
|
|
|
if self.overfill and (new_size > self.capacity):
|
|
return True
|
|
elif new_size > self.capacity:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def _recalc_capacity(
|
|
self,
|
|
size_to_add: int,
|
|
) -> None:
|
|
self.capacity_held += size_to_add
|
|
self.capacity_left = max(self.capacity_left - size_to_add, 0)
|
|
if self.capacity_left == 0:
|
|
self.full = True
|
|
else:
|
|
self.full = False
|
|
|
|
def append(
|
|
self,
|
|
object_: T,
|
|
) -> None:
|
|
size_to_add: int = 1
|
|
if self.cap_property is not None:
|
|
if not hasattr(object_, self.cap_property):
|
|
raise AttributeError("Object does not posses the wanted property")
|
|
attr = getattr(object_, self.cap_property)
|
|
if not isinstance(attr, int):
|
|
raise TypeError("Capacity property must be an integer")
|
|
size_to_add = attr
|
|
|
|
if self._capacity_sufficient(size_to_add):
|
|
self._contents.append(object_)
|
|
self._recalc_capacity(size_to_add)
|
|
else:
|
|
self.__raise_for_exceeded_capacity()
|
|
|
|
def extend(
|
|
self,
|
|
collection: Collection[T],
|
|
) -> None:
|
|
size_to_add: int = len(collection)
|
|
if self.cap_property is not None:
|
|
if not all(hasattr(object_, self.cap_property) for object_ in collection):
|
|
raise AttributeError("Object does not posses the wanted property")
|
|
if not all(
|
|
isinstance(getattr(object_, self.cap_property), int) for object_ in collection
|
|
):
|
|
raise TypeError("Capacity property must be an integer")
|
|
size_to_add = sum(getattr(object_, self.cap_property) for object_ in collection)
|
|
|
|
if self._capacity_sufficient(size_to_add):
|
|
self._contents.extend(collection)
|
|
self._recalc_capacity(size_to_add)
|
|
else:
|
|
self.__raise_for_exceeded_capacity()
|
|
|
|
def remove(
|
|
self,
|
|
object_: T,
|
|
) -> None:
|
|
size_to_add: int = 1
|
|
if self.cap_property is not None:
|
|
if not hasattr(object_, self.cap_property):
|
|
raise AttributeError("Object does not posses the wanted property")
|
|
attr = getattr(object_, self.cap_property)
|
|
if not isinstance(attr, int):
|
|
raise TypeError("Capacity property must be an integer")
|
|
size_to_add = attr
|
|
size_to_remove = (-1) * size_to_add
|
|
|
|
self._contents.remove(object_)
|
|
self._recalc_capacity(size_to_remove)
|
|
|
|
|
|
def get_deep_size(
|
|
obj: Any,
|
|
seen: set[int] | None = None,
|
|
unit: Literal["b", "kb", "mb", "gb"] = "b",
|
|
):
|
|
if seen is None:
|
|
seen = set()
|
|
|
|
obj_id = id(obj)
|
|
if obj_id in seen:
|
|
return 0
|
|
seen.add(obj_id)
|
|
size = sys.getsizeof(obj)
|
|
|
|
if is_dataclass(obj):
|
|
for f in fields(obj):
|
|
size += get_deep_size(getattr(obj, f.name), seen)
|
|
elif isinstance(obj, dict):
|
|
size += sum(get_deep_size(v, seen) + get_deep_size(k, seen) for k, v in obj.items())
|
|
elif isinstance(obj, (list, tuple, set)):
|
|
size += sum(get_deep_size(i, seen) for i in obj)
|
|
|
|
match unit:
|
|
case "kb":
|
|
size /= 1024
|
|
case "mb":
|
|
size /= 1024 * 1024
|
|
case "gb":
|
|
size /= 1024 * 1024 * 1024
|
|
|
|
return size
|
|
|
|
|
|
def build_chunks(
|
|
ngt_docs: NgtDocs,
|
|
num_chunks: int,
|
|
) -> list[Chunk[NgtDoc]]:
|
|
sorted_docs = sorted(ngt_docs.docs, key=lambda x: x.len, reverse=True)
|
|
|
|
chunks: list[Chunk[NgtDoc]] = []
|
|
for _ in range(num_chunks):
|
|
chunks.append(Chunk(int(1e8), overfill_once=True, cap_property="len"))
|
|
|
|
for doc in sorted_docs:
|
|
chunks = sorted(chunks, key=lambda x: x.capacity_held)
|
|
chunks[0].append(doc)
|
|
|
|
return chunks
|
|
|
|
|
|
# %%
|
|
ngt_docs = get_all_ngt_docs(BASE_PATH)
|
|
get_deep_size(ngt_docs, None, "mb")
|
|
# %%
|
|
ngt_docs.len
|
|
# %%
|
|
ngt_docs.total_lines
|
|
# %%
|
|
NUM_WORKERS = 4
|
|
chunks = build_chunks(ngt_docs, NUM_WORKERS)
|
|
# %%
|
|
for c in chunks:
|
|
print(f"Cap hold: {c.capacity_held:<5} Cap left: {c.capacity_left:<10}")
|
|
|
|
# %%
|
|
# convert golden master
|
|
# PRODUCT_PATH = BASE_PATH.parent
|
|
# PRODUCT_PATH
|
|
|
|
# r_paths = tuple(PRODUCT_PATH.rglob("curdata*/**/*_R.jpg"))
|
|
|
|
# for r_pth in r_paths:
|
|
# base, _ = r_pth.stem.split("_")
|
|
# g_pth = r_pth.parent / (base + "_G.jpg")
|
|
# b_pth = r_pth.parent / (base + "_B.jpg")
|
|
|
|
# r = pyvips.Image.new_from_file(r_pth, access="sequential")
|
|
# g = pyvips.Image.new_from_file(g_pth, access="sequential")
|
|
# b = pyvips.Image.new_from_file(b_pth, access="sequential")
|
|
# rgb_image = r.bandjoin([g, b]) # type: ignore
|
|
# rgb_image = rgb_image.copy(interpretation="srgb")
|
|
# filename = f"{base}_RGB.png"
|
|
# rgb_image.write_to_file(r_pth.parent / filename)
|
|
|
|
|
|
# %%
|
|
def process_image_dummy(
|
|
package_folder: Path,
|
|
chunk: Chunk[NgtDoc],
|
|
) -> None:
|
|
# load golden master images
|
|
# Package folder is the root folder of the current package.
|
|
# Make sure to load RGB golden master images from the correct location.
|
|
# These images have to be created upfront.
|
|
# Here, assume all images are already converted to RGB.
|
|
PRODUCT_PATH = package_folder.parent
|
|
print(PRODUCT_PATH)
|
|
CAM_MAPPING: dict[str, str] = {
|
|
"A_1": "camera1",
|
|
"A_2": "camera2",
|
|
"B_1": "camera3",
|
|
"B_2": "camera4",
|
|
}
|
|
orig_imgs: dict[str, pyvips.Image] = {}
|
|
|
|
for golden_master in PRODUCT_PATH.rglob("curdata*/**/*_RGB.png"):
|
|
cam_name = golden_master.parent.name
|
|
orig_imgs[cam_name] = pyvips.Image.new_from_file(golden_master) # type: ignore
|
|
|
|
print("Orig Images: ", orig_imgs)
|
|
|
|
idx = 0
|
|
# A chunk consists of "NgtDoc" dataclasses which represent one NGT file each. Definition:
|
|
# see above. The first two lines in each doc are skipped and not contained.
|
|
# We are operating on a per-file basis. This ensures there are no data races when
|
|
# using multiple processes.
|
|
# iterate through all documents
|
|
for doc in chunk:
|
|
if idx == 1: # ignore: early break condition
|
|
break
|
|
# ** load patch image (image containing the alleged errors)
|
|
# image-doc pairs (NGT file <-> image)
|
|
img = pyvips.Image.new_from_file(doc.rgb_img)
|
|
# identify camera ID ("A_1", "A_2", "B_1", "B_2")
|
|
cam_id = doc.rgb_img.parts[-3][-3:]
|
|
# use corresponding golden master
|
|
golden_master = orig_imgs[CAM_MAPPING[cam_id]]
|
|
|
|
idx += 1 # ignore: early break condition
|
|
|
|
# Each document "NgtDoc" contains its content as a collection of "NgtLine"
|
|
# objects (definition above as well). Each value in the line (comma separated) is
|
|
# accessible as property of this line object.
|
|
for line in doc.lines:
|
|
# iterate through all lines
|
|
break # ignore: early break condition
|
|
|
|
# ** Pipeline steps outlined in the initial presentation by Susanne
|
|
# extract_values: read the coordinates (can be obtained as properties from the
|
|
# "line" class)
|
|
# find_patch: -- identify correct model (and correct cam)
|
|
# ** identified model needed for management of inference process
|
|
# extract_GM_patch: extract patch from golden master (var: golden_master)
|
|
# find_AOI_image_area: extract patch from error area (var: img)
|
|
# insert_AOI_image_area: insert the patch into the golden master
|
|
# (var: golden_master), but use a new reference to keep the references to
|
|
# the original golden masters alive
|
|
|
|
# !! Now save this image somewhere
|
|
# !! Caution: The images can be very large. If we process file after file
|
|
# !! without deleting it, we could unneccessarily fill the disk or memory.
|
|
# ** Recommendation: Use a shared qeue (see below) with a capacity and put
|
|
# ** jobs in a blocking manner to interrupt the feeder process (this function)
|
|
# Example:
|
|
# queue = mp.Queue(20) -- a queue with a capacity of 20 jobs
|
|
# queue.put(JOB, block=True) -- place job in queue, block if no capacity left
|
|
|
|
# // eval image
|
|
# should use a queue with one central inference process
|
|
# ?? question: Can all models be loaded at once? Is the VRAM large enough?
|
|
# background: We can reduce loading times if the models are loaded once at
|
|
# startup. We then just queue inference jobs from this worker function with
|
|
# an identifier for the relevant model. In the inference process a lookup is
|
|
# performed for this model, the reference obtained and inference started.
|
|
# Storage of results can be done in the inference worker or by a separate
|
|
# thread/process.
|
|
...
|
|
|
|
|
|
# %%
|
|
BASE_PATH
|
|
# %%
|
|
process_image_dummy(BASE_PATH, chunks[0])
|
|
# %%
|