add test results

This commit is contained in:
Florian Förster 2026-01-30 14:04:20 +01:00
parent 9267f3da24
commit 24f5dea7da
2 changed files with 145 additions and 29 deletions

View File

@ -22,6 +22,9 @@ from sqlalchemy import event
from umbreit import db, types from umbreit import db, types
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# %% # %%
# import importlib # import importlib
@ -72,13 +75,22 @@ conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
) )
# engine = sql.create_engine(conn_string) # engine = sql.create_engine(conn_string)
engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) engine = sql.create_engine(
conn_string,
execution_options={"stream_results": True},
)
@event.listens_for(engine, "after_cursor_execute") # @event.listens_for(engine, "after_cursor_execute")
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): # def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
cursor.arraysize = 1000 # cursor.arraysize = 1000
cursor.prefetchrows = 1000 # cursor.prefetchrows = 1000
# @event.listens_for(engine, "before_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# %% # %%
@ -397,7 +409,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null(
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
p_save_diff_VM_bedp_tinfo = ( p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
) )
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
@ -440,6 +452,13 @@ def save_tmp_file(
if filename is None: if filename is None:
filename = str(uuid.uuid4()) filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow") pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth) data.write_ipc(pth)
@ -453,13 +472,13 @@ def load_tmp_file(
return pl.read_ipc(pth) return pl.read_ipc(pth)
def load_all_tmp_files() -> tuple[pl.DataFrame, ...]: def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: list[pl.DataFrame] = [] all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"): for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file) df = pl.read_ipc(file)
all_dfs.append(df) all_dfs[file.stem] = df
return tuple(all_dfs) return all_dfs
def get_starting_date( def get_starting_date(
@ -486,6 +505,7 @@ def get_raw_data() -> pl.DataFrame:
db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR, db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database( return pl.read_database(
@ -938,7 +958,16 @@ def _wf100_petersen_sub1_wdb(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1) .having(count_col > 1)
) )
# !! this is a sub result which muste be used in the result set if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer # !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
@ -947,6 +976,7 @@ def _wf100_petersen_sub1_wdb(
if SAVE_TMP_FILES: if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
@ -1063,7 +1093,17 @@ def _wf200_sub1(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3) .having(unique_count_col >= 3)
) )
# !! this is a sub result which muste be used in the result set if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer # !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
@ -1071,7 +1111,8 @@ def _wf200_sub1(
) )
if SAVE_TMP_FILES: if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, None) save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
@ -1099,27 +1140,13 @@ def _wf200_sub1(
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
raw_data = df.clone()
print(f"Number of entries: {len(df)}") print(f"Number of entries: {len(df)}")
clear_tmp_dir() clear_tmp_dir()
clear_result_data() clear_result_data()
# %% # %%
df.head() df.head()
# %% # %%
# df.filter(pl.col.BEDP_TITELNR == 4314750)
# # %%
# RELEVANT_DATE = get_starting_date(180)
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
# filter_ = sql.and_(
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
# )
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# print(relevant_titles)
# %%
raw_data = df.clone() raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data) # pipe_res = get_empty_pipeline_result(raw_data)
pipe_res = PipelineResult(raw_data) pipe_res = PipelineResult(raw_data)
@ -1167,11 +1194,99 @@ pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %% # %%
pipe_res.results.select(pl.col("VORLAGE").value_counts()) pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %% # %%
pipe_res.results.height
# %%
# ** aggregate test results # ** aggregate test results
all_tmps = load_all_tmp_files() all_tmps = load_all_tmp_files()
print(len(all_tmps)) print(len(all_tmps))
# %% # %%
all_tmps[2] WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
WF_200_TMP_RENAME = {
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
}
WF_100: list[pl.DataFrame] = []
WF_200: list[pl.DataFrame] = []
for name, df in all_tmps.items():
if TMPFILE_WF100_SUB1_WDB in name:
rename_schema = WF_100_TMP_RENAME
df = df.rename(rename_schema)
WF_100.append(df)
if TMPFILE_WF200_SUB1 in name:
rename_schema = WF_200_TMP_RENAME
df = df.rename(rename_schema)
WF_200.append(df)
tmp_WF_collects = (WF_100, WF_200)
all_tmps_preproc = []
for collect in tmp_WF_collects:
if len(collect) > 1:
df = pl.concat(collect)
elif len(collect) == 1:
df = collect[0].clone()
else:
raise RuntimeError()
all_tmps_preproc.append(df)
all_tmps_preproc
#############################
# %%
res_table = pipe_res.results.clone()
# %%
res_table.head()
# %%
raw_data.head()
# raw_data = raw_data.rename({"BEDARFNR": "BEDARF_NR", "BEDP_SEQUENZ": "BEDARF_SEQUENZ"})
# raw_data
# %%
res_title_info = res_table.join(
raw_data,
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
how="inner",
)
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
res_title_info.head()
# %%
df1 = all_tmps["WF-100_Sub1-WDB"]
df2 = all_tmps["WF-200_Sub1"]
df3 = all_tmps["WF-200_Sub1_1"]
df1.head()
# %%
test_results = res_title_info.clone()
for df in all_tmps_preproc:
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
# %%
test_results
# %%
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx"
test_results.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1))
# %%
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
)
deviation_vm = test_results.filter(pl.col.BEDP_TITELNR.is_in(dev["BEDP_TITELNR"].implode()))
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Abweichungen-VM_{date_str}.xlsx"
deviation_vm.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1))
# ** WF-200 potentially triggered
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
pl.col.BEDP_TITELNR.is_duplicated()
).sort("BEDP_TITELNR")
# %% # %%
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #

View File

@ -188,6 +188,7 @@ raw_data_query_schema_map: PolarsSchema = {
"VERLAGSNR": pl.UInt32, "VERLAGSNR": pl.UInt32,
"MENGE_VORMERKER": pl.UInt32, "MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8, "MANDFUEHR": pl.UInt8,
"EINKAEUFER": pl.String,
} }
tmp_data = Table( tmp_data = Table(