umbreit-py/data_analysis/02-3_oracle_workflow_test.py
2025-12-02 16:07:07 +01:00

592 lines
16 KiB
Python

# %%
import json
import time
from collections.abc import Sequence
from pathlib import Path
from pprint import pprint
import dopt_basics.datetime as dt
import polars as pl
import sqlalchemy as sql
from dopt_basics import configs, io
from umbreit import db, types
# %%
# import importlib
# db = importlib.reload(db)
# types = importlib.reload(types)
# %%
p_cfg = io.search_file_iterative(
starting_path=Path.cwd(),
glob_pattern="CRED*.toml",
stop_folder_name="umbreit-py",
)
assert p_cfg is not None
CFG = configs.load_toml(p_cfg)
HOST = CFG["server"]["host"]
PORT = CFG["server"]["port"]
SERVICE = CFG["server"]["service"]
USER_NAME = CFG["user"]["name"]
USER_PASS = CFG["user"]["pass"]
# %%
# !! init thick mode
# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29")
# assert p_oracle_client.exists()
# assert p_oracle_client.is_dir()
# oracledb.init_oracle_client(lib_dir=str(p_oracle_client))
# %%
types.Freigabe.WF_100.value
# %%
conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
engine = sql.create_engine(conn_string)
# %%
########### RESULTS ###########
# temporary
res_engine = sql.create_engine("sqlite://")
db.metadata.create_all(res_engine, tables=(db.results,))
# %%
# delete existing results
def delete_results(
res_engine: sql.Engine,
) -> None:
with res_engine.begin() as conn:
res = conn.execute(sql.delete(db.results))
print("Rows deleted: ", res.rowcount)
delete_results(res_engine)
stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz)
with res_engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
# define starting date for 3 month interval
# returns UTC time
current_dt = dt.current_time_tz(cut_microseconds=True)
print("Current DT: ", current_dt)
td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS)
print("Timedelta: ", td)
start_date = (current_dt - td).date()
print("Starting date: ", start_date)
# %%
# TODO find way to filter more efficiently
# WF-200: filter for relevant orders with current BEDP set
# missing: order types which are relevant
filter_K_rech = (608991, 260202)
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR,
db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT,
)
where_condition = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech),
)
stmt = (
sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
db.EXT_AUFPAUF,
)
.select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition))
.where(where_condition)
.limit(100) # full query really slow
)
# %%
print(stmt.compile(engine))
# %%
df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
df_order
# %%
# AUFPAUF
# stmt = sql.select(db.EXT_AUFPAUF)
# df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
# df_aufpauf
# df_aufpauf.filter(pl.col("TITELNR") == 6315273)
# prefilter amount columns for invalid entries
print("--------------- ext_bedpbed --------------")
t1 = time.perf_counter()
AMOUNT_COLS = frozenset(
(
"BEDP_MENGE_BEDARF",
"BEDP_MENGE_VERKAUF",
"BEDP_MENGE_ANFRAGE",
"BEDP_MENGE_BESTELLUNG",
"BEDP_MENGE_FREI",
"BEDP_MENGE_BEDARF_VM",
)
)
case_stmts = []
for col in AMOUNT_COLS:
case_stmts.append(
sql.case(
(db.ext_bedpbed.c[col] <= -1, sql.null()),
else_=db.ext_bedpbed.c[col],
).label(col)
)
stmt = sql.select(
*[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS],
*case_stmts,
)
df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
t2 = time.perf_counter()
elapsed = t2 - t1
# %%
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %%
# try title_info parsing
stmt = sql.select(db.ext_titel_info)
print(stmt.compile(engine))
# %%
# raw data query
# TODO look for entries which do not have an associated title number
print("--------------- ext_bedpbed --------------")
t1 = time.perf_counter()
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
)
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.MENGE_VORMERKER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
print(stmt.compile(engine))
df = pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
t2 = time.perf_counter()
elapsed = t2 - t1
# %%
print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %%
# SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20251202-2.arrow"
# df.write_ipc(p_save)
df = pl.read_ipc(p_save)
# %%
len(df)
df.head()
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
# %%
df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER"))
# %%
# !! CHECK: null values set in the query with CASE statement
print(len(df.filter(pl.col("MELDENUMMER") == 18)))
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %%
# !! CHECK: titles with request where no title information is found
# not_in_title_table = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(
# pl.col("MELDENUMMER").is_null()
# )
# EXPORT_FEAT = "BEDP_TITELNR"
# to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
# p_save_not_in_title_table = Path.cwd() / "not_in_title_table.json"
# with open(p_save_not_in_title_table, "w") as file:
# json.dump(to_save, file, indent=4)
# %%
# !! CHECK: different MANDANTEN
# check for valid entries for unknown MANDANTEN
# MANDANT = 80
# print(f"Mandant: {MANDANT}")
# print(
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
# )
# )
# print(
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
# ).null_count()
# )
# print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts()))
# %%
# VM_CRITERION = "MENGE_VORMERKER"
VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
def get_raw_data() -> pl.DataFrame:
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
)
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.MENGE_VORMERKER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
def get_empyt_result_df() -> pl.DataFrame:
schema = db.results_schema_map.copy()
del schema["id"]
return pl.DataFrame(schema=schema)
def apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> tuple[pl.DataFrame, pl.DataFrame]:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return df_current, df_removed
def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame:
df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
return df
def workflow_100_umbreit(
results: pl.DataFrame,
data: pl.DataFrame,
vm_criterion: str,
) -> tuple[pl.DataFrame, pl.DataFrame]:
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) > 0
relevant, filt = apply_several_filters(
data, (filter_meldenummer, filter_mandant, filter_number_vm)
)
results = _results_workflow_100(
results,
relevant,
vorlage=True,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
)
return results, filt
# post-processing the results
def _results_workflow_100(
results: pl.DataFrame,
data: pl.DataFrame,
vorlage: bool,
wf_id: int,
freigabe_auto: types.Freigabe,
) -> pl.DataFrame:
data = data.rename(db.map_to_result)
data = data.with_columns(
[
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"),
pl.lit(freigabe_auto.value)
.alias("freigabe_auto")
.cast(db.results_schema_map["freigabe_auto"]),
]
)
data = data.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"MENGE_VORMERKER",
]
)
return pl.concat([results, data])
# Petersen not present in data
# %%
df_raw = get_raw_data()
df_start = prepare_base_data(df_raw)
df_start
# %%
results_init = get_empyt_result_df()
results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION)
# df is where results are known
# filt_out contains entries for other workflows
# filt_out at this point represents all entries which are to be analysed in other workflows
# %%
results
# %%
filt_out
# %%
# ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only)
# ---------------------------------------------------------------------------- #
# %%
wf_200_start_data = filt_out.clone()
wf_200_start_data
# %%
def _init_workflow_200_umbreit(
results: pl.DataFrame,
data: pl.DataFrame,
vm_criterion: str,
) -> tuple[pl.DataFrame, pl.DataFrame]:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) == 0
relevant, filt = apply_several_filters(
data, (filter_meldenummer, filter_mandant, filter_number_vm)
)
return relevant, filt
# %%
df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION)
df
# %%
df.filter(pl.col("BEDARFNR") == 884607)
# %%
df_order.filter(pl.col("BEDARFNR") == 884607)
# %%
# now obtain order data for entries
t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner")
t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0))
t
# %%
agg_t = (
t.group_by(["BEDARFNR", "BEDP_SEQUENZ"])
.agg(
pl.count("AUFP_POSITION").alias("pos_count"),
pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(),
)
.filter(pl.col("count_customer") >= 0) # !! should be 3
)
agg_t
# %%
df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65))
# %%
# ---------------------------------------------------------------------------- #
# Writing results in DB
# ---------------------------------------------------------------------------- #
delete_results()
pipe_post.write_database(db.results.fullname, engine, if_table_exists="append")
stmt = sql.select(db.results)
db_results = pl.read_database(stmt, engine)
db_results
# ---------------------------------------------------------------------------- #
# Further Data Analysis
# ---------------------------------------------------------------------------- #
# %%
stmt = sql.select(db.ext_bedpbed)
df = pl.read_database(
stmt,
engine,
schema_overrides=db.ext_bedpbed_schema_map,
)
# %%
df.group_by("BEDP_TITELNR").agg(
pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN")
).filter(pl.col("unique_BEDP_MAN") > 1)
# %%
df["BEDP_MAN"].unique()
# %%
df.estimated_size(unit="mb")
# %%
target_bednr = df_raw["BEDARFNR"].to_list()
target_seq = df_raw["BEDP_SEQUENZ"].to_list()
# %%
stmt = (
sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
)
.where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr))
.where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq))
)
df_targets = pl.read_database(stmt, engine)
# %%
# df_targets.filter(pl.col("BEDARFNR") == 884174)
df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0)
# %%
# interesting order: 883697, 1, titleno: 7945981, 9964027
TITLE_NO = 7945981
# TITLE_NO = 9964027
stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO)
title_buy = pl.read_database(stmt, engine)
# %%
title_buy
# %% when were the orders placed
stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981)
title_order = pl.read_database(stmt, engine)
# %%
title_order
# -------------------------------------------------------------------------------------------
# %%
# title DB complete?
# - includes only titles which are deliverable since 01.06.2025 and who are assigned to
# buyer "Fröhlich"
stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map)
# %%
titles["MANDFUEHR"].unique()
# %%
unique_titles = set(titles["TI_NUMMER"].to_list())
len(unique_titles)
# %%
# requirements?
# - includes only order since 05.11.2025
stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
# %%
reqs
# %%
reqs["BEDP_MAN"].unique()
# %%
# intersection between all titles and the titles contained in the requirements table
unique_titles_req = set(reqs["BEDP_TITELNR"].to_list())
len(unique_titles_req)
# %%
intersection = unique_titles & unique_titles_req
len(intersection)
# %%
# orders?
# - includes only order since 05.11.2025
stmt = sql.select(db.EXT_AUFPAUF)
orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map)
# %%
orders.estimated_size(unit="mb")
# %%
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
df = dataframes[1]
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%
len(df)
# %%
df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0"))
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed")
df = pl.read_database(stmt, engine)
# %%
df
# %%
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%