From 9267f3da24a07b1e251eec395bb746b8e6eb6ba9 Mon Sep 17 00:00:00 2001 From: foefl Date: Thu, 29 Jan 2026 16:27:56 +0100 Subject: [PATCH] prepare temp data fusion for testing phase --- data_analysis/02-3_oracle_workflow_test.py | 131 +++++++++++++-------- 1 file changed, 80 insertions(+), 51 deletions(-) diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index e3912f8..c66b41b 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -3,8 +3,11 @@ from __future__ import annotations import datetime import json +import shutil +import tempfile import time import typing +import uuid from collections.abc import Sequence from pathlib import Path from pprint import pprint @@ -19,10 +22,31 @@ from sqlalchemy import event from umbreit import db, types + # %% # import importlib # types = importlib.reload(types) # db = importlib.reload(db) +# %% +def create_tmp_dir() -> Path: + tmp_pth = Path(tempfile.mkdtemp()) + assert tmp_pth.exists() + return tmp_pth + + +TMP_DIR = create_tmp_dir() + + +def clear_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + TMP_DIR.mkdir() + + +def remove_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + + +print(f"Created temp directory under: >{TMP_DIR}<") # %% p_cfg = io.search_file_iterative( @@ -404,6 +428,38 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( db.EXT_DOPT_ERGEBNIS.columns.keys() ) ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() +SAVE_TMP_FILES: typing.Final[bool] = True +TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" +TMPFILE_WF200_SUB1 = "WF-200_Sub1" + + +def save_tmp_file( + data: pl.DataFrame, + filename: str | None, +) -> None: + if filename is None: + filename = str(uuid.uuid4()) + pth = (TMP_DIR / filename).with_suffix(".arrow") + data.write_ipc(pth) + + +def load_tmp_file( + filename: str, +) -> pl.DataFrame: + pth = (TMP_DIR / filename).with_suffix(".arrow") + if not pth.exists(): + raise FileNotFoundError(f"File >{pth.name}< not found") + + return pl.read_ipc(pth) + + +def load_all_tmp_files() -> tuple[pl.DataFrame, ...]: + all_dfs: list[pl.DataFrame] = [] + for file in TMP_DIR.glob("*.arrow"): + df = pl.read_ipc(file) + all_dfs.append(df) + + return tuple(all_dfs) def get_starting_date( @@ -842,8 +898,7 @@ def _wf100_petersen_sub1_wdb( ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # input: pre-filtered entries (WDB titles and #VM > 0) - # more then 1 VM - # !! show these entries + # more than 1 VM: show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res = _apply_several_filters( pipe_result.open, @@ -883,11 +938,16 @@ def _wf100_petersen_sub1_wdb( .group_by(sub1.c.BEDP_TITELNR) .having(count_col > 1) ) + # !! this is a sub result which muste be used in the result set + # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) - print(relevant_titles) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) + entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) @@ -1003,10 +1063,16 @@ def _wf200_sub1( .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) + # !! this is a sub result which muste be used in the result set + # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, None) + entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) @@ -1034,7 +1100,8 @@ def _wf200_sub1( p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") - +clear_tmp_dir() +clear_result_data() # %% df.head() # %% @@ -1052,42 +1119,6 @@ df.head() # engine, # ) # print(relevant_titles) -# %% -# removed_rows = [] - -# raw_data = df.clone() -# print(f"Length raw data: {len(raw_data)}") -# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) -# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 - - -# filtered = raw_data.filter(filter_mandant) -# filtered_n = raw_data.filter(~filter_mandant) -# num_filter = len(filtered) -# num_filter_n = len(filtered_n) -# removed_rows.append(filtered_n) -# print(f"Length filtered: {num_filter}") -# print(f"Length filtered out: {num_filter_n}") -# print(f"Length all: {num_filter + num_filter_n}") -# raw_data = filtered -# out = pl.concat(removed_rows) -# print(f"Length out: {len(out)}") - -# # %% -# print("---------------------------------------") -# filtered = raw_data.filter(filter_ignore_MNR26) -# filtered_n = raw_data.filter(~filter_ignore_MNR26) -# num_filter = len(filtered) -# num_filter_n = len(filtered_n) -# len(filtered_n) -# # %% -# removed_rows.append(filtered_n) -# print(f"Length filtered: {num_filter}") -# print(f"Length filtered out: {num_filter_n}") -# print(f"Length all: {num_filter + num_filter_n}") -# out = pl.concat(removed_rows) -# print(f"Length out: {len(out)}") - # %% raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) @@ -1101,15 +1132,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results - -# %% -# // test result writing -res = pipe_res.results.clone() -res.height - -# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) -# %% -# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # %% pipe_res = wf910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") @@ -1117,8 +1139,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -# pipe_res.results.select(pl.col("vorlage").value_counts()) -# %% pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") @@ -1146,6 +1166,13 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% pipe_res.results.select(pl.col("VORLAGE").value_counts()) +# %% +# ** aggregate test results +all_tmps = load_all_tmp_files() +print(len(all_tmps)) +# %% +all_tmps[2] + # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) @@ -1613,3 +1640,5 @@ pipe_res_main.open # %% pipe_res.results # %% +remove_tmp_dir() +# %%