generated from dopt-python/py311
begin refactoring
This commit is contained in:
parent
fa659c37bd
commit
c244285c4d
@ -758,7 +758,7 @@ def wf900(
|
|||||||
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
||||||
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
pipe_res.open,
|
pipe_result.open,
|
||||||
(
|
(
|
||||||
filter_meldenummer_null,
|
filter_meldenummer_null,
|
||||||
filter_mandant,
|
filter_mandant,
|
||||||
|
|||||||
30
src/umbreit/constants.py
Normal file
30
src/umbreit/constants.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
from dopt_basics import configs
|
||||||
|
from dopt_basics import io as io_
|
||||||
|
|
||||||
|
LIB_PATH: Final[Path] = Path(__file__).parent
|
||||||
|
|
||||||
|
# // database connections
|
||||||
|
p_cfg = io_.search_file_iterative(
|
||||||
|
starting_path=LIB_PATH,
|
||||||
|
glob_pattern="CRED*.toml",
|
||||||
|
stop_folder_name="umbreit-py",
|
||||||
|
)
|
||||||
|
if p_cfg is None:
|
||||||
|
raise FileNotFoundError("Config was not found")
|
||||||
|
|
||||||
|
CFG = configs.load_toml(p_cfg)
|
||||||
|
HOST = CFG["server"]["host"]
|
||||||
|
PORT = CFG["server"]["port"]
|
||||||
|
SERVICE = CFG["server"]["service"]
|
||||||
|
USER_NAME = CFG["user"]["name"]
|
||||||
|
USER_PASS = CFG["user"]["pass"]
|
||||||
|
|
||||||
|
# TODO remove or change
|
||||||
|
# ** Oracle client libs
|
||||||
|
USE_THICK_MODE: Final[bool] = False
|
||||||
|
P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29")
|
||||||
|
assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found"
|
||||||
|
assert P_ORACLE_CLIENT_LIBS.is_dir()
|
||||||
750
src/umbreit/pipeline.py
Normal file
750
src/umbreit/pipeline.py
Normal file
@ -0,0 +1,750 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import uuid
|
||||||
|
from collections.abc import Sequence
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Final, Literal, Protocol, TypeAlias, overload
|
||||||
|
|
||||||
|
import dopt_basics.datetime as dt
|
||||||
|
import oracledb
|
||||||
|
import polars as pl
|
||||||
|
import polars.selectors as cs
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
from umbreit import constants, db, types
|
||||||
|
from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS
|
||||||
|
|
||||||
|
oracledb.defaults.arraysize = 1000
|
||||||
|
oracledb.defaults.prefetchrows = 1000
|
||||||
|
if constants.USE_THICK_MODE:
|
||||||
|
oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS))
|
||||||
|
|
||||||
|
|
||||||
|
def create_tmp_dir() -> Path:
|
||||||
|
tmp_pth = Path(tempfile.mkdtemp())
|
||||||
|
assert tmp_pth.exists()
|
||||||
|
return tmp_pth
|
||||||
|
|
||||||
|
|
||||||
|
def clear_tmp_dir() -> None:
|
||||||
|
shutil.rmtree(TMP_DIR)
|
||||||
|
TMP_DIR.mkdir()
|
||||||
|
|
||||||
|
|
||||||
|
def remove_tmp_dir() -> None:
|
||||||
|
shutil.rmtree(TMP_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
TMP_DIR = create_tmp_dir()
|
||||||
|
|
||||||
|
CONN_STRING: Final[str] = (
|
||||||
|
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
||||||
|
)
|
||||||
|
engine = sql.create_engine(
|
||||||
|
CONN_STRING,
|
||||||
|
execution_options={"stream_results": True},
|
||||||
|
)
|
||||||
|
|
||||||
|
VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM"
|
||||||
|
MANDANT_CRITERION: Final[str] = "BEDP_MAN"
|
||||||
|
ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM"
|
||||||
|
RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys())
|
||||||
|
ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
|
||||||
|
SAVE_TMP_FILES: Final[bool] = True
|
||||||
|
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
|
||||||
|
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
|
||||||
|
|
||||||
|
|
||||||
|
def save_tmp_file(
|
||||||
|
data: pl.DataFrame,
|
||||||
|
filename: str | None,
|
||||||
|
) -> None:
|
||||||
|
if filename is None:
|
||||||
|
filename = str(uuid.uuid4())
|
||||||
|
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
||||||
|
|
||||||
|
n: int = 1
|
||||||
|
while pth.exists():
|
||||||
|
filename_new = pth.stem + f"_{n}"
|
||||||
|
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
|
||||||
|
n += 1
|
||||||
|
|
||||||
|
data.write_ipc(pth)
|
||||||
|
|
||||||
|
|
||||||
|
def load_tmp_file(
|
||||||
|
filename: str,
|
||||||
|
) -> pl.DataFrame:
|
||||||
|
pth = (TMP_DIR / filename).with_suffix(".arrow")
|
||||||
|
if not pth.exists():
|
||||||
|
raise FileNotFoundError(f"File >{pth.name}< not found")
|
||||||
|
|
||||||
|
return pl.read_ipc(pth)
|
||||||
|
|
||||||
|
|
||||||
|
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
|
||||||
|
all_dfs: dict[str, pl.DataFrame] = {}
|
||||||
|
for file in TMP_DIR.glob("*.arrow"):
|
||||||
|
df = pl.read_ipc(file)
|
||||||
|
all_dfs[file.stem] = df
|
||||||
|
|
||||||
|
return all_dfs
|
||||||
|
|
||||||
|
|
||||||
|
def get_starting_date(
|
||||||
|
days_from_now: int,
|
||||||
|
) -> datetime.date:
|
||||||
|
current_dt = dt.current_time_tz(cut_microseconds=True)
|
||||||
|
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
|
||||||
|
|
||||||
|
return (current_dt - td).date()
|
||||||
|
|
||||||
|
|
||||||
|
def get_raw_data() -> pl.DataFrame:
|
||||||
|
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
|
||||||
|
stmt = sql.select(
|
||||||
|
db.ext_bedpbed.c.BEDARFNR,
|
||||||
|
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
||||||
|
db.ext_bedpbed.c.BEDP_TITELNR,
|
||||||
|
db.ext_bedpbed.c.BEDP_MAN,
|
||||||
|
sql.case(
|
||||||
|
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
|
||||||
|
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
||||||
|
).label("BEDP_MENGE_BEDARF_VM"),
|
||||||
|
db.ext_titel_info.c.MELDENUMMER,
|
||||||
|
db.ext_titel_info.c.VERLAGSNR,
|
||||||
|
db.ext_titel_info.c.MENGE_VORMERKER,
|
||||||
|
db.ext_titel_info.c.MANDFUEHR,
|
||||||
|
db.ext_titel_info.c.EINKAEUFER,
|
||||||
|
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
||||||
|
|
||||||
|
return pl.read_database(
|
||||||
|
stmt,
|
||||||
|
engine,
|
||||||
|
schema_overrides=db.raw_data_query_schema_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def save_tmp_data(df: pl.DataFrame) -> None:
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(sql.delete(db.tmp_data))
|
||||||
|
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
|
||||||
|
|
||||||
|
|
||||||
|
def get_tmp_data() -> pl.DataFrame:
|
||||||
|
return pl.read_database(
|
||||||
|
sql.select(db.tmp_data),
|
||||||
|
engine,
|
||||||
|
schema_overrides=db.tmp_data_schema_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_result_data() -> pl.DataFrame:
|
||||||
|
return pl.read_database(
|
||||||
|
sql.select(db.EXT_DOPT_ERGEBNIS),
|
||||||
|
engine,
|
||||||
|
schema_overrides=db.results_schema_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def save_result_data(results: pl.DataFrame) -> None:
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
|
||||||
|
|
||||||
|
|
||||||
|
def clear_result_data() -> None:
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
|
||||||
|
|
||||||
|
|
||||||
|
def save_result_data_native(results: pl.DataFrame) -> None:
|
||||||
|
results = results.with_columns(
|
||||||
|
[
|
||||||
|
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
|
||||||
|
for c in results.select(cs.boolean()).columns
|
||||||
|
]
|
||||||
|
)
|
||||||
|
stmt = """
|
||||||
|
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
|
||||||
|
"BEST_MENGE", "FREIGABE_AUTO")
|
||||||
|
VALUES (:1, :2, :3, :4, :5, :6)
|
||||||
|
"""
|
||||||
|
with engine.begin() as conn:
|
||||||
|
raw_conn = conn.connection.connection
|
||||||
|
with raw_conn.cursor() as cursor:
|
||||||
|
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_several_filters(
|
||||||
|
df: pl.DataFrame,
|
||||||
|
filters: Sequence[pl.Expr],
|
||||||
|
) -> types.FilterResult:
|
||||||
|
df_current = df
|
||||||
|
removed_rows: list[pl.DataFrame] = []
|
||||||
|
|
||||||
|
for filter in filters:
|
||||||
|
removed = df_current.filter(~filter)
|
||||||
|
removed_rows.append(removed)
|
||||||
|
|
||||||
|
df_current = df_current.filter(filter)
|
||||||
|
|
||||||
|
df_removed = pl.concat(removed_rows)
|
||||||
|
|
||||||
|
return types.FilterResult(in_=df_current, out_=df_removed)
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineResult:
|
||||||
|
__slots__ = ("_results", "_open", "_subtracted_indices")
|
||||||
|
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = data
|
||||||
|
schema = db.results_schema_map.copy()
|
||||||
|
del schema["ID"]
|
||||||
|
self._results = pl.DataFrame(schema=schema)
|
||||||
|
|
||||||
|
schema = {}
|
||||||
|
for col in self._index_cols:
|
||||||
|
schema[col] = db.raw_data_query_schema_map[col]
|
||||||
|
self._subtracted_indices = pl.DataFrame(schema=schema)
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._results) + len(self._open)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def open(self) -> pl.DataFrame:
|
||||||
|
return self._open
|
||||||
|
|
||||||
|
@property
|
||||||
|
def results(self) -> pl.DataFrame:
|
||||||
|
return self._results
|
||||||
|
|
||||||
|
@property
|
||||||
|
def subtracted_indices(self) -> pl.DataFrame:
|
||||||
|
return self._subtracted_indices
|
||||||
|
|
||||||
|
def update_open(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = data
|
||||||
|
|
||||||
|
def _subtract_data(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = self._open.join(data, on=self._index_cols, how="anti")
|
||||||
|
self._subtracted_indices = pl.concat(
|
||||||
|
(self._subtracted_indices, data[self._index_cols])
|
||||||
|
)
|
||||||
|
|
||||||
|
def _add_results(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
res = pl.concat([self._results, data])
|
||||||
|
self._results = res
|
||||||
|
|
||||||
|
def merge_pipeline(
|
||||||
|
self,
|
||||||
|
pipeline: PipelineResult,
|
||||||
|
) -> None:
|
||||||
|
self._subtract_data(pipeline.subtracted_indices)
|
||||||
|
self._add_results(pipeline.results)
|
||||||
|
|
||||||
|
def write_results(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
vorlage: bool,
|
||||||
|
wf_id: types.Workflows,
|
||||||
|
freigabe_auto: types.Freigabe,
|
||||||
|
order_qty_expr: pl.Expr,
|
||||||
|
) -> None:
|
||||||
|
results = data.rename(db.map_data_to_result)
|
||||||
|
results = results.with_columns(
|
||||||
|
[
|
||||||
|
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
|
||||||
|
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
|
||||||
|
order_qty_expr,
|
||||||
|
pl.lit(freigabe_auto.value)
|
||||||
|
.alias("FREIGABE_AUTO")
|
||||||
|
.cast(db.results_schema_map["FREIGABE_AUTO"]),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
results = results.drop(
|
||||||
|
[
|
||||||
|
"BEDP_TITELNR",
|
||||||
|
"BEDP_MAN",
|
||||||
|
"BEDP_MENGE_BEDARF_VM",
|
||||||
|
"MELDENUMMER",
|
||||||
|
"VERLAGSNR",
|
||||||
|
"MENGE_VORMERKER",
|
||||||
|
"MANDFUEHR",
|
||||||
|
"EINKAEUFER",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
self._subtract_data(data)
|
||||||
|
self._add_results(results)
|
||||||
|
|
||||||
|
|
||||||
|
class ExprOrderQty(Protocol): ...
|
||||||
|
|
||||||
|
|
||||||
|
class ExprOrderQty_Base(ExprOrderQty, Protocol):
|
||||||
|
def __call__(self) -> pl.Expr: ...
|
||||||
|
|
||||||
|
|
||||||
|
ExprOrderQty_Base_Types: TypeAlias = (
|
||||||
|
Literal[types.Workflows.ID_200]
|
||||||
|
| Literal[types.Workflows.ID_900]
|
||||||
|
| Literal[types.Workflows.ID_910]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ExprOrderQty_WF100(ExprOrderQty, Protocol):
|
||||||
|
def __call__(self, empty: bool) -> pl.Expr: ...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get_expr_order_qty(
|
||||||
|
wf_id: Literal[types.Workflows.ID_100],
|
||||||
|
) -> ExprOrderQty_WF100: ...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get_expr_order_qty(
|
||||||
|
wf_id: ExprOrderQty_Base_Types,
|
||||||
|
) -> ExprOrderQty_Base: ...
|
||||||
|
|
||||||
|
|
||||||
|
def get_expr_order_qty(
|
||||||
|
wf_id: types.Workflows,
|
||||||
|
) -> ExprOrderQty:
|
||||||
|
empty_expr = (
|
||||||
|
pl.lit(0)
|
||||||
|
.alias(ORDER_QTY_CRIT)
|
||||||
|
.alias("BEST_MENGE")
|
||||||
|
.cast(db.results_schema_map["BEST_MENGE"])
|
||||||
|
)
|
||||||
|
|
||||||
|
def _empty() -> pl.Expr:
|
||||||
|
return empty_expr
|
||||||
|
|
||||||
|
func: ExprOrderQty
|
||||||
|
match wf_id:
|
||||||
|
case types.Workflows.ID_100:
|
||||||
|
|
||||||
|
def _func(empty: bool) -> pl.Expr:
|
||||||
|
order_qty_expr: pl.Expr
|
||||||
|
if empty:
|
||||||
|
order_qty_expr = empty_expr
|
||||||
|
else:
|
||||||
|
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
|
||||||
|
return order_qty_expr
|
||||||
|
|
||||||
|
func = _func
|
||||||
|
|
||||||
|
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
|
||||||
|
func = _empty
|
||||||
|
case _:
|
||||||
|
raise NotImplementedError(
|
||||||
|
f"Order expression for WF-ID {wf_id.value} is not implemented"
|
||||||
|
)
|
||||||
|
|
||||||
|
return func
|
||||||
|
|
||||||
|
|
||||||
|
def wf900(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
"""filter 'Meldenummer' and fill non-feasible entries"""
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
|
||||||
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer_null,
|
||||||
|
filter_mandant,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.out_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_900,
|
||||||
|
freigabe_auto=types.Freigabe.WF_900,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(),
|
||||||
|
)
|
||||||
|
|
||||||
|
pipe_result.update_open(
|
||||||
|
res.in_.with_columns(
|
||||||
|
pl.col("MENGE_VORMERKER").fill_null(0),
|
||||||
|
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def wf910(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
|
||||||
|
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
||||||
|
|
||||||
|
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.out_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_910,
|
||||||
|
freigabe_auto=types.Freigabe.WF_910,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(),
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
# this a main routine:
|
||||||
|
# receives and gives back result objects
|
||||||
|
def wf100_umbreit(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
vm_criterion: str,
|
||||||
|
) -> PipelineResult:
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
||||||
|
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION) == 1
|
||||||
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def wf100_petersen(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
vm_criterion: str,
|
||||||
|
) -> PipelineResult:
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
||||||
|
# difference WDB and others
|
||||||
|
|
||||||
|
# // WDB branch
|
||||||
|
# order quantity 0, no further action in other WFs
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
||||||
|
filter_number_vm = pl.col(vm_criterion) == 0
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_WDB,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=True),
|
||||||
|
)
|
||||||
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
res_candidates = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_WDB,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
wdb_sub_pipe = PipelineResult(res_candidates.in_)
|
||||||
|
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
|
||||||
|
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
|
||||||
|
pipe_result.merge_pipeline(wdb_sub_pipe)
|
||||||
|
|
||||||
|
# // other branch
|
||||||
|
# show always entries with #VM > 1
|
||||||
|
filter_number_vm = pl.col(vm_criterion) > 1
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def _wf100_petersen_sub1_wdb(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
|
||||||
|
# input: pre-filtered entries (WDB titles and #VM > 0)
|
||||||
|
# more than 1 VM: show these entries
|
||||||
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(filter_number_vm,),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
|
||||||
|
# past 6 months
|
||||||
|
save_tmp_data(pipe_result.open)
|
||||||
|
RELEVANT_DATE = get_starting_date(180)
|
||||||
|
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
|
||||||
|
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
db.tmp_data,
|
||||||
|
db.EXT_BESPBES_INFO.c.BESP_MENGE,
|
||||||
|
db.EXT_BESPBES_INFO.c.BESP_STATUS,
|
||||||
|
)
|
||||||
|
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
|
||||||
|
.where(filter_)
|
||||||
|
)
|
||||||
|
sub1 = stmt.subquery()
|
||||||
|
|
||||||
|
count_col = sql.func.count()
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
sub1.c.BEDP_TITELNR,
|
||||||
|
count_col.label("count"),
|
||||||
|
)
|
||||||
|
.select_from(sub1)
|
||||||
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
|
.having(count_col > 1)
|
||||||
|
)
|
||||||
|
if SAVE_TMP_FILES:
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
sub1.c.BEDP_TITELNR,
|
||||||
|
count_col.label("count"),
|
||||||
|
)
|
||||||
|
.select_from(sub1)
|
||||||
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
|
)
|
||||||
|
# !! this is a sub result which must be used in the result set
|
||||||
|
# !! for testing and feedback by the customer
|
||||||
|
relevant_titles = pl.read_database(
|
||||||
|
stmt,
|
||||||
|
engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
if SAVE_TMP_FILES:
|
||||||
|
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
|
||||||
|
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
|
||||||
|
|
||||||
|
entries_to_show = pipe_result.open.filter(
|
||||||
|
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=entries_to_show,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=pipe_result.open,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(empty=False),
|
||||||
|
)
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def wf200_umbreit(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
relevant_mnr: tuple[int, ...] = (17, 18)
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
||||||
|
filter_mandant = pl.col("BEDP_MAN") == 1
|
||||||
|
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(filter_meldenummer, filter_mandant),
|
||||||
|
)
|
||||||
|
sub_pipe = PipelineResult(res.in_)
|
||||||
|
sub_pipe = _wf200_sub1(sub_pipe)
|
||||||
|
assert sub_pipe.open.height == 0
|
||||||
|
pipe_result.merge_pipeline(sub_pipe)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def wf200_petersen(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
|
||||||
|
RELEVANT_MNR: tuple[int, ...] = (17, 18)
|
||||||
|
# // WDB branch
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_WDB,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
# ignore these
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_200,
|
||||||
|
freigabe_auto=types.Freigabe.WF_200,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(),
|
||||||
|
)
|
||||||
|
# // other branch
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
sub_pipe = PipelineResult(res.in_)
|
||||||
|
sub_pipe = _wf200_sub1(sub_pipe)
|
||||||
|
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
|
||||||
|
pipe_result.merge_pipeline(sub_pipe)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def _wf200_sub1(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
save_tmp_data(pipe_result.open)
|
||||||
|
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
|
||||||
|
RELEVANT_DATE = get_starting_date(90)
|
||||||
|
|
||||||
|
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
|
||||||
|
filter_ = sql.and_(
|
||||||
|
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
|
||||||
|
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
|
||||||
|
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
|
||||||
|
)
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
db.tmp_data,
|
||||||
|
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
|
||||||
|
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
|
||||||
|
)
|
||||||
|
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
|
||||||
|
.where(filter_)
|
||||||
|
)
|
||||||
|
sub1 = stmt.subquery()
|
||||||
|
|
||||||
|
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
sub1.c.BEDP_TITELNR,
|
||||||
|
sql.func.count().label("count"),
|
||||||
|
unique_count_col.label("customer_count"),
|
||||||
|
)
|
||||||
|
.select_from(sub1)
|
||||||
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
|
.having(unique_count_col >= 3)
|
||||||
|
)
|
||||||
|
if SAVE_TMP_FILES:
|
||||||
|
stmt = (
|
||||||
|
sql.select(
|
||||||
|
sub1.c.BEDP_TITELNR,
|
||||||
|
sql.func.count().label("count"),
|
||||||
|
unique_count_col.label("customer_count"),
|
||||||
|
)
|
||||||
|
.select_from(sub1)
|
||||||
|
.group_by(sub1.c.BEDP_TITELNR)
|
||||||
|
)
|
||||||
|
# !! this is a sub result which must be used in the result set
|
||||||
|
# !! for testing and feedback by the customer
|
||||||
|
relevant_titles = pl.read_database(
|
||||||
|
stmt,
|
||||||
|
engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
if SAVE_TMP_FILES:
|
||||||
|
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
|
||||||
|
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
|
||||||
|
|
||||||
|
entries_to_show = pipe_result.open.filter(
|
||||||
|
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
|
||||||
|
)
|
||||||
|
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=entries_to_show,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=types.Workflows.ID_200,
|
||||||
|
freigabe_auto=types.Freigabe.WF_200,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=pipe_result.open,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=types.Workflows.ID_200,
|
||||||
|
freigabe_auto=types.Freigabe.WF_200,
|
||||||
|
order_qty_expr=ORDER_QTY_FUNC(),
|
||||||
|
)
|
||||||
|
|
||||||
|
return pipe_result
|
||||||
Loading…
x
Reference in New Issue
Block a user