# %% import time from collections.abc import Sequence from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt import polars as pl import sqlalchemy as sql from dopt_basics import configs, io from umbreit import db, types # %% # import importlib # db = importlib.reload(db) # types = importlib.reload(types) # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), glob_pattern="CRED*.toml", stop_folder_name="umbreit-py", ) assert p_cfg is not None CFG = configs.load_toml(p_cfg) HOST = CFG["server"]["host"] PORT = CFG["server"]["port"] SERVICE = CFG["server"]["service"] USER_NAME = CFG["user"]["name"] USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode # p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") # assert p_oracle_client.exists() # assert p_oracle_client.is_dir() # oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% types.Freigabe.WF_100.value # %% conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) engine = sql.create_engine(conn_string) # %% ########### RESULTS ########### # temporary res_engine = sql.create_engine("sqlite://") db.metadata.create_all(res_engine, tables=(db.results,)) # %% # delete existing results def delete_results( res_engine: sql.Engine, ) -> None: with res_engine.begin() as conn: res = conn.execute(sql.delete(db.results)) print("Rows deleted: ", res.rowcount) delete_results(res_engine) stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) with res_engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% # define starting date for 3 month interval # returns UTC time current_dt = dt.current_time_tz(cut_microseconds=True) print("Current DT: ", current_dt) td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) print("Timedelta: ", td) start_date = (current_dt - td).date() print("Starting date: ", start_date) # %% # TODO find way to filter more efficiently # WF-200: filter for relevant orders with current BEDP set # missing: order types which are relevant filter_K_rech = (608991, 260202) join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, ) where_condition = sql.and_( db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), ) stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, db.EXT_AUFPAUF, ) .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) .where(where_condition) .limit(100) # full query really slow ) # %% print(stmt.compile(engine)) # %% df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) df_order # %% # AUFPAUF # stmt = sql.select(db.EXT_AUFPAUF) # df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) # df_aufpauf # df_aufpauf.filter(pl.col("TITELNR") == 6315273) # prefilter amount columns for invalid entries print("--------------- ext_bedpbed --------------") t1 = time.perf_counter() AMOUNT_COLS = frozenset( ( "BEDP_MENGE_BEDARF", "BEDP_MENGE_VERKAUF", "BEDP_MENGE_ANFRAGE", "BEDP_MENGE_BESTELLUNG", "BEDP_MENGE_FREI", "BEDP_MENGE_BEDARF_VM", ) ) case_stmts = [] for col in AMOUNT_COLS: case_stmts.append( sql.case( (db.ext_bedpbed.c[col] <= -1, sql.null()), else_=db.ext_bedpbed.c[col], ).label(col) ) stmt = sql.select( *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], *case_stmts, ) df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) t2 = time.perf_counter() elapsed = t2 - t1 # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # try title_info parsing stmt = sql.select(db.ext_titel_info) print(stmt.compile(engine)) # %% # raw data query # TODO change to left join, otherwise possible that requests are missed # TODO after that: look for entries which do not have an associated title number print("--------------- ext_bedpbed --------------") t1 = time.perf_counter() join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MENGE_VORMERKER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) print(stmt.compile(engine)) df = pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) t2 = time.perf_counter() elapsed = t2 - t1 # %% print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION = "BEDP_MENGE_BEDARF_VM" def get_raw_data() -> pl.DataFrame: join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, ) stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, sql.case( (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MENGE_VORMERKER, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition)) return pl.read_database( stmt, engine, schema_overrides=db.raw_data_query_schema_map, ) def get_empyt_result_df() -> pl.DataFrame: schema = db.results_schema_map.copy() del schema["id"] return pl.DataFrame(schema=schema) def apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], ) -> tuple[pl.DataFrame, pl.DataFrame]: df_current = df removed_rows: list[pl.DataFrame] = [] for filter in filters: removed = df_current.filter(~filter) removed_rows.append(removed) df_current = df_current.filter(filter) df_removed = pl.concat(removed_rows) return df_current, df_removed def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame: df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) return df def workflow_100_umbreit( results: pl.DataFrame, data: pl.DataFrame, vm_criterion: str, ) -> tuple[pl.DataFrame, pl.DataFrame]: filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) > 0 relevant, filt = apply_several_filters( data, (filter_meldenummer, filter_mandant, filter_number_vm) ) results = _results_workflow_100( results, relevant, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, ) return results, filt # post-processing the results def _results_workflow_100( results: pl.DataFrame, data: pl.DataFrame, vorlage: bool, wf_id: int, freigabe_auto: types.Freigabe, ) -> pl.DataFrame: data = data.rename(db.map_to_result) data = data.with_columns( [ pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), pl.lit(freigabe_auto.value) .alias("freigabe_auto") .cast(db.results_schema_map["freigabe_auto"]), ] ) data = data.drop( [ "BEDP_TITELNR", "BEDP_MAN", "BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER", ] ) return pl.concat([results, data]) # Petersen not present in data # %% df_raw = get_raw_data() df_start = prepare_base_data(df_raw) df_start # %% results_init = get_empyt_result_df() results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) # df is where results are known # filt_out contains entries for other workflows # filt_out at this point represents all entries which are to be analysed in other workflows # %% results # %% filt_out # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) # ---------------------------------------------------------------------------- # # %% wf_200_start_data = filt_out.clone() wf_200_start_data # %% def _init_workflow_200_umbreit( results: pl.DataFrame, data: pl.DataFrame, vm_criterion: str, ) -> tuple[pl.DataFrame, pl.DataFrame]: relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) == 0 relevant, filt = apply_several_filters( data, (filter_meldenummer, filter_mandant, filter_number_vm) ) return relevant, filt # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df # %% df.filter(pl.col("BEDARFNR") == 884607) # %% df_order.filter(pl.col("BEDARFNR") == 884607) # %% # now obtain order data for entries t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) t # %% agg_t = ( t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) .agg( pl.count("AUFP_POSITION").alias("pos_count"), pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), ) .filter(pl.col("count_customer") >= 0) # !! should be 3 ) agg_t # %% df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) # %% # ---------------------------------------------------------------------------- # # Writing results in DB # ---------------------------------------------------------------------------- # delete_results() pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") stmt = sql.select(db.results) db_results = pl.read_database(stmt, engine) db_results # ---------------------------------------------------------------------------- # # Further Data Analysis # ---------------------------------------------------------------------------- # # %% stmt = sql.select(db.ext_bedpbed) df = pl.read_database( stmt, engine, schema_overrides=db.ext_bedpbed_schema_map, ) # %% df.group_by("BEDP_TITELNR").agg( pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") ).filter(pl.col("unique_BEDP_MAN") > 1) # %% df["BEDP_MAN"].unique() # %% df.estimated_size(unit="mb") # %% target_bednr = df_raw["BEDARFNR"].to_list() target_seq = df_raw["BEDP_SEQUENZ"].to_list() # %% stmt = ( sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ) .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) ) df_targets = pl.read_database(stmt, engine) # %% # df_targets.filter(pl.col("BEDARFNR") == 884174) df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) # %% # interesting order: 883697, 1, titleno: 7945981, 9964027 TITLE_NO = 7945981 # TITLE_NO = 9964027 stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) title_buy = pl.read_database(stmt, engine) # %% title_buy # %% when were the orders placed stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) title_order = pl.read_database(stmt, engine) # %% title_order # ------------------------------------------------------------------------------------------- # %% # title DB complete? # - includes only titles which are deliverable since 01.06.2025 and who are assigned to # buyer "Fröhlich" stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) # %% titles["MANDFUEHR"].unique() # %% unique_titles = set(titles["TI_NUMMER"].to_list()) len(unique_titles) # %% # requirements? # - includes only order since 05.11.2025 stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) # %% reqs # %% reqs["BEDP_MAN"].unique() # %% # intersection between all titles and the titles contained in the requirements table unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) len(unique_titles_req) # %% intersection = unique_titles & unique_titles_req len(intersection) # %% # orders? # - includes only order since 05.11.2025 stmt = sql.select(db.EXT_AUFPAUF) orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) # %% orders.estimated_size(unit="mb") # %% with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% df = dataframes[1] # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% len(df) # %% df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) # %% stmt = sql.text("SELECT * FROM ext_bedpbed") df = pl.read_database(stmt, engine) # %% df # %% # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %%