diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index e22dc1f..5c686eb 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -229,22 +229,55 @@ df.head() # %% # // NO LIVE DATA NEEDED # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% print(len(df)) df.head() # %% +df.head() +# %% +# ** CHECK: duplicates +temp = df.fill_null(0) +mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated() +temp.filter(mask) +# %% +df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False) +# %% +# ** CHECK: positions without titlenumber +df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique() +# %% # ** CHECK: unique title number? -df.with_columns(titlenumber_count=pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR")).select( - ["BEDP_TITELNR", "titlenumber_count"] -).unique().filter(pl.col("titlenumber_count") > 1) +df.group_by("BEDP_TITELNR").agg( + pl.col("BEDP_TITELNR").len().alias("count"), + pl.col.BEDP_MAN.unique().alias("unique_bedp_man"), + pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"), +).unique().filter(pl.col("count") > 1) +# %% +df.filter(pl.col.BEDP_TITELNR == 8679893) +# %% +df.with_columns( + pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count") +).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter( + pl.col("titlenumber_count") > 1 +) # %% # ** CHECK: distribution of MELDENUMMER -df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select( - pl.len() +temp = ( + df.group_by("MELDENUMMER") + .agg(pl.col("MELDENUMMER").len().alias("count")) + .sort("count", descending=True) ) +sum_entries = len(df) +temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion")) +temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum")) +temp +# df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select( +# pl.len() +# ) +# p_save = Path.cwd() / "meldenummer_anteile_20260114-1.xlsx" +# temp.write_excel(p_save) # %% # ** CHECK: differences MANDANT in BEDP and in TINFO # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) @@ -458,41 +491,12 @@ class PipelineResult: (self._subtracted_indices, data[self._index_cols]) ) - # TODO remove - # def _subtract_from_open( - # self, - # data: pl.DataFrame, - # ) -> None: - # self._open = self._open.join(data, on=self._index_cols, how="anti") - # self._subtracted_indices = pl.concat( - # (self._subtracted_indices, data[self._index_cols]) - # ) - - # def _subtract_from_indices( - # self, - # indices: pl.DataFrame, - # ) -> None: - # self._open = self._open.join(indices, on=self._index_cols, how="anti") - # self._subtracted_indices = pl.concat( - # (self._subtracted_indices, indices[self._index_cols]) - # ) - def _add_results( self, data: pl.DataFrame, ) -> None: self._results = pl.concat([self._results, data]) - # TODO remove - # def add_pipeline_results(self, pipeline: PipelineResult) -> None: - # self._add_results(pipeline.results) - - # def subtract_pipeline( - # self, - # pipeline: PipelineResult, - # ) -> None: - # self._subtract_data(pipeline.subtracted_indices) - def merge_pipeline( self, pipeline: PipelineResult, @@ -508,6 +512,7 @@ class PipelineResult: freigabe_auto: types.Freigabe, is_out: bool, ) -> None: + # TODO move to other position ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" results = data.rename(db.map_to_result) @@ -781,24 +786,19 @@ def wf100_petersen_wdb_sub1( # filtered out entries (WDB with #VM == 1) must be analysed for orders in the # past 6 months - title_nos = res.out_["BEDP_TITELNR"].to_list() - # !! query used because of slow pre-filtering queries - # TODO check for more native pre-filtering within the database when - # TODO performance problems are solved start_date = get_starting_date(180) filter_ = sql.and_( - db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos), + db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()), db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, ) stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) - df_show = ( + entries_show = ( df_order.group_by("BESP_TITELNR") .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) - entries_to_show = df_show["BESP_TITELNR"].to_list() - filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show) + filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list()) res = _apply_several_filters(pipe_result.open, (filter_titleno,)) pipe_result.write_results( data=res.in_, @@ -820,7 +820,7 @@ def wf100_petersen_wdb_sub1( # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") @@ -879,7 +879,7 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") pipe_res.results # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # %% -pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) +# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # %% pipe_res = workflow_910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") @@ -887,7 +887,7 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -pipe_res.results.select(pl.col("vorlage").value_counts()) +# pipe_res.results.select(pl.col("vorlage").value_counts()) # %% pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") @@ -900,6 +900,9 @@ print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") +# %% +pipe_res.open.filter(pl.col.MELDENUMMER == 18).filter(pl.col.BEDP_MENGE_BEDARF_VM > 0) + # %% pipe_res.results.select(pl.col("vorlage").value_counts()) # %% @@ -918,28 +921,89 @@ filt_out # Workflow 200 (Umbreit only) # ---------------------------------------------------------------------------- # # %% -wf_200_start_data = filt_out.clone() +wf_200_start_data = pipe_res.open.clone() wf_200_start_data # %% -def _init_workflow_200_umbreit( - results: pl.DataFrame, - data: pl.DataFrame, - vm_criterion: str, -) -> tuple[pl.DataFrame, pl.DataFrame]: +def workflow_200_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 - filter_number_vm = pl.col(vm_criterion) == 0 + # not relevant, because already done in WF-100 + # filter_number_vm = pl.col(vm_criterion) == 0 - relevant, filt = _apply_several_filters( - data, (filter_meldenummer, filter_mandant, filter_number_vm) + res = _apply_several_filters( + pipe_result.open, + (filter_meldenummer, filter_mandant), ) - return relevant, filt +relevant_mnr: tuple[int, ...] = (17, 18) +filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) +filter_mandant = pl.col("BEDP_MAN") == 1 +res = _apply_several_filters( + pipe_res.open, + (filter_meldenummer, filter_mandant), +) +# %% +# these entries must be checked for relevant orders +# therefore, a temp table must be created in the database to execute efficient +# queries, other approaches are just hacks +# SOLUTION: +# - save these entries to a temp table 'temp' +# - look up the order history of the past 3 months +# -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR +# -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND +# -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND +# -- +res.in_ +# %% +# // demo query with IN statement +data = res.in_.clone() +title_sub_choice = data["BEDP_TITELNR"][:300].to_list() +rel_date = get_starting_date(90) +rel_date +# %% +filter_ = sql.and_( + db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice), + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), +) +stmt = sql.select(db.EXT_AUFPAUF).where(filter_) + +print(stmt.compile(engine)) + +# %% +demo = pl.read_database( + stmt, + engine, + schema_overrides=db.EXT_AUFPAUF_schema_map, +) +# %% +demo.head() +# %% +demo_2 = demo.clone() +demo_2.head() +print(f"Number of titles before filtering: {len(demo_2)}") +demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99))) +demo_2 = ( + demo_2.group_by("TITELNR", maintain_order=True) + .agg( + pl.len().alias("count"), + pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"), + ) + .filter(pl.col.customer_count >= 3) +) + +# these remaining titles are relevant and should be shown +# the others should be disposed +print(f"Number of titles which are relevant: {len(demo_2)}") +print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}") +demo_2 # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df @@ -1155,7 +1219,8 @@ res_vm_crit.out_ # filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list() len(title_nos) - +# %% +title_nos # %% # define starting date for 6 month interval @@ -1175,6 +1240,7 @@ df_show = ( .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) +df_show # %% # !! show these entries # !! do not show others @@ -1183,7 +1249,7 @@ print(f"Number of entries relevant: {len(entries_to_show)}") # %% res_vm_crit.out_ # %% -filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show) +filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode()) res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,)) diff --git a/data_analysis/queries.sql b/data_analysis/queries.sql index 2a41515..08feafd 100644 --- a/data_analysis/queries.sql +++ b/data_analysis/queries.sql @@ -39,40 +39,102 @@ set timing on -- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26; -- PROMPT ###################################### PROMPT ################################################# -SELECT * FROM ( - SELECT - bedp.BEDARFNR, - bedp.BEDP_SEQUENZ, - bedp.BEDP_TITELNR, - bedp.BEDP_MAN, - bedp.BEDP_MENGE_BEDARF_VM, - t_info.MELDENUMMER, - t_info.VERLAGSNR, - t_info.MENGE_VORMERKER, - t_info.MANDFUEHR - FROM EXT_BEDPBED bedp - LEFT JOIN EXT_TITEL_INFO t_info - ON bedp.BEDP_TITELNR = t_info.TI_NUMMER -) view1 -WHERE view1.VERLAGSNR IN (76008, 76070) -FETCH FIRST 100 ROWS ONLY; - -SELECT * FROM EXT_BESPBES_INFO besp -WHERE besp.BESP_TITELNR = 7590554 AND -besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD'); - - --- SELECT COUNT(*) FROM ( --- SELECT /*+ NO_USE_HASH(bedp t_info) */ --- view1.BEDP_TITELNR, +-- SELECT * FROM ( +-- SELECT +-- bedp.BEDARFNR, +-- bedp.BEDP_SEQUENZ, +-- bedp.BEDP_TITELNR, +-- bedp.BEDP_MAN, +-- bedp.BEDP_MENGE_BEDARF_VM, -- t_info.MELDENUMMER, -- t_info.VERLAGSNR, +-- t_info.MENGE_VORMERKER, -- t_info.MANDFUEHR --- FROM --- (SELECT DISTINCT bedp.BEDP_TITELNR FROM EXT_BEDPBED bedp) view1 +-- FROM EXT_BEDPBED bedp -- LEFT JOIN EXT_TITEL_INFO t_info --- ON view1.BEDP_TITELNR = t_info.TI_NUMMER --- ) sub1 WHERE sub1.MANDFUEHR IN (1,90) AND sub1.MELDENUMMER in (17, 18); +-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER +-- ) view1 +-- WHERE view1.VERLAGSNR IN (76008, 76070) +-- FETCH FIRST 100 ROWS ONLY; + +-- ### default query for raw data ### +-- SELECT +-- bedp.BEDARFNR, +-- bedp.BEDP_SEQUENZ, +-- bedp.BEDP_TITELNR, +-- bedp.BEDP_MAN, +-- bedp.BEDP_MENGE_BEDARF_VM, +-- t_info.MELDENUMMER, +-- t_info.VERLAGSNR, +-- t_info.MENGE_VORMERKER, +-- t_info.MANDFUEHR +-- FROM EXT_BEDPBED bedp +-- LEFT JOIN EXT_TITEL_INFO_NEU t_info +-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER; + + +SELECT * FROM EXT_BESPBES_INFO besp +WHERE besp.BESP_TITELNR IN ( + 6351156, + 5102709, + 8379474, + 2461414, + 5244007, + 236729, + 7776227, + 6820705, + 2032829, + 5087975, + 7306207, + 8030748, + 6722678, + 9514688, + 1059243, + 7831209, + 1778312, + 7738343, + 3920800, + 3506292, + 1985266, + 7168195, + 6235581, + 7981302, + 1136555, + 9875369 +) AND +besp.BES_DATUM > TO_DATE('2025-06-01', 'YYYY-MM-DD') +ORDER BY besp.BES_DATUM ASC; + +-- SELECT +-- bedp.BEDARFNR, +-- bedp.BEDP_SEQUENZ, +-- bedp.BEDP_TITELNR, +-- bedp.BEDP_MAN, +-- bedp.BEDP_MENGE_BEDARF_VM, +-- t_info.MELDENUMMER, +-- t_info.VERLAGSNR, +-- t_info.MENGE_VORMERKER, +-- t_info.MANDFUEHR +-- FROM EXT_BEDPBED bedp +-- LEFT JOIN EXT_TITEL_INFO_NEU t_info +-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER; + + +SELECT COUNT(*) FROM (SELECT + bedp.BEDARFNR, + bedp.BEDP_SEQUENZ, + bedp.BEDP_TITELNR, + bedp.BEDP_MAN, + bedp.BEDP_MENGE_BEDARF_VM, + t_info.MELDENUMMER, + t_info.VERLAGSNR, + t_info.MENGE_VORMERKER, + t_info.MANDFUEHR +FROM EXT_BEDPBED bedp +LEFT JOIN EXT_TITEL_INFO_NEU t_info + ON bedp.BEDP_TITELNR = t_info.TI_NUMMER +WHERE bedp.BEDP_MAN IN (1,90) AND t_info.MELDENUMMER in (17, 18) AND +bedp.BEDP_MENGE_BEDARF_VM = 0); -- SELECT * FROM ( @@ -175,4 +237,24 @@ besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD'); -- SELECT * -- FROM EXT_AUFPAUF auf --- WHERE auf.AUFTRAGS_DATUM > TO_DATE('2025-11-18', 'YYYY-MM-DD'); \ No newline at end of file +-- WHERE auf.AUFTRAGS_DATUM > TO_DATE('2025-11-18', 'YYYY-MM-DD'); + +-- system queries +-- SELECT owner, table_name +-- FROM all_tables +-- ORDER BY owner, table_name; + +-- SELECT table_name +-- FROM user_tables +-- ORDER BY table_name; + +-- SELECT view_name +-- FROM user_views +-- ORDER BY view_name; +-- SELECT owner, view_name +-- FROM all_views +-- WHERE owner = 'UMB' +-- ORDER BY view_name; + +-- DESC all_views; +DESC EXT_AUFPAUF; \ No newline at end of file diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 946e96e..a160e23 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -51,7 +51,7 @@ ext_bedpbed_null_values: PolarsNullValues = { } ext_titel_info = Table( - "ext_titel_info", + "ext_titel_info_neu", metadata, Column("TI_NUMMER", sql.Integer, primary_key=True, autoincrement=False, nullable=False), Column("MANDFUEHR", sql.Integer, primary_key=True, autoincrement=False, nullable=False),