diff --git a/data_analysis/02-2_connect_db.py b/data_analysis/02-2_connect_db.py index 7c3e589..7e7c700 100644 --- a/data_analysis/02-2_connect_db.py +++ b/data_analysis/02-2_connect_db.py @@ -2,8 +2,11 @@ from pathlib import Path import oracledb +import sqlalchemy as sql from dopt_basics import configs, io +from umbreit import db + # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), @@ -20,23 +23,54 @@ USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode -p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") -assert p_oracle_client.exists() -assert p_oracle_client.is_dir() -oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) +# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") +# assert p_oracle_client.exists() +# assert p_oracle_client.is_dir() +# oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn=f"{HOST}:{PORT}/{SERVICE}") -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn=f"{HOST}/{SERVICE}") -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn="10.50.4.82:1521/TS4") -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn="ts4db:1521/?TS4") +conn_string = ( + f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" +) +engine = sql.create_engine(conn_string) +# %% +# stmt = sql.select(db.EXT_BESPBES_INFO).limit(10) +stmt = sql.text("SELECT * FROM ext_bedpbed FETCH FIRST 30 ROWS ONLY") +compiled = stmt.compile(dialect=engine.dialect, compile_kwargs={"literal_binds": True}) +print(str(compiled)) -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn="ts4db.umbreit.local:1521/TS4") -# conn = oracledb.connect(user=USER_NAME, password=USER_PASS, dsn="10.50.4.82:1521/TS4") +# %% +with engine.connect() as conn: + res = conn.execute(stmt) + +res = tuple(res.all()) +# %% +len(res[0]) +# %% +res + +# %% +engine.dispose() + +# # %% +# stmt = sql.text("SELECT * FROM ALL_TAB_COLUMNS WHERE TABLE_NAME = 'ext_bedpbed' AND COLUMN_NAME = 'BEDARFNR'") +# with engine.connect() as conn: +# res = conn.execute(stmt) +# # %% +# res.all() + +# %% conn = oracledb.connect( user=USER_NAME, password=USER_PASS, host=HOST, port=PORT, - service_name="TS4", + service_name=SERVICE, ) # %% +cursor = conn.cursor() +cursor.execute("select * from ext_bedpbed limit 3") +# %% +print(cursor.fetchone()) +# %% +conn.close() +# %% diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py new file mode 100644 index 0000000..6c783e8 --- /dev/null +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -0,0 +1,548 @@ +# %% +import time +from collections.abc import Sequence +from pathlib import Path +from pprint import pprint + +import dopt_basics.datetime as dt +import polars as pl +import sqlalchemy as sql +from dopt_basics import configs, io + +from umbreit import db, types + +# %% +# import importlib +# db = importlib.reload(db) +# types = importlib.reload(types) + +# %% +p_cfg = io.search_file_iterative( + starting_path=Path.cwd(), + glob_pattern="CRED*.toml", + stop_folder_name="umbreit-py", +) +assert p_cfg is not None +CFG = configs.load_toml(p_cfg) +HOST = CFG["server"]["host"] +PORT = CFG["server"]["port"] +SERVICE = CFG["server"]["service"] +USER_NAME = CFG["user"]["name"] +USER_PASS = CFG["user"]["pass"] +# %% +# !! init thick mode +# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") +# assert p_oracle_client.exists() +# assert p_oracle_client.is_dir() +# oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) +# %% +types.Freigabe.WF_100.value + +# %% +conn_string = ( + f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" +) +engine = sql.create_engine(conn_string) +# %% +########### RESULTS ########### +# temporary +res_engine = sql.create_engine("sqlite://") +db.metadata.create_all(res_engine, tables=(db.results,)) + + +# %% +# delete existing results +def delete_results( + res_engine: sql.Engine, +) -> None: + with res_engine.begin() as conn: + res = conn.execute(sql.delete(db.results)) + + print("Rows deleted: ", res.rowcount) + + +delete_results(res_engine) +stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) +with res_engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +# define starting date for 3 month interval +# returns UTC time +current_dt = dt.current_time_tz(cut_microseconds=True) +print("Current DT: ", current_dt) +td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) +print("Timedelta: ", td) + +start_date = (current_dt - td).date() +print("Starting date: ", start_date) + +# %% +# TODO find way to filter more efficiently +# WF-200: filter for relevant orders with current BEDP set +# missing: order types which are relevant +filter_K_rech = (608991, 260202) +join_condition = sql.and_( + db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, + db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, +) +where_condition = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), +) + +stmt = ( + sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + db.EXT_AUFPAUF, + ) + .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) + .where(where_condition) + .limit(100) # full query really slow +) + +# %% +print(stmt.compile(engine)) +# %% +df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) +df_order + +# %% +# AUFPAUF +# stmt = sql.select(db.EXT_AUFPAUF) +# df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) +# df_aufpauf +# df_aufpauf.filter(pl.col("TITELNR") == 6315273) + +# prefilter amount columns for invalid entries + +print("--------------- ext_bedpbed --------------") +t1 = time.perf_counter() +AMOUNT_COLS = frozenset( + ( + "BEDP_MENGE_BEDARF", + "BEDP_MENGE_VERKAUF", + "BEDP_MENGE_ANFRAGE", + "BEDP_MENGE_BESTELLUNG", + "BEDP_MENGE_FREI", + "BEDP_MENGE_BEDARF_VM", + ) +) + +case_stmts = [] +for col in AMOUNT_COLS: + case_stmts.append( + sql.case( + (db.ext_bedpbed.c[col] <= -1, sql.null()), + else_=db.ext_bedpbed.c[col], + ).label(col) + ) + +stmt = sql.select( + *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], + *case_stmts, +) +df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) +t2 = time.perf_counter() +elapsed = t2 - t1 + +# %% +# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) +print(f"Query duration: {elapsed:.4f} sec") +print("Number of entries: ", len(df)) +print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") + +# %% +# try title_info parsing +stmt = sql.select(db.ext_titel_info) +print(stmt.compile(engine)) + + +# %% +# raw data query +# TODO change to left join, otherwise possible that requests are missed +# TODO after that: look for entries which do not have an associated title number + +print("--------------- ext_bedpbed --------------") +t1 = time.perf_counter() +join_condition = sql.and_( + db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, + db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, +) +stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + sql.case( + (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ).label("BEDP_MENGE_BEDARF_VM"), + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.MENGE_VORMERKER, +).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) + +print(stmt.compile(engine)) +df = pl.read_database( + stmt, + engine, + schema_overrides=db.raw_data_query_schema_map, +) +t2 = time.perf_counter() +elapsed = t2 - t1 +# %% +print(f"Query duration: {elapsed:.4f} sec") +print("Number of entries: ", len(df)) +print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") + +# %% +# VM_CRITERION = "MENGE_VORMERKER" +VM_CRITERION = "BEDP_MENGE_BEDARF_VM" + + +def get_raw_data() -> pl.DataFrame: + join_condition = sql.and_( + db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, + db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, + ) + stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + sql.case( + (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ).label("BEDP_MENGE_BEDARF_VM"), + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.MENGE_VORMERKER, + ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition)) + + return pl.read_database( + stmt, + engine, + schema_overrides=db.raw_data_query_schema_map, + ) + + +def get_empyt_result_df() -> pl.DataFrame: + schema = db.results_schema_map.copy() + del schema["id"] + return pl.DataFrame(schema=schema) + + +def apply_several_filters( + df: pl.DataFrame, + filters: Sequence[pl.Expr], +) -> tuple[pl.DataFrame, pl.DataFrame]: + df_current = df + removed_rows: list[pl.DataFrame] = [] + + for filter in filters: + removed = df_current.filter(~filter) + removed_rows.append(removed) + + df_current = df_current.filter(filter) + + df_removed = pl.concat(removed_rows) + + return df_current, df_removed + + +def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame: + df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) + df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) + + return df + + +def workflow_100_umbreit( + results: pl.DataFrame, + data: pl.DataFrame, + vm_criterion: str, +) -> tuple[pl.DataFrame, pl.DataFrame]: + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 1 + filter_number_vm = pl.col(vm_criterion) > 0 + + relevant, filt = apply_several_filters( + data, (filter_meldenummer, filter_mandant, filter_number_vm) + ) + results = _results_workflow_100( + results, + relevant, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + ) + + return results, filt + + +# post-processing the results +def _results_workflow_100( + results: pl.DataFrame, + data: pl.DataFrame, + vorlage: bool, + wf_id: int, + freigabe_auto: types.Freigabe, +) -> pl.DataFrame: + data = data.rename(db.map_to_result) + data = data.with_columns( + [ + pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), + pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), + pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), + pl.lit(freigabe_auto.value) + .alias("freigabe_auto") + .cast(db.results_schema_map["freigabe_auto"]), + ] + ) + data = data.drop( + [ + "BEDP_TITELNR", + "BEDP_MAN", + "BEDP_MENGE_BEDARF_VM", + "MELDENUMMER", + "MENGE_VORMERKER", + ] + ) + + return pl.concat([results, data]) + + +# Petersen not present in data + +# %% +df_raw = get_raw_data() +df_start = prepare_base_data(df_raw) +df_start + +# %% +results_init = get_empyt_result_df() +results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) +# df is where results are known +# filt_out contains entries for other workflows +# filt_out at this point represents all entries which are to be analysed in other workflows +# %% +results +# %% +filt_out + + +# %% +# ---------------------------------------------------------------------------- # +# Workflow 200 (Umbreit only) +# ---------------------------------------------------------------------------- # +# %% +wf_200_start_data = filt_out.clone() +wf_200_start_data + + +# %% +def _init_workflow_200_umbreit( + results: pl.DataFrame, + data: pl.DataFrame, + vm_criterion: str, +) -> tuple[pl.DataFrame, pl.DataFrame]: + relevant_mnr: tuple[int, ...] = (17, 18) + filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) + filter_mandant = pl.col("BEDP_MAN") == 1 + filter_number_vm = pl.col(vm_criterion) == 0 + + relevant, filt = apply_several_filters( + data, (filter_meldenummer, filter_mandant, filter_number_vm) + ) + + return relevant, filt + + +# %% +df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) +df + +# %% +df.filter(pl.col("BEDARFNR") == 884607) + +# %% +df_order.filter(pl.col("BEDARFNR") == 884607) + +# %% +# now obtain order data for entries +t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") +t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) +t +# %% +agg_t = ( + t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) + .agg( + pl.count("AUFP_POSITION").alias("pos_count"), + pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), + ) + .filter(pl.col("count_customer") >= 0) # !! should be 3 +) +agg_t + +# %% +df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) + +# %% +# ---------------------------------------------------------------------------- # +# Writing results in DB +# ---------------------------------------------------------------------------- # + +delete_results() +pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") + +stmt = sql.select(db.results) +db_results = pl.read_database(stmt, engine) +db_results + +# ---------------------------------------------------------------------------- # +# Further Data Analysis +# ---------------------------------------------------------------------------- # +# %% +stmt = sql.select(db.ext_bedpbed) +df = pl.read_database( + stmt, + engine, + schema_overrides=db.ext_bedpbed_schema_map, +) +# %% +df.group_by("BEDP_TITELNR").agg( + pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") +).filter(pl.col("unique_BEDP_MAN") > 1) +# %% +df["BEDP_MAN"].unique() +# %% +df.estimated_size(unit="mb") +# %% +target_bednr = df_raw["BEDARFNR"].to_list() +target_seq = df_raw["BEDP_SEQUENZ"].to_list() +# %% +stmt = ( + sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ) + .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) + .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) +) +df_targets = pl.read_database(stmt, engine) +# %% +# df_targets.filter(pl.col("BEDARFNR") == 884174) +df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) + +# %% +# interesting order: 883697, 1, titleno: 7945981, 9964027 +TITLE_NO = 7945981 +# TITLE_NO = 9964027 +stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) +title_buy = pl.read_database(stmt, engine) +# %% +title_buy + +# %% when were the orders placed +stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) +title_order = pl.read_database(stmt, engine) +# %% +title_order + +# ------------------------------------------------------------------------------------------- + +# %% +# title DB complete? +# - includes only titles which are deliverable since 01.06.2025 and who are assigned to +# buyer "Fröhlich" +stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) +titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) +# %% +titles["MANDFUEHR"].unique() +# %% +unique_titles = set(titles["TI_NUMMER"].to_list()) +len(unique_titles) + +# %% +# requirements? +# - includes only order since 05.11.2025 +stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) +reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) +# %% +reqs + +# %% +reqs["BEDP_MAN"].unique() + +# %% +# intersection between all titles and the titles contained in the requirements table +unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) +len(unique_titles_req) +# %% +intersection = unique_titles & unique_titles_req +len(intersection) +# %% +# orders? +# - includes only order since 05.11.2025 +stmt = sql.select(db.EXT_AUFPAUF) +orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) + +# %% +orders.estimated_size(unit="mb") + +# %% +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) +# %% +stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +df = dataframes[1] +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% +len(df) +# %% +df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed") +df = pl.read_database(stmt, engine) + +# %% +df +# %% +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 5ad646a..a8fac2f 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -95,7 +95,7 @@ EXT_BESPBES_INFO = Table( Column("BESP_MAND", sql.Integer, nullable=False), Column("BESP_TITELNR", sql.Integer, nullable=False), Column("BESPAA", sql.String(1), nullable=True), - Column("BESP_ART", sql.String(1), nullable=False), + Column("BESP_ART", sql.String(1), nullable=True), Column("BESVAK999", sql.String(1), nullable=True), )