Compare commits

..

5 Commits
main ... proto

Author SHA1 Message Date
c244285c4d begin refactoring 2026-02-02 11:53:37 +01:00
fa659c37bd prepare test data generation 2026-02-02 11:27:19 +01:00
c3b37e7862 add automatic schema for raw data query 2026-02-02 11:27:03 +01:00
24f5dea7da add test results 2026-01-30 14:04:20 +01:00
9267f3da24 prepare temp data fusion for testing phase 2026-01-29 16:27:56 +01:00
4 changed files with 1033 additions and 86 deletions

View File

@ -3,8 +3,11 @@ from __future__ import annotations
import datetime
import json
import shutil
import tempfile
import time
import typing
import uuid
from collections.abc import Sequence
from pathlib import Path
from pprint import pprint
@ -19,10 +22,34 @@ from sqlalchemy import event
from umbreit import db, types
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# %%
# import importlib
# types = importlib.reload(types)
# db = importlib.reload(db)
# %%
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
TMP_DIR = create_tmp_dir()
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
print(f"Created temp directory under: >{TMP_DIR}<")
# %%
p_cfg = io.search_file_iterative(
@ -48,13 +75,22 @@ conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
# engine = sql.create_engine(conn_string)
engine = sql.create_engine(conn_string, execution_options={"stream_results": True})
engine = sql.create_engine(
conn_string,
execution_options={"stream_results": True},
)
@event.listens_for(engine, "after_cursor_execute")
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
cursor.arraysize = 1000
cursor.prefetchrows = 1000
# @event.listens_for(engine, "after_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# @event.listens_for(engine, "before_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# %%
@ -373,7 +409,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null(
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx"
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
)
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
@ -404,6 +440,45 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys()
)
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
@ -430,6 +505,7 @@ def get_raw_data() -> pl.DataFrame:
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
@ -599,6 +675,7 @@ class PipelineResult:
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
@ -681,7 +758,7 @@ def wf900(
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_res.open,
pipe_result.open,
(
filter_meldenummer_null,
filter_mandant,
@ -842,8 +919,7 @@ def _wf100_petersen_sub1_wdb(
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more then 1 VM
# !! show these entries
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
@ -883,11 +959,26 @@ def _wf100_petersen_sub1_wdb(
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
print(relevant_titles)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
@ -1003,10 +1094,27 @@ def _wf200_sub1(
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
@ -1031,63 +1139,26 @@ def _wf200_sub1(
# %%
# SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
df = pl.read_ipc(p_save)
READ_DATABASE = False
OVERWRITE = True
FILENAME = "raw_data_from_sql_query_20260202-1.arrow"
p_save = Path.cwd() / FILENAME
if READ_DATABASE:
df = get_raw_data()
if not p_save.exists() or OVERWRITE:
df.write_ipc(p_save)
else:
df = pl.read_ipc(p_save)
# %%
df
# %%
# initialise pipeline
raw_data = df.clone()
print(f"Number of entries: {len(df)}")
clear_tmp_dir()
clear_result_data()
# %%
df.head()
# %%
# df.filter(pl.col.BEDP_TITELNR == 4314750)
# # %%
# RELEVANT_DATE = get_starting_date(180)
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
# filter_ = sql.and_(
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
# )
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# print(relevant_titles)
# %%
# removed_rows = []
# raw_data = df.clone()
# print(f"Length raw data: {len(raw_data)}")
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
# filtered = raw_data.filter(filter_mandant)
# filtered_n = raw_data.filter(~filter_mandant)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# raw_data = filtered
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# # %%
# print("---------------------------------------")
# filtered = raw_data.filter(filter_ignore_MNR26)
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# len(filtered_n)
# # %%
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# %%
raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data)
@ -1101,15 +1172,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results
# %%
# // test result writing
res = pipe_res.results.clone()
res.height
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %%
pipe_res = wf910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
@ -1117,8 +1179,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
# pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
@ -1146,6 +1206,120 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %%
pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %%
pipe_res.results.height
# %%
# // aggregate test results
all_tmps = load_all_tmp_files()
print(len(all_tmps))
# %%
def prepare_tmp_data() -> list[pl.DataFrame]:
all_tmps = load_all_tmp_files()
WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
WF_200_TMP_RENAME = {
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
}
WF_100: list[pl.DataFrame] = []
WF_200: list[pl.DataFrame] = []
for name, df in all_tmps.items():
if TMPFILE_WF100_SUB1_WDB in name:
rename_schema = WF_100_TMP_RENAME
df = df.rename(rename_schema)
WF_100.append(df)
elif TMPFILE_WF200_SUB1 in name:
rename_schema = WF_200_TMP_RENAME
df = df.rename(rename_schema)
WF_200.append(df)
tmp_WF_collects = (WF_100, WF_200)
all_tmps_preproc: list[pl.DataFrame] = []
for collect in tmp_WF_collects:
if len(collect) > 1:
df = pl.concat(collect)
elif len(collect) == 1:
df = collect[0].clone()
else:
raise RuntimeError()
all_tmps_preproc.append(df)
return all_tmps_preproc
def generate_test_result_data(
raw_data: pl.DataFrame,
pipe_result: PipelineResult,
) -> pl.DataFrame:
all_tmps_preproc = prepare_tmp_data()
res_table = pipe_result.results.clone()
res_title_info = res_table.join(
raw_data,
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
how="inner",
)
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
columns = [
"VORLAGE",
"WF_ID",
"BEST_MENGE",
"FREIGABE_AUTO",
"BEDP_MENGE_BEDARF_VM",
"MENGE_VORMERKER",
"BEDP_TITELNR",
"BEDP_MAN",
"MELDENUMMER",
"VERLAGSNR",
"EINKAEUFER",
"MANDFUEHR",
]
res_title_info = res_title_info.select(columns)
test_results = res_title_info.clone()
for df in all_tmps_preproc:
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
return test_results
# %%
test_results = generate_test_result_data(raw_data, pipe_res)
test_results.head()
# %%
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx"
test_results.to_pandas().set_index("Index").to_excel(
p_save,
freeze_panes=(1, 1),
sheet_name=f"Ergebnisse_Testphase_{date_str}",
)
#####################################################################
# %%
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
)
deviation_vm = test_results.filter(pl.col.BEDP_TITELNR.is_in(dev["BEDP_TITELNR"].implode()))
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Abweichungen-VM_{date_str}.xlsx"
deviation_vm.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1))
# ** WF-200 potentially triggered
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
pl.col.BEDP_TITELNR.is_duplicated()
).sort("BEDP_TITELNR")
# %%
# ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only)
@ -1613,3 +1787,5 @@ pipe_res_main.open
# %%
pipe_res.results
# %%
remove_tmp_dir()
# %%

30
src/umbreit/constants.py Normal file
View File

@ -0,0 +1,30 @@
from pathlib import Path
from typing import Final
from dopt_basics import configs
from dopt_basics import io as io_
LIB_PATH: Final[Path] = Path(__file__).parent
# // database connections
p_cfg = io_.search_file_iterative(
starting_path=LIB_PATH,
glob_pattern="CRED*.toml",
stop_folder_name="umbreit-py",
)
if p_cfg is None:
raise FileNotFoundError("Config was not found")
CFG = configs.load_toml(p_cfg)
HOST = CFG["server"]["host"]
PORT = CFG["server"]["port"]
SERVICE = CFG["server"]["service"]
USER_NAME = CFG["user"]["name"]
USER_PASS = CFG["user"]["pass"]
# TODO remove or change
# ** Oracle client libs
USE_THICK_MODE: Final[bool] = False
P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29")
assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found"
assert P_ORACLE_CLIENT_LIBS.is_dir()

View File

@ -178,17 +178,8 @@ EXT_AUFPAUF_schema_map: PolarsSchema = {
EXT_AUFPAUF_null_values: PolarsNullValues = {}
# // queries and temp data
raw_data_query_schema_map: PolarsSchema = {
"BEDARFNR": pl.UInt32,
"BEDP_SEQUENZ": pl.UInt32,
"BEDP_TITELNR": pl.UInt32,
"BEDP_MAN": pl.UInt8,
"BEDP_MENGE_BEDARF_VM": pl.UInt32,
"MELDENUMMER": pl.UInt8,
"VERLAGSNR": pl.UInt32,
"MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8,
}
raw_data_query_schema_map: PolarsSchema = ext_bedpbed_schema_map.copy()
raw_data_query_schema_map.update(ext_titel_info_schema_map)
tmp_data = Table(
"EXT_TMP_BEDP_TINFO",

750
src/umbreit/pipeline.py Normal file
View File

@ -0,0 +1,750 @@
from __future__ import annotations
import datetime
import shutil
import tempfile
import uuid
from collections.abc import Sequence
from pathlib import Path
from typing import Final, Literal, Protocol, TypeAlias, overload
import dopt_basics.datetime as dt
import oracledb
import polars as pl
import polars.selectors as cs
import sqlalchemy as sql
from umbreit import constants, db, types
from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
if constants.USE_THICK_MODE:
oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS))
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR = create_tmp_dir()
CONN_STRING: Final[str] = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
engine = sql.create_engine(
CONN_STRING,
execution_options={"stream_results": True},
)
VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION: Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys())
ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
days_from_now: int,
) -> datetime.date:
current_dt = dt.current_time_tz(cut_microseconds=True)
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
return (current_dt - td).date()
def get_raw_data() -> pl.DataFrame:
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
def save_tmp_data(df: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.tmp_data))
with engine.begin() as conn:
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
def get_tmp_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.tmp_data),
engine,
schema_overrides=db.tmp_data_schema_map,
)
def get_result_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.EXT_DOPT_ERGEBNIS),
engine,
schema_overrides=db.results_schema_map,
)
def save_result_data(results: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
def clear_result_data() -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
def save_result_data_native(results: pl.DataFrame) -> None:
results = results.with_columns(
[
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
for c in results.select(cs.boolean()).columns
]
)
stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
"BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6)
"""
with engine.begin() as conn:
raw_conn = conn.connection.connection
with raw_conn.cursor() as cursor:
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
def _apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> types.FilterResult:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return types.FilterResult(in_=df_current, out_=df_removed)
class PipelineResult:
__slots__ = ("_results", "_open", "_subtracted_indices")
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
def __init__(
self,
data: pl.DataFrame,
) -> None:
self._open = data
schema = db.results_schema_map.copy()
del schema["ID"]
self._results = pl.DataFrame(schema=schema)
schema = {}
for col in self._index_cols:
schema[col] = db.raw_data_query_schema_map[col]
self._subtracted_indices = pl.DataFrame(schema=schema)
def __len__(self) -> int:
return len(self._results) + len(self._open)
@property
def open(self) -> pl.DataFrame:
return self._open
@property
def results(self) -> pl.DataFrame:
return self._results
@property
def subtracted_indices(self) -> pl.DataFrame:
return self._subtracted_indices
def update_open(
self,
data: pl.DataFrame,
) -> None:
self._open = data
def _subtract_data(
self,
data: pl.DataFrame,
) -> None:
self._open = self._open.join(data, on=self._index_cols, how="anti")
self._subtracted_indices = pl.concat(
(self._subtracted_indices, data[self._index_cols])
)
def _add_results(
self,
data: pl.DataFrame,
) -> None:
res = pl.concat([self._results, data])
self._results = res
def merge_pipeline(
self,
pipeline: PipelineResult,
) -> None:
self._subtract_data(pipeline.subtracted_indices)
self._add_results(pipeline.results)
def write_results(
self,
data: pl.DataFrame,
vorlage: bool,
wf_id: types.Workflows,
freigabe_auto: types.Freigabe,
order_qty_expr: pl.Expr,
) -> None:
results = data.rename(db.map_data_to_result)
results = results.with_columns(
[
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
order_qty_expr,
pl.lit(freigabe_auto.value)
.alias("FREIGABE_AUTO")
.cast(db.results_schema_map["FREIGABE_AUTO"]),
]
)
results = results.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
self._subtract_data(data)
self._add_results(results)
class ExprOrderQty(Protocol): ...
class ExprOrderQty_Base(ExprOrderQty, Protocol):
def __call__(self) -> pl.Expr: ...
ExprOrderQty_Base_Types: TypeAlias = (
Literal[types.Workflows.ID_200]
| Literal[types.Workflows.ID_900]
| Literal[types.Workflows.ID_910]
)
class ExprOrderQty_WF100(ExprOrderQty, Protocol):
def __call__(self, empty: bool) -> pl.Expr: ...
@overload
def get_expr_order_qty(
wf_id: Literal[types.Workflows.ID_100],
) -> ExprOrderQty_WF100: ...
@overload
def get_expr_order_qty(
wf_id: ExprOrderQty_Base_Types,
) -> ExprOrderQty_Base: ...
def get_expr_order_qty(
wf_id: types.Workflows,
) -> ExprOrderQty:
empty_expr = (
pl.lit(0)
.alias(ORDER_QTY_CRIT)
.alias("BEST_MENGE")
.cast(db.results_schema_map["BEST_MENGE"])
)
def _empty() -> pl.Expr:
return empty_expr
func: ExprOrderQty
match wf_id:
case types.Workflows.ID_100:
def _func(empty: bool) -> pl.Expr:
order_qty_expr: pl.Expr
if empty:
order_qty_expr = empty_expr
else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
return order_qty_expr
func = _func
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
func = _empty
case _:
raise NotImplementedError(
f"Order expression for WF-ID {wf_id.value} is not implemented"
)
return func
def wf900(
pipe_result: PipelineResult,
) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer_null,
filter_mandant,
),
)
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_900,
freigabe_auto=types.Freigabe.WF_900,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.update_open(
res.in_.with_columns(
pl.col("MENGE_VORMERKER").fill_null(0),
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
)
)
return pipe_result
def wf910(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_910,
freigabe_auto=types.Freigabe.WF_910,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result
# this a main routine:
# receives and gives back result objects
def wf100_umbreit(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf100_petersen(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# difference WDB and others
# // WDB branch
# order quantity 0, no further action in other WFs
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) == 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=True),
)
filter_number_vm = pl.col(vm_criterion) > 0
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
wdb_sub_pipe = PipelineResult(res_candidates.in_)
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch
# show always entries with #VM > 1
filter_number_vm = pl.col(vm_criterion) > 1
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def _wf100_petersen_sub1_wdb(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
(filter_number_vm,),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months
save_tmp_data(pipe_result.open)
RELEVANT_DATE = get_starting_date(180)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
stmt = (
sql.select(
db.tmp_data,
db.EXT_BESPBES_INFO.c.BESP_MENGE,
db.EXT_BESPBES_INFO.c.BESP_STATUS,
)
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
count_col = sql.func.count()
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf200_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
pipe_result.open,
(filter_meldenummer, filter_mandant),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def wf200_petersen(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_MNR: tuple[int, ...] = (17, 18)
# // WDB branch
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
),
)
# ignore these
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
# // other branch
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf200_sub1(
pipe_result: PipelineResult,
) -> PipelineResult:
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(90)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result