diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index e3912f8..1dfa93e 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -3,14 +3,18 @@ from __future__ import annotations import datetime import json +import shutil +import tempfile import time import typing +import uuid from collections.abc import Sequence from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt import oracledb +import pandas as pd import polars as pl import polars.selectors as cs import sqlalchemy as sql @@ -19,10 +23,34 @@ from sqlalchemy import event from umbreit import db, types +oracledb.defaults.arraysize = 1000 +oracledb.defaults.prefetchrows = 1000 + + # %% # import importlib # types = importlib.reload(types) # db = importlib.reload(db) +# %% +def create_tmp_dir() -> Path: + tmp_pth = Path(tempfile.mkdtemp()) + assert tmp_pth.exists() + return tmp_pth + + +TMP_DIR = create_tmp_dir() + + +def clear_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + TMP_DIR.mkdir() + + +def remove_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + + +print(f"Created temp directory under: >{TMP_DIR}<") # %% p_cfg = io.search_file_iterative( @@ -48,13 +76,22 @@ conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) # engine = sql.create_engine(conn_string) -engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) +engine = sql.create_engine( + conn_string, + execution_options={"stream_results": True}, +) -@event.listens_for(engine, "after_cursor_execute") -def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): - cursor.arraysize = 1000 - cursor.prefetchrows = 1000 +# @event.listens_for(engine, "after_cursor_execute") +# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): +# cursor.arraysize = 1000 +# cursor.prefetchrows = 1000 + + +# @event.listens_for(engine, "before_cursor_execute") +# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): +# cursor.arraysize = 1000 +# cursor.prefetchrows = 1000 # %% @@ -233,7 +270,7 @@ temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # // NO LIVE DATA NEEDED # SAVING/LOADING # p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" -p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260303-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% @@ -373,7 +410,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null( # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) p_save_diff_VM_bedp_tinfo = ( - Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" + Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx" ) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) @@ -404,6 +441,46 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( db.EXT_DOPT_ERGEBNIS.columns.keys() ) ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() +SAVE_TMP_FILES: typing.Final[bool] = True +TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit" +TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" +TMPFILE_WF200_SUB1 = "WF-200_Sub1" + + +def save_tmp_file( + data: pl.DataFrame, + filename: str | None, +) -> None: + if filename is None: + filename = str(uuid.uuid4()) + pth = (TMP_DIR / filename).with_suffix(".arrow") + + n: int = 1 + while pth.exists(): + filename_new = pth.stem + f"_{n}" + pth = (TMP_DIR / filename_new).with_suffix(".arrow") + n += 1 + + data.write_ipc(pth) + + +def load_tmp_file( + filename: str, +) -> pl.DataFrame: + pth = (TMP_DIR / filename).with_suffix(".arrow") + if not pth.exists(): + raise FileNotFoundError(f"File >{pth.name}< not found") + + return pl.read_ipc(pth) + + +def load_all_tmp_files() -> dict[str, pl.DataFrame]: + all_dfs: dict[str, pl.DataFrame] = {} + for file in TMP_DIR.glob("*.arrow"): + df = pl.read_ipc(file) + all_dfs[file.stem] = df + + return all_dfs def get_starting_date( @@ -430,6 +507,7 @@ def get_raw_data() -> pl.DataFrame: db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, + db.ext_titel_info.c.EINKAEUFER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( @@ -599,6 +677,7 @@ class PipelineResult: "VERLAGSNR", "MENGE_VORMERKER", "MANDFUEHR", + "EINKAEUFER", ] ) @@ -676,12 +755,11 @@ def get_expr_order_qty( def wf900( pipe_result: PipelineResult, ) -> PipelineResult: - """filter 'Meldenummer' and fill non-feasible entries""" ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( - pipe_res.open, + pipe_result.open, ( filter_meldenummer_null, filter_mandant, @@ -729,13 +807,14 @@ def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: - ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # TODO remove + # ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 - res = _apply_several_filters( + res_candidates = _apply_several_filters( pipe_result.open, ( filter_meldenummer, @@ -743,8 +822,96 @@ def wf100_umbreit( filter_number_vm, ), ) + # sub-pipe neccessary: + # analyse MNr(18) mit #VM > 0 for reservations in the past two months + # similar to subroutine in WF-200 "_wf200_sub1" + sub_pipe = PipelineResult(res_candidates.in_) + sub_pipe = _wf100_sub1_umbreit(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + # pipe_result.write_results( + # data=res.in_, + # vorlage=False, + # wf_id=types.Workflows.ID_100, + # freigabe_auto=types.Freigabe.WF_100, + # order_qty_expr=ORDER_QTY_FUNC(empty=False), + # ) + + return pipe_result + + +def _wf100_sub1_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + # entry titles with MNr(18) and #VM > 0 + # show entries with more than three orders from different + # customers in the past two months + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + RELEVANT_DATE = get_starting_date(60) # see REQ-1002 + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J", + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + pipe_result.write_results( - data=res.in_, + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, @@ -799,8 +966,10 @@ def wf100_petersen( pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch - # show always entries with #VM > 1 - filter_number_vm = pl.col(vm_criterion) > 1 + # Verlage: always show because of missing information of ONIX + # data (REQ-1003) + # show always entries with #VM > 0 + filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( @@ -817,22 +986,24 @@ def wf100_petersen( order_qty_expr=ORDER_QTY_FUNC(empty=False), ) - filter_number_vm = pl.col(vm_criterion) > 0 - res = _apply_several_filters( - pipe_result.open, - ( - filter_meldenummer, - filter_mandant, - filter_number_vm, - ), - ) - pipe_result.write_results( - data=res.in_, - vorlage=False, - wf_id=types.Workflows.ID_100, - freigabe_auto=types.Freigabe.WF_100, - order_qty_expr=ORDER_QTY_FUNC(empty=False), - ) + # TODO remove after successful tests + # // excluded based on feedback on 27.02.2026 + # filter_number_vm = pl.col(vm_criterion) > 0 + # res = _apply_several_filters( + # pipe_result.open, + # ( + # filter_meldenummer, + # filter_mandant, + # filter_number_vm, + # ), + # ) + # pipe_result.write_results( + # data=res.in_, + # vorlage=False, + # wf_id=types.Workflows.ID_100, + # freigabe_auto=types.Freigabe.WF_100, + # order_qty_expr=ORDER_QTY_FUNC(empty=False), + # ) return pipe_result @@ -842,8 +1013,7 @@ def _wf100_petersen_sub1_wdb( ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # input: pre-filtered entries (WDB titles and #VM > 0) - # more then 1 VM - # !! show these entries + # more than 1 VM: show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res = _apply_several_filters( pipe_result.open, @@ -883,11 +1053,26 @@ def _wf100_petersen_sub1_wdb( .group_by(sub1.c.BEDP_TITELNR) .having(count_col > 1) ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) - print(relevant_titles) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) + relevant_titles = relevant_titles.filter(pl.col.COUNT > 1) + entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) @@ -973,7 +1158,7 @@ def _wf200_sub1( ) -> PipelineResult: save_tmp_data(pipe_result.open) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) - RELEVANT_DATE = get_starting_date(90) + RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000 join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( @@ -1003,10 +1188,27 @@ def _wf200_sub1( .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) @@ -1031,63 +1233,26 @@ def _wf200_sub1( # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" -df = pl.read_ipc(p_save) +READ_DATABASE = False +OVERWRITE = True +FILENAME = "raw_data_from_sql_query_20260303-2.arrow" +p_save = Path.cwd() / FILENAME +if READ_DATABASE: + df = get_raw_data() + if not p_save.exists() or OVERWRITE: + df.write_ipc(p_save) +else: + df = pl.read_ipc(p_save) +# %% +df +# %% +# initialise pipeline +raw_data = df.clone() print(f"Number of entries: {len(df)}") - +clear_tmp_dir() +clear_result_data() # %% df.head() -# %% -# df.filter(pl.col.BEDP_TITELNR == 4314750) -# # %% -# RELEVANT_DATE = get_starting_date(180) -# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR -# filter_ = sql.and_( -# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE, -# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750, -# ) -# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) -# relevant_titles = pl.read_database( -# stmt, -# engine, -# ) -# print(relevant_titles) -# %% -# removed_rows = [] - -# raw_data = df.clone() -# print(f"Length raw data: {len(raw_data)}") -# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) -# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 - - -# filtered = raw_data.filter(filter_mandant) -# filtered_n = raw_data.filter(~filter_mandant) -# num_filter = len(filtered) -# num_filter_n = len(filtered_n) -# removed_rows.append(filtered_n) -# print(f"Length filtered: {num_filter}") -# print(f"Length filtered out: {num_filter_n}") -# print(f"Length all: {num_filter + num_filter_n}") -# raw_data = filtered -# out = pl.concat(removed_rows) -# print(f"Length out: {len(out)}") - -# # %% -# print("---------------------------------------") -# filtered = raw_data.filter(filter_ignore_MNR26) -# filtered_n = raw_data.filter(~filter_ignore_MNR26) -# num_filter = len(filtered) -# num_filter_n = len(filtered_n) -# len(filtered_n) -# # %% -# removed_rows.append(filtered_n) -# print(f"Length filtered: {num_filter}") -# print(f"Length filtered out: {num_filter_n}") -# print(f"Length all: {num_filter + num_filter_n}") -# out = pl.concat(removed_rows) -# print(f"Length out: {len(out)}") - # %% raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) @@ -1101,15 +1266,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results - -# %% -# // test result writing -res = pipe_res.results.clone() -res.height - -# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) -# %% -# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # %% pipe_res = wf910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") @@ -1117,8 +1273,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -# pipe_res.results.select(pl.col("vorlage").value_counts()) -# %% pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") @@ -1146,6 +1300,194 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% pipe_res.results.select(pl.col("VORLAGE").value_counts()) +# %% +pipe_res.results.height +# %% +# // aggregate test results +all_tmps = load_all_tmp_files() +print(len(all_tmps)) + +# %% +all_tmps + + +# %% +def prepare_tmp_data() -> list[pl.DataFrame]: + all_tmps = load_all_tmp_files() + WF_100_TMP_WDB_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"} + WF_100_TMP_UMBREIT_RENAME = { + "COUNT": "WF-100_Umbreit_Anz-Best-Kunde_verg_3_Monate", + "CUSTOMER_COUNT": "WF-100_Umbreit_Anz-Kunden_verg_3_Monate", + } + WF_200_TMP_RENAME = { + "COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate", + "CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate", + } + + WF_100_UMBREIT: list[pl.DataFrame] = [] + WF_100_WDB: list[pl.DataFrame] = [] + WF_200: list[pl.DataFrame] = [] + + for name, df in all_tmps.items(): + if TMPFILE_WF100_SUB1_UMBREIT in name: + rename_schema = WF_100_TMP_UMBREIT_RENAME + df = df.rename(rename_schema) + WF_100_UMBREIT.append(df) + elif TMPFILE_WF100_SUB1_WDB in name: + rename_schema = WF_100_TMP_WDB_RENAME + df = df.rename(rename_schema) + WF_100_WDB.append(df) + elif TMPFILE_WF200_SUB1 in name: + rename_schema = WF_200_TMP_RENAME + df = df.rename(rename_schema) + WF_200.append(df) + + tmp_WF_collects = (WF_100_UMBREIT, WF_100_WDB, WF_200) + all_tmps_preproc: list[pl.DataFrame] = [] + + for collect in tmp_WF_collects: + if len(collect) > 1: + df = pl.concat(collect) + elif len(collect) == 1: + df = collect[0].clone() + else: + raise RuntimeError() + + all_tmps_preproc.append(df) + + return all_tmps_preproc + + +def generate_test_result_data( + raw_data: pl.DataFrame, + pipe_result: PipelineResult, +) -> pl.DataFrame: + all_tmps_preproc = prepare_tmp_data() + + res_table = pipe_result.results.clone() + res_title_info = res_table.join( + raw_data, + left_on=["BEDARF_NR", "BEDARF_SEQUENZ"], + right_on=["BEDARFNR", "BEDP_SEQUENZ"], + how="inner", + ) + exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ") + res_title_info = res_title_info.select(pl.exclude(exclude_cols)) + columns = [ + "VORLAGE", + "WF_ID", + "BEST_MENGE", + "FREIGABE_AUTO", + "BEDP_MENGE_BEDARF_VM", + "MENGE_VORMERKER", + "BEDP_TITELNR", + "BEDP_MAN", + "MELDENUMMER", + "VERLAGSNR", + "EINKAEUFER", + "MANDFUEHR", + ] + res_title_info = res_title_info.select(columns) + + test_results = res_title_info.clone() + for df in all_tmps_preproc: + test_results = test_results.join(df, on="BEDP_TITELNR", how="left") + + test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False) + test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all()) + test_results = test_results.with_columns( + pl.lit(None, dtype=pl.String).alias("Anmerkungen/Feedback") + ) + + return test_results + + +# %% +test_results = generate_test_result_data(raw_data, pipe_res) +test_results.head() +# %% +col = "WF-100_Umbreit_Anz-Best-Kunde_verg_3_Monate" +# col = "WF-100_Umbreit_Anz-Kunden_verg_3_Monate" +# test_results.filter(pl.col(col) >= 3) + +# %% +# RELEVANT_DATE = get_starting_date(60) +# +# title_no = 7753822 +# title_no = 5383912 +# filter_ = sql.and_( +# db.EXT_AUFPAUF.c.TITELNR == title_no, +# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, +# db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J", +# ) +# stmt = sql.select( +# db.EXT_AUFPAUF, +# ).where(filter_) +# relevant_titles = pl.read_database( +# stmt, +# engine, +# ) +# relevant_titles + + +# %% +def write_test_results_excel( + data: pl.DataFrame, + base_filename: str, +) -> None: + date_str = datetime.datetime.now().strftime("%Y-%m-%d") + p_save = Path.cwd() / f"{base_filename}_{date_str}.xlsx" + pd_df = data.to_pandas().set_index("Index") + + with pd.ExcelWriter(p_save, engine="xlsxwriter") as writer: + sheet_name = f"Ergebnisse_Testphase_{date_str}" + pd_df.to_excel( + writer, + freeze_panes=(1, 1), + sheet_name=sheet_name, + ) + worksheet = writer.sheets[sheet_name] + + rows, cols = pd_df.shape + columns = ["Index"] + pd_df.columns.to_list() + worksheet.add_table( + 0, + 0, + rows, + cols, + {"columns": [{"header": c} for c in columns], "style": "Table Style Light 9"}, + ) + for i, col in enumerate(columns): + if i == 0: + worksheet.set_column( + i, i, max(pd_df.index.astype(str).map(len).max(), len(col)) + 2 + ) + continue + worksheet.set_column( + i, i, max(pd_df[col].astype(str).map(len).max(), len(col)) + 2 + ) + + +# %% +write_test_results_excel(test_results, "Testdatensatz_WF-100-200") + +##################################################################### +# %% +# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER +deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter( + pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER +) +deviation_vm = test_results.filter( + pl.col.BEDP_TITELNR.is_in(deviation_vm["BEDP_TITELNR"].implode()) +) +deviation_vm +# %% +write_test_results_excel(deviation_vm, "Abweichungen-VM") +# ** WF-200 potentially triggered +raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter( + pl.col.BEDP_TITELNR.is_duplicated() +).sort("BEDP_TITELNR") + # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) @@ -1157,7 +1499,7 @@ wf_200_start_data # %% engine.dispose() - +remove_tmp_dir() # %% relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) @@ -1613,3 +1955,5 @@ pipe_res_main.open # %% pipe_res.results # %% +remove_tmp_dir() +# %% diff --git a/pdm.lock b/pdm.lock index eb97ec1..d1c7601 100644 --- a/pdm.lock +++ b/pdm.lock @@ -2692,30 +2692,29 @@ files = [ [[package]] name = "ruff" -version = "0.14.3" +version = "0.15.0" requires_python = ">=3.7" summary = "An extremely fast Python linter and code formatter, written in Rust." groups = ["lint"] files = [ - {file = "ruff-0.14.3-py3-none-linux_armv6l.whl", hash = "sha256:876b21e6c824f519446715c1342b8e60f97f93264012de9d8d10314f8a79c371"}, - {file = "ruff-0.14.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b6fd8c79b457bedd2abf2702b9b472147cd860ed7855c73a5247fa55c9117654"}, - {file = "ruff-0.14.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:71ff6edca490c308f083156938c0c1a66907151263c4abdcb588602c6e696a14"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:786ee3ce6139772ff9272aaf43296d975c0217ee1b97538a98171bf0d21f87ed"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:cd6291d0061811c52b8e392f946889916757610d45d004e41140d81fb6cd5ddc"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a497ec0c3d2c88561b6d90f9c29f5ae68221ac00d471f306fa21fa4264ce5fcd"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e231e1be58fc568950a04fbe6887c8e4b85310e7889727e2b81db205c45059eb"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:469e35872a09c0e45fecf48dd960bfbce056b5db2d5e6b50eca329b4f853ae20"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3d6bc90307c469cb9d28b7cfad90aaa600b10d67c6e22026869f585e1e8a2db0"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e2f8a0bbcffcfd895df39c9a4ecd59bb80dca03dc43f7fb63e647ed176b741e"}, - {file = "ruff-0.14.3-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:678fdd7c7d2d94851597c23ee6336d25f9930b460b55f8598e011b57c74fd8c5"}, - {file = "ruff-0.14.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1ec1ac071e7e37e0221d2f2dbaf90897a988c531a8592a6a5959f0603a1ecf5e"}, - {file = "ruff-0.14.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afcdc4b5335ef440d19e7df9e8ae2ad9f749352190e96d481dc501b753f0733e"}, - {file = "ruff-0.14.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:7bfc42f81862749a7136267a343990f865e71fe2f99cf8d2958f684d23ce3dfa"}, - {file = "ruff-0.14.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a65e448cfd7e9c59fae8cf37f9221585d3354febaad9a07f29158af1528e165f"}, - {file = "ruff-0.14.3-py3-none-win32.whl", hash = "sha256:f3d91857d023ba93e14ed2d462ab62c3428f9bbf2b4fbac50a03ca66d31991f7"}, - {file = "ruff-0.14.3-py3-none-win_amd64.whl", hash = "sha256:d7b7006ac0756306db212fd37116cce2bd307e1e109375e1c6c106002df0ae5f"}, - {file = "ruff-0.14.3-py3-none-win_arm64.whl", hash = "sha256:26eb477ede6d399d898791d01961e16b86f02bc2486d0d1a7a9bb2379d055dc1"}, - {file = "ruff-0.14.3.tar.gz", hash = "sha256:4ff876d2ab2b161b6de0aa1f5bd714e8e9b4033dc122ee006925fbacc4f62153"}, + {file = "ruff-0.15.0-py3-none-linux_armv6l.whl", hash = "sha256:aac4ebaa612a82b23d45964586f24ae9bc23ca101919f5590bdb368d74ad5455"}, + {file = "ruff-0.15.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:dcd4be7cc75cfbbca24a98d04d0b9b36a270d0833241f776b788d59f4142b14d"}, + {file = "ruff-0.15.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d747e3319b2bce179c7c1eaad3d884dc0a199b5f4d5187620530adf9105268ce"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:650bd9c56ae03102c51a5e4b554d74d825ff3abe4db22b90fd32d816c2e90621"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a6664b7eac559e3048223a2da77769c2f92b43a6dfd4720cef42654299a599c9"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f811f97b0f092b35320d1556f3353bf238763420ade5d9e62ebd2b73f2ff179"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:761ec0a66680fab6454236635a39abaf14198818c8cdf691e036f4bc0f406b2d"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:940f11c2604d317e797b289f4f9f3fa5555ffe4fb574b55ed006c3d9b6f0eb78"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bcbca3d40558789126da91d7ef9a7c87772ee107033db7191edefa34e2c7f1b4"}, + {file = "ruff-0.15.0-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:9a121a96db1d75fa3eb39c4539e607f628920dd72ff1f7c5ee4f1b768ac62d6e"}, + {file = "ruff-0.15.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5298d518e493061f2eabd4abd067c7e4fb89e2f63291c94332e35631c07c3662"}, + {file = "ruff-0.15.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afb6e603d6375ff0d6b0cee563fa21ab570fd15e65c852cb24922cef25050cf1"}, + {file = "ruff-0.15.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:77e515f6b15f828b94dc17d2b4ace334c9ddb7d9468c54b2f9ed2b9c1593ef16"}, + {file = "ruff-0.15.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6f6e80850a01eb13b3e42ee0ebdf6e4497151b48c35051aab51c101266d187a3"}, + {file = "ruff-0.15.0-py3-none-win32.whl", hash = "sha256:238a717ef803e501b6d51e0bdd0d2c6e8513fe9eec14002445134d3907cd46c3"}, + {file = "ruff-0.15.0-py3-none-win_amd64.whl", hash = "sha256:dd5e4d3301dc01de614da3cdffc33d4b1b96fb89e45721f1598e5532ccf78b18"}, + {file = "ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a"}, + {file = "ruff-0.15.0.tar.gz", hash = "sha256:6bdea47cdbea30d40f8f8d7d69c0854ba7c15420ec75a26f463290949d7f7e9a"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index ab367dd..3675ccc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "umbreit" -version = "0.1.0" +version = "0.1.1dev0" description = "Umbreit's Python-based application" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, @@ -71,7 +71,7 @@ directory = "reports/coverage" [tool.bumpversion] -current_version = "0.1.0" +current_version = "0.1.1dev0" parse = """(?x) (?P0|[1-9]\\d*)\\. (?P0|[1-9]\\d*)\\. diff --git a/src/umbreit/constants.py b/src/umbreit/constants.py new file mode 100644 index 0000000..5a64a0b --- /dev/null +++ b/src/umbreit/constants.py @@ -0,0 +1,30 @@ +from pathlib import Path +from typing import Final + +from dopt_basics import configs +from dopt_basics import io as io_ + +LIB_PATH: Final[Path] = Path(__file__).parent + +# // database connections +p_cfg = io_.search_file_iterative( + starting_path=LIB_PATH, + glob_pattern="CRED*.toml", + stop_folder_name="umbreit-py", +) +if p_cfg is None: + raise FileNotFoundError("Config was not found") + +CFG = configs.load_toml(p_cfg) +HOST = CFG["server"]["host"] +PORT = CFG["server"]["port"] +SERVICE = CFG["server"]["service"] +USER_NAME = CFG["user"]["name"] +USER_PASS = CFG["user"]["pass"] + +# TODO remove or change +# ** Oracle client libs +USE_THICK_MODE: Final[bool] = False +P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29") +assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found" +assert P_ORACLE_CLIENT_LIBS.is_dir() diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 05a4964..0fbd389 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -178,17 +178,8 @@ EXT_AUFPAUF_schema_map: PolarsSchema = { EXT_AUFPAUF_null_values: PolarsNullValues = {} # // queries and temp data -raw_data_query_schema_map: PolarsSchema = { - "BEDARFNR": pl.UInt32, - "BEDP_SEQUENZ": pl.UInt32, - "BEDP_TITELNR": pl.UInt32, - "BEDP_MAN": pl.UInt8, - "BEDP_MENGE_BEDARF_VM": pl.UInt32, - "MELDENUMMER": pl.UInt8, - "VERLAGSNR": pl.UInt32, - "MENGE_VORMERKER": pl.UInt32, - "MANDFUEHR": pl.UInt8, -} +raw_data_query_schema_map: PolarsSchema = ext_bedpbed_schema_map.copy() +raw_data_query_schema_map.update(ext_titel_info_schema_map) tmp_data = Table( "EXT_TMP_BEDP_TINFO", diff --git a/src/umbreit/pipeline.py b/src/umbreit/pipeline.py new file mode 100644 index 0000000..f70dfa0 --- /dev/null +++ b/src/umbreit/pipeline.py @@ -0,0 +1,821 @@ +from __future__ import annotations + +import datetime +import shutil +import tempfile +import typing +import uuid +from collections.abc import Sequence +from pathlib import Path + +import dopt_basics.datetime as dt +import oracledb +import polars as pl +import polars.selectors as cs +import sqlalchemy as sql + +from umbreit import constants, db, types +from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS + +oracledb.defaults.arraysize = 1000 +oracledb.defaults.prefetchrows = 1000 +if constants.USE_THICK_MODE: + oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS)) + + +def create_tmp_dir() -> Path: + tmp_pth = Path(tempfile.mkdtemp()) + assert tmp_pth.exists() + return tmp_pth + + +def clear_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + TMP_DIR.mkdir() + + +def remove_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + + +TMP_DIR = create_tmp_dir() + +CONN_STRING: typing.Final[str] = ( + f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" +) +engine = sql.create_engine( + CONN_STRING, + execution_options={"stream_results": True}, +) + +VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN" +ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( + db.EXT_DOPT_ERGEBNIS.columns.keys() +) +ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() +SAVE_TMP_FILES: typing.Final[bool] = True +TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit" +TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" +TMPFILE_WF200_SUB1 = "WF-200_Sub1" + + +def save_tmp_file( + data: pl.DataFrame, + filename: str | None, +) -> None: + if filename is None: + filename = str(uuid.uuid4()) + pth = (TMP_DIR / filename).with_suffix(".arrow") + + n: int = 1 + while pth.exists(): + filename_new = pth.stem + f"_{n}" + pth = (TMP_DIR / filename_new).with_suffix(".arrow") + n += 1 + + data.write_ipc(pth) + + +def load_tmp_file( + filename: str, +) -> pl.DataFrame: + pth = (TMP_DIR / filename).with_suffix(".arrow") + if not pth.exists(): + raise FileNotFoundError(f"File >{pth.name}< not found") + + return pl.read_ipc(pth) + + +def load_all_tmp_files() -> dict[str, pl.DataFrame]: + all_dfs: dict[str, pl.DataFrame] = {} + for file in TMP_DIR.glob("*.arrow"): + df = pl.read_ipc(file) + all_dfs[file.stem] = df + + return all_dfs + + +def get_starting_date( + days_from_now: int, +) -> datetime.date: + current_dt = dt.current_time_tz(cut_microseconds=True) + td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS) + + return (current_dt - td).date() + + +def get_raw_data() -> pl.DataFrame: + join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER + stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + sql.case( + (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ).label("BEDP_MENGE_BEDARF_VM"), + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.VERLAGSNR, + db.ext_titel_info.c.MENGE_VORMERKER, + db.ext_titel_info.c.MANDFUEHR, + db.ext_titel_info.c.EINKAEUFER, + ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) + + return pl.read_database( + stmt, + engine, + schema_overrides=db.raw_data_query_schema_map, + ) + + +def save_tmp_data(df: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.tmp_data)) + + with engine.begin() as conn: + conn.execute(sql.insert(db.tmp_data), df.to_dicts()) + + +def get_tmp_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.tmp_data), + engine, + schema_overrides=db.tmp_data_schema_map, + ) + + +def get_result_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.EXT_DOPT_ERGEBNIS), + engine, + schema_overrides=db.results_schema_map, + ) + + +def save_result_data(results: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts()) + + +def clear_result_data() -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS)) + + +def save_result_data_native( + results: pl.DataFrame, +) -> None: + results = results.with_columns( + [ + pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c) + for c in results.select(cs.boolean()).columns + ] + ) + stmt = """ + INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", + "BEST_MENGE", "FREIGABE_AUTO") + VALUES (:1, :2, :3, :4, :5, :6) + """ + with engine.begin() as conn: + raw_conn = conn.connection.connection + with raw_conn.cursor() as cursor: + cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True)) + + +def _apply_several_filters( + df: pl.DataFrame, + filters: Sequence[pl.Expr], +) -> types.FilterResult: + df_current = df + removed_rows: list[pl.DataFrame] = [] + + for filter in filters: + removed = df_current.filter(~filter) + removed_rows.append(removed) + + df_current = df_current.filter(filter) + + df_removed = pl.concat(removed_rows) + + return types.FilterResult(in_=df_current, out_=df_removed) + + +class PipelineResult: + __slots__ = ("_results", "_open", "_subtracted_indices") + _index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ") + + def __init__( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + schema = db.results_schema_map.copy() + del schema["ID"] + self._results = pl.DataFrame(schema=schema) + + schema = {} + for col in self._index_cols: + schema[col] = db.raw_data_query_schema_map[col] + self._subtracted_indices = pl.DataFrame(schema=schema) + + def __len__(self) -> int: + return len(self._results) + len(self._open) + + @property + def open(self) -> pl.DataFrame: + return self._open + + @property + def results(self) -> pl.DataFrame: + return self._results + + @property + def subtracted_indices(self) -> pl.DataFrame: + return self._subtracted_indices + + def update_open( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + + def _subtract_data( + self, + data: pl.DataFrame, + ) -> None: + self._open = self._open.join(data, on=self._index_cols, how="anti") + self._subtracted_indices = pl.concat( + (self._subtracted_indices, data[self._index_cols]) + ) + + def _add_results( + self, + data: pl.DataFrame, + ) -> None: + res = pl.concat([self._results, data]) + self._results = res + + def merge_pipeline( + self, + pipeline: PipelineResult, + ) -> None: + self._subtract_data(pipeline.subtracted_indices) + self._add_results(pipeline.results) + + def write_results( + self, + data: pl.DataFrame, + vorlage: bool, + wf_id: types.Workflows, + freigabe_auto: types.Freigabe, + order_qty_expr: pl.Expr, + ) -> None: + results = data.rename(db.map_data_to_result) + results = results.with_columns( + [ + pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), + pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), + order_qty_expr, + pl.lit(freigabe_auto.value) + .alias("FREIGABE_AUTO") + .cast(db.results_schema_map["FREIGABE_AUTO"]), + ] + ) + results = results.drop( + [ + "BEDP_TITELNR", + "BEDP_MAN", + "BEDP_MENGE_BEDARF_VM", + "MELDENUMMER", + "VERLAGSNR", + "MENGE_VORMERKER", + "MANDFUEHR", + "EINKAEUFER", + ] + ) + + self._subtract_data(data) + self._add_results(results) + + +class ExprOrderQty(typing.Protocol): ... + + +class ExprOrderQty_Base(ExprOrderQty, typing.Protocol): + def __call__(self) -> pl.Expr: ... + + +ExprOrderQty_Base_Types: typing.TypeAlias = ( + typing.Literal[types.Workflows.ID_200] + | typing.Literal[types.Workflows.ID_900] + | typing.Literal[types.Workflows.ID_910] +) + + +class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol): + def __call__(self, empty: bool) -> pl.Expr: ... + + +@typing.overload +def get_expr_order_qty( + wf_id: typing.Literal[types.Workflows.ID_100], +) -> ExprOrderQty_WF100: ... + + +@typing.overload +def get_expr_order_qty( + wf_id: ExprOrderQty_Base_Types, +) -> ExprOrderQty_Base: ... + + +def get_expr_order_qty( + wf_id: types.Workflows, +) -> ExprOrderQty: + empty_expr = ( + pl.lit(0) + .alias(ORDER_QTY_CRIT) + .alias("BEST_MENGE") + .cast(db.results_schema_map["BEST_MENGE"]) + ) + + def _empty() -> pl.Expr: + return empty_expr + + func: ExprOrderQty + match wf_id: + case types.Workflows.ID_100: + + def _func(empty: bool) -> pl.Expr: + order_qty_expr: pl.Expr + if empty: + order_qty_expr = empty_expr + else: + order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE") + return order_qty_expr + + func = _func + + case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910: + func = _empty + case _: + raise NotImplementedError( + f"Order expression for WF-ID {wf_id.value} is not implemented" + ) + + return func + + +# // begin workflows +def wf900( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) + filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() + filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer_null, + filter_mandant, + ), + ) + pipe_result.write_results( + data=res.out_, + vorlage=False, + wf_id=types.Workflows.ID_900, + freigabe_auto=types.Freigabe.WF_900, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + pipe_result.update_open( + res.in_.with_columns( + pl.col("MENGE_VORMERKER").fill_null(0), + pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0), + ) + ) + + return pipe_result + + +def wf910( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) + filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 + + res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,)) + pipe_result.write_results( + data=res.out_, + vorlage=False, + wf_id=types.Workflows.ID_910, + freigabe_auto=types.Freigabe.WF_910, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + return pipe_result + + +# this a main routine: +# receives and gives back result objects +def wf100_umbreit( + pipe_result: PipelineResult, + vm_criterion: str, +) -> PipelineResult: + # TODO remove + # ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col(MANDANT_CRITERION) == 1 + filter_number_vm = pl.col(vm_criterion) > 0 + + res_candidates = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + # sub-pipe neccessary: + # analyse MNr(18) mit #VM > 0 for reservations in the past two months + # similar to subroutine in WF-200 "_wf200_sub1" + sub_pipe = PipelineResult(res_candidates.in_) + sub_pipe = _wf100_sub1_umbreit(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def _wf100_sub1_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + # entry titles with MNr(18) and #VM > 0 + # show entries with more than three orders from different + # customers in the past two months + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + RELEVANT_DATE = get_starting_date(60) # see REQ-1002 + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J", + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + return pipe_result + + +def wf100_petersen( + pipe_result: PipelineResult, + vm_criterion: str, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # difference WDB and others + + # // WDB branch + # order quantity 0, no further action in other WFs + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + filter_number_vm = pl.col(vm_criterion) == 0 + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=True), + ) + filter_number_vm = pl.col(vm_criterion) > 0 + res_candidates = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + wdb_sub_pipe = PipelineResult(res_candidates.in_) + wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe) + assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(wdb_sub_pipe) + + # // other branch + # Verlage: always show because of missing information of ONIX + # data (REQ-1003) + # show always entries with #VM > 0 + filter_number_vm = pl.col(vm_criterion) > 0 + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + return pipe_result + + +def _wf100_petersen_sub1_wdb( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # input: pre-filtered entries (WDB titles and #VM > 0) + # more than 1 VM: show these entries + filter_number_vm = pl.col(VM_CRITERION) > 1 + res = _apply_several_filters( + pipe_result.open, + (filter_number_vm,), + ) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + # filtered out entries (WDB with #VM == 1) must be analysed for orders in the + # past 6 months + save_tmp_data(pipe_result.open) + RELEVANT_DATE = get_starting_date(180) + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR + filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE + stmt = ( + sql.select( + db.tmp_data, + db.EXT_BESPBES_INFO.c.BESP_MENGE, + db.EXT_BESPBES_INFO.c.BESP_STATUS, + ) + .select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + count_col = sql.func.count() + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(count_col > 1) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) + relevant_titles = relevant_titles.filter(pl.col.COUNT > 1) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + return pipe_result + + +def wf200_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + relevant_mnr: tuple[int, ...] = (17, 18) + filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) + filter_mandant = pl.col("BEDP_MAN") == 1 + + res = _apply_several_filters( + pipe_result.open, + (filter_meldenummer, filter_mandant), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0 + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def wf200_petersen( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_MNR: tuple[int, ...] = (17, 18) + # // WDB branch + filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR) + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + ), + ) + # ignore these + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + # // other branch + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + ), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def _wf200_sub1( + pipe_result: PipelineResult, +) -> PipelineResult: + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000 + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), + db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFTRAGS_ART, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + return pipe_result diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 1f7a457..3ee1b8e 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -26,6 +26,7 @@ class FilterResult: class Workflows(enum.Enum): + ID_000 = 0 ID_100 = 100 ID_200 = 200 ID_900 = 900 @@ -38,8 +39,8 @@ class OrderQtyExprKwArgs: class Freigabe(enum.Enum): + WF_000 = False WF_100 = False WF_200 = False WF_900 = False WF_910 = False - OPEN = False