# %% from __future__ import annotations import datetime import json import time import typing from collections.abc import Sequence from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt import oracledb import polars as pl import polars.selectors as cs import sqlalchemy as sql from dopt_basics import configs, io from sqlalchemy import event from umbreit import db, types # %% # import importlib # types = importlib.reload(types) # db = importlib.reload(db) # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), glob_pattern="CRED*.toml", stop_folder_name="umbreit-py", ) assert p_cfg is not None CFG = configs.load_toml(p_cfg) HOST = CFG["server"]["host"] PORT = CFG["server"]["port"] SERVICE = CFG["server"]["service"] USER_NAME = CFG["user"]["name"] USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode # p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") # assert p_oracle_client.exists() # assert p_oracle_client.is_dir() # oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) # engine = sql.create_engine(conn_string) engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) @event.listens_for(engine, "after_cursor_execute") def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): cursor.arraysize = 1000 cursor.prefetchrows = 1000 # %% ########### RESULTS ########### # temporary res_engine = sql.create_engine("sqlite://") db.metadata.create_all(res_engine, tables=(db.results_local,)) # %% # delete existing results def delete_results( res_engine: sql.Engine, ) -> None: with res_engine.begin() as conn: res = conn.execute(sql.delete(db.results_local)) print("Rows deleted: ", res.rowcount) delete_results(res_engine) stmt = sql.select(db.results_local.c.bedarf_nr, db.results_local.c.bedarf_sequenz) with res_engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% # define starting date for 3 month interval # returns UTC time current_dt = dt.current_time_tz(cut_microseconds=True) print("Current DT: ", current_dt) td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) print("Timedelta: ", td) start_date = (current_dt - td).date() print("Starting date: ", start_date) # %% # // ---------- LIVE DATA ----------- # TODO find way to filter more efficiently # WF-200: filter for relevant orders with current BEDP set # missing: order types which are relevant filter_K_rech = (608991, 260202) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, ) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, ) where_condition = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), ) stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, db.EXT_AUFPAUF, ) .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) .where(where_condition) .limit(100) # full query really slow ) # %% print(stmt.compile(engine)) # %% df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) df_order # %% # AUFPAUF # stmt = sql.select(db.EXT_AUFPAUF) # df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) # df_aufpauf # df_aufpauf.filter(pl.col("TITELNR") == 6315273) # prefilter amount columns for invalid entries # // tests with ext_bedpbed # print("--------------- ext_bedpbed --------------") # t1 = time.perf_counter() # AMOUNT_COLS = frozenset( # ( # "BEDP_MENGE_BEDARF", # "BEDP_MENGE_VERKAUF", # "BEDP_MENGE_ANFRAGE", # "BEDP_MENGE_BESTELLUNG", # "BEDP_MENGE_FREI", # "BEDP_MENGE_BEDARF_VM", # ) # ) # case_stmts = [] # for col in AMOUNT_COLS: # case_stmts.append( # sql.case( # (db.ext_bedpbed.c[col] <= -1, sql.null()), # else_=db.ext_bedpbed.c[col], # ).label(col) # ) # stmt = sql.select( # *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], # *case_stmts, # ) # df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # t2 = time.perf_counter() # elapsed = t2 - t1 # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) # print(f"Query duration: {elapsed:.4f} sec") # print("Number of entries: ", len(df)) # print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # try title_info parsing stmt = sql.select(db.ext_titel_info) print(stmt.compile(engine)) # %% # raw data query # TODO look for entries which do not have an associated title number print("--------------- raw data query --------------") t1 = time.perf_counter() # join_condition = sql.and_( # db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, # db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, # ) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, # sql.case( # (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), # else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, # ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) print(stmt.compile(engine)) df = pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) t2 = time.perf_counter() elapsed = t2 - t1 # %% print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% df.head() # %% temp = df.with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% # // NO LIVE DATA NEEDED # SAVING/LOADING # p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% print(len(df)) df.head() # %% temp = df.fill_null(0) mask = df.select(pl.exclude("BEDARFNR", "BEDP_SEQUENZ")).is_duplicated() temp.filter(mask).sort("BEDP_TITELNR") # %% temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90))).with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp = df.with_columns( pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), ) temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% df.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) # %% # ** CHECK: duplicates temp = df.fill_null(0) mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated() temp.filter(mask) # %% df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False) # %% # ** CHECK: positions without titlenumber df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique() # %% # ** CHECK: unique title number? df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_TITELNR").len().alias("count"), pl.col.BEDP_MAN.unique().alias("unique_bedp_man"), pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"), ).unique().filter(pl.col("count") > 1) # %% df.filter(pl.col.BEDP_TITELNR == 8679893) # %% df.with_columns( pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count") ).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter( pl.col("titlenumber_count") > 1 ) # %% # ** CHECK: distribution of MELDENUMMER temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90))) sum_entries = len(temp) temp = ( temp.group_by("MELDENUMMER") .agg(pl.col("MELDENUMMER").len().alias("count")) .sort("count", descending=True) ) temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion")) temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum")) temp # df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select( # pl.len() # ) # p_save = Path.cwd() / "meldenummer_anteile_20260114-2.xlsx" # temp.write_excel(p_save) # %% # ** CHECK: differences MANDANT in BEDP and in TINFO # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique()) # %% df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique()) # %% df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1) # %% # df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5) df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null()) # %% # ** CHECK: different MANDANTEN # check for valid entries for unknown MANDANTEN # MANDANTEN others than (1, 90) do not possess relevant properties such as # "MELDENUMMER" and others --> conclusion: not relevant # MANDANT = 80 # print(f"Mandant: {MANDANT}") # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ) # ) # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ).null_count() # ) # print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts())) # %% df.filter(pl.col("MELDENUMMER").is_null()).filter(pl.col("MANDFUEHR").is_not_null()) # %% # ** PREFILTER # always needed, entries filtered out are to be disposed filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) df.filter(filter_meldenummer_null).filter(filter_mandant) # df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) # %% len(df) # %% # ** CHECK: null values set in the query with CASE statement # not known if NULL because of CASE statement or already set in table # unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"? # from the title DB df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null()) df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) # %% df.select("MELDENUMMER").unique() # %% # ** CHECK: null values for "MENGE_VORMERKER" df.filter(pl.col("MENGE_VORMERKER").is_null()) # df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) agg_t = ( df.group_by(["MELDENUMMER"]).agg( # pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(), pl.col("MENGE_VORMERKER").alias("VM_count").unique(), ) # .filter(pl.col("count_customer") >= 0) # !! should be 3 ) # .filter(pl.col("MELDENUMMER") == 18) agg_t # %% df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum()) # %% # ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER" # ** not known at this point # there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER --> # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) p_save_diff_VM_bedp_tinfo = ( Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" ) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) # why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? # %% # ** CHECK: titles with request where no title information is found # result: there were entries found on 02.12., but not on 03.12.2025 not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) EXPORT_FEAT = "BEDP_TITELNR" to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251211-1.json" print(to_save) # with open(p_save_not_in_title_table, "w") as file: # json.dump(to_save, file, indent=4) # %% df.group_by("BEDP_MAN").agg(pl.len()) # %% df.filter(pl.col("MELDENUMMER").is_null()).group_by("BEDP_MAN").agg(pl.len().alias("count")) # %% print(len(df.filter(pl.col("MELDENUMMER") == 18))) # df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN" ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( db.EXT_DOPT_ERGEBNIS.columns.keys() ) ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() def get_starting_date( days: int, ) -> datetime.date: current_dt = dt.current_time_tz(cut_microseconds=True) td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS) return (current_dt - td).date() # TODO exchange to new query focusing on TINFO table def get_raw_data() -> pl.DataFrame: join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MENGE_VORMERKER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) def save_tmp_data(df: pl.DataFrame) -> None: with engine.begin() as conn: conn.execute(sql.delete(db.tmp_data)) with engine.begin() as conn: conn.execute(sql.insert(db.tmp_data), df.to_dicts()) def get_tmp_data() -> pl.DataFrame: return pl.read_database( sql.select(db.tmp_data), engine, schema_overrides=db.tmp_data_schema_map, ) def get_result_data() -> pl.DataFrame: return pl.read_database( sql.select(db.EXT_DOPT_ERGEBNIS), engine, schema_overrides=db.results_schema_map, ) def save_result_data(results: pl.DataFrame) -> None: with engine.begin() as conn: conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts()) def clear_result_data() -> None: with engine.begin() as conn: conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS)) def save_result_data_native(results: pl.DataFrame) -> None: results = results.with_columns( [ pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c) for c in results.select(cs.boolean()).columns ] ) stmt = """ INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO") VALUES (:1, :2, :3, :4, :5, :6) """ with engine.begin() as conn: raw_conn = conn.connection.connection with raw_conn.cursor() as cursor: cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True)) def _apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], ) -> types.FilterResult: df_current = df removed_rows: list[pl.DataFrame] = [] for filter in filters: removed = df_current.filter(~filter) removed_rows.append(removed) df_current = df_current.filter(filter) df_removed = pl.concat(removed_rows) return types.FilterResult(in_=df_current, out_=df_removed) class PipelineResult: __slots__ = ("_results", "_open", "_subtracted_indices") _index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ") def __init__( self, data: pl.DataFrame, ) -> None: self._open = data schema = db.results_schema_map.copy() del schema["ID"] self._results = pl.DataFrame(schema=schema) schema = {} for col in self._index_cols: schema[col] = db.raw_data_query_schema_map[col] self._subtracted_indices = pl.DataFrame(schema=schema) def __len__(self) -> int: return len(self._results) + len(self._open) @property def open(self) -> pl.DataFrame: return self._open @property def results(self) -> pl.DataFrame: return self._results @property def subtracted_indices(self) -> pl.DataFrame: return self._subtracted_indices def update_open( self, data: pl.DataFrame, ) -> None: self._open = data def _subtract_data( self, data: pl.DataFrame, ) -> None: self._open = self._open.join(data, on=self._index_cols, how="anti") self._subtracted_indices = pl.concat( (self._subtracted_indices, data[self._index_cols]) ) def _add_results( self, data: pl.DataFrame, ) -> None: res = pl.concat([self._results, data]) self._results = res def merge_pipeline( self, pipeline: PipelineResult, ) -> None: self._subtract_data(pipeline.subtracted_indices) self._add_results(pipeline.results) def write_results( self, data: pl.DataFrame, vorlage: bool, wf_id: types.Workflows, freigabe_auto: types.Freigabe, order_qty_expr: pl.Expr, ) -> None: results = data.rename(db.map_data_to_result) results = results.with_columns( [ pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), order_qty_expr, pl.lit(freigabe_auto.value) .alias("FREIGABE_AUTO") .cast(db.results_schema_map["FREIGABE_AUTO"]), ] ) results = results.drop( [ "BEDP_TITELNR", "BEDP_MAN", "BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "VERLAGSNR", "MENGE_VORMERKER", "MANDFUEHR", ] ) # TODO remove # results = results.select(RESULT_COLUMN_ORDER) # print(results) # print("####################") # print(self._results) self._subtract_data(data) self._add_results(results) class ExprOrderQty(typing.Protocol): ... class ExprOrderQty_Base(ExprOrderQty, typing.Protocol): def __call__(self) -> pl.Expr: ... ExprOrderQty_Base_Types: typing.TypeAlias = ( typing.Literal[types.Workflows.ID_200] | typing.Literal[types.Workflows.ID_900] | typing.Literal[types.Workflows.ID_910] ) class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol): def __call__(self, empty: bool) -> pl.Expr: ... @typing.overload def get_expr_order_qty( wf_id: typing.Literal[types.Workflows.ID_100], ) -> ExprOrderQty_WF100: ... @typing.overload def get_expr_order_qty( wf_id: ExprOrderQty_Base_Types, ) -> ExprOrderQty_Base: ... def get_expr_order_qty( wf_id: types.Workflows, ) -> ExprOrderQty: empty_expr = ( pl.lit(0) .alias(ORDER_QTY_CRIT) .alias("BEST_MENGE") .cast(db.results_schema_map["BEST_MENGE"]) ) def _empty() -> pl.Expr: return empty_expr func: ExprOrderQty match wf_id: case types.Workflows.ID_100: def _func(empty: bool) -> pl.Expr: order_qty_expr: pl.Expr if empty: order_qty_expr = empty_expr else: order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE") return order_qty_expr func = _func case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910: func = _empty case _: raise NotImplementedError( f"Order expression for WF-ID {wf_id.value} is not implemented" ) return func def wf900( pipe_result: PipelineResult, ) -> PipelineResult: """filter 'Meldenummer' and fill non-feasible entries""" ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( pipe_res.open, ( filter_meldenummer_null, filter_mandant, ), ) pipe_result.write_results( data=res.out_, vorlage=False, wf_id=types.Workflows.ID_900, freigabe_auto=types.Freigabe.WF_900, order_qty_expr=ORDER_QTY_FUNC(), ) pipe_result.update_open( res.in_.with_columns( pl.col("MENGE_VORMERKER").fill_null(0), pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0), ) ) return pipe_result def wf910( pipe_result: PipelineResult, ) -> PipelineResult: # TODO check if necessary because of WF-900 ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 res = _apply_several_filters( pipe_result.open, filters=( filter_mandant, filter_ignore_MNR26, ), ) pipe_result.write_results( data=res.out_, vorlage=False, wf_id=types.Workflows.ID_910, freigabe_auto=types.Freigabe.WF_910, order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result # this a main routine: # receives and gives back result objects def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def wf100_petersen( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # difference WDB and others # // WDB branch # order quantity 0, no further action in other WFs filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) == 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=True), ) # TODO add check for orders or quantity to transform # TODO show them filter_number_vm = pl.col(vm_criterion) > 0 res_candidates = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) wdb_sub_pipe = PipelineResult(res_candidates.in_) wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe) assert wdb_sub_pipe.open.height == 0 pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch # show always entries with #VM > 1 filter_number_vm = pl.col(vm_criterion) > 1 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def _wf100_petersen_sub1_wdb( pipe_result: PipelineResult, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # input: pre-filtered entries (WDB titles and #VM > 0) # more then 1 VM # !! show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res = _apply_several_filters( pipe_result.open, (filter_number_vm,), ) pipe_result.write_results( data=res.in_, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) # filtered out entries (WDB with #VM == 1) must be analysed for orders in the # past 6 months start_date = get_starting_date(180) filter_ = sql.and_( db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()), db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, ) stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) entries_show = ( df_order.group_by("BESP_TITELNR") .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) # TODO IS IN check good because of performance? filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list()) res = _apply_several_filters(pipe_result.open, (filter_titleno,)) pipe_result.write_results( data=res.in_, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) pipe_result.write_results( data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result def wf200_umbreit( pipe_result: PipelineResult, ) -> PipelineResult: relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 res = _apply_several_filters( pipe_result.open, (filter_meldenummer, filter_mandant), ) sub_pipe = PipelineResult(res.in_) sub_pipe = _wf200_sub1(sub_pipe) assert sub_pipe.open.height == 0 pipe_result.merge_pipeline(sub_pipe) return pipe_result def wf200_petersen( pipe_result: PipelineResult, ) -> PipelineResult: ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) RELEVANT_MNR: tuple[int, ...] = (17, 18) # // WDB branch filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR) filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, ), ) # ignore these pipe_result.write_results( data=res.in_, vorlage=False, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) # // other branch res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, ), ) sub_pipe = PipelineResult(res.in_) sub_pipe = _wf200_sub1(sub_pipe) assert sub_pipe.open.height == 0 pipe_result.merge_pipeline(sub_pipe) return pipe_result def _wf200_sub1( pipe_result: PipelineResult, ) -> PipelineResult: save_tmp_data(pipe_result.open) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) RELEVANT_DATE = get_starting_date(90) join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), ) stmt = ( sql.select( db.tmp_data, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, db.EXT_AUFPAUF.c.AUFTRAGS_ART, ) .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) .where(filter_) ) sub1 = stmt.subquery() unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) stmt = ( sql.select( sub1.c.BEDP_TITELNR, sql.func.count().label("count"), unique_count_col.label("customer_count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) relevant_titles = pl.read_database( stmt, engine, ) entries_to_show = pipe_result.open.filter( pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) pipe_result.write_results( data=entries_to_show, vorlage=True, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) pipe_result.write_results( data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_200, freigabe_auto=types.Freigabe.WF_200, order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result # %% # SAVING/LOADING p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") # %% df.head() # %% # removed_rows = [] # raw_data = df.clone() # print(f"Length raw data: {len(raw_data)}") # filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) # filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 # filtered = raw_data.filter(filter_mandant) # filtered_n = raw_data.filter(~filter_mandant) # num_filter = len(filtered) # num_filter_n = len(filtered_n) # removed_rows.append(filtered_n) # print(f"Length filtered: {num_filter}") # print(f"Length filtered out: {num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}") # raw_data = filtered # out = pl.concat(removed_rows) # print(f"Length out: {len(out)}") # # %% # print("---------------------------------------") # filtered = raw_data.filter(filter_ignore_MNR26) # filtered_n = raw_data.filter(~filter_ignore_MNR26) # num_filter = len(filtered) # num_filter_n = len(filtered_n) # len(filtered_n) # # %% # removed_rows.append(filtered_n) # print(f"Length filtered: {num_filter}") # print(f"Length filtered out: {num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}") # out = pl.concat(removed_rows) # print(f"Length out: {len(out)}") # %% raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) pipe_res = PipelineResult(raw_data) pipe_res.results pipe_res = wf900(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results # %% # // test result writing res = pipe_res.results.clone() res.height # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # %% # pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # %% pipe_res = wf910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% # pipe_res.results.select(pl.col("vorlage").value_counts()) # %% pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf100_petersen(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf200_umbreit(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = wf200_petersen(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% pipe_res.results.select(pl.col("VORLAGE").value_counts()) # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) # ---------------------------------------------------------------------------- # # %% wf_200_start_data = pipe_res.open.clone() wf_200_start_data # %% engine.dispose() # %% relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 res = _apply_several_filters( wf_200_start_data, (filter_meldenummer, filter_mandant), ) # %% # these entries must be checked for relevant orders # therefore, a temp table must be created in the database to execute efficient # queries, other approaches are just hacks # SOLUTION: # - save these entries to a temp table 'temp' # - look up the order history of the past 3 months # -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR # -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND # -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND # # this is a separate sub-pipeline like in Petersen WF-100 # these entries are either to be shown or not sub_pipe_umbreit = PipelineResult(res.in_) # %% sub_pipe_umbreit.open # %% # %% save_tmp_data(sub_pipe_umbreit.open) # %% rel_date = get_starting_date(90) rel_date # %% # old way using in statements # filter_ = sql.and_( # db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice), # db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), # ) # join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR # filter_ = sql.and_( # db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), # db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), # ) # stmt = ( # sql.select( # db.tmp_data, # db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, # db.EXT_AUFPAUF.c.AUFTRAGS_ART, # ) # .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) # .where(filter_) # ) # print(stmt.compile(engine)) # new_schema = db.EXT_AUFPAUF_schema_map.copy() # new_schema.update(db.tmp_data_schema_map) # new_schema # %% # demo = pl.read_database( # stmt, # engine, # schema_overrides=db.EXT_AUFPAUF_schema_map, # ) # # %% # demo # # %% # demo.select(pl.col.AUFTRAGS_ART).unique() # %% get_tmp_data() # %% # demo_2 = demo.clone() # # demo_2.head() # print(f"Number of titles before filtering: {len(demo_2)}") # demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99))) # demo_2 = ( # demo_2.group_by("BEDP_TITELNR", maintain_order=True) # .agg( # pl.len().alias("count"), # pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"), # ) # .filter(pl.col.customer_count >= 3) # ) # # these remaining titles are relevant and should be shown # # the others should be disposed # print(f"Number of titles which are relevant: {len(demo_2)}") # print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}") # demo_2 # %% # make a subquery for the pre-filtered entries # // query to obtain relevant title numbers join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), ) stmt = ( sql.select( db.tmp_data, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, db.EXT_AUFPAUF.c.AUFTRAGS_ART, ) .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) .where(filter_) ) sub1 = stmt.subquery() unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) stmt = ( sql.select( sub1.c.BEDP_TITELNR, sql.func.count().label("count"), unique_count_col.label("customer_count"), ) .select_from(sub1) .group_by(sub1.c.BEDP_TITELNR) .having(unique_count_col >= 3) ) print(stmt.compile(engine)) # %% demo_agg = pl.read_database( stmt, engine, ) # %% demo_agg # %% sub_pipe_umbreit.open # sub_pipe_umbreit.open.select("BEDP_TITELNR").n_unique() # %% # now obtain these entries from the open data demo_agg["BEDP_TITELNR"].unique().implode() entries_to_show = sub_pipe_umbreit.open.filter( pl.col.BEDP_TITELNR.is_in(demo_agg["BEDP_TITELNR"].unique().implode()) ) entries_to_show # %% sub_pipe_umbreit.open # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df # %% df.filter(pl.col("BEDARFNR") == 884607) # %% df_order.filter(pl.col("BEDARFNR") == 884607) # %% # now obtain order data for entries t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) t # %% agg_t = ( t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) .agg( pl.count("AUFP_POSITION").alias("pos_count"), pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), ) .filter(pl.col("count_customer") >= 0) # !! should be 3 ) agg_t # %% df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) # %% # ---------------------------------------------------------------------------- # # Writing results in DB # ---------------------------------------------------------------------------- # delete_results() pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") stmt = sql.select(db.results) db_results = pl.read_database(stmt, engine) db_results # ---------------------------------------------------------------------------- # # Further Data Analysis # ---------------------------------------------------------------------------- # # %% stmt = sql.select(db.ext_bedpbed) df = pl.read_database( stmt, engine, schema_overrides=db.ext_bedpbed_schema_map, ) # %% df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") ).filter(pl.col("unique_BEDP_MAN") > 1) # %% df["BEDP_MAN"].unique() # %% df.estimated_size(unit="mb") # %% target_bednr = df_raw["BEDARFNR"].to_list() target_seq = df_raw["BEDP_SEQUENZ"].to_list() # %% stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ) .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) ) df_targets = pl.read_database(stmt, engine) # %% # df_targets.filter(pl.col("BEDARFNR") == 884174) df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) # %% # interesting order: 883697, 1, titleno: 7945981, 9964027 TITLE_NO = 7945981 # TITLE_NO = 9964027 stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) title_buy = pl.read_database(stmt, engine) # %% title_buy # %% when were the orders placed stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) title_order = pl.read_database(stmt, engine) # %% title_order # ------------------------------------------------------------------------------------------- # %% # title DB complete? # - includes only titles which are deliverable since 01.06.2025 and who are assigned to # buyer "Fröhlich" stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) # %% titles["MANDFUEHR"].unique() # %% unique_titles = set(titles["TI_NUMMER"].to_list()) len(unique_titles) # %% # requirements? # - includes only order since 05.11.2025 stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # %% reqs # %% reqs["BEDP_MAN"].unique() # %% # intersection between all titles and the titles contained in the requirements table unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) len(unique_titles_req) # %% intersection = unique_titles & unique_titles_req len(intersection) # %% # orders? # - includes only order since 05.11.2025 stmt = sql.select(db.EXT_AUFPAUF) orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) # %% orders.estimated_size(unit="mb") # %% with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% df = dataframes[1] # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% len(df) # %% df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) # %% stmt = sql.text("SELECT * FROM ext_bedpbed") df = pl.read_database(stmt, engine) # %% df # %% # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% # ** Petersen WDB filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(VM_CRITERION) > 0 res = _apply_several_filters( df, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) # %% res.in_ # %% # !! show these entries filter_number_vm = pl.col(VM_CRITERION) > 1 res_vm_crit = _apply_several_filters( res.in_, (filter_number_vm,), ) # %% res_vm_crit.out_ # %% # filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list() len(title_nos) # %% title_nos # %% # define starting date for 6 month interval # returns UTC time start_date = get_starting_date(180) filter_ = sql.and_( db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos), db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, ) stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) df_order # %% # filter entries which have df_show = ( df_order.group_by("BESP_TITELNR") .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) df_show # %% # !! show these entries # !! do not show others entries_to_show = df_show["BESP_TITELNR"].to_list() print(f"Number of entries relevant: {len(entries_to_show)}") # %% res_vm_crit.out_ # %% filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode()) res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,)) # %% res_wdb.in_ # %% res_wdb.out_ # %% # %% # %% # %% schema = {} for col in ("BEDARFNR", "BEDP_SEQUENZ"): schema[col] = db.raw_data_query_schema_map[col] base = pl.DataFrame(schema=schema) # %% data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))} orig_data = pl.DataFrame(data, schema=schema) data = orig_data[:5].clone() # %% pl.concat([base, data]) # %% orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti") # %% orig_data[("BEDARFNR", "BEDP_SEQUENZ")] # %% raw_data = df.clone() pipe_res = PipelineResult(raw_data) pipe_res.open # %% pipe_res.results # %% sub_data = pipe_res.open[:20].clone() sub_data # %% pipe_res.write_results( sub_data, vorlage=True, wf_id=30, freigabe_auto=types.Freigabe.WF_100, is_out=True, ) # %% pipe_res.open # %% pipe_res.results # %% raw_data = df.clone() pipe_res_main = PipelineResult(raw_data) pipe_res_main.open # %% pipe_res_main.merge_pipeline(pipe_res) # %% pipe_res_main.open # %% pipe_res.results # %%