prepare temp data fusion for testing phase

This commit is contained in:
Florian Förster 2026-01-29 16:27:56 +01:00
parent f487781598
commit 9267f3da24

View File

@ -3,8 +3,11 @@ from __future__ import annotations
import datetime import datetime
import json import json
import shutil
import tempfile
import time import time
import typing import typing
import uuid
from collections.abc import Sequence from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from pprint import pprint from pprint import pprint
@ -19,10 +22,31 @@ from sqlalchemy import event
from umbreit import db, types from umbreit import db, types
# %% # %%
# import importlib # import importlib
# types = importlib.reload(types) # types = importlib.reload(types)
# db = importlib.reload(db) # db = importlib.reload(db)
# %%
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
TMP_DIR = create_tmp_dir()
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
print(f"Created temp directory under: >{TMP_DIR}<")
# %% # %%
p_cfg = io.search_file_iterative( p_cfg = io.search_file_iterative(
@ -404,6 +428,38 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys() db.EXT_DOPT_ERGEBNIS.columns.keys()
) )
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> tuple[pl.DataFrame, ...]:
all_dfs: list[pl.DataFrame] = []
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs.append(df)
return tuple(all_dfs)
def get_starting_date( def get_starting_date(
@ -842,8 +898,7 @@ def _wf100_petersen_sub1_wdb(
) -> PipelineResult: ) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0) # input: pre-filtered entries (WDB titles and #VM > 0)
# more then 1 VM # more than 1 VM: show these entries
# !! show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1 filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters( res = _apply_several_filters(
pipe_result.open, pipe_result.open,
@ -883,11 +938,16 @@ def _wf100_petersen_sub1_wdb(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1) .having(count_col > 1)
) )
# !! this is a sub result which muste be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
engine, engine,
) )
print(relevant_titles)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
) )
@ -1003,10 +1063,16 @@ def _wf200_sub1(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3) .having(unique_count_col >= 3)
) )
# !! this is a sub result which muste be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
engine, engine,
) )
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, None)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
) )
@ -1034,7 +1100,8 @@ def _wf200_sub1(
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
print(f"Number of entries: {len(df)}") print(f"Number of entries: {len(df)}")
clear_tmp_dir()
clear_result_data()
# %% # %%
df.head() df.head()
# %% # %%
@ -1052,42 +1119,6 @@ df.head()
# engine, # engine,
# ) # )
# print(relevant_titles) # print(relevant_titles)
# %%
# removed_rows = []
# raw_data = df.clone()
# print(f"Length raw data: {len(raw_data)}")
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
# filtered = raw_data.filter(filter_mandant)
# filtered_n = raw_data.filter(~filter_mandant)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# raw_data = filtered
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# # %%
# print("---------------------------------------")
# filtered = raw_data.filter(filter_ignore_MNR26)
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# len(filtered_n)
# # %%
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# %% # %%
raw_data = df.clone() raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data) # pipe_res = get_empty_pipeline_result(raw_data)
@ -1101,15 +1132,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %% # %%
pipe_res.results pipe_res.results
# %%
# // test result writing
res = pipe_res.results.clone()
res.height
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %% # %%
pipe_res = wf910(pipe_res) pipe_res = wf910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}") print(f"Length of base data: {len(raw_data):>18}")
@ -1117,8 +1139,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}") print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %% # %%
# pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}") print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries pipe data: {len(pipe_res):>10}")
@ -1146,6 +1166,13 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %% # %%
pipe_res.results.select(pl.col("VORLAGE").value_counts()) pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %%
# ** aggregate test results
all_tmps = load_all_tmp_files()
print(len(all_tmps))
# %%
all_tmps[2]
# %% # %%
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only) # Workflow 200 (Umbreit only)
@ -1613,3 +1640,5 @@ pipe_res_main.open
# %% # %%
pipe_res.results pipe_res.results
# %% # %%
remove_tmp_dir()
# %%