# %% import json import time import typing from collections.abc import Sequence from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt import polars as pl import sqlalchemy as sql from dopt_basics import configs, io from umbreit import db, types # %% # import importlib # types = importlib.reload(types) # db = importlib.reload(db) # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), glob_pattern="CRED*.toml", stop_folder_name="umbreit-py", ) assert p_cfg is not None CFG = configs.load_toml(p_cfg) HOST = CFG["server"]["host"] PORT = CFG["server"]["port"] SERVICE = CFG["server"]["service"] USER_NAME = CFG["user"]["name"] USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode # p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") # assert p_oracle_client.exists() # assert p_oracle_client.is_dir() # oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% types.Freigabe.WF_100.value # %% conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) engine = sql.create_engine(conn_string) # %% ########### RESULTS ########### # temporary res_engine = sql.create_engine("sqlite://") db.metadata.create_all(res_engine, tables=(db.results,)) # %% # delete existing results def delete_results( res_engine: sql.Engine, ) -> None: with res_engine.begin() as conn: res = conn.execute(sql.delete(db.results)) print("Rows deleted: ", res.rowcount) delete_results(res_engine) stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) with res_engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% # define starting date for 3 month interval # returns UTC time current_dt = dt.current_time_tz(cut_microseconds=True) print("Current DT: ", current_dt) td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) print("Timedelta: ", td) start_date = (current_dt - td).date() print("Starting date: ", start_date) # %% # // ---------- LIVE DATA ----------- # TODO find way to filter more efficiently # WF-200: filter for relevant orders with current BEDP set # missing: order types which are relevant filter_K_rech = (608991, 260202) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, ) where_condition = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), ) stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, db.EXT_AUFPAUF, ) .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) .where(where_condition) .limit(100) # full query really slow ) # %% print(stmt.compile(engine)) # %% df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) df_order # %% # AUFPAUF # stmt = sql.select(db.EXT_AUFPAUF) # df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) # df_aufpauf # df_aufpauf.filter(pl.col("TITELNR") == 6315273) # prefilter amount columns for invalid entries print("--------------- ext_bedpbed --------------") t1 = time.perf_counter() AMOUNT_COLS = frozenset( ( "BEDP_MENGE_BEDARF", "BEDP_MENGE_VERKAUF", "BEDP_MENGE_ANFRAGE", "BEDP_MENGE_BESTELLUNG", "BEDP_MENGE_FREI", "BEDP_MENGE_BEDARF_VM", ) ) case_stmts = [] for col in AMOUNT_COLS: case_stmts.append( sql.case( (db.ext_bedpbed.c[col] <= -1, sql.null()), else_=db.ext_bedpbed.c[col], ).label(col) ) stmt = sql.select( *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], *case_stmts, ) df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) t2 = time.perf_counter() elapsed = t2 - t1 # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # try title_info parsing stmt = sql.select(db.ext_titel_info) print(stmt.compile(engine)) # %% # raw data query # TODO look for entries which do not have an associated title number print("--------------- raw data query --------------") t1 = time.perf_counter() # join_condition = sql.and_( # db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, # db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, # ) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MANDFUEHR, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) print(stmt.compile(engine)) df = pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) t2 = time.perf_counter() elapsed = t2 - t1 # %% print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% df.head() # %% # // NO LIVE DATA NEEDED # SAVING/LOADING p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% print(len(df)) df.head() # %% # ** CHECK: differences MANDANT in BEDP and in TINFO # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique()) # %% df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique()) # %% df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1) # %% # df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5) df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null()) # %% # ** CHECK: different MANDANTEN # check for valid entries for unknown MANDANTEN # MANDANTEN others than (1, 90) do not possess relevant properties such as # "MELDENUMMER" and others --> conclusion: not relevant # MANDANT = 80 # print(f"Mandant: {MANDANT}") # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ) # ) # print( # df.filter(pl.col("BEDP_MAN") == MANDANT).select( # ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"] # ).null_count() # ) # print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts())) # %% # ** PREFILTER # always needed, entries filtered out are to be disposed filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) df.filter(filter_meldenummer_null).filter(filter_mandant) # df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) # %% len(df) # %% # ** CHECK: null values set in the query with CASE statement # not known if NULL because of CASE statement or already set in table # unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"? # from the title DB df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null()) df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) # %% df.select("MELDENUMMER").unique() # %% # ** CHECK: null values for "MENGE_VORMERKER" df.filter(pl.col("MENGE_VORMERKER").is_null()) # df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0) agg_t = ( df.group_by(["MELDENUMMER"]).agg( # pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(), pl.col("MENGE_VORMERKER").alias("VM_count").unique(), ) # .filter(pl.col("count_customer") >= 0) # !! should be 3 ) # .filter(pl.col("MELDENUMMER") == 18) agg_t df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum()) # %% # ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER" # ** not known at this point # there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER --> # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? # %% # ** CHECK: titles with request where no title information is found # result: there were entries found on 02.12., but not on 03.12.2025 not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) EXPORT_FEAT = "BEDP_TITELNR" to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-2.json" print(to_save) # with open(p_save_not_in_title_table, "w") as file: # json.dump(to_save, file, indent=4) # %% print(len(df.filter(pl.col("MELDENUMMER") == 18))) # df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION = "BEDP_MENGE_BEDARF_VM" # TODO exchange to new query focusing on TINFO table def get_raw_data() -> pl.DataFrame: join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MENGE_VORMERKER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) def get_empty_pipeline_result( data: pl.DataFrame, ) -> types.PipelineResult: schema = db.results_schema_map.copy() del schema["id"] results = pl.DataFrame(schema=schema) return types.PipelineResult(results=results, open=data) def _apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], ) -> types.FilterResult: df_current = df removed_rows: list[pl.DataFrame] = [] for filter in filters: removed = df_current.filter(~filter) removed_rows.append(removed) df_current = df_current.filter(filter) df_removed = pl.concat(removed_rows) return types.FilterResult(in_=df_current, out_=df_removed) # post-processing the results # TODO: order quantity not always necessary # TODO: change relevant criterion for order quantity def _write_results( results_table: pl.DataFrame, data: pl.DataFrame, vorlage: bool, wf_id: int, freigabe_auto: types.Freigabe, is_out: bool, ) -> pl.DataFrame: ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" data = data.rename(db.map_to_result) order_qty_expr: pl.Expr if is_out: order_qty_expr = ( pl.lit(0) .alias("ORDER_QTY_CRIT") .alias("best_menge") .cast(db.results_schema_map["best_menge"]) ) else: order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") data = data.with_columns( [ pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), order_qty_expr, pl.lit(freigabe_auto.value) .alias("freigabe_auto") .cast(db.results_schema_map["freigabe_auto"]), ] ) data = data.drop( [ "BEDP_TITELNR", "BEDP_MAN", "BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "VERLAGSNR", "MENGE_VORMERKER", "MANDFUEHR", ] ) return pl.concat([results_table, data]) def workflow_900( pipe_result: types.PipelineResult, ) -> types.PipelineResult: """pre-routine to handle non-feasible entries""" filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) res = _apply_several_filters( pipe_res.open, ( filter_meldenummer_null, filter_mandant, ), ) pipe_result.results = _write_results( pipe_result.results, data=res.out_, vorlage=False, wf_id=900, freigabe_auto=types.Freigabe.WF_900, is_out=True, ) pipe_result.open = res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) pipe_result.open = res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) return pipe_result # main routine # results for filtered out entries written def workflow_910( pipe_result: types.PipelineResult, ) -> types.PipelineResult: filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 res = _apply_several_filters( pipe_result.open, filters=( filter_mandant, filter_ignore_MNR26, ), ) # write results for entries which were filtered out pipe_result.results = _write_results( pipe_result.results, data=res.out_, vorlage=False, wf_id=910, freigabe_auto=types.Freigabe.WF_910, is_out=True, ) pipe_result.open = res.in_ return pipe_result # this a main routine: # receives and gives back result objects def workflow_100_umbreit( pipe_result: types.PipelineResult, vm_criterion: str, ) -> types.PipelineResult: filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.results = _write_results( results_table=pipe_result.results, data=res.in_, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) pipe_result.open = res.out_ return pipe_result def workflow_100_petersen( pipe_result: types.PipelineResult, vm_criterion: str, ) -> types.PipelineResult: # difference WDB and others # // WDB branch filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) pipe_result.results = _write_results( results_table=pipe_result.results, data=res.in_, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) pipe_result.open = res.out_ # order quantity 0, no further action in other WFs filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) == 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_WDB, filter_number_vm, ), ) pipe_result.results = _write_results( results_table=pipe_result.results, data=res.in_, vorlage=False, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) pipe_result.open = res.out_ # // other branch filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 90 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( filter_meldenummer, filter_mandant, filter_number_vm, ), ) pipe_result.results = _write_results( results_table=pipe_result.results, data=res.in_, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) pipe_result.open = res.out_ return pipe_result # %% # SAVING/LOADING p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") # %% df.head() # %% # removed_rows = [] # raw_data = df.clone() # print(f"Length raw data: {len(raw_data)}") # filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) # filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 # filtered = raw_data.filter(filter_mandant) # filtered_n = raw_data.filter(~filter_mandant) # num_filter = len(filtered) # num_filter_n = len(filtered_n) # removed_rows.append(filtered_n) # print(f"Length filtered: {num_filter}") # print(f"Length filtered out: {num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}") # raw_data = filtered # out = pl.concat(removed_rows) # print(f"Length out: {len(out)}") # # %% # print("---------------------------------------") # filtered = raw_data.filter(filter_ignore_MNR26) # filtered_n = raw_data.filter(~filter_ignore_MNR26) # num_filter = len(filtered) # num_filter_n = len(filtered_n) # len(filtered_n) # # %% # removed_rows.append(filtered_n) # print(f"Length filtered: {num_filter}") # print(f"Length filtered out: {num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}") # out = pl.concat(removed_rows) # print(f"Length out: {len(out)}") # %% raw_data = df.clone() pipe_res = get_empty_pipeline_result(raw_data) pipe_res.results pipe_res = workflow_900(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # %% pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # print(f"Base data and pipe result in line: {}") # %% pipe_res = workflow_910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results.select(pl.col("vorlage").value_counts()) # %% pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res = workflow_100_petersen(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results.select(pl.col("vorlage").value_counts()) # %% pipe_res.results.filter(pl.col("vorlage") == True) # %% raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3) # %% raw_data.head() # %% filt_out # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) # ---------------------------------------------------------------------------- # # %% wf_200_start_data = filt_out.clone() wf_200_start_data # %% def _init_workflow_200_umbreit( results: pl.DataFrame, data: pl.DataFrame, vm_criterion: str, ) -> tuple[pl.DataFrame, pl.DataFrame]: relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) == 0 relevant, filt = _apply_several_filters( data, (filter_meldenummer, filter_mandant, filter_number_vm) ) return relevant, filt # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df # %% df.filter(pl.col("BEDARFNR") == 884607) # %% df_order.filter(pl.col("BEDARFNR") == 884607) # %% # now obtain order data for entries t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) t # %% agg_t = ( t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) .agg( pl.count("AUFP_POSITION").alias("pos_count"), pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), ) .filter(pl.col("count_customer") >= 0) # !! should be 3 ) agg_t # %% df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) # %% # ---------------------------------------------------------------------------- # # Writing results in DB # ---------------------------------------------------------------------------- # delete_results() pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") stmt = sql.select(db.results) db_results = pl.read_database(stmt, engine) db_results # ---------------------------------------------------------------------------- # # Further Data Analysis # ---------------------------------------------------------------------------- # # %% stmt = sql.select(db.ext_bedpbed) df = pl.read_database( stmt, engine, schema_overrides=db.ext_bedpbed_schema_map, ) # %% df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") ).filter(pl.col("unique_BEDP_MAN") > 1) # %% df["BEDP_MAN"].unique() # %% df.estimated_size(unit="mb") # %% target_bednr = df_raw["BEDARFNR"].to_list() target_seq = df_raw["BEDP_SEQUENZ"].to_list() # %% stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ) .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) ) df_targets = pl.read_database(stmt, engine) # %% # df_targets.filter(pl.col("BEDARFNR") == 884174) df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) # %% # interesting order: 883697, 1, titleno: 7945981, 9964027 TITLE_NO = 7945981 # TITLE_NO = 9964027 stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) title_buy = pl.read_database(stmt, engine) # %% title_buy # %% when were the orders placed stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) title_order = pl.read_database(stmt, engine) # %% title_order # ------------------------------------------------------------------------------------------- # %% # title DB complete? # - includes only titles which are deliverable since 01.06.2025 and who are assigned to # buyer "Fröhlich" stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) # %% titles["MANDFUEHR"].unique() # %% unique_titles = set(titles["TI_NUMMER"].to_list()) len(unique_titles) # %% # requirements? # - includes only order since 05.11.2025 stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # %% reqs # %% reqs["BEDP_MAN"].unique() # %% # intersection between all titles and the titles contained in the requirements table unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) len(unique_titles_req) # %% intersection = unique_titles & unique_titles_req len(intersection) # %% # orders? # - includes only order since 05.11.2025 stmt = sql.select(db.EXT_AUFPAUF) orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) # %% orders.estimated_size(unit="mb") # %% with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% df = dataframes[1] # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% len(df) # %% df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) # %% stmt = sql.text("SELECT * FROM ext_bedpbed") df = pl.read_database(stmt, engine) # %% df # %% # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %%