generated from dopt-python/py311
Compare commits
No commits in common. "proto" and "main" have entirely different histories.
@ -3,18 +3,14 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
import uuid
|
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
|
|
||||||
import dopt_basics.datetime as dt
|
import dopt_basics.datetime as dt
|
||||||
import oracledb
|
import oracledb
|
||||||
import pandas as pd
|
|
||||||
import polars as pl
|
import polars as pl
|
||||||
import polars.selectors as cs
|
import polars.selectors as cs
|
||||||
import sqlalchemy as sql
|
import sqlalchemy as sql
|
||||||
@ -23,34 +19,10 @@ from sqlalchemy import event
|
|||||||
|
|
||||||
from umbreit import db, types
|
from umbreit import db, types
|
||||||
|
|
||||||
oracledb.defaults.arraysize = 1000
|
|
||||||
oracledb.defaults.prefetchrows = 1000
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
# import importlib
|
# import importlib
|
||||||
# types = importlib.reload(types)
|
# types = importlib.reload(types)
|
||||||
# db = importlib.reload(db)
|
# db = importlib.reload(db)
|
||||||
# %%
|
|
||||||
def create_tmp_dir() -> Path:
|
|
||||||
tmp_pth = Path(tempfile.mkdtemp())
|
|
||||||
assert tmp_pth.exists()
|
|
||||||
return tmp_pth
|
|
||||||
|
|
||||||
|
|
||||||
TMP_DIR = create_tmp_dir()
|
|
||||||
|
|
||||||
|
|
||||||
def clear_tmp_dir() -> None:
|
|
||||||
shutil.rmtree(TMP_DIR)
|
|
||||||
TMP_DIR.mkdir()
|
|
||||||
|
|
||||||
|
|
||||||
def remove_tmp_dir() -> None:
|
|
||||||
shutil.rmtree(TMP_DIR)
|
|
||||||
|
|
||||||
|
|
||||||
print(f"Created temp directory under: >{TMP_DIR}<")
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
p_cfg = io.search_file_iterative(
|
p_cfg = io.search_file_iterative(
|
||||||
@ -76,22 +48,13 @@ conn_string = (
|
|||||||
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
||||||
)
|
)
|
||||||
# engine = sql.create_engine(conn_string)
|
# engine = sql.create_engine(conn_string)
|
||||||
engine = sql.create_engine(
|
engine = sql.create_engine(conn_string, execution_options={"stream_results": True})
|
||||||
conn_string,
|
|
||||||
execution_options={"stream_results": True},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# @event.listens_for(engine, "after_cursor_execute")
|
@event.listens_for(engine, "after_cursor_execute")
|
||||||
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
|
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
|
||||||
# cursor.arraysize = 1000
|
cursor.arraysize = 1000
|
||||||
# cursor.prefetchrows = 1000
|
cursor.prefetchrows = 1000
|
||||||
|
|
||||||
|
|
||||||
# @event.listens_for(engine, "before_cursor_execute")
|
|
||||||
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
|
|
||||||
# cursor.arraysize = 1000
|
|
||||||
# cursor.prefetchrows = 1000
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
@ -410,7 +373,7 @@ df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null(
|
|||||||
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
|
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
|
||||||
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
|
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
|
||||||
p_save_diff_VM_bedp_tinfo = (
|
p_save_diff_VM_bedp_tinfo = (
|
||||||
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
|
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx"
|
||||||
)
|
)
|
||||||
|
|
||||||
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
|
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
|
||||||
@ -441,45 +404,6 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
|
|||||||
db.EXT_DOPT_ERGEBNIS.columns.keys()
|
db.EXT_DOPT_ERGEBNIS.columns.keys()
|
||||||
)
|
)
|
||||||
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
|
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
|
||||||
SAVE_TMP_FILES: typing.Final[bool] = True
|
|
||||||
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
|
|
||||||
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
|
|
||||||
|
|
||||||
|
|
||||||
def save_tmp_file(
|
|
||||||
data: pl.DataFrame,
|
|
||||||
filename: str | None,
|
|
||||||
) -> None:
|
|
||||||
if filename is None:
|
|
||||||
filename = str(uuid.uuid4())
|
|
||||||
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
|
||||||
|
|
||||||
n: int = 1
|
|
||||||
while pth.exists():
|
|
||||||
filename_new = pth.stem + f"_{n}"
|
|
||||||
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
|
|
||||||
n += 1
|
|
||||||
|
|
||||||
data.write_ipc(pth)
|
|
||||||
|
|
||||||
|
|
||||||
def load_tmp_file(
|
|
||||||
filename: str,
|
|
||||||
) -> pl.DataFrame:
|
|
||||||
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
|
||||||
if not pth.exists():
|
|
||||||
raise FileNotFoundError(f"File >{pth.name}< not found")
|
|
||||||
|
|
||||||
return pl.read_ipc(pth)
|
|
||||||
|
|
||||||
|
|
||||||
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
|
|
||||||
all_dfs: dict[str, pl.DataFrame] = {}
|
|
||||||
for file in TMP_DIR.glob("*.arrow"):
|
|
||||||
df = pl.read_ipc(file)
|
|
||||||
all_dfs[file.stem] = df
|
|
||||||
|
|
||||||
return all_dfs
|
|
||||||
|
|
||||||
|
|
||||||
def get_starting_date(
|
def get_starting_date(
|
||||||
@ -506,7 +430,6 @@ def get_raw_data() -> pl.DataFrame:
|
|||||||
db.ext_titel_info.c.VERLAGSNR,
|
db.ext_titel_info.c.VERLAGSNR,
|
||||||
db.ext_titel_info.c.MENGE_VORMERKER,
|
db.ext_titel_info.c.MENGE_VORMERKER,
|
||||||
db.ext_titel_info.c.MANDFUEHR,
|
db.ext_titel_info.c.MANDFUEHR,
|
||||||
db.ext_titel_info.c.EINKAEUFER,
|
|
||||||
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
||||||
|
|
||||||
return pl.read_database(
|
return pl.read_database(
|
||||||
@ -676,7 +599,6 @@ class PipelineResult:
|
|||||||
"VERLAGSNR",
|
"VERLAGSNR",
|
||||||
"MENGE_VORMERKER",
|
"MENGE_VORMERKER",
|
||||||
"MANDFUEHR",
|
"MANDFUEHR",
|
||||||
"EINKAEUFER",
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -759,7 +681,7 @@ def wf900(
|
|||||||
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
pipe_result.open,
|
pipe_res.open,
|
||||||
(
|
(
|
||||||
filter_meldenummer_null,
|
filter_meldenummer_null,
|
||||||
filter_mandant,
|
filter_mandant,
|
||||||
@ -920,7 +842,8 @@ def _wf100_petersen_sub1_wdb(
|
|||||||
) -> PipelineResult:
|
) -> PipelineResult:
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
||||||
# input: pre-filtered entries (WDB titles and #VM > 0)
|
# input: pre-filtered entries (WDB titles and #VM > 0)
|
||||||
# more than 1 VM: show these entries
|
# more then 1 VM
|
||||||
|
# !! show these entries
|
||||||
filter_number_vm = pl.col(VM_CRITERION) > 1
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
pipe_result.open,
|
pipe_result.open,
|
||||||
@ -960,26 +883,11 @@ def _wf100_petersen_sub1_wdb(
|
|||||||
.group_by(sub1.c.BEDP_TITELNR)
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
.having(count_col > 1)
|
.having(count_col > 1)
|
||||||
)
|
)
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
count_col.label("count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
)
|
|
||||||
# !! this is a sub result which must be used in the result set
|
|
||||||
# !! for testing and feedback by the customer
|
|
||||||
relevant_titles = pl.read_database(
|
relevant_titles = pl.read_database(
|
||||||
stmt,
|
stmt,
|
||||||
engine,
|
engine,
|
||||||
)
|
)
|
||||||
|
print(relevant_titles)
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
|
|
||||||
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
|
|
||||||
|
|
||||||
entries_to_show = pipe_result.open.filter(
|
entries_to_show = pipe_result.open.filter(
|
||||||
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
||||||
)
|
)
|
||||||
@ -1095,27 +1003,10 @@ def _wf200_sub1(
|
|||||||
.group_by(sub1.c.BEDP_TITELNR)
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
.having(unique_count_col >= 3)
|
.having(unique_count_col >= 3)
|
||||||
)
|
)
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
sql.func.count().label("count"),
|
|
||||||
unique_count_col.label("customer_count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
)
|
|
||||||
# !! this is a sub result which must be used in the result set
|
|
||||||
# !! for testing and feedback by the customer
|
|
||||||
relevant_titles = pl.read_database(
|
relevant_titles = pl.read_database(
|
||||||
stmt,
|
stmt,
|
||||||
engine,
|
engine,
|
||||||
)
|
)
|
||||||
|
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
|
|
||||||
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
|
|
||||||
|
|
||||||
entries_to_show = pipe_result.open.filter(
|
entries_to_show = pipe_result.open.filter(
|
||||||
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
||||||
)
|
)
|
||||||
@ -1140,26 +1031,63 @@ def _wf200_sub1(
|
|||||||
|
|
||||||
# %%
|
# %%
|
||||||
# SAVING/LOADING
|
# SAVING/LOADING
|
||||||
READ_DATABASE = False
|
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
|
||||||
OVERWRITE = True
|
|
||||||
FILENAME = "raw_data_from_sql_query_20260202-1.arrow"
|
|
||||||
p_save = Path.cwd() / FILENAME
|
|
||||||
if READ_DATABASE:
|
|
||||||
df = get_raw_data()
|
|
||||||
if not p_save.exists() or OVERWRITE:
|
|
||||||
df.write_ipc(p_save)
|
|
||||||
else:
|
|
||||||
df = pl.read_ipc(p_save)
|
df = pl.read_ipc(p_save)
|
||||||
# %%
|
|
||||||
df
|
|
||||||
# %%
|
|
||||||
# initialise pipeline
|
|
||||||
raw_data = df.clone()
|
|
||||||
print(f"Number of entries: {len(df)}")
|
print(f"Number of entries: {len(df)}")
|
||||||
clear_tmp_dir()
|
|
||||||
clear_result_data()
|
|
||||||
# %%
|
# %%
|
||||||
df.head()
|
df.head()
|
||||||
|
# %%
|
||||||
|
# df.filter(pl.col.BEDP_TITELNR == 4314750)
|
||||||
|
# # %%
|
||||||
|
# RELEVANT_DATE = get_starting_date(180)
|
||||||
|
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
|
||||||
|
# filter_ = sql.and_(
|
||||||
|
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
|
||||||
|
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
|
||||||
|
# )
|
||||||
|
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
|
||||||
|
# relevant_titles = pl.read_database(
|
||||||
|
# stmt,
|
||||||
|
# engine,
|
||||||
|
# )
|
||||||
|
# print(relevant_titles)
|
||||||
|
# %%
|
||||||
|
# removed_rows = []
|
||||||
|
|
||||||
|
# raw_data = df.clone()
|
||||||
|
# print(f"Length raw data: {len(raw_data)}")
|
||||||
|
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
|
||||||
|
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
||||||
|
|
||||||
|
|
||||||
|
# filtered = raw_data.filter(filter_mandant)
|
||||||
|
# filtered_n = raw_data.filter(~filter_mandant)
|
||||||
|
# num_filter = len(filtered)
|
||||||
|
# num_filter_n = len(filtered_n)
|
||||||
|
# removed_rows.append(filtered_n)
|
||||||
|
# print(f"Length filtered: {num_filter}")
|
||||||
|
# print(f"Length filtered out: {num_filter_n}")
|
||||||
|
# print(f"Length all: {num_filter + num_filter_n}")
|
||||||
|
# raw_data = filtered
|
||||||
|
# out = pl.concat(removed_rows)
|
||||||
|
# print(f"Length out: {len(out)}")
|
||||||
|
|
||||||
|
# # %%
|
||||||
|
# print("---------------------------------------")
|
||||||
|
# filtered = raw_data.filter(filter_ignore_MNR26)
|
||||||
|
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
|
||||||
|
# num_filter = len(filtered)
|
||||||
|
# num_filter_n = len(filtered_n)
|
||||||
|
# len(filtered_n)
|
||||||
|
# # %%
|
||||||
|
# removed_rows.append(filtered_n)
|
||||||
|
# print(f"Length filtered: {num_filter}")
|
||||||
|
# print(f"Length filtered out: {num_filter_n}")
|
||||||
|
# print(f"Length all: {num_filter + num_filter_n}")
|
||||||
|
# out = pl.concat(removed_rows)
|
||||||
|
# print(f"Length out: {len(out)}")
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
raw_data = df.clone()
|
raw_data = df.clone()
|
||||||
# pipe_res = get_empty_pipeline_result(raw_data)
|
# pipe_res = get_empty_pipeline_result(raw_data)
|
||||||
@ -1173,6 +1101,15 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|||||||
|
|
||||||
# %%
|
# %%
|
||||||
pipe_res.results
|
pipe_res.results
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# // test result writing
|
||||||
|
res = pipe_res.results.clone()
|
||||||
|
res.height
|
||||||
|
|
||||||
|
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
|
||||||
|
# %%
|
||||||
|
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
|
||||||
# %%
|
# %%
|
||||||
pipe_res = wf910(pipe_res)
|
pipe_res = wf910(pipe_res)
|
||||||
print(f"Length of base data: {len(raw_data):>18}")
|
print(f"Length of base data: {len(raw_data):>18}")
|
||||||
@ -1180,6 +1117,8 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
|||||||
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
||||||
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
||||||
# %%
|
# %%
|
||||||
|
# pipe_res.results.select(pl.col("vorlage").value_counts())
|
||||||
|
# %%
|
||||||
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
|
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
|
||||||
print(f"Length of base data: {len(raw_data):>18}")
|
print(f"Length of base data: {len(raw_data):>18}")
|
||||||
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
||||||
@ -1207,159 +1146,6 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|||||||
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
|
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
|
||||||
# %%
|
# %%
|
||||||
pipe_res.results.select(pl.col("VORLAGE").value_counts())
|
pipe_res.results.select(pl.col("VORLAGE").value_counts())
|
||||||
# %%
|
|
||||||
pipe_res.results.height
|
|
||||||
# %%
|
|
||||||
# // aggregate test results
|
|
||||||
all_tmps = load_all_tmp_files()
|
|
||||||
print(len(all_tmps))
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
def prepare_tmp_data() -> list[pl.DataFrame]:
|
|
||||||
all_tmps = load_all_tmp_files()
|
|
||||||
WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
|
|
||||||
WF_200_TMP_RENAME = {
|
|
||||||
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
|
|
||||||
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
|
|
||||||
}
|
|
||||||
|
|
||||||
WF_100: list[pl.DataFrame] = []
|
|
||||||
WF_200: list[pl.DataFrame] = []
|
|
||||||
|
|
||||||
for name, df in all_tmps.items():
|
|
||||||
if TMPFILE_WF100_SUB1_WDB in name:
|
|
||||||
rename_schema = WF_100_TMP_RENAME
|
|
||||||
df = df.rename(rename_schema)
|
|
||||||
WF_100.append(df)
|
|
||||||
elif TMPFILE_WF200_SUB1 in name:
|
|
||||||
rename_schema = WF_200_TMP_RENAME
|
|
||||||
df = df.rename(rename_schema)
|
|
||||||
WF_200.append(df)
|
|
||||||
|
|
||||||
tmp_WF_collects = (WF_100, WF_200)
|
|
||||||
all_tmps_preproc: list[pl.DataFrame] = []
|
|
||||||
|
|
||||||
for collect in tmp_WF_collects:
|
|
||||||
if len(collect) > 1:
|
|
||||||
df = pl.concat(collect)
|
|
||||||
elif len(collect) == 1:
|
|
||||||
df = collect[0].clone()
|
|
||||||
else:
|
|
||||||
raise RuntimeError()
|
|
||||||
|
|
||||||
all_tmps_preproc.append(df)
|
|
||||||
|
|
||||||
return all_tmps_preproc
|
|
||||||
|
|
||||||
|
|
||||||
def generate_test_result_data(
|
|
||||||
raw_data: pl.DataFrame,
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> pl.DataFrame:
|
|
||||||
all_tmps_preproc = prepare_tmp_data()
|
|
||||||
|
|
||||||
res_table = pipe_result.results.clone()
|
|
||||||
res_title_info = res_table.join(
|
|
||||||
raw_data,
|
|
||||||
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
|
|
||||||
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
|
|
||||||
how="inner",
|
|
||||||
)
|
|
||||||
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
|
|
||||||
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
|
|
||||||
columns = [
|
|
||||||
"VORLAGE",
|
|
||||||
"WF_ID",
|
|
||||||
"BEST_MENGE",
|
|
||||||
"FREIGABE_AUTO",
|
|
||||||
"BEDP_MENGE_BEDARF_VM",
|
|
||||||
"MENGE_VORMERKER",
|
|
||||||
"BEDP_TITELNR",
|
|
||||||
"BEDP_MAN",
|
|
||||||
"MELDENUMMER",
|
|
||||||
"VERLAGSNR",
|
|
||||||
"EINKAEUFER",
|
|
||||||
"MANDFUEHR",
|
|
||||||
]
|
|
||||||
res_title_info = res_title_info.select(columns)
|
|
||||||
|
|
||||||
test_results = res_title_info.clone()
|
|
||||||
for df in all_tmps_preproc:
|
|
||||||
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
|
|
||||||
|
|
||||||
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
|
|
||||||
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
|
|
||||||
test_results = test_results.with_columns(
|
|
||||||
pl.lit(None, dtype=pl.String).alias("Anmerkungen/Feedback")
|
|
||||||
)
|
|
||||||
|
|
||||||
return test_results
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
test_results = generate_test_result_data(raw_data, pipe_res)
|
|
||||||
test_results.head()
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
def write_test_results_excel(
|
|
||||||
data: pl.DataFrame,
|
|
||||||
base_filename: str,
|
|
||||||
) -> None:
|
|
||||||
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
|
|
||||||
p_save = Path.cwd() / f"{base_filename}_{date_str}.xlsx"
|
|
||||||
pd_df = data.to_pandas().set_index("Index")
|
|
||||||
|
|
||||||
with pd.ExcelWriter(p_save, engine="xlsxwriter") as writer:
|
|
||||||
sheet_name = f"Ergebnisse_Testphase_{date_str}"
|
|
||||||
pd_df.to_excel(
|
|
||||||
writer,
|
|
||||||
freeze_panes=(1, 1),
|
|
||||||
sheet_name=sheet_name,
|
|
||||||
)
|
|
||||||
worksheet = writer.sheets[sheet_name]
|
|
||||||
|
|
||||||
rows, cols = pd_df.shape
|
|
||||||
columns = ["Index"] + pd_df.columns.to_list()
|
|
||||||
worksheet.add_table(
|
|
||||||
0,
|
|
||||||
0,
|
|
||||||
rows,
|
|
||||||
cols,
|
|
||||||
{"columns": [{"header": c} for c in columns], "style": "Table Style Light 9"},
|
|
||||||
)
|
|
||||||
for i, col in enumerate(columns):
|
|
||||||
if i == 0:
|
|
||||||
worksheet.set_column(
|
|
||||||
i, i, max(pd_df.index.astype(str).map(len).max(), len(col)) + 2
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
worksheet.set_column(
|
|
||||||
i, i, max(pd_df[col].astype(str).map(len).max(), len(col)) + 2
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
write_test_results_excel(test_results, "Testdatensatz_WF-100-200")
|
|
||||||
|
|
||||||
#####################################################################
|
|
||||||
# %%
|
|
||||||
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
|
|
||||||
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
|
|
||||||
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
|
|
||||||
)
|
|
||||||
deviation_vm = test_results.filter(
|
|
||||||
pl.col.BEDP_TITELNR.is_in(deviation_vm["BEDP_TITELNR"].implode())
|
|
||||||
)
|
|
||||||
deviation_vm
|
|
||||||
# %%
|
|
||||||
write_test_results_excel(deviation_vm, "Abweichungen-VM")
|
|
||||||
# ** WF-200 potentially triggered
|
|
||||||
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
|
|
||||||
pl.col.BEDP_TITELNR.is_duplicated()
|
|
||||||
).sort("BEDP_TITELNR")
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
# ---------------------------------------------------------------------------- #
|
# ---------------------------------------------------------------------------- #
|
||||||
# Workflow 200 (Umbreit only)
|
# Workflow 200 (Umbreit only)
|
||||||
@ -1371,7 +1157,7 @@ wf_200_start_data
|
|||||||
|
|
||||||
# %%
|
# %%
|
||||||
engine.dispose()
|
engine.dispose()
|
||||||
remove_tmp_dir()
|
|
||||||
# %%
|
# %%
|
||||||
relevant_mnr: tuple[int, ...] = (17, 18)
|
relevant_mnr: tuple[int, ...] = (17, 18)
|
||||||
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
||||||
@ -1827,5 +1613,3 @@ pipe_res_main.open
|
|||||||
# %%
|
# %%
|
||||||
pipe_res.results
|
pipe_res.results
|
||||||
# %%
|
# %%
|
||||||
remove_tmp_dir()
|
|
||||||
# %%
|
|
||||||
|
|||||||
39
pdm.lock
generated
39
pdm.lock
generated
@ -2692,29 +2692,30 @@ files = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruff"
|
name = "ruff"
|
||||||
version = "0.15.0"
|
version = "0.14.3"
|
||||||
requires_python = ">=3.7"
|
requires_python = ">=3.7"
|
||||||
summary = "An extremely fast Python linter and code formatter, written in Rust."
|
summary = "An extremely fast Python linter and code formatter, written in Rust."
|
||||||
groups = ["lint"]
|
groups = ["lint"]
|
||||||
files = [
|
files = [
|
||||||
{file = "ruff-0.15.0-py3-none-linux_armv6l.whl", hash = "sha256:aac4ebaa612a82b23d45964586f24ae9bc23ca101919f5590bdb368d74ad5455"},
|
{file = "ruff-0.14.3-py3-none-linux_armv6l.whl", hash = "sha256:876b21e6c824f519446715c1342b8e60f97f93264012de9d8d10314f8a79c371"},
|
||||||
{file = "ruff-0.15.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:dcd4be7cc75cfbbca24a98d04d0b9b36a270d0833241f776b788d59f4142b14d"},
|
{file = "ruff-0.14.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b6fd8c79b457bedd2abf2702b9b472147cd860ed7855c73a5247fa55c9117654"},
|
||||||
{file = "ruff-0.15.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d747e3319b2bce179c7c1eaad3d884dc0a199b5f4d5187620530adf9105268ce"},
|
{file = "ruff-0.14.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:71ff6edca490c308f083156938c0c1a66907151263c4abdcb588602c6e696a14"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:650bd9c56ae03102c51a5e4b554d74d825ff3abe4db22b90fd32d816c2e90621"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:786ee3ce6139772ff9272aaf43296d975c0217ee1b97538a98171bf0d21f87ed"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a6664b7eac559e3048223a2da77769c2f92b43a6dfd4720cef42654299a599c9"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:cd6291d0061811c52b8e392f946889916757610d45d004e41140d81fb6cd5ddc"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f811f97b0f092b35320d1556f3353bf238763420ade5d9e62ebd2b73f2ff179"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a497ec0c3d2c88561b6d90f9c29f5ae68221ac00d471f306fa21fa4264ce5fcd"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:761ec0a66680fab6454236635a39abaf14198818c8cdf691e036f4bc0f406b2d"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e231e1be58fc568950a04fbe6887c8e4b85310e7889727e2b81db205c45059eb"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:940f11c2604d317e797b289f4f9f3fa5555ffe4fb574b55ed006c3d9b6f0eb78"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:469e35872a09c0e45fecf48dd960bfbce056b5db2d5e6b50eca329b4f853ae20"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bcbca3d40558789126da91d7ef9a7c87772ee107033db7191edefa34e2c7f1b4"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3d6bc90307c469cb9d28b7cfad90aaa600b10d67c6e22026869f585e1e8a2db0"},
|
||||||
{file = "ruff-0.15.0-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:9a121a96db1d75fa3eb39c4539e607f628920dd72ff1f7c5ee4f1b768ac62d6e"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e2f8a0bbcffcfd895df39c9a4ecd59bb80dca03dc43f7fb63e647ed176b741e"},
|
||||||
{file = "ruff-0.15.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5298d518e493061f2eabd4abd067c7e4fb89e2f63291c94332e35631c07c3662"},
|
{file = "ruff-0.14.3-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:678fdd7c7d2d94851597c23ee6336d25f9930b460b55f8598e011b57c74fd8c5"},
|
||||||
{file = "ruff-0.15.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afb6e603d6375ff0d6b0cee563fa21ab570fd15e65c852cb24922cef25050cf1"},
|
{file = "ruff-0.14.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1ec1ac071e7e37e0221d2f2dbaf90897a988c531a8592a6a5959f0603a1ecf5e"},
|
||||||
{file = "ruff-0.15.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:77e515f6b15f828b94dc17d2b4ace334c9ddb7d9468c54b2f9ed2b9c1593ef16"},
|
{file = "ruff-0.14.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:afcdc4b5335ef440d19e7df9e8ae2ad9f749352190e96d481dc501b753f0733e"},
|
||||||
{file = "ruff-0.15.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6f6e80850a01eb13b3e42ee0ebdf6e4497151b48c35051aab51c101266d187a3"},
|
{file = "ruff-0.14.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:7bfc42f81862749a7136267a343990f865e71fe2f99cf8d2958f684d23ce3dfa"},
|
||||||
{file = "ruff-0.15.0-py3-none-win32.whl", hash = "sha256:238a717ef803e501b6d51e0bdd0d2c6e8513fe9eec14002445134d3907cd46c3"},
|
{file = "ruff-0.14.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a65e448cfd7e9c59fae8cf37f9221585d3354febaad9a07f29158af1528e165f"},
|
||||||
{file = "ruff-0.15.0-py3-none-win_amd64.whl", hash = "sha256:dd5e4d3301dc01de614da3cdffc33d4b1b96fb89e45721f1598e5532ccf78b18"},
|
{file = "ruff-0.14.3-py3-none-win32.whl", hash = "sha256:f3d91857d023ba93e14ed2d462ab62c3428f9bbf2b4fbac50a03ca66d31991f7"},
|
||||||
{file = "ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a"},
|
{file = "ruff-0.14.3-py3-none-win_amd64.whl", hash = "sha256:d7b7006ac0756306db212fd37116cce2bd307e1e109375e1c6c106002df0ae5f"},
|
||||||
{file = "ruff-0.15.0.tar.gz", hash = "sha256:6bdea47cdbea30d40f8f8d7d69c0854ba7c15420ec75a26f463290949d7f7e9a"},
|
{file = "ruff-0.14.3-py3-none-win_arm64.whl", hash = "sha256:26eb477ede6d399d898791d01961e16b86f02bc2486d0d1a7a9bb2379d055dc1"},
|
||||||
|
{file = "ruff-0.14.3.tar.gz", hash = "sha256:4ff876d2ab2b161b6de0aa1f5bd714e8e9b4033dc122ee006925fbacc4f62153"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@ -1,30 +0,0 @@
|
|||||||
from pathlib import Path
|
|
||||||
from typing import Final
|
|
||||||
|
|
||||||
from dopt_basics import configs
|
|
||||||
from dopt_basics import io as io_
|
|
||||||
|
|
||||||
LIB_PATH: Final[Path] = Path(__file__).parent
|
|
||||||
|
|
||||||
# // database connections
|
|
||||||
p_cfg = io_.search_file_iterative(
|
|
||||||
starting_path=LIB_PATH,
|
|
||||||
glob_pattern="CRED*.toml",
|
|
||||||
stop_folder_name="umbreit-py",
|
|
||||||
)
|
|
||||||
if p_cfg is None:
|
|
||||||
raise FileNotFoundError("Config was not found")
|
|
||||||
|
|
||||||
CFG = configs.load_toml(p_cfg)
|
|
||||||
HOST = CFG["server"]["host"]
|
|
||||||
PORT = CFG["server"]["port"]
|
|
||||||
SERVICE = CFG["server"]["service"]
|
|
||||||
USER_NAME = CFG["user"]["name"]
|
|
||||||
USER_PASS = CFG["user"]["pass"]
|
|
||||||
|
|
||||||
# TODO remove or change
|
|
||||||
# ** Oracle client libs
|
|
||||||
USE_THICK_MODE: Final[bool] = False
|
|
||||||
P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29")
|
|
||||||
assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found"
|
|
||||||
assert P_ORACLE_CLIENT_LIBS.is_dir()
|
|
||||||
@ -178,8 +178,17 @@ EXT_AUFPAUF_schema_map: PolarsSchema = {
|
|||||||
EXT_AUFPAUF_null_values: PolarsNullValues = {}
|
EXT_AUFPAUF_null_values: PolarsNullValues = {}
|
||||||
|
|
||||||
# // queries and temp data
|
# // queries and temp data
|
||||||
raw_data_query_schema_map: PolarsSchema = ext_bedpbed_schema_map.copy()
|
raw_data_query_schema_map: PolarsSchema = {
|
||||||
raw_data_query_schema_map.update(ext_titel_info_schema_map)
|
"BEDARFNR": pl.UInt32,
|
||||||
|
"BEDP_SEQUENZ": pl.UInt32,
|
||||||
|
"BEDP_TITELNR": pl.UInt32,
|
||||||
|
"BEDP_MAN": pl.UInt8,
|
||||||
|
"BEDP_MENGE_BEDARF_VM": pl.UInt32,
|
||||||
|
"MELDENUMMER": pl.UInt8,
|
||||||
|
"VERLAGSNR": pl.UInt32,
|
||||||
|
"MENGE_VORMERKER": pl.UInt32,
|
||||||
|
"MANDFUEHR": pl.UInt8,
|
||||||
|
}
|
||||||
|
|
||||||
tmp_data = Table(
|
tmp_data = Table(
|
||||||
"EXT_TMP_BEDP_TINFO",
|
"EXT_TMP_BEDP_TINFO",
|
||||||
|
|||||||
@ -1,750 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
import uuid
|
|
||||||
from collections.abc import Sequence
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Final, Literal, Protocol, TypeAlias, overload
|
|
||||||
|
|
||||||
import dopt_basics.datetime as dt
|
|
||||||
import oracledb
|
|
||||||
import polars as pl
|
|
||||||
import polars.selectors as cs
|
|
||||||
import sqlalchemy as sql
|
|
||||||
|
|
||||||
from umbreit import constants, db, types
|
|
||||||
from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS
|
|
||||||
|
|
||||||
oracledb.defaults.arraysize = 1000
|
|
||||||
oracledb.defaults.prefetchrows = 1000
|
|
||||||
if constants.USE_THICK_MODE:
|
|
||||||
oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS))
|
|
||||||
|
|
||||||
|
|
||||||
def create_tmp_dir() -> Path:
|
|
||||||
tmp_pth = Path(tempfile.mkdtemp())
|
|
||||||
assert tmp_pth.exists()
|
|
||||||
return tmp_pth
|
|
||||||
|
|
||||||
|
|
||||||
def clear_tmp_dir() -> None:
|
|
||||||
shutil.rmtree(TMP_DIR)
|
|
||||||
TMP_DIR.mkdir()
|
|
||||||
|
|
||||||
|
|
||||||
def remove_tmp_dir() -> None:
|
|
||||||
shutil.rmtree(TMP_DIR)
|
|
||||||
|
|
||||||
|
|
||||||
TMP_DIR = create_tmp_dir()
|
|
||||||
|
|
||||||
CONN_STRING: Final[str] = (
|
|
||||||
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
|
||||||
)
|
|
||||||
engine = sql.create_engine(
|
|
||||||
CONN_STRING,
|
|
||||||
execution_options={"stream_results": True},
|
|
||||||
)
|
|
||||||
|
|
||||||
VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM"
|
|
||||||
MANDANT_CRITERION: Final[str] = "BEDP_MAN"
|
|
||||||
ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM"
|
|
||||||
RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys())
|
|
||||||
ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
|
|
||||||
SAVE_TMP_FILES: Final[bool] = True
|
|
||||||
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
|
|
||||||
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
|
|
||||||
|
|
||||||
|
|
||||||
def save_tmp_file(
|
|
||||||
data: pl.DataFrame,
|
|
||||||
filename: str | None,
|
|
||||||
) -> None:
|
|
||||||
if filename is None:
|
|
||||||
filename = str(uuid.uuid4())
|
|
||||||
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
|
||||||
|
|
||||||
n: int = 1
|
|
||||||
while pth.exists():
|
|
||||||
filename_new = pth.stem + f"_{n}"
|
|
||||||
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
|
|
||||||
n += 1
|
|
||||||
|
|
||||||
data.write_ipc(pth)
|
|
||||||
|
|
||||||
|
|
||||||
def load_tmp_file(
|
|
||||||
filename: str,
|
|
||||||
) -> pl.DataFrame:
|
|
||||||
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
|
||||||
if not pth.exists():
|
|
||||||
raise FileNotFoundError(f"File >{pth.name}< not found")
|
|
||||||
|
|
||||||
return pl.read_ipc(pth)
|
|
||||||
|
|
||||||
|
|
||||||
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
|
|
||||||
all_dfs: dict[str, pl.DataFrame] = {}
|
|
||||||
for file in TMP_DIR.glob("*.arrow"):
|
|
||||||
df = pl.read_ipc(file)
|
|
||||||
all_dfs[file.stem] = df
|
|
||||||
|
|
||||||
return all_dfs
|
|
||||||
|
|
||||||
|
|
||||||
def get_starting_date(
|
|
||||||
days_from_now: int,
|
|
||||||
) -> datetime.date:
|
|
||||||
current_dt = dt.current_time_tz(cut_microseconds=True)
|
|
||||||
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
|
|
||||||
|
|
||||||
return (current_dt - td).date()
|
|
||||||
|
|
||||||
|
|
||||||
def get_raw_data() -> pl.DataFrame:
|
|
||||||
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
|
|
||||||
stmt = sql.select(
|
|
||||||
db.ext_bedpbed.c.BEDARFNR,
|
|
||||||
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
|
||||||
db.ext_bedpbed.c.BEDP_TITELNR,
|
|
||||||
db.ext_bedpbed.c.BEDP_MAN,
|
|
||||||
sql.case(
|
|
||||||
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
|
|
||||||
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
|
||||||
).label("BEDP_MENGE_BEDARF_VM"),
|
|
||||||
db.ext_titel_info.c.MELDENUMMER,
|
|
||||||
db.ext_titel_info.c.VERLAGSNR,
|
|
||||||
db.ext_titel_info.c.MENGE_VORMERKER,
|
|
||||||
db.ext_titel_info.c.MANDFUEHR,
|
|
||||||
db.ext_titel_info.c.EINKAEUFER,
|
|
||||||
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
|
||||||
|
|
||||||
return pl.read_database(
|
|
||||||
stmt,
|
|
||||||
engine,
|
|
||||||
schema_overrides=db.raw_data_query_schema_map,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def save_tmp_data(df: pl.DataFrame) -> None:
|
|
||||||
with engine.begin() as conn:
|
|
||||||
conn.execute(sql.delete(db.tmp_data))
|
|
||||||
|
|
||||||
with engine.begin() as conn:
|
|
||||||
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
|
|
||||||
|
|
||||||
|
|
||||||
def get_tmp_data() -> pl.DataFrame:
|
|
||||||
return pl.read_database(
|
|
||||||
sql.select(db.tmp_data),
|
|
||||||
engine,
|
|
||||||
schema_overrides=db.tmp_data_schema_map,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_result_data() -> pl.DataFrame:
|
|
||||||
return pl.read_database(
|
|
||||||
sql.select(db.EXT_DOPT_ERGEBNIS),
|
|
||||||
engine,
|
|
||||||
schema_overrides=db.results_schema_map,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def save_result_data(results: pl.DataFrame) -> None:
|
|
||||||
with engine.begin() as conn:
|
|
||||||
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
|
|
||||||
|
|
||||||
|
|
||||||
def clear_result_data() -> None:
|
|
||||||
with engine.begin() as conn:
|
|
||||||
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
|
|
||||||
|
|
||||||
|
|
||||||
def save_result_data_native(results: pl.DataFrame) -> None:
|
|
||||||
results = results.with_columns(
|
|
||||||
[
|
|
||||||
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
|
|
||||||
for c in results.select(cs.boolean()).columns
|
|
||||||
]
|
|
||||||
)
|
|
||||||
stmt = """
|
|
||||||
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
|
|
||||||
"BEST_MENGE", "FREIGABE_AUTO")
|
|
||||||
VALUES (:1, :2, :3, :4, :5, :6)
|
|
||||||
"""
|
|
||||||
with engine.begin() as conn:
|
|
||||||
raw_conn = conn.connection.connection
|
|
||||||
with raw_conn.cursor() as cursor:
|
|
||||||
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
|
|
||||||
|
|
||||||
|
|
||||||
def _apply_several_filters(
|
|
||||||
df: pl.DataFrame,
|
|
||||||
filters: Sequence[pl.Expr],
|
|
||||||
) -> types.FilterResult:
|
|
||||||
df_current = df
|
|
||||||
removed_rows: list[pl.DataFrame] = []
|
|
||||||
|
|
||||||
for filter in filters:
|
|
||||||
removed = df_current.filter(~filter)
|
|
||||||
removed_rows.append(removed)
|
|
||||||
|
|
||||||
df_current = df_current.filter(filter)
|
|
||||||
|
|
||||||
df_removed = pl.concat(removed_rows)
|
|
||||||
|
|
||||||
return types.FilterResult(in_=df_current, out_=df_removed)
|
|
||||||
|
|
||||||
|
|
||||||
class PipelineResult:
|
|
||||||
__slots__ = ("_results", "_open", "_subtracted_indices")
|
|
||||||
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
data: pl.DataFrame,
|
|
||||||
) -> None:
|
|
||||||
self._open = data
|
|
||||||
schema = db.results_schema_map.copy()
|
|
||||||
del schema["ID"]
|
|
||||||
self._results = pl.DataFrame(schema=schema)
|
|
||||||
|
|
||||||
schema = {}
|
|
||||||
for col in self._index_cols:
|
|
||||||
schema[col] = db.raw_data_query_schema_map[col]
|
|
||||||
self._subtracted_indices = pl.DataFrame(schema=schema)
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
|
||||||
return len(self._results) + len(self._open)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def open(self) -> pl.DataFrame:
|
|
||||||
return self._open
|
|
||||||
|
|
||||||
@property
|
|
||||||
def results(self) -> pl.DataFrame:
|
|
||||||
return self._results
|
|
||||||
|
|
||||||
@property
|
|
||||||
def subtracted_indices(self) -> pl.DataFrame:
|
|
||||||
return self._subtracted_indices
|
|
||||||
|
|
||||||
def update_open(
|
|
||||||
self,
|
|
||||||
data: pl.DataFrame,
|
|
||||||
) -> None:
|
|
||||||
self._open = data
|
|
||||||
|
|
||||||
def _subtract_data(
|
|
||||||
self,
|
|
||||||
data: pl.DataFrame,
|
|
||||||
) -> None:
|
|
||||||
self._open = self._open.join(data, on=self._index_cols, how="anti")
|
|
||||||
self._subtracted_indices = pl.concat(
|
|
||||||
(self._subtracted_indices, data[self._index_cols])
|
|
||||||
)
|
|
||||||
|
|
||||||
def _add_results(
|
|
||||||
self,
|
|
||||||
data: pl.DataFrame,
|
|
||||||
) -> None:
|
|
||||||
res = pl.concat([self._results, data])
|
|
||||||
self._results = res
|
|
||||||
|
|
||||||
def merge_pipeline(
|
|
||||||
self,
|
|
||||||
pipeline: PipelineResult,
|
|
||||||
) -> None:
|
|
||||||
self._subtract_data(pipeline.subtracted_indices)
|
|
||||||
self._add_results(pipeline.results)
|
|
||||||
|
|
||||||
def write_results(
|
|
||||||
self,
|
|
||||||
data: pl.DataFrame,
|
|
||||||
vorlage: bool,
|
|
||||||
wf_id: types.Workflows,
|
|
||||||
freigabe_auto: types.Freigabe,
|
|
||||||
order_qty_expr: pl.Expr,
|
|
||||||
) -> None:
|
|
||||||
results = data.rename(db.map_data_to_result)
|
|
||||||
results = results.with_columns(
|
|
||||||
[
|
|
||||||
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
|
|
||||||
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
|
|
||||||
order_qty_expr,
|
|
||||||
pl.lit(freigabe_auto.value)
|
|
||||||
.alias("FREIGABE_AUTO")
|
|
||||||
.cast(db.results_schema_map["FREIGABE_AUTO"]),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
results = results.drop(
|
|
||||||
[
|
|
||||||
"BEDP_TITELNR",
|
|
||||||
"BEDP_MAN",
|
|
||||||
"BEDP_MENGE_BEDARF_VM",
|
|
||||||
"MELDENUMMER",
|
|
||||||
"VERLAGSNR",
|
|
||||||
"MENGE_VORMERKER",
|
|
||||||
"MANDFUEHR",
|
|
||||||
"EINKAEUFER",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
self._subtract_data(data)
|
|
||||||
self._add_results(results)
|
|
||||||
|
|
||||||
|
|
||||||
class ExprOrderQty(Protocol): ...
|
|
||||||
|
|
||||||
|
|
||||||
class ExprOrderQty_Base(ExprOrderQty, Protocol):
|
|
||||||
def __call__(self) -> pl.Expr: ...
|
|
||||||
|
|
||||||
|
|
||||||
ExprOrderQty_Base_Types: TypeAlias = (
|
|
||||||
Literal[types.Workflows.ID_200]
|
|
||||||
| Literal[types.Workflows.ID_900]
|
|
||||||
| Literal[types.Workflows.ID_910]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ExprOrderQty_WF100(ExprOrderQty, Protocol):
|
|
||||||
def __call__(self, empty: bool) -> pl.Expr: ...
|
|
||||||
|
|
||||||
|
|
||||||
@overload
|
|
||||||
def get_expr_order_qty(
|
|
||||||
wf_id: Literal[types.Workflows.ID_100],
|
|
||||||
) -> ExprOrderQty_WF100: ...
|
|
||||||
|
|
||||||
|
|
||||||
@overload
|
|
||||||
def get_expr_order_qty(
|
|
||||||
wf_id: ExprOrderQty_Base_Types,
|
|
||||||
) -> ExprOrderQty_Base: ...
|
|
||||||
|
|
||||||
|
|
||||||
def get_expr_order_qty(
|
|
||||||
wf_id: types.Workflows,
|
|
||||||
) -> ExprOrderQty:
|
|
||||||
empty_expr = (
|
|
||||||
pl.lit(0)
|
|
||||||
.alias(ORDER_QTY_CRIT)
|
|
||||||
.alias("BEST_MENGE")
|
|
||||||
.cast(db.results_schema_map["BEST_MENGE"])
|
|
||||||
)
|
|
||||||
|
|
||||||
def _empty() -> pl.Expr:
|
|
||||||
return empty_expr
|
|
||||||
|
|
||||||
func: ExprOrderQty
|
|
||||||
match wf_id:
|
|
||||||
case types.Workflows.ID_100:
|
|
||||||
|
|
||||||
def _func(empty: bool) -> pl.Expr:
|
|
||||||
order_qty_expr: pl.Expr
|
|
||||||
if empty:
|
|
||||||
order_qty_expr = empty_expr
|
|
||||||
else:
|
|
||||||
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
|
|
||||||
return order_qty_expr
|
|
||||||
|
|
||||||
func = _func
|
|
||||||
|
|
||||||
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
|
|
||||||
func = _empty
|
|
||||||
case _:
|
|
||||||
raise NotImplementedError(
|
|
||||||
f"Order expression for WF-ID {wf_id.value} is not implemented"
|
|
||||||
)
|
|
||||||
|
|
||||||
return func
|
|
||||||
|
|
||||||
|
|
||||||
def wf900(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
"""filter 'Meldenummer' and fill non-feasible entries"""
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
|
|
||||||
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer_null,
|
|
||||||
filter_mandant,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.out_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_900,
|
|
||||||
freigabe_auto=types.Freigabe.WF_900,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(),
|
|
||||||
)
|
|
||||||
|
|
||||||
pipe_result.update_open(
|
|
||||||
res.in_.with_columns(
|
|
||||||
pl.col("MENGE_VORMERKER").fill_null(0),
|
|
||||||
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def wf910(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
|
|
||||||
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
|
||||||
|
|
||||||
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.out_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_910,
|
|
||||||
freigabe_auto=types.Freigabe.WF_910,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(),
|
|
||||||
)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
# this a main routine:
|
|
||||||
# receives and gives back result objects
|
|
||||||
def wf100_umbreit(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
vm_criterion: str,
|
|
||||||
) -> PipelineResult:
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
|
||||||
|
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION) == 1
|
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
|
||||||
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def wf100_petersen(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
vm_criterion: str,
|
|
||||||
) -> PipelineResult:
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
|
||||||
# difference WDB and others
|
|
||||||
|
|
||||||
# // WDB branch
|
|
||||||
# order quantity 0, no further action in other WFs
|
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
||||||
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
||||||
filter_number_vm = pl.col(vm_criterion) == 0
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_WDB,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=True),
|
|
||||||
)
|
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
|
||||||
res_candidates = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_WDB,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
wdb_sub_pipe = PipelineResult(res_candidates.in_)
|
|
||||||
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
|
|
||||||
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
|
|
||||||
pipe_result.merge_pipeline(wdb_sub_pipe)
|
|
||||||
|
|
||||||
# // other branch
|
|
||||||
# show always entries with #VM > 1
|
|
||||||
filter_number_vm = pl.col(vm_criterion) > 1
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=True,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
|
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def _wf100_petersen_sub1_wdb(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
|
||||||
# input: pre-filtered entries (WDB titles and #VM > 0)
|
|
||||||
# more than 1 VM: show these entries
|
|
||||||
filter_number_vm = pl.col(VM_CRITERION) > 1
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(filter_number_vm,),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=True,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
|
|
||||||
# past 6 months
|
|
||||||
save_tmp_data(pipe_result.open)
|
|
||||||
RELEVANT_DATE = get_starting_date(180)
|
|
||||||
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
|
|
||||||
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
db.tmp_data,
|
|
||||||
db.EXT_BESPBES_INFO.c.BESP_MENGE,
|
|
||||||
db.EXT_BESPBES_INFO.c.BESP_STATUS,
|
|
||||||
)
|
|
||||||
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
|
|
||||||
.where(filter_)
|
|
||||||
)
|
|
||||||
sub1 = stmt.subquery()
|
|
||||||
|
|
||||||
count_col = sql.func.count()
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
count_col.label("count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
.having(count_col > 1)
|
|
||||||
)
|
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
count_col.label("count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
)
|
|
||||||
# !! this is a sub result which must be used in the result set
|
|
||||||
# !! for testing and feedback by the customer
|
|
||||||
relevant_titles = pl.read_database(
|
|
||||||
stmt,
|
|
||||||
engine,
|
|
||||||
)
|
|
||||||
|
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
|
|
||||||
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
|
|
||||||
|
|
||||||
entries_to_show = pipe_result.open.filter(
|
|
||||||
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=entries_to_show,
|
|
||||||
vorlage=True,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=pipe_result.open,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
|
||||||
)
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def wf200_umbreit(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
relevant_mnr: tuple[int, ...] = (17, 18)
|
|
||||||
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
|
||||||
filter_mandant = pl.col("BEDP_MAN") == 1
|
|
||||||
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(filter_meldenummer, filter_mandant),
|
|
||||||
)
|
|
||||||
sub_pipe = PipelineResult(res.in_)
|
|
||||||
sub_pipe = _wf200_sub1(sub_pipe)
|
|
||||||
assert sub_pipe.open.height == 0
|
|
||||||
pipe_result.merge_pipeline(sub_pipe)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def wf200_petersen(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
|
|
||||||
RELEVANT_MNR: tuple[int, ...] = (17, 18)
|
|
||||||
# // WDB branch
|
|
||||||
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
|
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
||||||
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_WDB,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
# ignore these
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_200,
|
|
||||||
freigabe_auto=types.Freigabe.WF_200,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(),
|
|
||||||
)
|
|
||||||
# // other branch
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
sub_pipe = PipelineResult(res.in_)
|
|
||||||
sub_pipe = _wf200_sub1(sub_pipe)
|
|
||||||
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
|
|
||||||
pipe_result.merge_pipeline(sub_pipe)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
|
|
||||||
|
|
||||||
def _wf200_sub1(
|
|
||||||
pipe_result: PipelineResult,
|
|
||||||
) -> PipelineResult:
|
|
||||||
save_tmp_data(pipe_result.open)
|
|
||||||
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
|
|
||||||
RELEVANT_DATE = get_starting_date(90)
|
|
||||||
|
|
||||||
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
|
|
||||||
filter_ = sql.and_(
|
|
||||||
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
|
|
||||||
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
|
|
||||||
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
|
|
||||||
)
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
db.tmp_data,
|
|
||||||
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
|
|
||||||
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
|
|
||||||
)
|
|
||||||
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
|
|
||||||
.where(filter_)
|
|
||||||
)
|
|
||||||
sub1 = stmt.subquery()
|
|
||||||
|
|
||||||
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
sql.func.count().label("count"),
|
|
||||||
unique_count_col.label("customer_count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
.having(unique_count_col >= 3)
|
|
||||||
)
|
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
stmt = (
|
|
||||||
sql.select(
|
|
||||||
sub1.c.BEDP_TITELNR,
|
|
||||||
sql.func.count().label("count"),
|
|
||||||
unique_count_col.label("customer_count"),
|
|
||||||
)
|
|
||||||
.select_from(sub1)
|
|
||||||
.group_by(sub1.c.BEDP_TITELNR)
|
|
||||||
)
|
|
||||||
# !! this is a sub result which must be used in the result set
|
|
||||||
# !! for testing and feedback by the customer
|
|
||||||
relevant_titles = pl.read_database(
|
|
||||||
stmt,
|
|
||||||
engine,
|
|
||||||
)
|
|
||||||
|
|
||||||
if SAVE_TMP_FILES:
|
|
||||||
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
|
|
||||||
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
|
|
||||||
|
|
||||||
entries_to_show = pipe_result.open.filter(
|
|
||||||
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
|
||||||
)
|
|
||||||
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=entries_to_show,
|
|
||||||
vorlage=True,
|
|
||||||
wf_id=types.Workflows.ID_200,
|
|
||||||
freigabe_auto=types.Freigabe.WF_200,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(),
|
|
||||||
)
|
|
||||||
pipe_result.write_results(
|
|
||||||
data=pipe_result.open,
|
|
||||||
vorlage=False,
|
|
||||||
wf_id=types.Workflows.ID_200,
|
|
||||||
freigabe_auto=types.Freigabe.WF_200,
|
|
||||||
order_qty_expr=ORDER_QTY_FUNC(),
|
|
||||||
)
|
|
||||||
|
|
||||||
return pipe_result
|
|
||||||
Loading…
x
Reference in New Issue
Block a user