From fa659c37bd0559ff51ae816d49504bb3051e371a Mon Sep 17 00:00:00 2001 From: foefl Date: Mon, 2 Feb 2026 11:27:19 +0100 Subject: [PATCH] prepare test data generation --- data_analysis/02-3_oracle_workflow_test.py | 162 ++++++++++++--------- 1 file changed, 97 insertions(+), 65 deletions(-) diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index 3287106..71301e7 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -675,6 +675,7 @@ class PipelineResult: "VERLAGSNR", "MENGE_VORMERKER", "MANDFUEHR", + "EINKAEUFER", ] ) @@ -1138,8 +1139,20 @@ def _wf200_sub1( # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" -df = pl.read_ipc(p_save) +READ_DATABASE = False +OVERWRITE = True +FILENAME = "raw_data_from_sql_query_20260202-1.arrow" +p_save = Path.cwd() / FILENAME +if READ_DATABASE: + df = get_raw_data() + if not p_save.exists() or OVERWRITE: + df.write_ipc(p_save) +else: + df = pl.read_ipc(p_save) +# %% +df +# %% +# initialise pipeline raw_data = df.clone() print(f"Number of entries: {len(df)}") clear_tmp_dir() @@ -1196,83 +1209,102 @@ pipe_res.results.select(pl.col("VORLAGE").value_counts()) # %% pipe_res.results.height # %% -# ** aggregate test results +# // aggregate test results all_tmps = load_all_tmp_files() print(len(all_tmps)) + + # %% -WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"} -WF_200_TMP_RENAME = { - "COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate", - "CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate", -} +def prepare_tmp_data() -> list[pl.DataFrame]: + all_tmps = load_all_tmp_files() + WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"} + WF_200_TMP_RENAME = { + "COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate", + "CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate", + } -WF_100: list[pl.DataFrame] = [] -WF_200: list[pl.DataFrame] = [] + WF_100: list[pl.DataFrame] = [] + WF_200: list[pl.DataFrame] = [] -for name, df in all_tmps.items(): - if TMPFILE_WF100_SUB1_WDB in name: - rename_schema = WF_100_TMP_RENAME - df = df.rename(rename_schema) - WF_100.append(df) - if TMPFILE_WF200_SUB1 in name: - rename_schema = WF_200_TMP_RENAME - df = df.rename(rename_schema) - WF_200.append(df) + for name, df in all_tmps.items(): + if TMPFILE_WF100_SUB1_WDB in name: + rename_schema = WF_100_TMP_RENAME + df = df.rename(rename_schema) + WF_100.append(df) + elif TMPFILE_WF200_SUB1 in name: + rename_schema = WF_200_TMP_RENAME + df = df.rename(rename_schema) + WF_200.append(df) -tmp_WF_collects = (WF_100, WF_200) -all_tmps_preproc = [] + tmp_WF_collects = (WF_100, WF_200) + all_tmps_preproc: list[pl.DataFrame] = [] -for collect in tmp_WF_collects: - if len(collect) > 1: - df = pl.concat(collect) - elif len(collect) == 1: - df = collect[0].clone() - else: - raise RuntimeError() + for collect in tmp_WF_collects: + if len(collect) > 1: + df = pl.concat(collect) + elif len(collect) == 1: + df = collect[0].clone() + else: + raise RuntimeError() - all_tmps_preproc.append(df) + all_tmps_preproc.append(df) + + return all_tmps_preproc + + +def generate_test_result_data( + raw_data: pl.DataFrame, + pipe_result: PipelineResult, +) -> pl.DataFrame: + all_tmps_preproc = prepare_tmp_data() + + res_table = pipe_result.results.clone() + res_title_info = res_table.join( + raw_data, + left_on=["BEDARF_NR", "BEDARF_SEQUENZ"], + right_on=["BEDARFNR", "BEDP_SEQUENZ"], + how="inner", + ) + exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ") + res_title_info = res_title_info.select(pl.exclude(exclude_cols)) + columns = [ + "VORLAGE", + "WF_ID", + "BEST_MENGE", + "FREIGABE_AUTO", + "BEDP_MENGE_BEDARF_VM", + "MENGE_VORMERKER", + "BEDP_TITELNR", + "BEDP_MAN", + "MELDENUMMER", + "VERLAGSNR", + "EINKAEUFER", + "MANDFUEHR", + ] + res_title_info = res_title_info.select(columns) + + test_results = res_title_info.clone() + for df in all_tmps_preproc: + test_results = test_results.join(df, on="BEDP_TITELNR", how="left") + + test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False) + test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all()) + + return test_results -all_tmps_preproc -############################# # %% -res_table = pipe_res.results.clone() -# %% -res_table.head() -# %% -raw_data.head() -# raw_data = raw_data.rename({"BEDARFNR": "BEDARF_NR", "BEDP_SEQUENZ": "BEDARF_SEQUENZ"}) -# raw_data -# %% -res_title_info = res_table.join( - raw_data, - left_on=["BEDARF_NR", "BEDARF_SEQUENZ"], - right_on=["BEDARFNR", "BEDP_SEQUENZ"], - how="inner", -) -exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ") -res_title_info = res_title_info.select(pl.exclude(exclude_cols)) -res_title_info.head() -# %% -df1 = all_tmps["WF-100_Sub1-WDB"] -df2 = all_tmps["WF-200_Sub1"] -df3 = all_tmps["WF-200_Sub1_1"] -df1.head() -# %% -test_results = res_title_info.clone() - -for df in all_tmps_preproc: - test_results = test_results.join(df, on="BEDP_TITELNR", how="left") - -test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False) -test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all()) -# %% -test_results - +test_results = generate_test_result_data(raw_data, pipe_res) +test_results.head() # %% date_str = datetime.datetime.now().strftime("%Y-%m-%d") p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx" -test_results.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1)) +test_results.to_pandas().set_index("Index").to_excel( + p_save, + freeze_panes=(1, 1), + sheet_name=f"Ergebnisse_Testphase_{date_str}", +) +##################################################################### # %% # ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(