Compare commits

...

5 Commits
main ... proto

Author SHA1 Message Date
c244285c4d begin refactoring 2026-02-02 11:53:37 +01:00
fa659c37bd prepare test data generation 2026-02-02 11:27:19 +01:00
c3b37e7862 add automatic schema for raw data query 2026-02-02 11:27:03 +01:00
24f5dea7da add test results 2026-01-30 14:04:20 +01:00
9267f3da24 prepare temp data fusion for testing phase 2026-01-29 16:27:56 +01:00
4 changed files with 1033 additions and 86 deletions

View File

@ -3,8 +3,11 @@ from __future__ import annotations
import datetime import datetime
import json import json
import shutil
import tempfile
import time import time
import typing import typing
import uuid
from collections.abc import Sequence from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from pprint import pprint from pprint import pprint
@ -19,10 +22,34 @@ from sqlalchemy import event
from umbreit import db, types from umbreit import db, types
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# %% # %%
# import importlib # import importlib
# types = importlib.reload(types) # types = importlib.reload(types)
# db = importlib.reload(db) # db = importlib.reload(db)
# %%
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
TMP_DIR = create_tmp_dir()
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
print(f"Created temp directory under: >{TMP_DIR}<")
# %% # %%
p_cfg = io.search_file_iterative( p_cfg = io.search_file_iterative(
@ -48,13 +75,22 @@ conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
) )
# engine = sql.create_engine(conn_string) # engine = sql.create_engine(conn_string)
engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) engine = sql.create_engine(
conn_string,
execution_options={"stream_results": True},
)
@event.listens_for(engine, "after_cursor_execute") # @event.listens_for(engine, "after_cursor_execute")
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): # def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
cursor.arraysize = 1000 # cursor.arraysize = 1000
cursor.prefetchrows = 1000 # cursor.prefetchrows = 1000
# @event.listens_for(engine, "before_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# %% # %%
@ -373,7 +409,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null(
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable # BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
p_save_diff_VM_bedp_tinfo = ( p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
) )
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
@ -404,6 +440,45 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys() db.EXT_DOPT_ERGEBNIS.columns.keys()
) )
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date( def get_starting_date(
@ -430,6 +505,7 @@ def get_raw_data() -> pl.DataFrame:
db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR, db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database( return pl.read_database(
@ -599,6 +675,7 @@ class PipelineResult:
"VERLAGSNR", "VERLAGSNR",
"MENGE_VORMERKER", "MENGE_VORMERKER",
"MANDFUEHR", "MANDFUEHR",
"EINKAEUFER",
] ]
) )
@ -681,7 +758,7 @@ def wf900(
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters( res = _apply_several_filters(
pipe_res.open, pipe_result.open,
( (
filter_meldenummer_null, filter_meldenummer_null,
filter_mandant, filter_mandant,
@ -842,8 +919,7 @@ def _wf100_petersen_sub1_wdb(
) -> PipelineResult: ) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0) # input: pre-filtered entries (WDB titles and #VM > 0)
# more then 1 VM # more than 1 VM: show these entries
# !! show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1 filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters( res = _apply_several_filters(
pipe_result.open, pipe_result.open,
@ -883,11 +959,26 @@ def _wf100_petersen_sub1_wdb(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1) .having(count_col > 1)
) )
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
engine, engine,
) )
print(relevant_titles)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
) )
@ -1003,10 +1094,27 @@ def _wf200_sub1(
.group_by(sub1.c.BEDP_TITELNR) .group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3) .having(unique_count_col >= 3)
) )
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database( relevant_titles = pl.read_database(
stmt, stmt,
engine, engine,
) )
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter( entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
) )
@ -1031,63 +1139,26 @@ def _wf200_sub1(
# %% # %%
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" READ_DATABASE = False
OVERWRITE = True
FILENAME = "raw_data_from_sql_query_20260202-1.arrow"
p_save = Path.cwd() / FILENAME
if READ_DATABASE:
df = get_raw_data()
if not p_save.exists() or OVERWRITE:
df.write_ipc(p_save)
else:
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
# %%
df
# %%
# initialise pipeline
raw_data = df.clone()
print(f"Number of entries: {len(df)}") print(f"Number of entries: {len(df)}")
clear_tmp_dir()
clear_result_data()
# %% # %%
df.head() df.head()
# %%
# df.filter(pl.col.BEDP_TITELNR == 4314750)
# # %%
# RELEVANT_DATE = get_starting_date(180)
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
# filter_ = sql.and_(
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
# )
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# print(relevant_titles)
# %%
# removed_rows = []
# raw_data = df.clone()
# print(f"Length raw data: {len(raw_data)}")
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
# filtered = raw_data.filter(filter_mandant)
# filtered_n = raw_data.filter(~filter_mandant)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# raw_data = filtered
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# # %%
# print("---------------------------------------")
# filtered = raw_data.filter(filter_ignore_MNR26)
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# len(filtered_n)
# # %%
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# %% # %%
raw_data = df.clone() raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data) # pipe_res = get_empty_pipeline_result(raw_data)
@ -1101,15 +1172,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %% # %%
pipe_res.results pipe_res.results
# %%
# // test result writing
res = pipe_res.results.clone()
res.height
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %% # %%
pipe_res = wf910(pipe_res) pipe_res = wf910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}") print(f"Length of base data: {len(raw_data):>18}")
@ -1117,8 +1179,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}") print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %% # %%
# pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}") print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries pipe data: {len(pipe_res):>10}")
@ -1146,6 +1206,120 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %% # %%
pipe_res.results.select(pl.col("VORLAGE").value_counts()) pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %%
pipe_res.results.height
# %%
# // aggregate test results
all_tmps = load_all_tmp_files()
print(len(all_tmps))
# %%
def prepare_tmp_data() -> list[pl.DataFrame]:
all_tmps = load_all_tmp_files()
WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
WF_200_TMP_RENAME = {
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
}
WF_100: list[pl.DataFrame] = []
WF_200: list[pl.DataFrame] = []
for name, df in all_tmps.items():
if TMPFILE_WF100_SUB1_WDB in name:
rename_schema = WF_100_TMP_RENAME
df = df.rename(rename_schema)
WF_100.append(df)
elif TMPFILE_WF200_SUB1 in name:
rename_schema = WF_200_TMP_RENAME
df = df.rename(rename_schema)
WF_200.append(df)
tmp_WF_collects = (WF_100, WF_200)
all_tmps_preproc: list[pl.DataFrame] = []
for collect in tmp_WF_collects:
if len(collect) > 1:
df = pl.concat(collect)
elif len(collect) == 1:
df = collect[0].clone()
else:
raise RuntimeError()
all_tmps_preproc.append(df)
return all_tmps_preproc
def generate_test_result_data(
raw_data: pl.DataFrame,
pipe_result: PipelineResult,
) -> pl.DataFrame:
all_tmps_preproc = prepare_tmp_data()
res_table = pipe_result.results.clone()
res_title_info = res_table.join(
raw_data,
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
how="inner",
)
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
columns = [
"VORLAGE",
"WF_ID",
"BEST_MENGE",
"FREIGABE_AUTO",
"BEDP_MENGE_BEDARF_VM",
"MENGE_VORMERKER",
"BEDP_TITELNR",
"BEDP_MAN",
"MELDENUMMER",
"VERLAGSNR",
"EINKAEUFER",
"MANDFUEHR",
]
res_title_info = res_title_info.select(columns)
test_results = res_title_info.clone()
for df in all_tmps_preproc:
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
return test_results
# %%
test_results = generate_test_result_data(raw_data, pipe_res)
test_results.head()
# %%
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx"
test_results.to_pandas().set_index("Index").to_excel(
p_save,
freeze_panes=(1, 1),
sheet_name=f"Ergebnisse_Testphase_{date_str}",
)
#####################################################################
# %%
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
)
deviation_vm = test_results.filter(pl.col.BEDP_TITELNR.is_in(dev["BEDP_TITELNR"].implode()))
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Abweichungen-VM_{date_str}.xlsx"
deviation_vm.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1))
# ** WF-200 potentially triggered
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
pl.col.BEDP_TITELNR.is_duplicated()
).sort("BEDP_TITELNR")
# %% # %%
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only) # Workflow 200 (Umbreit only)
@ -1613,3 +1787,5 @@ pipe_res_main.open
# %% # %%
pipe_res.results pipe_res.results
# %% # %%
remove_tmp_dir()
# %%

