Merge results obtained till project pause #2

Merged
foefl merged 13 commits from proto into main 2026-06-11 07:11:02 +00:00
7 changed files with 1320 additions and 134 deletions

View File

@@ -3,14 +3,18 @@ from __future__ import annotations
import datetime
import json
import shutil
import tempfile
import time
import typing
import uuid
from collections.abc import Sequence
from pathlib import Path
from pprint import pprint
import dopt_basics.datetime as dt
import oracledb
import pandas as pd
import polars as pl
import polars.selectors as cs
import sqlalchemy as sql
@@ -19,10 +23,34 @@ from sqlalchemy import event
from umbreit import db, types
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# %%
# import importlib
# types = importlib.reload(types)
# db = importlib.reload(db)
# %%
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
TMP_DIR = create_tmp_dir()
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
print(f"Created temp directory under: >{TMP_DIR}<")
# %%
p_cfg = io.search_file_iterative(
@@ -48,13 +76,22 @@ conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
# engine = sql.create_engine(conn_string)
engine = sql.create_engine(conn_string, execution_options={"stream_results": True})
engine = sql.create_engine(
conn_string,
execution_options={"stream_results": True},
)
@event.listens_for(engine, "after_cursor_execute")
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
cursor.arraysize = 1000
cursor.prefetchrows = 1000
# @event.listens_for(engine, "after_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# @event.listens_for(engine, "before_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# %%
@@ -233,7 +270,7 @@ temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0)
# // NO LIVE DATA NEEDED
# SAVING/LOADING
# p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow"
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
p_save = Path.cwd() / "raw_data_from_sql_query_20260303-1.arrow"
# df.write_ipc(p_save)
df = pl.read_ipc(p_save)
# %%
@@ -373,7 +410,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null(
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx"
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
)
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
@@ -404,6 +441,46 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys()
)
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit"
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
@@ -430,6 +507,7 @@ def get_raw_data() -> pl.DataFrame:
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
@@ -599,6 +677,7 @@ class PipelineResult:
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
@@ -676,12 +755,11 @@ def get_expr_order_qty(
def wf900(
pipe_result: PipelineResult,
) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_res.open,
pipe_result.open,
(
filter_meldenummer_null,
filter_mandant,
@@ -729,13 +807,14 @@ def wf100_umbreit(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# TODO remove
# ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
@@ -743,8 +822,96 @@ def wf100_umbreit(
filter_number_vm,
),
)
# sub-pipe neccessary:
# analyse MNr(18) mit #VM > 0 for reservations in the past two months
# similar to subroutine in WF-200 "_wf200_sub1"
sub_pipe = PipelineResult(res_candidates.in_)
sub_pipe = _wf100_sub1_umbreit(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
# pipe_result.write_results(
# data=res.in_,
# vorlage=False,
# wf_id=types.Workflows.ID_100,
# freigabe_auto=types.Freigabe.WF_100,
# order_qty_expr=ORDER_QTY_FUNC(empty=False),
# )
return pipe_result
def _wf100_sub1_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
# entry titles with MNr(18) and #VM > 0
# show entries with more than three orders from different
# customers in the past two months
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
RELEVANT_DATE = get_starting_date(60) # see REQ-1002
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J",
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=res.in_,
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
@@ -799,8 +966,10 @@ def wf100_petersen(
pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch
# show always entries with #VM > 1
filter_number_vm = pl.col(vm_criterion) > 1
# Verlage: always show because of missing information of ONIX
# data (REQ-1003)
# show always entries with #VM > 0
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
@@ -817,22 +986,24 @@ def wf100_petersen(
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
# TODO remove after successful tests
# // excluded based on feedback on 27.02.2026
# filter_number_vm = pl.col(vm_criterion) > 0
# res = _apply_several_filters(
# pipe_result.open,
# (
# filter_meldenummer,
# filter_mandant,
# filter_number_vm,
# ),
# )
# pipe_result.write_results(
# data=res.in_,
# vorlage=False,
# wf_id=types.Workflows.ID_100,
# freigabe_auto=types.Freigabe.WF_100,
# order_qty_expr=ORDER_QTY_FUNC(empty=False),
# )
return pipe_result
@@ -842,8 +1013,7 @@ def _wf100_petersen_sub1_wdb(
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more then 1 VM
# !! show these entries
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
@@ -883,11 +1053,26 @@ def _wf100_petersen_sub1_wdb(
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
print(relevant_titles)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
@@ -973,7 +1158,7 @@ def _wf200_sub1(
) -> PipelineResult:
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(90)
RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
@@ -1003,10 +1188,27 @@ def _wf200_sub1(
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
@@ -1031,63 +1233,26 @@ def _wf200_sub1(
# %%
# SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
df = pl.read_ipc(p_save)
READ_DATABASE = False
OVERWRITE = True
FILENAME = "raw_data_from_sql_query_20260303-2.arrow"
p_save = Path.cwd() / FILENAME
if READ_DATABASE:
df = get_raw_data()
if not p_save.exists() or OVERWRITE:
df.write_ipc(p_save)
else:
df = pl.read_ipc(p_save)
# %%
df
# %%
# initialise pipeline
raw_data = df.clone()
print(f"Number of entries: {len(df)}")
clear_tmp_dir()
clear_result_data()
# %%
df.head()
# %%
# df.filter(pl.col.BEDP_TITELNR == 4314750)
# # %%
# RELEVANT_DATE = get_starting_date(180)
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
# filter_ = sql.and_(
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
# )
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# print(relevant_titles)
# %%
# removed_rows = []
# raw_data = df.clone()
# print(f"Length raw data: {len(raw_data)}")
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
# filtered = raw_data.filter(filter_mandant)
# filtered_n = raw_data.filter(~filter_mandant)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# raw_data = filtered
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# # %%
# print("---------------------------------------")
# filtered = raw_data.filter(filter_ignore_MNR26)
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
# num_filter = len(filtered)
# num_filter_n = len(filtered_n)
# len(filtered_n)
# # %%
# removed_rows.append(filtered_n)
# print(f"Length filtered: {num_filter}")
# print(f"Length filtered out: {num_filter_n}")
# print(f"Length all: {num_filter + num_filter_n}")
# out = pl.concat(removed_rows)
# print(f"Length out: {len(out)}")
# %%
raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data)
@@ -1101,15 +1266,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results
# %%
# // test result writing
res = pipe_res.results.clone()
res.height
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %%
pipe_res = wf910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
@@ -1117,8 +1273,6 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
# pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
@@ -1146,6 +1300,194 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %%
pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %%
pipe_res.results.height
# %%
# // aggregate test results
all_tmps = load_all_tmp_files()
print(len(all_tmps))
# %%
all_tmps
# %%
def prepare_tmp_data() -> list[pl.DataFrame]:
all_tmps = load_all_tmp_files()
WF_100_TMP_WDB_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
WF_100_TMP_UMBREIT_RENAME = {
"COUNT": "WF-100_Umbreit_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-100_Umbreit_Anz-Kunden_verg_3_Monate",
}
WF_200_TMP_RENAME = {
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
}
WF_100_UMBREIT: list[pl.DataFrame] = []
WF_100_WDB: list[pl.DataFrame] = []
WF_200: list[pl.DataFrame] = []
for name, df in all_tmps.items():
if TMPFILE_WF100_SUB1_UMBREIT in name:
rename_schema = WF_100_TMP_UMBREIT_RENAME
df = df.rename(rename_schema)
WF_100_UMBREIT.append(df)
elif TMPFILE_WF100_SUB1_WDB in name:
rename_schema = WF_100_TMP_WDB_RENAME
df = df.rename(rename_schema)
WF_100_WDB.append(df)
elif TMPFILE_WF200_SUB1 in name:
rename_schema = WF_200_TMP_RENAME
df = df.rename(rename_schema)
WF_200.append(df)
tmp_WF_collects = (WF_100_UMBREIT, WF_100_WDB, WF_200)
all_tmps_preproc: list[pl.DataFrame] = []
for collect in tmp_WF_collects:
if len(collect) > 1:
df = pl.concat(collect)
elif len(collect) == 1:
df = collect[0].clone()
else:
raise RuntimeError()
all_tmps_preproc.append(df)
return all_tmps_preproc
def generate_test_result_data(
raw_data: pl.DataFrame,
pipe_result: PipelineResult,
) -> pl.DataFrame:
all_tmps_preproc = prepare_tmp_data()
res_table = pipe_result.results.clone()
res_title_info = res_table.join(
raw_data,
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
how="inner",
)
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
columns = [
"VORLAGE",
"WF_ID",
"BEST_MENGE",
"FREIGABE_AUTO",
"BEDP_MENGE_BEDARF_VM",
"MENGE_VORMERKER",
"BEDP_TITELNR",
"BEDP_MAN",
"MELDENUMMER",
"VERLAGSNR",
"EINKAEUFER",
"MANDFUEHR",
]
res_title_info = res_title_info.select(columns)
test_results = res_title_info.clone()
for df in all_tmps_preproc:
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
test_results = test_results.with_columns(
pl.lit(None, dtype=pl.String).alias("Anmerkungen/Feedback")
)
return test_results
# %%
test_results = generate_test_result_data(raw_data, pipe_res)
test_results.head()
# %%
col = "WF-100_Umbreit_Anz-Best-Kunde_verg_3_Monate"
# col = "WF-100_Umbreit_Anz-Kunden_verg_3_Monate"
# test_results.filter(pl.col(col) >= 3)
# %%
# RELEVANT_DATE = get_starting_date(60)
#
# title_no = 7753822
# title_no = 5383912
# filter_ = sql.and_(
# db.EXT_AUFPAUF.c.TITELNR == title_no,
# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
# db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J",
# )
# stmt = sql.select(
# db.EXT_AUFPAUF,
# ).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# relevant_titles
# %%
def write_test_results_excel(
data: pl.DataFrame,
base_filename: str,
) -> None:
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"{base_filename}_{date_str}.xlsx"
pd_df = data.to_pandas().set_index("Index")
with pd.ExcelWriter(p_save, engine="xlsxwriter") as writer:
sheet_name = f"Ergebnisse_Testphase_{date_str}"
pd_df.to_excel(
writer,
freeze_panes=(1, 1),
sheet_name=sheet_name,
)
worksheet = writer.sheets[sheet_name]
rows, cols = pd_df.shape
columns = ["Index"] + pd_df.columns.to_list()
worksheet.add_table(
0,
0,
rows,
cols,
{"columns": [{"header": c} for c in columns], "style": "Table Style Light 9"},
)
for i, col in enumerate(columns):
if i == 0:
worksheet.set_column(
i, i, max(pd_df.index.astype(str).map(len).max(), len(col)) + 2
)
continue
worksheet.set_column(
i, i, max(pd_df[col].astype(str).map(len).max(), len(col)) + 2
)
# %%
write_test_results_excel(test_results, "Testdatensatz_WF-100-200")
#####################################################################
# %%
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
)
deviation_vm = test_results.filter(
pl.col.BEDP_TITELNR.is_in(deviation_vm["BEDP_TITELNR"].implode())
)
deviation_vm
# %%
write_test_results_excel(deviation_vm, "Abweichungen-VM")
# ** WF-200 potentially triggered
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
pl.col.BEDP_TITELNR.is_duplicated()
).sort("BEDP_TITELNR")
# %%
# ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only)
@@ -1157,7 +1499,7 @@ wf_200_start_data
# %%
engine.dispose()
remove_tmp_dir()
# %%
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
@@ -1613,3 +1955,5 @@ pipe_res_main.open
# %%
pipe_res.results
# %%
remove_tmp_dir()
# %%

39
pdm.lock generated
View File

@@ -2692,30 +2692,29 @@ files = [
[[package]]
name = "ruff"
version = "0.14.3"
version = "0.15.0"
requires_python = ">=3.7"
summary = "An extremely fast Python linter and code formatter, written in Rust."
groups = ["lint"]
files = [
{file = "ruff-0.14.3-py3-none-linux_armv6l.whl", hash = "sha256:876b21e6c824f519446715c1342b8e60f97f93264012de9d8d10314f8a79c371"},
{file = "ruff-0.14.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b6fd8c79b457bedd2abf2702b9b472147cd860ed7855c73a5247fa55c9117654"},
{file = "ruff-0.14.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:71ff6edca490c308f083156938c0c1a66907151263c4abdcb588602c6e696a14"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:786ee3ce6139772ff9272aaf43296d975c0217ee1b97538a98171bf0d21f87ed"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:cd6291d0061811c52b8e392f946889916757610d45d004e41140d81fb6cd5ddc"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a497ec0c3d2c88561b6d90f9c29f5ae68221ac00d471f306fa21fa4264ce5fcd"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e231e1be58fc568950a04fbe6887c8e4b85310e7889727e2b81db205c45059eb"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:469e35872a09c0e45fecf48dd960bfbce056b5db2d5e6b50eca329b4f853ae20"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3d6bc90307c469cb9d28b7cfad90aaa600b10d67c6e22026869f585e1e8a2db0"},
{file = "ruff-0.14.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e2f8a0bbcffcfd895df39c9a4ecd59bb80dca03dc43f7fb63e647ed176b741e"},
{file = "ruff-0.14.3-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:678fdd7c7d2d94851597c23ee6336d25f9930b460b55f8598e011b57c74fd8c5"},
{file = "ruff-0.14.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1ec1ac071e7e37e0221d2f2dbaf90897a988c531a8592a6a5959f0603a1ecf5e"},
{file = "ruff-0.14.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afcdc4b5335ef440d19e7df9e8ae2ad9f749352190e96d481dc501b753f0733e"},
{file = "ruff-0.14.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:7bfc42f81862749a7136267a343990f865e71fe2f99cf8d2958f684d23ce3dfa"},
{file = "ruff-0.14.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a65e448cfd7e9c59fae8cf37f9221585d3354febaad9a07f29158af1528e165f"},
{file = "ruff-0.14.3-py3-none-win32.whl", hash = "sha256:f3d91857d023ba93e14ed2d462ab62c3428f9bbf2b4fbac50a03ca66d31991f7"},
{file = "ruff-0.14.3-py3-none-win_amd64.whl", hash = "sha256:d7b7006ac0756306db212fd37116cce2bd307e1e109375e1c6c106002df0ae5f"},
{file = "ruff-0.14.3-py3-none-win_arm64.whl", hash = "sha256:26eb477ede6d399d898791d01961e16b86f02bc2486d0d1a7a9bb2379d055dc1"},
{file = "ruff-0.14.3.tar.gz", hash = "sha256:4ff876d2ab2b161b6de0aa1f5bd714e8e9b4033dc122ee006925fbacc4f62153"},
{file = "ruff-0.15.0-py3-none-linux_armv6l.whl", hash = "sha256:aac4ebaa612a82b23d45964586f24ae9bc23ca101919f5590bdb368d74ad5455"},
{file = "ruff-0.15.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:dcd4be7cc75cfbbca24a98d04d0b9b36a270d0833241f776b788d59f4142b14d"},
{file = "ruff-0.15.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d747e3319b2bce179c7c1eaad3d884dc0a199b5f4d5187620530adf9105268ce"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:650bd9c56ae03102c51a5e4b554d74d825ff3abe4db22b90fd32d816c2e90621"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a6664b7eac559e3048223a2da77769c2f92b43a6dfd4720cef42654299a599c9"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f811f97b0f092b35320d1556f3353bf238763420ade5d9e62ebd2b73f2ff179"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:761ec0a66680fab6454236635a39abaf14198818c8cdf691e036f4bc0f406b2d"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:940f11c2604d317e797b289f4f9f3fa5555ffe4fb574b55ed006c3d9b6f0eb78"},
{file = "ruff-0.15.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bcbca3d40558789126da91d7ef9a7c87772ee107033db7191edefa34e2c7f1b4"},
{file = "ruff-0.15.0-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:9a121a96db1d75fa3eb39c4539e607f628920dd72ff1f7c5ee4f1b768ac62d6e"},
{file = "ruff-0.15.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5298d518e493061f2eabd4abd067c7e4fb89e2f63291c94332e35631c07c3662"},
{file = "ruff-0.15.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afb6e603d6375ff0d6b0cee563fa21ab570fd15e65c852cb24922cef25050cf1"},
{file = "ruff-0.15.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:77e515f6b15f828b94dc17d2b4ace334c9ddb7d9468c54b2f9ed2b9c1593ef16"},
{file = "ruff-0.15.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6f6e80850a01eb13b3e42ee0ebdf6e4497151b48c35051aab51c101266d187a3"},
{file = "ruff-0.15.0-py3-none-win32.whl", hash = "sha256:238a717ef803e501b6d51e0bdd0d2c6e8513fe9eec14002445134d3907cd46c3"},
{file = "ruff-0.15.0-py3-none-win_amd64.whl", hash = "sha256:dd5e4d3301dc01de614da3cdffc33d4b1b96fb89e45721f1598e5532ccf78b18"},
{file = "ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a"},
{file = "ruff-0.15.0.tar.gz", hash = "sha256:6bdea47cdbea30d40f8f8d7d69c0854ba7c15420ec75a26f463290949d7f7e9a"},
]
[[package]]

View File

@@ -1,6 +1,6 @@
[project]
name = "umbreit"
version = "0.1.0"
version = "0.1.1dev0"
description = "Umbreit's Python-based application"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
@@ -71,7 +71,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.1.0"
current_version = "0.1.1dev0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

30
src/umbreit/constants.py Normal file
View File

@@ -0,0 +1,30 @@
from pathlib import Path
from typing import Final
from dopt_basics import configs
from dopt_basics import io as io_
LIB_PATH: Final[Path] = Path(__file__).parent
# // database connections
p_cfg = io_.search_file_iterative(
starting_path=LIB_PATH,
glob_pattern="CRED*.toml",
stop_folder_name="umbreit-py",
)
if p_cfg is None:
raise FileNotFoundError("Config was not found")
CFG = configs.load_toml(p_cfg)
HOST = CFG["server"]["host"]
PORT = CFG["server"]["port"]
SERVICE = CFG["server"]["service"]
USER_NAME = CFG["user"]["name"]
USER_PASS = CFG["user"]["pass"]
# TODO remove or change
# ** Oracle client libs
USE_THICK_MODE: Final[bool] = False
P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29")
assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found"
assert P_ORACLE_CLIENT_LIBS.is_dir()

View File

@@ -178,17 +178,8 @@ EXT_AUFPAUF_schema_map: PolarsSchema = {
EXT_AUFPAUF_null_values: PolarsNullValues = {}
# // queries and temp data
raw_data_query_schema_map: PolarsSchema = {
"BEDARFNR": pl.UInt32,
"BEDP_SEQUENZ": pl.UInt32,
"BEDP_TITELNR": pl.UInt32,
"BEDP_MAN": pl.UInt8,
"BEDP_MENGE_BEDARF_VM": pl.UInt32,
"MELDENUMMER": pl.UInt8,
"VERLAGSNR": pl.UInt32,
"MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8,
}
raw_data_query_schema_map: PolarsSchema = ext_bedpbed_schema_map.copy()
raw_data_query_schema_map.update(ext_titel_info_schema_map)
tmp_data = Table(
"EXT_TMP_BEDP_TINFO",

821
src/umbreit/pipeline.py Normal file
View File

@@ -0,0 +1,821 @@
from __future__ import annotations
import datetime
import shutil
import tempfile
import typing
import uuid
from collections.abc import Sequence
from pathlib import Path
import dopt_basics.datetime as dt
import oracledb
import polars as pl
import polars.selectors as cs
import sqlalchemy as sql
from umbreit import constants, db, types
from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
if constants.USE_THICK_MODE:
oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS))
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR = create_tmp_dir()
CONN_STRING: typing.Final[str] = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
engine = sql.create_engine(
CONN_STRING,
execution_options={"stream_results": True},
)
VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys()
)
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit"
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
days_from_now: int,
) -> datetime.date:
current_dt = dt.current_time_tz(cut_microseconds=True)
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
return (current_dt - td).date()
def get_raw_data() -> pl.DataFrame:
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
def save_tmp_data(df: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.tmp_data))
with engine.begin() as conn:
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
def get_tmp_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.tmp_data),
engine,
schema_overrides=db.tmp_data_schema_map,
)
def get_result_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.EXT_DOPT_ERGEBNIS),
engine,
schema_overrides=db.results_schema_map,
)
def save_result_data(results: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
def clear_result_data() -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
def save_result_data_native(
results: pl.DataFrame,
) -> None:
results = results.with_columns(
[
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
for c in results.select(cs.boolean()).columns
]
)
stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
"BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6)
"""
with engine.begin() as conn:
raw_conn = conn.connection.connection
with raw_conn.cursor() as cursor:
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
def _apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> types.FilterResult:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return types.FilterResult(in_=df_current, out_=df_removed)
class PipelineResult:
__slots__ = ("_results", "_open", "_subtracted_indices")
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
def __init__(
self,
data: pl.DataFrame,
) -> None:
self._open = data
schema = db.results_schema_map.copy()
del schema["ID"]
self._results = pl.DataFrame(schema=schema)
schema = {}
for col in self._index_cols:
schema[col] = db.raw_data_query_schema_map[col]
self._subtracted_indices = pl.DataFrame(schema=schema)
def __len__(self) -> int:
return len(self._results) + len(self._open)
@property
def open(self) -> pl.DataFrame:
return self._open
@property
def results(self) -> pl.DataFrame:
return self._results
@property
def subtracted_indices(self) -> pl.DataFrame:
return self._subtracted_indices
def update_open(
self,
data: pl.DataFrame,
) -> None:
self._open = data
def _subtract_data(
self,
data: pl.DataFrame,
) -> None:
self._open = self._open.join(data, on=self._index_cols, how="anti")
self._subtracted_indices = pl.concat(
(self._subtracted_indices, data[self._index_cols])
)
def _add_results(
self,
data: pl.DataFrame,
) -> None:
res = pl.concat([self._results, data])
self._results = res
def merge_pipeline(
self,
pipeline: PipelineResult,
) -> None:
self._subtract_data(pipeline.subtracted_indices)
self._add_results(pipeline.results)
def write_results(
self,
data: pl.DataFrame,
vorlage: bool,
wf_id: types.Workflows,
freigabe_auto: types.Freigabe,
order_qty_expr: pl.Expr,
) -> None:
results = data.rename(db.map_data_to_result)
results = results.with_columns(
[
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
order_qty_expr,
pl.lit(freigabe_auto.value)
.alias("FREIGABE_AUTO")
.cast(db.results_schema_map["FREIGABE_AUTO"]),
]
)
results = results.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
self._subtract_data(data)
self._add_results(results)
class ExprOrderQty(typing.Protocol): ...
class ExprOrderQty_Base(ExprOrderQty, typing.Protocol):
def __call__(self) -> pl.Expr: ...
ExprOrderQty_Base_Types: typing.TypeAlias = (
typing.Literal[types.Workflows.ID_200]
| typing.Literal[types.Workflows.ID_900]
| typing.Literal[types.Workflows.ID_910]
)
class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol):
def __call__(self, empty: bool) -> pl.Expr: ...
@typing.overload
def get_expr_order_qty(
wf_id: typing.Literal[types.Workflows.ID_100],
) -> ExprOrderQty_WF100: ...
@typing.overload
def get_expr_order_qty(
wf_id: ExprOrderQty_Base_Types,
) -> ExprOrderQty_Base: ...
def get_expr_order_qty(
wf_id: types.Workflows,
) -> ExprOrderQty:
empty_expr = (
pl.lit(0)
.alias(ORDER_QTY_CRIT)
.alias("BEST_MENGE")
.cast(db.results_schema_map["BEST_MENGE"])
)
def _empty() -> pl.Expr:
return empty_expr
func: ExprOrderQty
match wf_id:
case types.Workflows.ID_100:
def _func(empty: bool) -> pl.Expr:
order_qty_expr: pl.Expr
if empty:
order_qty_expr = empty_expr
else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
return order_qty_expr
func = _func
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
func = _empty
case _:
raise NotImplementedError(
f"Order expression for WF-ID {wf_id.value} is not implemented"
)
return func
# // begin workflows
def wf900(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer_null,
filter_mandant,
),
)
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_900,
freigabe_auto=types.Freigabe.WF_900,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.update_open(
res.in_.with_columns(
pl.col("MENGE_VORMERKER").fill_null(0),
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
)
)
return pipe_result
def wf910(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_910,
freigabe_auto=types.Freigabe.WF_910,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result
# this a main routine:
# receives and gives back result objects
def wf100_umbreit(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
# TODO remove
# ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
# sub-pipe neccessary:
# analyse MNr(18) mit #VM > 0 for reservations in the past two months
# similar to subroutine in WF-200 "_wf200_sub1"
sub_pipe = PipelineResult(res_candidates.in_)
sub_pipe = _wf100_sub1_umbreit(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf100_sub1_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
# entry titles with MNr(18) and #VM > 0
# show entries with more than three orders from different
# customers in the past two months
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
RELEVANT_DATE = get_starting_date(60) # see REQ-1002
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J",
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf100_petersen(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# difference WDB and others
# // WDB branch
# order quantity 0, no further action in other WFs
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) == 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=True),
)
filter_number_vm = pl.col(vm_criterion) > 0
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
wdb_sub_pipe = PipelineResult(res_candidates.in_)
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch
# Verlage: always show because of missing information of ONIX
# data (REQ-1003)
# show always entries with #VM > 0
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def _wf100_petersen_sub1_wdb(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
(filter_number_vm,),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months
save_tmp_data(pipe_result.open)
RELEVANT_DATE = get_starting_date(180)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
stmt = (
sql.select(
db.tmp_data,
db.EXT_BESPBES_INFO.c.BESP_MENGE,
db.EXT_BESPBES_INFO.c.BESP_STATUS,
)
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
count_col = sql.func.count()
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf200_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
pipe_result.open,
(filter_meldenummer, filter_mandant),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def wf200_petersen(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_MNR: tuple[int, ...] = (17, 18)
# // WDB branch
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
),
)
# ignore these
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
# // other branch
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf200_sub1(
pipe_result: PipelineResult,
) -> PipelineResult:
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result

View File

@@ -26,6 +26,7 @@ class FilterResult:
class Workflows(enum.Enum):
ID_000 = 0
ID_100 = 100
ID_200 = 200
ID_900 = 900
@@ -38,8 +39,8 @@ class OrderQtyExprKwArgs:
class Freigabe(enum.Enum):
WF_000 = False
WF_100 = False
WF_200 = False
WF_900 = False
WF_910 = False
OPEN = False