diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index 82eec9c..cb1b48f 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -1,6 +1,7 @@ # %% import json import time +import typing from collections.abc import Sequence from pathlib import Path from pprint import pprint @@ -14,8 +15,9 @@ from umbreit import db, types # %% # import importlib -# db = importlib.reload(db) # types = importlib.reload(types) +# db = importlib.reload(db) + # %% p_cfg = io.search_file_iterative( @@ -80,6 +82,8 @@ start_date = (current_dt - td).date() print("Starting date: ", start_date) # %% +# // ---------- LIVE DATA ----------- + # TODO find way to filter more efficiently # WF-200: filter for relevant orders with current BEDP set # missing: order types which are relevant @@ -168,11 +172,14 @@ print(stmt.compile(engine)) # raw data query # TODO look for entries which do not have an associated title number -print("--------------- ext_bedpbed --------------") +print("--------------- raw data query --------------") t1 = time.perf_counter() +# join_condition = sql.and_( +# db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, +# db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, +# ) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, - db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, @@ -199,36 +206,24 @@ elapsed = t2 - t1 print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") + + # %% +# // NO LIVE DATA NEEDED # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20251202-2.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% -len(df) +print(len(df)) df.head() # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) # %% -df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER")) -# %% -# !! CHECK: null values set in the query with CASE statement -print(len(df.filter(pl.col("MELDENUMMER") == 18))) -# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) -df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) -# %% -# !! CHECK: titles with request where no title information is found -# not_in_title_table = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter( -# pl.col("MELDENUMMER").is_null() -# ) -# EXPORT_FEAT = "BEDP_TITELNR" -# to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} -# p_save_not_in_title_table = Path.cwd() / "not_in_title_table.json" - -# with open(p_save_not_in_title_table, "w") as file: -# json.dump(to_save, file, indent=4) -# %% -# !! CHECK: different MANDANTEN +# ** CHECK: different MANDANTEN # check for valid entries for unknown MANDANTEN +# MANDANTEN others than (1, 90) do not possess relevant properties such as +# "MELDENUMMER" and others --> conclusion: not relevant + # MANDANT = 80 # print(f"Mandant: {MANDANT}") @@ -243,6 +238,58 @@ df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # ).null_count() # ) # print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts())) + +# %% +# ** PREFILTER +# always needed, entries filtered out are to be disposed +df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) +# %% +len(df) +# %% +# ** CHECK: null values set in the query with CASE statement +# not known if NULL because of CASE statement or already set in table +# unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"? +# from the title DB +df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null()) +df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) +# %% +df.select("MELDENUMMER").unique() +# %% +# ** CHECK: null values for "MENGE_VORMERKER" +df.filter(pl.col("MENGE_VORMERKER").is_null()) +# df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) + +agg_t = ( + df.group_by(["MELDENUMMER"]).agg( + # pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(), + pl.col("MENGE_VORMERKER").alias("VM_count").unique(), + ) + # .filter(pl.col("count_customer") >= 0) # !! should be 3 +) # .filter(pl.col("MELDENUMMER") == 18) +agg_t + +df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum()) + +# %% +# ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER" +# ** not known at this point +# there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER --> +# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable +df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) +# why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? +# %% +# ** CHECK: titles with request where no title information is found +# result: there were entries found on 02.12., but not on 03.12.2025 +not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) +EXPORT_FEAT = "BEDP_TITELNR" +to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} +p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-1.json" +to_save +# with open(p_save_not_in_title_table, "w") as file: +# json.dump(to_save, file, indent=4) +# %% +print(len(df.filter(pl.col("MELDENUMMER") == 18))) +# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION = "BEDP_MENGE_BEDARF_VM" @@ -273,16 +320,40 @@ def get_raw_data() -> pl.DataFrame: ) -def get_empyt_result_df() -> pl.DataFrame: +def get_empyt_pipeline_result( + data: pl.DataFrame, +) -> types.PipelineResult: schema = db.results_schema_map.copy() del schema["id"] - return pl.DataFrame(schema=schema) + results = pl.DataFrame(schema=schema) + return types.PipelineResult(results=results, open=data) -def apply_several_filters( +def prepare_base_data( + df: pl.DataFrame, +) -> pl.DataFrame: + """pre-routine to handle non-feasible entries + + Parameters + ---------- + df : pl.DataFrame + raw data collected from database query + + Returns + ------- + pl.DataFrame + pre-processed data + """ + df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) + df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) + + return df + + +def _apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], -) -> tuple[pl.DataFrame, pl.DataFrame]: +) -> types.FilterResult: df_current = df removed_rows: list[pl.DataFrame] = [] @@ -294,53 +365,39 @@ def apply_several_filters( df_removed = pl.concat(removed_rows) - return df_current, df_removed - - -def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame: - df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) - df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) - - return df - - -def workflow_100_umbreit( - results: pl.DataFrame, - data: pl.DataFrame, - vm_criterion: str, -) -> tuple[pl.DataFrame, pl.DataFrame]: - filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col("BEDP_MAN") == 1 - filter_number_vm = pl.col(vm_criterion) > 0 - - relevant, filt = apply_several_filters( - data, (filter_meldenummer, filter_mandant, filter_number_vm) - ) - results = _results_workflow_100( - results, - relevant, - vorlage=True, - wf_id=100, - freigabe_auto=types.Freigabe.WF_100, - ) - - return results, filt + return types.FilterResult(in_=df_current, out_=df_removed) # post-processing the results -def _results_workflow_100( - results: pl.DataFrame, +# TODO: order quantity not always necessary +# TODO: change relevant criterion for order quantity +def _write_results( + results_table: pl.DataFrame, data: pl.DataFrame, vorlage: bool, wf_id: int, freigabe_auto: types.Freigabe, + is_out: bool, ) -> pl.DataFrame: + ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" + data = data.rename(db.map_to_result) + order_qty_expr: pl.Expr + if is_out: + order_qty_expr = ( + pl.lit(0) + .alias("ORDER_QTY_CRIT") + .alias("best_menge") + .cast(db.results_schema_map["best_menge"]) + ) + else: + order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") + data = data.with_columns( [ pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), - pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), + order_qty_expr, pl.lit(freigabe_auto.value) .alias("freigabe_auto") .cast(db.results_schema_map["freigabe_auto"]), @@ -356,18 +413,130 @@ def _results_workflow_100( ] ) - return pl.concat([results, data]) + return pl.concat([results_table, data]) + + +# main routine +# results for filtered out entries written +def workflow_910( + pipe_result: types.PipelineResult, +) -> types.PipelineResult: + filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) + filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 + + res = _apply_several_filters( + pipe_result.open, + filters=( + filter_mandant, + filter_ignore_MNR26, + ), + ) + # write results for entries which were filtered out + pipe_result.results = _write_results( + pipe_result.results, + data=res.out_, + vorlage=False, + wf_id=910, + freigabe_auto=types.Freigabe.WF_910, + is_out=True, + ) + pipe_result.open = res.in_ + + return pipe_result + + +# this a main routine: +# receives and gives back result objects +def workflow_100_umbreit( + pipe_result: types.PipelineResult, + vm_criterion: str, +) -> types.PipelineResult: + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 1 + filter_number_vm = pl.col(vm_criterion) > 0 + + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.results = _write_results( + results_table=pipe_result.results, + data=res.in_, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + pipe_result.open = res.out_ + + return pipe_result # Petersen not present in data # %% -df_raw = get_raw_data() -df_start = prepare_base_data(df_raw) -df_start +# SAVING/LOADING +p_save = Path.cwd() / "raw_data_from_sql_query_20251203-1.arrow" +# df.write_ipc(p_save) +df = pl.read_ipc(p_save) +print(f"Number of entries: {len(df)}") # %% -results_init = get_empyt_result_df() +df.head() + +# %% +removed_rows = [] + +raw_data = df.clone() +print(f"Length raw data: {len(raw_data)}") +filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) +filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 + + +filtered = raw_data.filter(filter_mandant) +filtered_n = raw_data.filter(~filter_mandant) +num_filter = len(filtered) +num_filter_n = len(filtered_n) +removed_rows.append(filtered_n) +print(f"Length filtered: {num_filter}") +print(f"Length filtered out: {num_filter_n}") +print(f"Length all: {num_filter + num_filter_n}") +raw_data = filtered +out = pl.concat(removed_rows) +print(f"Length out: {len(out)}") + +# %% +print("---------------------------------------") +filtered = raw_data.filter(filter_ignore_MNR26) +filtered_n = raw_data.filter(~filter_ignore_MNR26) +num_filter = len(filtered) +num_filter_n = len(filtered_n) +len(filtered_n) +# %% +removed_rows.append(filtered_n) +print(f"Length filtered: {num_filter}") +print(f"Length filtered out: {num_filter_n}") +print(f"Length all: {num_filter + num_filter_n}") +out = pl.concat(removed_rows) +print(f"Length out: {len(out)}") + +# %% +raw_data = df.clone() +pipe_res = get_empyt_pipeline_result(raw_data) +pipe_res.results + +# %% +pipe_res = workflow_910(pipe_res) +pipe_res +# df_start = prepare_base_data(df_raw) +# df_start + +# %% +results_init = get_empyt_pipeline_result() results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) # df is where results are known # filt_out contains entries for other workflows @@ -398,7 +567,7 @@ def _init_workflow_200_umbreit( filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) == 0 - relevant, filt = apply_several_filters( + relevant, filt = _apply_several_filters( data, (filter_meldenummer, filter_mandant, filter_number_vm) ) diff --git a/data_analysis/not_in_title_table.json b/data_analysis/not_in_title_table_20251202-1.json similarity index 100% rename from data_analysis/not_in_title_table.json rename to data_analysis/not_in_title_table_20251202-1.json diff --git a/data_analysis/queries.sql b/data_analysis/queries.sql index 19e1ca3..3aff2e3 100644 --- a/data_analysis/queries.sql +++ b/data_analysis/queries.sql @@ -35,5 +35,5 @@ SELECT count(*) FROM EXT_BEDPBED; -- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26; -- PROMPT ###################################### --- SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 6132326; -SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 4591588; +SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 6132326; +-- SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 4591588; diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 7b24e33..21cf817 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -1,7 +1,7 @@ from __future__ import annotations import enum -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import TypeAlias import polars as pl @@ -10,6 +10,19 @@ PolarsSchema: TypeAlias = dict[str, type[pl.DataType]] PolarsNullValues: TypeAlias = dict[str, str] +@dataclass(slots=True, kw_only=True, eq=False) +class FilterResult: + in_: pl.DataFrame + out_: pl.DataFrame + + +@dataclass(slots=True, kw_only=True, eq=False) +class PipelineResult: + results: pl.DataFrame + open: pl.DataFrame + + class Freigabe(enum.Enum): WF_100 = False WF_200 = False + WF_910 = False