30
src/umbreit/constants.py Normal file
View File

@ -0,0 +1,30 @@
from pathlib import Path
from typing import Final
from dopt_basics import configs
from dopt_basics import io as io_
LIB_PATH: Final[Path] = Path(__file__).parent
# // database connections
p_cfg = io_.search_file_iterative(
starting_path=LIB_PATH,
glob_pattern="CRED*.toml",
stop_folder_name="umbreit-py",
)
if p_cfg is None:
raise FileNotFoundError("Config was not found")
CFG = configs.load_toml(p_cfg)
HOST = CFG["server"]["host"]
PORT = CFG["server"]["port"]
SERVICE = CFG["server"]["service"]
USER_NAME = CFG["user"]["name"]
USER_PASS = CFG["user"]["pass"]
# TODO remove or change
# ** Oracle client libs
USE_THICK_MODE: Final[bool] = False
P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29")
assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found"
assert P_ORACLE_CLIENT_LIBS.is_dir()

View File

@ -178,17 +178,8 @@ EXT_AUFPAUF_schema_map: PolarsSchema = {
EXT_AUFPAUF_null_values: PolarsNullValues = {} EXT_AUFPAUF_null_values: PolarsNullValues = {}
# // queries and temp data # // queries and temp data
raw_data_query_schema_map: PolarsSchema = { raw_data_query_schema_map: PolarsSchema = ext_bedpbed_schema_map.copy()
"BEDARFNR": pl.UInt32, raw_data_query_schema_map.update(ext_titel_info_schema_map)
"BEDP_SEQUENZ": pl.UInt32,
"BEDP_TITELNR": pl.UInt32,
"BEDP_MAN": pl.UInt8,
"BEDP_MENGE_BEDARF_VM": pl.UInt32,
"MELDENUMMER": pl.UInt8,
"VERLAGSNR": pl.UInt32,
"MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8,
}
tmp_data = Table( tmp_data = Table(
"EXT_TMP_BEDP_TINFO", "EXT_TMP_BEDP_TINFO",

750
src/umbreit/pipeline.py Normal file
View File

@ -0,0 +1,750 @@
from __future__ import annotations
import datetime
import shutil
import tempfile
import uuid
from collections.abc import Sequence
from pathlib import Path
from typing import Final, Literal, Protocol, TypeAlias, overload
import dopt_basics.datetime as dt
import oracledb
import polars as pl
import polars.selectors as cs
import sqlalchemy as sql
from umbreit import constants, db, types
from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
if constants.USE_THICK_MODE:
oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS))
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR = create_tmp_dir()
CONN_STRING: Final[str] = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
engine = sql.create_engine(
CONN_STRING,
execution_options={"stream_results": True},
)
VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION: Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys())
ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
days_from_now: int,
) -> datetime.date:
current_dt = dt.current_time_tz(cut_microseconds=True)
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
return (current_dt - td).date()
def get_raw_data() -> pl.DataFrame:
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
def save_tmp_data(df: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.tmp_data))
with engine.begin() as conn:
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
def get_tmp_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.tmp_data),
engine,
schema_overrides=db.tmp_data_schema_map,
)
def get_result_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.EXT_DOPT_ERGEBNIS),
engine,
schema_overrides=db.results_schema_map,
)
def save_result_data(results: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
def clear_result_data() -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
def save_result_data_native(results: pl.DataFrame) -> None:
results = results.with_columns(
[
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
for c in results.select(cs.boolean()).columns
]
)
stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
"BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6)
"""
with engine.begin() as conn:
raw_conn = conn.connection.connection
with raw_conn.cursor() as cursor:
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
def _apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> types.FilterResult:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return types.FilterResult(in_=df_current, out_=df_removed)
class PipelineResult:
__slots__ = ("_results", "_open", "_subtracted_indices")
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
def __init__(
self,
data: pl.DataFrame,
) -> None:
self._open = data
schema = db.results_schema_map.copy()
del schema["ID"]
self._results = pl.DataFrame(schema=schema)
schema = {}
for col in self._index_cols:
schema[col] = db.raw_data_query_schema_map[col]
self._subtracted_indices = pl.DataFrame(schema=schema)
def __len__(self) -> int:
return len(self._results) + len(self._open)
@property
def open(self) -> pl.DataFrame:
return self._open
@property
def results(self) -> pl.DataFrame:
return self._results
@property
def subtracted_indices(self) -> pl.DataFrame:
return self._subtracted_indices
def update_open(
self,
data: pl.DataFrame,
) -> None:
self._open = data
def _subtract_data(
self,
data: pl.DataFrame,
) -> None:
self._open = self._open.join(data, on=self._index_cols, how="anti")
self._subtracted_indices = pl.concat(
(self._subtracted_indices, data[self._index_cols])
)
def _add_results(
self,
data: pl.DataFrame,
) -> None:
res = pl.concat([self._results, data])
self._results = res
def merge_pipeline(
self,
pipeline: PipelineResult,
) -> None:
self._subtract_data(pipeline.subtracted_indices)
self._add_results(pipeline.results)
def write_results(
self,
data: pl.DataFrame,
vorlage: bool,
wf_id: types.Workflows,
freigabe_auto: types.Freigabe,
order_qty_expr: pl.Expr,
) -> None:
results = data.rename(db.map_data_to_result)
results = results.with_columns(
[
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
order_qty_expr,
pl.lit(freigabe_auto.value)
.alias("FREIGABE_AUTO")
.cast(db.results_schema_map["FREIGABE_AUTO"]),
]
)
results = results.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
self._subtract_data(data)
self._add_results(results)
class ExprOrderQty(Protocol): ...
class ExprOrderQty_Base(ExprOrderQty, Protocol):
def __call__(self) -> pl.Expr: ...
ExprOrderQty_Base_Types: TypeAlias = (
Literal[types.Workflows.ID_200]
| Literal[types.Workflows.ID_900]
| Literal[types.Workflows.ID_910]
)
class ExprOrderQty_WF100(ExprOrderQty, Protocol):
def __call__(self, empty: bool) -> pl.Expr: ...
@overload
def get_expr_order_qty(
wf_id: Literal[types.Workflows.ID_100],
) -> ExprOrderQty_WF100: ...
@overload
def get_expr_order_qty(
wf_id: ExprOrderQty_Base_Types,
) -> ExprOrderQty_Base: ...
def get_expr_order_qty(
wf_id: types.Workflows,
) -> ExprOrderQty:
empty_expr = (
pl.lit(0)
.alias(ORDER_QTY_CRIT)
.alias("BEST_MENGE")
.cast(db.results_schema_map["BEST_MENGE"])
)
def _empty() -> pl.Expr:
return empty_expr
func: ExprOrderQty
match wf_id:
case types.Workflows.ID_100:
def _func(empty: bool) -> pl.Expr:
order_qty_expr: pl.Expr
if empty:
order_qty_expr = empty_expr
else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
return order_qty_expr
func = _func
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
func = _empty
case _:
raise NotImplementedError(
f"Order expression for WF-ID {wf_id.value} is not implemented"
)
return func
def wf900(
pipe_result: PipelineResult,
) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer_null,
filter_mandant,
),
)
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_900,
freigabe_auto=types.Freigabe.WF_900,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.update_open(
res.in_.with_columns(
pl.col("MENGE_VORMERKER").fill_null(0),
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
)
)
return pipe_result
def wf910(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_910,
freigabe_auto=types.Freigabe.WF_910,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result
# this a main routine:
# receives and gives back result objects
def wf100_umbreit(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf100_petersen(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# difference WDB and others
# // WDB branch
# order quantity 0, no further action in other WFs
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) == 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=True),
)
filter_number_vm = pl.col(vm_criterion) > 0
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
wdb_sub_pipe = PipelineResult(res_candidates.in_)
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch
# show always entries with #VM > 1
filter_number_vm = pl.col(vm_criterion) > 1
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def _wf100_petersen_sub1_wdb(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
(filter_number_vm,),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months
save_tmp_data(pipe_result.open)
RELEVANT_DATE = get_starting_date(180)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
stmt = (
sql.select(
db.tmp_data,
db.EXT_BESPBES_INFO.c.BESP_MENGE,
db.EXT_BESPBES_INFO.c.BESP_STATUS,
)
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
count_col = sql.func.count()
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf200_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
pipe_result.open,
(filter_meldenummer, filter_mandant),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def wf200_petersen(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_MNR: tuple[int, ...] = (17, 18)
# // WDB branch
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
),
)
# ignore these
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
# // other branch
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf200_sub1(
pipe_result: PipelineResult,
) -> PipelineResult:
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(90)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result