# %% from __future__ import annotations import datetime import json import shutil import tempfile import time import typing import uuid from collections.abc import Sequence from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt import oracledb import pandas as pd import polars as pl import polars.selectors as cs import sqlalchemy as sql from dopt_basics import configs, io from sqlalchemy import event from umbreit import db, types oracledb.defaults.arraysize = 1000 oracledb.defaults.prefetchrows = 1000 # %% # import importlib # types = importlib.reload(types) # db = importlib.reload(db) # %% def create_tmp_dir() -> Path: tmp_pth = Path(tempfile.mkdtemp()) assert tmp_pth.exists() return tmp_pth TMP_DIR = create_tmp_dir() def clear_tmp_dir() -> None: shutil.rmtree(TMP_DIR) TMP_DIR.mkdir() def remove_tmp_dir() -> None: shutil.rmtree(TMP_DIR) print(f"Created temp directory under: >{TMP_DIR}<") # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), glob_pattern="CRED*.toml", stop_folder_name="umbreit-py", ) assert p_cfg is not None CFG = configs.load_toml(p_cfg) HOST = CFG["server"]["host"] PORT = CFG["server"]["port"] SERVICE = CFG["server"]["service"] USER_NAME = CFG["user"]["name"] USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode # p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") # assert p_oracle_client.exists() # assert p_oracle_client.is_dir() # oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) # engine = sql.create_engine(conn_string) engine = sql.create_engine( conn_string, execution_options={"stream_results": True}, ) # @event.listens_for(engine, "after_cursor_execute") # def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): # cursor.arraysize = 1000 # cursor.prefetchrows = 1000 # @event.listens_for(engine, "before_cursor_execute") # def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): # cursor.arraysize = 1000 # cursor.prefetchrows = 1000 # %% ########### RESULTS ########### # temporary res_engine = sql.create_engine("sqlite://") db.metadata.create_all(res_engine, tables=(db.results_local,)) # %% # delete existing results def delete_results( res_engine: sql.Engine, ) -> None: with res_engine.begin() as conn: res = conn.execute(sql.delete(db.results_local)) print("Rows deleted: ", res.rowcount) delete_results(res_engine) stmt = sql.select(db.results_local.c.bedarf_nr, db.results_local.c.bedarf_sequenz) with res_engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% # define starting date for 3 month interval # returns UTC time current_dt = dt.current_time_tz(cut_microseconds=True) print("Current DT: ", current_dt) td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) print("Timedelta: ", td) start_date = (current_dt - td).date() print("Starting date: ", start_date) # %% # // ---------- LIVE DATA ----------- # WF-200: filter for relevant orders with current BEDP set # missing: order types which are relevant filter_K_rech = (608991, 260202) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, ) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, ) where_condition = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), ) stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, db.EXT_AUFPAUF, ) .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) .where(where_condition) .limit(100) # full query really slow ) # %% print(stmt.compile(engine)) # %% df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) df_order # %% # AUFPAUF # stmt = sql.select(db.EXT_AUFPAUF) # df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) # df_aufpauf # df_aufpauf.filter(pl.col("TITELNR") == 6315273) # prefilter amount columns for invalid entries # // tests with ext_bedpbed # print("--------------- ext_bedpbed --------------") # t1 = time.perf_counter() # AMOUNT_COLS = frozenset( # ( # "BEDP_MENGE_BEDARF", # "BEDP_MENGE_VERKAUF", # "BEDP_MENGE_ANFRAGE", # "BEDP_MENGE_BESTELLUNG", # "BEDP_MENGE_FREI", # "BEDP_MENGE_BEDARF_VM", # ) # ) # case_stmts = [] # for col in AMOUNT_COLS: # case_stmts.append( # sql.case( # (db.ext_bedpbed.c[col] <= -1, sql.null()), # else_=db.ext_bedpbed.c[col], # ).label(col) # ) # stmt = sql.select( # *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], # *case_stmts, # ) # df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # t2 = time.perf_counter() # elapsed = t2 - t1 # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) # print(f"Query duration: {elapsed:.4f} sec") # print("Number of entries: ", len(df)) # print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # try title_info parsing stmt = sql.select(db.ext_titel_info) print(stmt.compile(engine)) # %% # raw data query print("--------------- raw data query --------------") t1 = time.perf_counter() # join_condition = sql.and_( # db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, # db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, # ) join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, # db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) print(stmt.compile(engine)) df = pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) t2 = time.perf_counter() elapsed = t2 - t1 # %% print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% df.head() # %% temp = df.with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% # // NO LIVE DATA NEEDED # SAVING/LOADING # p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% print(len(df)) df.head() # %% temp = df.fill_null(0) mask = df.select(pl.exclude("BEDARFNR", "BEDP_SEQUENZ")).is_duplicated() temp.filter(mask).sort("BEDP_TITELNR") # %% temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90))).with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp = df.with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% df.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% # ** CHECK: duplicates temp = df.fill_null(0) mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated() temp.filter(mask) # %% df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False) # %% # ** CHECK: positions without titlenumber df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique() # %% # ** CHECK: unique title number? df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_TITELNR").len().alias("count"), pl.col.BEDP_MAN.unique().alias("unique_bedp_man"), pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"), ).unique().filter(pl.col("count") > 1) # %% df.filter(pl.col.BEDP_TITELNR == 8679893) # %% df.with_columns( pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count") ).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter( pl.col("titlenumber_count") > 1 ) # %% # ** CHECK: distribution of MELDENUMMER temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90))) sum_entries = len(temp) temp = ( temp.group_by("MELDENUMMER") .agg(pl.col("MELDENUMMER").len().alias("count")) .sort("count", descending=True) ) temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion")) temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum")) temp # df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select( # pl.len() # ) # p_save = Path.cwd() / "meldenummer_anteile_20260114-2.xlsx" # temp.write_excel(p_save) # %% # ** CHECK: differences MANDANT in BEDP and in TINFO # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique()) # %% df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique()) # %% df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1) # %% # df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5) df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null()) # %% # ** CHECK: different MANDANTEN # check for valid entries for unknown MANDANTEN # MANDANTEN others than (1, 90) do not possess relevant properties such as # "MELDENUMMER" and others --> conclusion: not relevant # MANDANT = 80 # print(f"Mandant: {MANDANT}") # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ) # ) # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ).null_count() # ) # print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts())) # %% df.filter(pl.col("MELDENUMMER").is_null()).filter(pl.col("MANDFUEHR").is_not_null()) # %% # ** PREFILTER # always needed, entries filtered out are to be disposed filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) df.filter(filter_meldenummer_null).filter(filter_mandant) # df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) # %% len(df) # %% # ** CHECK: null values set in the query with CASE statement # not known if NULL because of CASE statement or already set in table # unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"? # from the title DB df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null()) df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) # %% df.select("MELDENUMMER").unique() # %% # ** CHECK: null values for "MENGE_VORMERKER" df.filter(pl.col("MENGE_VORMERKER").is_null()) # df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) agg_t = ( df.group_by(["MELDENUMMER"]).agg( # pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(), pl.col("MENGE_VORMERKER").alias("VM_count").unique(), ) # .filter(pl.col("count_customer") >= 0) # !! should be 3 ) # .filter(pl.col("MELDENUMMER") == 18) agg_t # %% df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum()) # %% # ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER" # ** not known at this point # there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER --> # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) p_save_diff_VM_bedp_tinfo = ( Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx" ) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) # why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? # %% # ** CHECK: titles with request where no title information is found # result: there were entries found on 02.12., but not on 03.12.2025 not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) EXPORT_FEAT = "BEDP_TITELNR" to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251211-1.json" print(to_save) # with open(p_save_not_in_title_table, "w") as file: # json.dump(to_save, file, indent=4) # %% df.group_by("BEDP_MAN").agg(pl.len()) # %% df.filter(pl.col("MELDENUMMER").is_null()).group_by("BEDP_MAN").agg(pl.len().alias("count")) # %% print(len(df.filter(pl.col("MELDENUMMER") == 18))) # df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN" ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( db.EXT_DOPT_ERGEBNIS.columns.keys() ) ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() SAVE_TMP_FILES: typing.Final[bool] = True TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" TMPFILE_WF200_SUB1 = "WF-200_Sub1" def save_tmp_file( data: pl.DataFrame, filename: str | None, ) -> None: if filename is None: filename = str(uuid.uuid4()) pth = (TMP_DIR / filename).with_suffix(".arrow") n: int = 1 while pth.exists(): filename_new = pth.stem + f"_{n}" pth = (TMP_DIR / filename_new).with_suffix(".arrow") n += 1 data.write_ipc(pth) def load_tmp_file( filename: str, ) -> pl.DataFrame: pth = (TMP_DIR / filename).with_suffix(".arrow") if not pth.exists(): raise FileNotFoundError(f"File >{pth.name}< not found") return pl.read_ipc(pth) def load_all_tmp_files() -> dict[str, pl.DataFrame]: all_dfs: dict[str, pl.DataFrame] = {} for file in TMP_DIR.glob("*.arrow"): df = pl.read_ipc(file) all_dfs[file.stem] = df return all_dfs def get_starting_date( days_from_now: int, ) -> datetime.date: current_dt = dt.current_time_tz(cut_microseconds=True) td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS) return (current_dt - td).date() def get_raw_data() -> pl.DataFrame: join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, db.ext_titel_info.c.EINKAEUFER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) def save_tmp_data(df: pl.DataFrame) -> None: with engine.begin() as conn: conn.execute(sql.delete(db.tmp_data)) with engine.begin() as conn: conn.execute(sql.insert(db.tmp_data), df.to_dicts()) def get_tmp_data() -> pl.DataFrame: return pl.read_database( sql.select(db.tmp_data), engine, schema_overrides=db.tmp_data_schema_map, ) def get_result_data() -> pl.DataFrame: return pl.read_database( sql.select(db.EXT_DOPT_ERGEBNIS), engine, schema_overrides=db.results_schema_map, ) def save_result_data(results: pl.DataFrame) -> None: with engine.begin() as conn: conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts()) def clear_result_data() -> None: with engine.begin() as conn: conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS)) def save_result_data_native(results: pl.DataFrame) -> None: results = results.with_columns( [ pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c) for c in results.select(cs.boolean()).columns ] ) stmt = """ INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO") VALUES (:1, :2, :3, :4, :5, :6) """ with engine.begin() as conn: raw_conn = conn.connection.connection with raw_conn.cursor() as cursor: cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True)) def _apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], ) -> types.FilterResult: df_current = df removed_rows: list[pl.DataFrame] = [] for filter in filters: removed = df_current.filter(~filter) removed_rows.append(removed) df_current = df_current.filter(filter) df_removed = pl.concat(removed_rows) return types.FilterResult(in_=df_current, out_=df_removed) class PipelineResult: __slots__ = ("_results", "_open", "_subtracted_indices") _index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ") def __init__( self, data: pl.DataFrame, ) -> None: self._open = data schema = db.results_schema_map.copy() del schema["ID"] self._results = pl.DataFrame(schema=schema) schema = {} for col in self._index_cols: schema[col] = db.raw_data_query_schema_map[col] self._subtracted_indices = pl.DataFrame(schema=schema) def __len__(self) -> int: return len(self._results) + len(self._open) @property def open(self) -> pl.DataFrame: return self._open @property def results(self) -> pl.DataFrame: return self._results @property def subtracted_indices(self) -> pl.DataFrame: return self._subtracted_indices def update_open( self, data: pl.DataFrame, ) -> None: self._open = data def _subtract_data( self, data: pl.DataFrame, ) -> None: self._open = self._open.join(data, on=self._index_cols, how="anti") self._subtracted_indices = pl.concat( (self._subtracted_indices, data[self._index_cols]) ) def _add_results( self, data: pl.DataFrame, ) -> None: res = pl.concat([self._results, data]) self._results = res def merge_pipeline( self, pipeline: PipelineResult, ) -> None: self._subtract_data(pipeline.subtracted_indices) self._add_results(pipeline.results) def write_results( self, data: pl.DataFrame, vorlage: bool, wf_id: types.Workflows, freigabe_auto: types.Freigabe, order_qty_expr: pl.Expr, ) -> None: results = data.rename(db.map_data_to_result) results = results.with_columns( [ pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), order_qty_expr, pl.lit(freigabe_auto.value) .alias("FREIGABE_AUTO") .cast(db.results_schema_map["FREIGABE_AUTO"]), ] ) results = results.drop( [ "BEDP_TITELNR", "BEDP_MAN", "BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "VERLAGSNR", "MENGE_VORMERKER", "MANDFUEHR", "EINKAEUFER", ] ) self._subtract_data(data) self._add_results(results) class ExprOrderQty(typing.Protocol): ... class ExprOrderQty_Base(ExprOrderQty, typing.Protocol): def __call__(self) -> pl.Expr: ... ExprOrderQty_Base_Types: typing.TypeAlias = ( typing.Literal[types.Workflows.ID_200] | typing.Literal[types.Workflows.ID_900] | typing.Literal[types.Workflows.ID_910] ) class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol): def __call__(self, empty: bool) -> pl.Expr: ... @typing.overload def get_expr_order_qty( wf_id: typing.Literal[types.Workflows.ID_100], ) -> ExprOrderQty_WF100: ... @typing.overload def get_expr_order_qty( wf_id: ExprOrderQty_Base_Types, ) -> ExprOrderQty_Base: ... def get_expr_order_qty( wf_id: types.Workflows, ) -> ExprOrderQty: empty_expr = ( pl.lit(0) .alias(ORDER_QTY_CRIT) .alias("BEST_MENGE") .cast(db.results_schema_map["BEST_MENGE"]) ) def _empty() -> pl.Expr: return empty_expr func: ExprOrderQty match wf_id: case types.Workflows.ID_100: def _func(empty: bool) -> pl.Expr: order_qty_expr: pl.Expr if empty: order_qty_expr = empty_expr else: order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE") return order_qty_expr func = _func case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910: func = _empty case _: raise NotImplementedError( f"Order expression for WF-ID {wf_id.value} is not implemented" ) return func def wf900( pipe_result: PipelineResult, ) -> PipelineResult: """filter 'Meldenummer' and fill non-feasible entries""" ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( pipe_result.open, ( filter_meldenummer_null, filter_mandant, ), ) pipe_result.write_results( data=res.out_, vorlage=False, wf_id=types.Workflows.ID_900, freigabe_auto=types.Freigabe.WF_900, order_qty_expr=ORDER_QTY_FUNC(), ) pipe_result.update_open( res.in_.with_columns( pl.col("MENGE_VORMERKER").fill_null(0), pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0), ) ) return pipe_result def wf910( pipe_result: PipelineResult, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,)) pipe_result.write_results( data=res.out_, vorlage=False, wf_id=types.Workflows.ID_910, freigabe_auto=types.Freigabe.WF_910, order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result # this a main routine: # receives and gives back result objects def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def wf100_petersen( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # difference WDB and others # // WDB branch # order quantity 0, no further action in other WFs filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) == 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=True), ) filter_number_vm = pl.col(vm_criterion) > 0 res_candidates = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) wdb_sub_pipe = PipelineResult(res_candidates.in_) wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe) assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed" pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch # show always entries with #VM > 1 filter_number_vm = pl.col(vm_criterion) > 1 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def _wf100_petersen_sub1_wdb( pipe_result: PipelineResult, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # input: pre-filtered entries (WDB titles and #VM > 0) # more than 1 VM: show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res = _apply_several_filters( pipe_result.open, (filter_number_vm,), ) pipe_result.write_results( data=res.in_, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) # filtered out entries (WDB with #VM == 1) must be analysed for orders in the # past 6 months save_tmp_data(pipe_result.open) RELEVANT_DATE = get_starting_date(180) join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE stmt = ( sql.select( db.tmp_data, db.EXT_BESPBES_INFO.c.BESP_MENGE, db.EXT_BESPBES_INFO.c.BESP_STATUS, ) .select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition)) .where(filter_) ) sub1 = stmt.subquery() count_col = sql.func.count() stmt = ( sql.select( sub1.c.BEDP_TITELNR, count_col.label("count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) .having(count_col > 1) ) if SAVE_TMP_FILES: stmt = ( sql.select( sub1.c.BEDP_TITELNR, count_col.label("count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) ) # !! this is a sub result which must be used in the result set # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) if SAVE_TMP_FILES: save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) relevant_titles = relevant_titles.filter(pl.col.COUNT > 1) entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) pipe_result.write_results( data=entries_to_show, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) pipe_result.write_results( data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def wf200_umbreit( pipe_result: PipelineResult, ) -> PipelineResult: relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 res = _apply_several_filters( pipe_result.open, (filter_meldenummer, filter_mandant), ) sub_pipe = PipelineResult(res.in_) sub_pipe = _wf200_sub1(sub_pipe) assert sub_pipe.open.height == 0 pipe_result.merge_pipeline(sub_pipe) return pipe_result def wf200_petersen( pipe_result: PipelineResult, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) RELEVANT_MNR: tuple[int, ...] = (17, 18) # // WDB branch filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR) filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, ), ) # ignore these pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) # // other branch res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, ), ) sub_pipe = PipelineResult(res.in_) sub_pipe = _wf200_sub1(sub_pipe) assert sub_pipe.open.height == 0, "Sub pipe not fully processed" pipe_result.merge_pipeline(sub_pipe) return pipe_result def _wf200_sub1( pipe_result: PipelineResult, ) -> PipelineResult: save_tmp_data(pipe_result.open) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) RELEVANT_DATE = get_starting_date(90) join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), ) stmt = ( sql.select( db.tmp_data, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, db.EXT_AUFPAUF.c.AUFTRAGS_ART, ) .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) .where(filter_) ) sub1 = stmt.subquery() unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) stmt = ( sql.select( sub1.c.BEDP_TITELNR, sql.func.count().label("count"), unique_count_col.label("customer_count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) if SAVE_TMP_FILES: stmt = ( sql.select( sub1.c.BEDP_TITELNR, sql.func.count().label("count"), unique_count_col.label("customer_count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) ) # !! this is a sub result which must be used in the result set # !! for testing and feedback by the customer relevant_titles = pl.read_database( stmt, engine, ) if SAVE_TMP_FILES: save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) pipe_result.write_results( data=entries_to_show, vorlage=True, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) pipe_result.write_results( data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result # %% # SAVING/LOADING READ_DATABASE = False OVERWRITE = True FILENAME = "raw_data_from_sql_query_20260202-1.arrow" p_save = Path.cwd() / FILENAME if READ_DATABASE: df = get_raw_data() if not p_save.exists() or OVERWRITE: df.write_ipc(p_save) else: df = pl.read_ipc(p_save) # %% df # %% # initialise pipeline raw_data = df.clone() print(f"Number of entries: {len(df)}") clear_tmp_dir() clear_result_data() # %% df.head() # %% raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) pipe_res = PipelineResult(raw_data) pipe_res.results pipe_res = wf900(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results # %% pipe_res = wf910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf100_petersen(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf200_umbreit(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf200_petersen(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% pipe_res.results.select(pl.col("VORLAGE").value_counts()) # %% pipe_res.results.height # %% # // aggregate test results all_tmps = load_all_tmp_files() print(len(all_tmps)) # %% def prepare_tmp_data() -> list[pl.DataFrame]: all_tmps = load_all_tmp_files() WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"} WF_200_TMP_RENAME = { "COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate", "CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate", } WF_100: list[pl.DataFrame] = [] WF_200: list[pl.DataFrame] = [] for name, df in all_tmps.items(): if TMPFILE_WF100_SUB1_WDB in name: rename_schema = WF_100_TMP_RENAME df = df.rename(rename_schema) WF_100.append(df) elif TMPFILE_WF200_SUB1 in name: rename_schema = WF_200_TMP_RENAME df = df.rename(rename_schema) WF_200.append(df) tmp_WF_collects = (WF_100, WF_200) all_tmps_preproc: list[pl.DataFrame] = [] for collect in tmp_WF_collects: if len(collect) > 1: df = pl.concat(collect) elif len(collect) == 1: df = collect[0].clone() else: raise RuntimeError() all_tmps_preproc.append(df) return all_tmps_preproc def generate_test_result_data( raw_data: pl.DataFrame, pipe_result: PipelineResult, ) -> pl.DataFrame: all_tmps_preproc = prepare_tmp_data() res_table = pipe_result.results.clone() res_title_info = res_table.join( raw_data, left_on=["BEDARF_NR", "BEDARF_SEQUENZ"], right_on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner", ) exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ") res_title_info = res_title_info.select(pl.exclude(exclude_cols)) columns = [ "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO", "BEDP_MENGE_BEDARF_VM", "MENGE_VORMERKER", "BEDP_TITELNR", "BEDP_MAN", "MELDENUMMER", "VERLAGSNR", "EINKAEUFER", "MANDFUEHR", ] res_title_info = res_title_info.select(columns) test_results = res_title_info.clone() for df in all_tmps_preproc: test_results = test_results.join(df, on="BEDP_TITELNR", how="left") test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False) test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all()) test_results = test_results.with_columns( pl.lit(None, dtype=pl.String).alias("Anmerkungen/Feedback") ) return test_results # %% test_results = generate_test_result_data(raw_data, pipe_res) test_results.head() # %% def write_test_results_excel( data: pl.DataFrame, base_filename: str, ) -> None: date_str = datetime.datetime.now().strftime("%Y-%m-%d") p_save = Path.cwd() / f"{base_filename}_{date_str}.xlsx" pd_df = data.to_pandas().set_index("Index") with pd.ExcelWriter(p_save, engine="xlsxwriter") as writer: sheet_name = f"Ergebnisse_Testphase_{date_str}" pd_df.to_excel( writer, freeze_panes=(1, 1), sheet_name=sheet_name, ) worksheet = writer.sheets[sheet_name] rows, cols = pd_df.shape columns = ["Index"] + pd_df.columns.to_list() worksheet.add_table( 0, 0, rows, cols, {"columns": [{"header": c} for c in columns], "style": "Table Style Light 9"}, ) for i, col in enumerate(columns): if i == 0: worksheet.set_column( i, i, max(pd_df.index.astype(str).map(len).max(), len(col)) + 2 ) continue worksheet.set_column( i, i, max(pd_df[col].astype(str).map(len).max(), len(col)) + 2 ) # %% write_test_results_excel(test_results, "Testdatensatz_WF-100-200") ##################################################################### # %% # ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter( pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER ) deviation_vm = test_results.filter( pl.col.BEDP_TITELNR.is_in(deviation_vm["BEDP_TITELNR"].implode()) ) deviation_vm # %% write_test_results_excel(deviation_vm, "Abweichungen-VM") # ** WF-200 potentially triggered raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter( pl.col.BEDP_TITELNR.is_duplicated() ).sort("BEDP_TITELNR") # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) # ---------------------------------------------------------------------------- # # %% wf_200_start_data = pipe_res.open.clone() wf_200_start_data # %% engine.dispose() remove_tmp_dir() # %% relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 res = _apply_several_filters( wf_200_start_data, (filter_meldenummer, filter_mandant), ) # %% # these entries must be checked for relevant orders # therefore, a temp table must be created in the database to execute efficient # queries, other approaches are just hacks # SOLUTION: # - save these entries to a temp table 'temp' # - look up the order history of the past 3 months # -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR # -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND # -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND # # this is a separate sub-pipeline like in Petersen WF-100 # these entries are either to be shown or not sub_pipe_umbreit = PipelineResult(res.in_) # %% sub_pipe_umbreit.open # %% # %% save_tmp_data(sub_pipe_umbreit.open) # %% rel_date = get_starting_date(90) rel_date # %% # old way using in statements # filter_ = sql.and_( # db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice), # db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), # ) # join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR # filter_ = sql.and_( # db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), # db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), # ) # stmt = ( # sql.select( # db.tmp_data, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, # db.EXT_AUFPAUF.c.AUFTRAGS_ART, # ) # .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) # .where(filter_) # ) # print(stmt.compile(engine)) # new_schema = db.EXT_AUFPAUF_schema_map.copy() # new_schema.update(db.tmp_data_schema_map) # new_schema # %% # demo = pl.read_database( # stmt, # engine, # schema_overrides=db.EXT_AUFPAUF_schema_map, # ) # # %% # demo # # %% # demo.select(pl.col.AUFTRAGS_ART).unique() # %% get_tmp_data() # %% # demo_2 = demo.clone() # # demo_2.head() # print(f"Number of titles before filtering: {len(demo_2)}") # demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99))) # demo_2 = ( # demo_2.group_by("BEDP_TITELNR", maintain_order=True) # .agg( # pl.len().alias("count"), # pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"), # ) # .filter(pl.col.customer_count >= 3) # ) # # these remaining titles are relevant and should be shown # # the others should be disposed # print(f"Number of titles which are relevant: {len(demo_2)}") # print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}") # demo_2 # %% # make a subquery for the pre-filtered entries # // query to obtain relevant title numbers join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), ) stmt = ( sql.select( db.tmp_data, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, db.EXT_AUFPAUF.c.AUFTRAGS_ART, ) .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) .where(filter_) ) sub1 = stmt.subquery() unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) stmt = ( sql.select( sub1.c.BEDP_TITELNR, sql.func.count().label("count"), unique_count_col.label("customer_count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) print(stmt.compile(engine)) # %% demo_agg = pl.read_database( stmt, engine, ) # %% demo_agg # %% sub_pipe_umbreit.open # sub_pipe_umbreit.open.select("BEDP_TITELNR").n_unique() # %% # now obtain these entries from the open data demo_agg["BEDP_TITELNR"].unique().implode() entries_to_show = sub_pipe_umbreit.open.filter( pl.col.BEDP_TITELNR.is_in(demo_agg["BEDP_TITELNR"].unique().implode()) ) entries_to_show # %% sub_pipe_umbreit.open # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df # %% df.filter(pl.col("BEDARFNR") == 884607) # %% df_order.filter(pl.col("BEDARFNR") == 884607) # %% # now obtain order data for entries t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) t # %% agg_t = ( t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) .agg( pl.count("AUFP_POSITION").alias("pos_count"), pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), ) .filter(pl.col("count_customer") >= 0) # !! should be 3 ) agg_t # %% df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) # %% # ---------------------------------------------------------------------------- # # Writing results in DB # ---------------------------------------------------------------------------- # delete_results() pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") stmt = sql.select(db.results) db_results = pl.read_database(stmt, engine) db_results # ---------------------------------------------------------------------------- # # Further Data Analysis # ---------------------------------------------------------------------------- # # %% stmt = sql.select(db.ext_bedpbed) df = pl.read_database( stmt, engine, schema_overrides=db.ext_bedpbed_schema_map, ) # %% df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") ).filter(pl.col("unique_BEDP_MAN") > 1) # %% df["BEDP_MAN"].unique() # %% df.estimated_size(unit="mb") # %% target_bednr = df_raw["BEDARFNR"].to_list() target_seq = df_raw["BEDP_SEQUENZ"].to_list() # %% stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ) .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) ) df_targets = pl.read_database(stmt, engine) # %% # df_targets.filter(pl.col("BEDARFNR") == 884174) df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) # %% # interesting order: 883697, 1, titleno: 7945981, 9964027 TITLE_NO = 7945981 # TITLE_NO = 9964027 stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) title_buy = pl.read_database(stmt, engine) # %% title_buy # %% when were the orders placed stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) title_order = pl.read_database(stmt, engine) # %% title_order # ------------------------------------------------------------------------------------------- # %% # title DB complete? # - includes only titles which are deliverable since 01.06.2025 and who are assigned to # buyer "Fröhlich" stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) # %% titles["MANDFUEHR"].unique() # %% unique_titles = set(titles["TI_NUMMER"].to_list()) len(unique_titles) # %% # requirements? # - includes only order since 05.11.2025 stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # %% reqs # %% reqs["BEDP_MAN"].unique() # %% # intersection between all titles and the titles contained in the requirements table unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) len(unique_titles_req) # %% intersection = unique_titles & unique_titles_req len(intersection) # %% # orders? # - includes only order since 05.11.2025 stmt = sql.select(db.EXT_AUFPAUF) orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) # %% orders.estimated_size(unit="mb") # %% with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% df = dataframes[1] # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% len(df) # %% df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) # %% stmt = sql.text("SELECT * FROM ext_bedpbed") df = pl.read_database(stmt, engine) # %% df # %% # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% # ** Petersen WDB filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(VM_CRITERION) > 0 res = _apply_several_filters( df, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) # %% res.in_ # %% # !! show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res_vm_crit = _apply_several_filters( res.in_, (filter_number_vm,), ) # %% res_vm_crit.out_ # %% # filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list() len(title_nos) # %% title_nos # %% # define starting date for 6 month interval # returns UTC time start_date = get_starting_date(180) filter_ = sql.and_( db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos), db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, ) stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) df_order # %% # filter entries which have df_show = ( df_order.group_by("BESP_TITELNR") .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) df_show # %% # !! show these entries # !! do not show others entries_to_show = df_show["BESP_TITELNR"].to_list() print(f"Number of entries relevant: {len(entries_to_show)}") # %% res_vm_crit.out_ # %% filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode()) res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,)) # %% res_wdb.in_ # %% res_wdb.out_ # %% # %% # %% # %% schema = {} for col in ("BEDARFNR", "BEDP_SEQUENZ"): schema[col] = db.raw_data_query_schema_map[col] base = pl.DataFrame(schema=schema) # %% data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))} orig_data = pl.DataFrame(data, schema=schema) data = orig_data[:5].clone() # %% pl.concat([base, data]) # %% orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti") # %% orig_data[("BEDARFNR", "BEDP_SEQUENZ")] # %% raw_data = df.clone() pipe_res = PipelineResult(raw_data) pipe_res.open # %% pipe_res.results # %% sub_data = pipe_res.open[:20].clone() sub_data # %% pipe_res.write_results( sub_data, vorlage=True, wf_id=30, freigabe_auto=types.Freigabe.WF_100, is_out=True, ) # %% pipe_res.open # %% pipe_res.results # %% raw_data = df.clone() pipe_res_main = PipelineResult(raw_data) pipe_res_main.open # %% pipe_res_main.merge_pipeline(pipe_res) # %% pipe_res_main.open # %% pipe_res.results # %% remove_tmp_dir() # %%