diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index c66b41b..3287106 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -22,6 +22,9 @@ from sqlalchemy import event from umbreit import db, types +oracledb.defaults.arraysize = 1000 +oracledb.defaults.prefetchrows = 1000 + # %% # import importlib @@ -72,13 +75,22 @@ conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) # engine = sql.create_engine(conn_string) -engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) +engine = sql.create_engine( + conn_string, + execution_options={"stream_results": True}, +) -@event.listens_for(engine, "after_cursor_execute") -def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): - cursor.arraysize = 1000 - cursor.prefetchrows = 1000 +# @event.listens_for(engine, "after_cursor_execute") +# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): +# cursor.arraysize = 1000 +# cursor.prefetchrows = 1000 + + +# @event.listens_for(engine, "before_cursor_execute") +# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): +# cursor.arraysize = 1000 +# cursor.prefetchrows = 1000 # %% @@ -397,7 +409,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null( # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) p_save_diff_VM_bedp_tinfo = ( - Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" + Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx" ) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) @@ -440,6 +452,13 @@ def save_tmp_file( if filename is None: filename = str(uuid.uuid4()) pth = (TMP_DIR / filename).with_suffix(".arrow") + + n: int = 1 + while pth.exists(): + filename_new = pth.stem + f"_{n}" + pth = (TMP_DIR / filename_new).with_suffix(".arrow") + n += 1 + data.write_ipc(pth) @@ -453,13 +472,13 @@ def load_tmp_file( return pl.read_ipc(pth) -def load_all_tmp_files() -> tuple[pl.DataFrame, ...]: - all_dfs: list[pl.DataFrame] = [] +def load_all_tmp_files() -> dict[str, pl.DataFrame]: + all_dfs: dict[str, pl.DataFrame] = {} for file in TMP_DIR.glob("*.arrow"): df = pl.read_ipc(file) - all_dfs.append(df) + all_dfs[file.stem] = df - return tuple(all_dfs) + return all_dfs def get_starting_date( @@ -486,6 +505,7 @@ def get_raw_data() -> pl.DataFrame: db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, + db.ext_titel_info.c.EINKAEUFER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( @@ -938,7 +958,16 @@ def _wf100_petersen_sub1_wdb( .group_by(sub1.c.BEDP_TITELNR) .having(count_col > 1) ) - # !! this is a sub result which muste be used in the result set + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, @@ -947,6 +976,7 @@ def _wf100_petersen_sub1_wdb( if SAVE_TMP_FILES: save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) + relevant_titles = relevant_titles.filter(pl.col.COUNT > 1) entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) @@ -1063,7 +1093,17 @@ def _wf200_sub1( .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) - # !! this is a sub result which muste be used in the result set + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, @@ -1071,7 +1111,8 @@ def _wf200_sub1( ) if SAVE_TMP_FILES: - save_tmp_file(relevant_titles, None) + save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) @@ -1099,27 +1140,13 @@ def _wf200_sub1( # SAVING/LOADING p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" df = pl.read_ipc(p_save) +raw_data = df.clone() print(f"Number of entries: {len(df)}") clear_tmp_dir() clear_result_data() # %% df.head() # %% -# df.filter(pl.col.BEDP_TITELNR == 4314750) -# # %% -# RELEVANT_DATE = get_starting_date(180) -# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR -# filter_ = sql.and_( -# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE, -# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750, -# ) -# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) -# relevant_titles = pl.read_database( -# stmt, -# engine, -# ) -# print(relevant_titles) -# %% raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) pipe_res = PipelineResult(raw_data) @@ -1167,11 +1194,99 @@ pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% pipe_res.results.select(pl.col("VORLAGE").value_counts()) # %% +pipe_res.results.height +# %% # ** aggregate test results all_tmps = load_all_tmp_files() print(len(all_tmps)) # %% -all_tmps[2] +WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"} +WF_200_TMP_RENAME = { + "COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate", + "CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate", +} + +WF_100: list[pl.DataFrame] = [] +WF_200: list[pl.DataFrame] = [] + +for name, df in all_tmps.items(): + if TMPFILE_WF100_SUB1_WDB in name: + rename_schema = WF_100_TMP_RENAME + df = df.rename(rename_schema) + WF_100.append(df) + if TMPFILE_WF200_SUB1 in name: + rename_schema = WF_200_TMP_RENAME + df = df.rename(rename_schema) + WF_200.append(df) + +tmp_WF_collects = (WF_100, WF_200) +all_tmps_preproc = [] + +for collect in tmp_WF_collects: + if len(collect) > 1: + df = pl.concat(collect) + elif len(collect) == 1: + df = collect[0].clone() + else: + raise RuntimeError() + + all_tmps_preproc.append(df) + +all_tmps_preproc + +############################# +# %% +res_table = pipe_res.results.clone() +# %% +res_table.head() +# %% +raw_data.head() +# raw_data = raw_data.rename({"BEDARFNR": "BEDARF_NR", "BEDP_SEQUENZ": "BEDARF_SEQUENZ"}) +# raw_data +# %% +res_title_info = res_table.join( + raw_data, + left_on=["BEDARF_NR", "BEDARF_SEQUENZ"], + right_on=["BEDARFNR", "BEDP_SEQUENZ"], + how="inner", +) +exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ") +res_title_info = res_title_info.select(pl.exclude(exclude_cols)) +res_title_info.head() +# %% +df1 = all_tmps["WF-100_Sub1-WDB"] +df2 = all_tmps["WF-200_Sub1"] +df3 = all_tmps["WF-200_Sub1_1"] +df1.head() +# %% +test_results = res_title_info.clone() + +for df in all_tmps_preproc: + test_results = test_results.join(df, on="BEDP_TITELNR", how="left") + +test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False) +test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all()) +# %% +test_results + +# %% +date_str = datetime.datetime.now().strftime("%Y-%m-%d") +p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx" +test_results.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1)) +# %% +# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER +deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter( + pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER +) +deviation_vm = test_results.filter(pl.col.BEDP_TITELNR.is_in(dev["BEDP_TITELNR"].implode())) + +date_str = datetime.datetime.now().strftime("%Y-%m-%d") +p_save = Path.cwd() / f"Abweichungen-VM_{date_str}.xlsx" +deviation_vm.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1)) +# ** WF-200 potentially triggered +raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter( + pl.col.BEDP_TITELNR.is_duplicated() +).sort("BEDP_TITELNR") # %% # ---------------------------------------------------------------------------- # diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 05a4964..ddd3c56 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -188,6 +188,7 @@ raw_data_query_schema_map: PolarsSchema = { "VERLAGSNR": pl.UInt32, "MENGE_VORMERKER": pl.UInt32, "MANDFUEHR": pl.UInt8, + "EINKAEUFER": pl.String, } tmp_data = Table(