generated from dopt-python/py311
adapt architecture for pipelines, add WF-100 Petersen requirements
This commit is contained in:
parent
759010993f
commit
f98a2e2829
@ -1,4 +1,7 @@
|
|||||||
# %%
|
# %%
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
@ -7,9 +10,11 @@ from pathlib import Path
|
|||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
|
|
||||||
import dopt_basics.datetime as dt
|
import dopt_basics.datetime as dt
|
||||||
|
import oracledb
|
||||||
import polars as pl
|
import polars as pl
|
||||||
import sqlalchemy as sql
|
import sqlalchemy as sql
|
||||||
from dopt_basics import configs, io
|
from dopt_basics import configs, io
|
||||||
|
from sqlalchemy import event
|
||||||
|
|
||||||
from umbreit import db, types
|
from umbreit import db, types
|
||||||
|
|
||||||
@ -34,18 +39,24 @@ USER_NAME = CFG["user"]["name"]
|
|||||||
USER_PASS = CFG["user"]["pass"]
|
USER_PASS = CFG["user"]["pass"]
|
||||||
# %%
|
# %%
|
||||||
# !! init thick mode
|
# !! init thick mode
|
||||||
# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29")
|
p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29")
|
||||||
# assert p_oracle_client.exists()
|
assert p_oracle_client.exists()
|
||||||
# assert p_oracle_client.is_dir()
|
assert p_oracle_client.is_dir()
|
||||||
# oracledb.init_oracle_client(lib_dir=str(p_oracle_client))
|
oracledb.init_oracle_client(lib_dir=str(p_oracle_client))
|
||||||
# %%
|
|
||||||
types.Freigabe.WF_100.value
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
conn_string = (
|
conn_string = (
|
||||||
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
||||||
)
|
)
|
||||||
engine = sql.create_engine(conn_string)
|
# engine = sql.create_engine(conn_string)
|
||||||
|
engine = sql.create_engine(conn_string, execution_options={"stream_results": True})
|
||||||
|
|
||||||
|
|
||||||
|
@event.listens_for(engine, "after_cursor_execute")
|
||||||
|
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
|
||||||
|
cursor.arraysize = 1000
|
||||||
|
cursor.prefetchrows = 1000
|
||||||
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
########### RESULTS ###########
|
########### RESULTS ###########
|
||||||
# temporary
|
# temporary
|
||||||
@ -129,35 +140,36 @@ df_order
|
|||||||
|
|
||||||
# prefilter amount columns for invalid entries
|
# prefilter amount columns for invalid entries
|
||||||
|
|
||||||
print("--------------- ext_bedpbed --------------")
|
# // tests with ext_bedpbed
|
||||||
t1 = time.perf_counter()
|
# print("--------------- ext_bedpbed --------------")
|
||||||
AMOUNT_COLS = frozenset(
|
# t1 = time.perf_counter()
|
||||||
(
|
# AMOUNT_COLS = frozenset(
|
||||||
"BEDP_MENGE_BEDARF",
|
# (
|
||||||
"BEDP_MENGE_VERKAUF",
|
# "BEDP_MENGE_BEDARF",
|
||||||
"BEDP_MENGE_ANFRAGE",
|
# "BEDP_MENGE_VERKAUF",
|
||||||
"BEDP_MENGE_BESTELLUNG",
|
# "BEDP_MENGE_ANFRAGE",
|
||||||
"BEDP_MENGE_FREI",
|
# "BEDP_MENGE_BESTELLUNG",
|
||||||
"BEDP_MENGE_BEDARF_VM",
|
# "BEDP_MENGE_FREI",
|
||||||
)
|
# "BEDP_MENGE_BEDARF_VM",
|
||||||
)
|
# )
|
||||||
|
# )
|
||||||
|
|
||||||
case_stmts = []
|
# case_stmts = []
|
||||||
for col in AMOUNT_COLS:
|
# for col in AMOUNT_COLS:
|
||||||
case_stmts.append(
|
# case_stmts.append(
|
||||||
sql.case(
|
# sql.case(
|
||||||
(db.ext_bedpbed.c[col] <= -1, sql.null()),
|
# (db.ext_bedpbed.c[col] <= -1, sql.null()),
|
||||||
else_=db.ext_bedpbed.c[col],
|
# else_=db.ext_bedpbed.c[col],
|
||||||
).label(col)
|
# ).label(col)
|
||||||
)
|
# )
|
||||||
|
|
||||||
stmt = sql.select(
|
# stmt = sql.select(
|
||||||
*[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS],
|
# *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS],
|
||||||
*case_stmts,
|
# *case_stmts,
|
||||||
)
|
# )
|
||||||
df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
|
# df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
|
||||||
t2 = time.perf_counter()
|
# t2 = time.perf_counter()
|
||||||
elapsed = t2 - t1
|
# elapsed = t2 - t1
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
|
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
|
||||||
@ -217,7 +229,7 @@ df.head()
|
|||||||
# %%
|
# %%
|
||||||
# // NO LIVE DATA NEEDED
|
# // NO LIVE DATA NEEDED
|
||||||
# SAVING/LOADING
|
# SAVING/LOADING
|
||||||
p_save = Path.cwd() / "raw_data_from_sql_query_20251211-1.arrow"
|
p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow"
|
||||||
# df.write_ipc(p_save)
|
# df.write_ipc(p_save)
|
||||||
df = pl.read_ipc(p_save)
|
df = pl.read_ipc(p_save)
|
||||||
# %%
|
# %%
|
||||||
@ -336,6 +348,16 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18)))
|
|||||||
# %%
|
# %%
|
||||||
# VM_CRITERION = "MENGE_VORMERKER"
|
# VM_CRITERION = "MENGE_VORMERKER"
|
||||||
VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
|
VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
|
||||||
|
MANDANT_CRITERION = "BEDP_MAN"
|
||||||
|
|
||||||
|
|
||||||
|
def get_starting_date(
|
||||||
|
days: int,
|
||||||
|
) -> datetime.date:
|
||||||
|
current_dt = dt.current_time_tz(cut_microseconds=True)
|
||||||
|
td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS)
|
||||||
|
|
||||||
|
return (current_dt - td).date()
|
||||||
|
|
||||||
|
|
||||||
# TODO exchange to new query focusing on TINFO table
|
# TODO exchange to new query focusing on TINFO table
|
||||||
@ -364,13 +386,10 @@ def get_raw_data() -> pl.DataFrame:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_empty_pipeline_result(
|
# def get_empty_pipeline_result(
|
||||||
data: pl.DataFrame,
|
# data: pl.DataFrame,
|
||||||
) -> types.PipelineResult:
|
# ) -> PipelineResult:
|
||||||
schema = db.results_schema_map.copy()
|
# return PipelineResult(data)
|
||||||
del schema["id"]
|
|
||||||
results = pl.DataFrame(schema=schema)
|
|
||||||
return types.PipelineResult(results=results, open=data)
|
|
||||||
|
|
||||||
|
|
||||||
def _apply_several_filters(
|
def _apply_several_filters(
|
||||||
@ -391,20 +410,107 @@ def _apply_several_filters(
|
|||||||
return types.FilterResult(in_=df_current, out_=df_removed)
|
return types.FilterResult(in_=df_current, out_=df_removed)
|
||||||
|
|
||||||
|
|
||||||
# post-processing the results
|
class PipelineResult:
|
||||||
# TODO: order quantity not always necessary
|
__slots__ = ("_results", "_open", "_subtracted_indices")
|
||||||
# TODO: change relevant criterion for order quantity
|
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
|
||||||
def _write_results(
|
|
||||||
results_table: pl.DataFrame,
|
def __init__(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = data
|
||||||
|
schema = db.results_schema_map.copy()
|
||||||
|
del schema["id"]
|
||||||
|
self._results = pl.DataFrame(schema=schema)
|
||||||
|
|
||||||
|
schema = {}
|
||||||
|
for col in self._index_cols:
|
||||||
|
schema[col] = db.raw_data_query_schema_map[col]
|
||||||
|
self._subtracted_indices = pl.DataFrame(schema=schema)
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._results) + len(self._open)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def open(self) -> pl.DataFrame:
|
||||||
|
return self._open
|
||||||
|
|
||||||
|
@property
|
||||||
|
def results(self) -> pl.DataFrame:
|
||||||
|
return self._results
|
||||||
|
|
||||||
|
@property
|
||||||
|
def subtracted_indices(self) -> pl.DataFrame:
|
||||||
|
return self._subtracted_indices
|
||||||
|
|
||||||
|
def update_open(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = data
|
||||||
|
|
||||||
|
def _subtract_data(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._open = self._open.join(data, on=self._index_cols, how="anti")
|
||||||
|
self._subtracted_indices = pl.concat(
|
||||||
|
(self._subtracted_indices, data[self._index_cols])
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO remove
|
||||||
|
# def _subtract_from_open(
|
||||||
|
# self,
|
||||||
|
# data: pl.DataFrame,
|
||||||
|
# ) -> None:
|
||||||
|
# self._open = self._open.join(data, on=self._index_cols, how="anti")
|
||||||
|
# self._subtracted_indices = pl.concat(
|
||||||
|
# (self._subtracted_indices, data[self._index_cols])
|
||||||
|
# )
|
||||||
|
|
||||||
|
# def _subtract_from_indices(
|
||||||
|
# self,
|
||||||
|
# indices: pl.DataFrame,
|
||||||
|
# ) -> None:
|
||||||
|
# self._open = self._open.join(indices, on=self._index_cols, how="anti")
|
||||||
|
# self._subtracted_indices = pl.concat(
|
||||||
|
# (self._subtracted_indices, indices[self._index_cols])
|
||||||
|
# )
|
||||||
|
|
||||||
|
def _add_results(
|
||||||
|
self,
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
self._results = pl.concat([self._results, data])
|
||||||
|
|
||||||
|
# TODO remove
|
||||||
|
# def add_pipeline_results(self, pipeline: PipelineResult) -> None:
|
||||||
|
# self._add_results(pipeline.results)
|
||||||
|
|
||||||
|
# def subtract_pipeline(
|
||||||
|
# self,
|
||||||
|
# pipeline: PipelineResult,
|
||||||
|
# ) -> None:
|
||||||
|
# self._subtract_data(pipeline.subtracted_indices)
|
||||||
|
|
||||||
|
def merge_pipeline(
|
||||||
|
self,
|
||||||
|
pipeline: PipelineResult,
|
||||||
|
) -> None:
|
||||||
|
self._subtract_data(pipeline.subtracted_indices)
|
||||||
|
self._add_results(pipeline.results)
|
||||||
|
|
||||||
|
def write_results(
|
||||||
|
self,
|
||||||
data: pl.DataFrame,
|
data: pl.DataFrame,
|
||||||
vorlage: bool,
|
vorlage: bool,
|
||||||
wf_id: int,
|
wf_id: int,
|
||||||
freigabe_auto: types.Freigabe,
|
freigabe_auto: types.Freigabe,
|
||||||
is_out: bool,
|
is_out: bool,
|
||||||
) -> pl.DataFrame:
|
) -> None:
|
||||||
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
|
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
|
||||||
|
|
||||||
data = data.rename(db.map_to_result)
|
results = data.rename(db.map_to_result)
|
||||||
order_qty_expr: pl.Expr
|
order_qty_expr: pl.Expr
|
||||||
if is_out:
|
if is_out:
|
||||||
order_qty_expr = (
|
order_qty_expr = (
|
||||||
@ -416,7 +522,7 @@ def _write_results(
|
|||||||
else:
|
else:
|
||||||
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
|
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
|
||||||
|
|
||||||
data = data.with_columns(
|
results = results.with_columns(
|
||||||
[
|
[
|
||||||
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
|
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
|
||||||
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
|
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
|
||||||
@ -426,7 +532,7 @@ def _write_results(
|
|||||||
.cast(db.results_schema_map["freigabe_auto"]),
|
.cast(db.results_schema_map["freigabe_auto"]),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
data = data.drop(
|
results = results.drop(
|
||||||
[
|
[
|
||||||
"BEDP_TITELNR",
|
"BEDP_TITELNR",
|
||||||
"BEDP_MAN",
|
"BEDP_MAN",
|
||||||
@ -438,16 +544,70 @@ def _write_results(
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
return pl.concat([results_table, data])
|
self._subtract_data(data)
|
||||||
|
self._add_results(results)
|
||||||
|
|
||||||
|
|
||||||
|
# post-processing the results
|
||||||
|
# TODO: order quantity not always necessary
|
||||||
|
# TODO: change relevant criterion for order quantity
|
||||||
|
# def _write_results(
|
||||||
|
# pipe_result: PipelineResult,
|
||||||
|
# data: pl.DataFrame,
|
||||||
|
# vorlage: bool,
|
||||||
|
# wf_id: int,
|
||||||
|
# freigabe_auto: types.Freigabe,
|
||||||
|
# is_out: bool,
|
||||||
|
# ) -> PipelineResult:
|
||||||
|
# ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
|
||||||
|
|
||||||
|
# results = data.rename(db.map_to_result)
|
||||||
|
# order_qty_expr: pl.Expr
|
||||||
|
# if is_out:
|
||||||
|
# order_qty_expr = (
|
||||||
|
# pl.lit(0)
|
||||||
|
# .alias("ORDER_QTY_CRIT")
|
||||||
|
# .alias("best_menge")
|
||||||
|
# .cast(db.results_schema_map["best_menge"])
|
||||||
|
# )
|
||||||
|
# else:
|
||||||
|
# order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
|
||||||
|
|
||||||
|
# results = results.with_columns(
|
||||||
|
# [
|
||||||
|
# pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
|
||||||
|
# pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
|
||||||
|
# order_qty_expr,
|
||||||
|
# pl.lit(freigabe_auto.value)
|
||||||
|
# .alias("freigabe_auto")
|
||||||
|
# .cast(db.results_schema_map["freigabe_auto"]),
|
||||||
|
# ]
|
||||||
|
# )
|
||||||
|
# results = results.drop(
|
||||||
|
# [
|
||||||
|
# "BEDP_TITELNR",
|
||||||
|
# "BEDP_MAN",
|
||||||
|
# "BEDP_MENGE_BEDARF_VM",
|
||||||
|
# "MELDENUMMER",
|
||||||
|
# "VERLAGSNR",
|
||||||
|
# "MENGE_VORMERKER",
|
||||||
|
# "MANDFUEHR",
|
||||||
|
# ]
|
||||||
|
# )
|
||||||
|
|
||||||
|
# pipe_result.subtract_from_open(data)
|
||||||
|
# pipe_result.add_results(results)
|
||||||
|
|
||||||
|
# return pipe_result
|
||||||
|
|
||||||
|
|
||||||
def workflow_900(
|
def workflow_900(
|
||||||
pipe_result: types.PipelineResult,
|
pipe_result: PipelineResult,
|
||||||
) -> types.PipelineResult:
|
) -> PipelineResult:
|
||||||
"""pre-routine to handle non-feasible entries"""
|
"""filter 'Meldenummer' and fill non-feasible entries"""
|
||||||
|
|
||||||
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
||||||
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
pipe_res.open,
|
pipe_res.open,
|
||||||
(
|
(
|
||||||
@ -456,8 +616,7 @@ def workflow_900(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
pipe_result.results = _write_results(
|
pipe_result.write_results(
|
||||||
pipe_result.results,
|
|
||||||
data=res.out_,
|
data=res.out_,
|
||||||
vorlage=False,
|
vorlage=False,
|
||||||
wf_id=900,
|
wf_id=900,
|
||||||
@ -465,18 +624,18 @@ def workflow_900(
|
|||||||
is_out=True,
|
is_out=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
pipe_result.open = res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
|
pipe_result.update_open(res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)))
|
||||||
pipe_result.open = res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
|
pipe_result.update_open(res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)))
|
||||||
|
|
||||||
return pipe_result
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
# main routine
|
|
||||||
# results for filtered out entries written
|
|
||||||
def workflow_910(
|
def workflow_910(
|
||||||
pipe_result: types.PipelineResult,
|
pipe_result: PipelineResult,
|
||||||
) -> types.PipelineResult:
|
) -> PipelineResult:
|
||||||
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
|
# TODO check if necessary because of WF-900
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
||||||
|
|
||||||
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
||||||
|
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
@ -487,15 +646,13 @@ def workflow_910(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
# write results for entries which were filtered out
|
# write results for entries which were filtered out
|
||||||
pipe_result.results = _write_results(
|
pipe_result.write_results(
|
||||||
pipe_result.results,
|
|
||||||
data=res.out_,
|
data=res.out_,
|
||||||
vorlage=False,
|
vorlage=False,
|
||||||
wf_id=910,
|
wf_id=910,
|
||||||
freigabe_auto=types.Freigabe.WF_910,
|
freigabe_auto=types.Freigabe.WF_910,
|
||||||
is_out=True,
|
is_out=True,
|
||||||
)
|
)
|
||||||
pipe_result.open = res.in_
|
|
||||||
|
|
||||||
return pipe_result
|
return pipe_result
|
||||||
|
|
||||||
@ -503,11 +660,11 @@ def workflow_910(
|
|||||||
# this a main routine:
|
# this a main routine:
|
||||||
# receives and gives back result objects
|
# receives and gives back result objects
|
||||||
def workflow_100_umbreit(
|
def workflow_100_umbreit(
|
||||||
pipe_result: types.PipelineResult,
|
pipe_result: PipelineResult,
|
||||||
vm_criterion: str,
|
vm_criterion: str,
|
||||||
) -> types.PipelineResult:
|
) -> PipelineResult:
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
filter_mandant = pl.col("MANDFUEHR") == 1
|
filter_mandant = pl.col(MANDANT_CRITERION) == 1
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
@ -518,53 +675,27 @@ def workflow_100_umbreit(
|
|||||||
filter_number_vm,
|
filter_number_vm,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
pipe_result.results = _write_results(
|
pipe_result.write_results(
|
||||||
results_table=pipe_result.results,
|
|
||||||
data=res.in_,
|
data=res.in_,
|
||||||
vorlage=True,
|
vorlage=True,
|
||||||
wf_id=100,
|
wf_id=100,
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
is_out=False,
|
is_out=False,
|
||||||
)
|
)
|
||||||
pipe_result.open = res.out_
|
|
||||||
|
|
||||||
return pipe_result
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
def workflow_100_petersen(
|
def workflow_100_petersen(
|
||||||
pipe_result: types.PipelineResult,
|
pipe_result: PipelineResult,
|
||||||
vm_criterion: str,
|
vm_criterion: str,
|
||||||
) -> types.PipelineResult:
|
) -> PipelineResult:
|
||||||
# difference WDB and others
|
# difference WDB and others
|
||||||
|
|
||||||
# // WDB branch
|
# // WDB branch
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
||||||
filter_mandant = pl.col("MANDFUEHR") == 90
|
|
||||||
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
|
||||||
|
|
||||||
res = _apply_several_filters(
|
|
||||||
pipe_result.open,
|
|
||||||
(
|
|
||||||
filter_meldenummer,
|
|
||||||
filter_mandant,
|
|
||||||
filter_WDB,
|
|
||||||
filter_number_vm,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pipe_result.results = _write_results(
|
|
||||||
results_table=pipe_result.results,
|
|
||||||
data=res.in_,
|
|
||||||
vorlage=True,
|
|
||||||
wf_id=100,
|
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
|
||||||
is_out=False,
|
|
||||||
)
|
|
||||||
pipe_result.open = res.out_
|
|
||||||
|
|
||||||
# order quantity 0, no further action in other WFs
|
# order quantity 0, no further action in other WFs
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
filter_mandant = pl.col("MANDFUEHR") == 90
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
||||||
filter_number_vm = pl.col(vm_criterion) == 0
|
filter_number_vm = pl.col(vm_criterion) == 0
|
||||||
|
|
||||||
@ -577,19 +708,37 @@ def workflow_100_petersen(
|
|||||||
filter_number_vm,
|
filter_number_vm,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
pipe_result.results = _write_results(
|
pipe_result.write_results(
|
||||||
results_table=pipe_result.results,
|
|
||||||
data=res.in_,
|
data=res.in_,
|
||||||
vorlage=False,
|
vorlage=False,
|
||||||
wf_id=100,
|
wf_id=100,
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
is_out=False,
|
is_out=False,
|
||||||
)
|
)
|
||||||
pipe_result.open = res.out_
|
|
||||||
|
# TODO add check for orders or quantity to transform
|
||||||
|
# TODO show them
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
||||||
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
res_candidates = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_WDB,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
wdb_sub_pipe = PipelineResult(res_candidates.in_)
|
||||||
|
wdb_sub_pipe = wf100_petersen_wdb_sub1(wdb_sub_pipe)
|
||||||
|
assert len(wdb_sub_pipe.open) == 0
|
||||||
|
pipe_result.merge_pipeline(wdb_sub_pipe)
|
||||||
|
|
||||||
# // other branch
|
# // other branch
|
||||||
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
filter_mandant = pl.col("MANDFUEHR") == 90
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
filter_number_vm = pl.col(vm_criterion) > 0
|
filter_number_vm = pl.col(vm_criterion) > 0
|
||||||
|
|
||||||
res = _apply_several_filters(
|
res = _apply_several_filters(
|
||||||
@ -600,27 +749,85 @@ def workflow_100_petersen(
|
|||||||
filter_number_vm,
|
filter_number_vm,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
pipe_result.results = _write_results(
|
pipe_result.write_results(
|
||||||
results_table=pipe_result.results,
|
|
||||||
data=res.in_,
|
data=res.in_,
|
||||||
vorlage=True,
|
vorlage=True,
|
||||||
wf_id=100,
|
wf_id=100,
|
||||||
freigabe_auto=types.Freigabe.WF_100,
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
is_out=False,
|
is_out=False,
|
||||||
)
|
)
|
||||||
pipe_result.open = res.out_
|
|
||||||
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
|
def wf100_petersen_wdb_sub1(
|
||||||
|
pipe_result: PipelineResult,
|
||||||
|
) -> PipelineResult:
|
||||||
|
# input: pre-filtered entries (WDB titles and #VM > 0)
|
||||||
|
# more then 1 VM
|
||||||
|
# !! show these entries
|
||||||
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
||||||
|
res = _apply_several_filters(
|
||||||
|
pipe_result.open,
|
||||||
|
(filter_number_vm,),
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
is_out=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
|
||||||
|
# past 6 months
|
||||||
|
title_nos = res.out_["BEDP_TITELNR"].to_list()
|
||||||
|
# !! query used because of slow pre-filtering queries
|
||||||
|
# TODO check for more native pre-filtering within the database when
|
||||||
|
# TODO performance problems are solved
|
||||||
|
start_date = get_starting_date(180)
|
||||||
|
filter_ = sql.and_(
|
||||||
|
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos),
|
||||||
|
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
|
||||||
|
)
|
||||||
|
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
|
||||||
|
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
|
||||||
|
df_show = (
|
||||||
|
df_order.group_by("BESP_TITELNR")
|
||||||
|
.agg(pl.col("BESP_TITELNR").count().alias("count"))
|
||||||
|
.filter(pl.col("count") > 1)
|
||||||
|
)
|
||||||
|
entries_to_show = df_show["BESP_TITELNR"].to_list()
|
||||||
|
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show)
|
||||||
|
res = _apply_several_filters(pipe_result.open, (filter_titleno,))
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=res.in_,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
is_out=False,
|
||||||
|
)
|
||||||
|
pipe_result.write_results(
|
||||||
|
data=pipe_result.open,
|
||||||
|
vorlage=False,
|
||||||
|
wf_id=100,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
is_out=False,
|
||||||
|
)
|
||||||
|
|
||||||
return pipe_result
|
return pipe_result
|
||||||
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
# SAVING/LOADING
|
# SAVING/LOADING
|
||||||
p_save = Path.cwd() / "raw_data_from_sql_query_20251211-1.arrow"
|
p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow"
|
||||||
df = pl.read_ipc(p_save)
|
df = pl.read_ipc(p_save)
|
||||||
print(f"Number of entries: {len(df)}")
|
print(f"Number of entries: {len(df)}")
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
df.head()
|
df.head()
|
||||||
|
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
# removed_rows = []
|
# removed_rows = []
|
||||||
|
|
||||||
@ -659,7 +866,8 @@ df.head()
|
|||||||
|
|
||||||
# %%
|
# %%
|
||||||
raw_data = df.clone()
|
raw_data = df.clone()
|
||||||
pipe_res = get_empty_pipeline_result(raw_data)
|
# pipe_res = get_empty_pipeline_result(raw_data)
|
||||||
|
pipe_res = PipelineResult(raw_data)
|
||||||
pipe_res.results
|
pipe_res.results
|
||||||
pipe_res = workflow_900(pipe_res)
|
pipe_res = workflow_900(pipe_res)
|
||||||
print(f"Length of base data: {len(raw_data):>18}")
|
print(f"Length of base data: {len(raw_data):>18}")
|
||||||
@ -916,3 +1124,124 @@ for col, dtype in zip(df.columns, df.dtypes):
|
|||||||
print("dtypes of DF...")
|
print("dtypes of DF...")
|
||||||
pprint(col_dtype)
|
pprint(col_dtype)
|
||||||
# %%
|
# %%
|
||||||
|
# ** Petersen WDB
|
||||||
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
||||||
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
||||||
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
||||||
|
filter_number_vm = pl.col(VM_CRITERION) > 0
|
||||||
|
|
||||||
|
res = _apply_several_filters(
|
||||||
|
df,
|
||||||
|
(
|
||||||
|
filter_meldenummer,
|
||||||
|
filter_mandant,
|
||||||
|
filter_WDB,
|
||||||
|
filter_number_vm,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
# %%
|
||||||
|
res.in_
|
||||||
|
# %%
|
||||||
|
# !! show these entries
|
||||||
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
||||||
|
|
||||||
|
res_vm_crit = _apply_several_filters(
|
||||||
|
res.in_,
|
||||||
|
(filter_number_vm,),
|
||||||
|
)
|
||||||
|
# %%
|
||||||
|
res_vm_crit.out_
|
||||||
|
# %%
|
||||||
|
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months
|
||||||
|
title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list()
|
||||||
|
len(title_nos)
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# define starting date for 6 month interval
|
||||||
|
# returns UTC time
|
||||||
|
start_date = get_starting_date(180)
|
||||||
|
filter_ = sql.and_(
|
||||||
|
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos),
|
||||||
|
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
|
||||||
|
)
|
||||||
|
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
|
||||||
|
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
|
||||||
|
df_order
|
||||||
|
# %%
|
||||||
|
# filter entries which have
|
||||||
|
df_show = (
|
||||||
|
df_order.group_by("BESP_TITELNR")
|
||||||
|
.agg(pl.col("BESP_TITELNR").count().alias("count"))
|
||||||
|
.filter(pl.col("count") > 1)
|
||||||
|
)
|
||||||
|
# %%
|
||||||
|
# !! show these entries
|
||||||
|
# !! do not show others
|
||||||
|
entries_to_show = df_show["BESP_TITELNR"].to_list()
|
||||||
|
print(f"Number of entries relevant: {len(entries_to_show)}")
|
||||||
|
# %%
|
||||||
|
res_vm_crit.out_
|
||||||
|
# %%
|
||||||
|
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show)
|
||||||
|
|
||||||
|
res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,))
|
||||||
|
|
||||||
|
# %%
|
||||||
|
res_wdb.in_
|
||||||
|
# %%
|
||||||
|
res_wdb.out_
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# %%
|
||||||
|
# %%
|
||||||
|
schema = {}
|
||||||
|
for col in ("BEDARFNR", "BEDP_SEQUENZ"):
|
||||||
|
schema[col] = db.raw_data_query_schema_map[col]
|
||||||
|
base = pl.DataFrame(schema=schema)
|
||||||
|
# %%
|
||||||
|
data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))}
|
||||||
|
orig_data = pl.DataFrame(data, schema=schema)
|
||||||
|
data = orig_data[:5].clone()
|
||||||
|
# %%
|
||||||
|
pl.concat([base, data])
|
||||||
|
# %%
|
||||||
|
orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti")
|
||||||
|
# %%
|
||||||
|
orig_data[("BEDARFNR", "BEDP_SEQUENZ")]
|
||||||
|
# %%
|
||||||
|
raw_data = df.clone()
|
||||||
|
pipe_res = PipelineResult(raw_data)
|
||||||
|
pipe_res.open
|
||||||
|
# %%
|
||||||
|
pipe_res.results
|
||||||
|
# %%
|
||||||
|
sub_data = pipe_res.open[:20].clone()
|
||||||
|
sub_data
|
||||||
|
# %%
|
||||||
|
pipe_res.write_results(
|
||||||
|
sub_data,
|
||||||
|
vorlage=True,
|
||||||
|
wf_id=30,
|
||||||
|
freigabe_auto=types.Freigabe.WF_100,
|
||||||
|
is_out=True,
|
||||||
|
)
|
||||||
|
# %%
|
||||||
|
pipe_res.open
|
||||||
|
# %%
|
||||||
|
pipe_res.results
|
||||||
|
# %%
|
||||||
|
raw_data = df.clone()
|
||||||
|
pipe_res_main = PipelineResult(raw_data)
|
||||||
|
pipe_res_main.open
|
||||||
|
# %%
|
||||||
|
pipe_res_main.merge_pipeline(pipe_res)
|
||||||
|
# %%
|
||||||
|
pipe_res_main.open
|
||||||
|
# %%
|
||||||
|
pipe_res.results
|
||||||
|
# %%
|
||||||
|
|||||||
@ -19,8 +19,27 @@ set timing on
|
|||||||
-- AND bedp.BEDP_MAN = t_info.MANDFUEHR;
|
-- AND bedp.BEDP_MAN = t_info.MANDFUEHR;
|
||||||
-- PROMPT ####################################
|
-- PROMPT ####################################
|
||||||
|
|
||||||
PROMPT >>>>>>>>> All allowed
|
-- PROMPT >>>>>>>>> All allowed
|
||||||
SELECT COUNT(*) FROM (
|
-- SELECT COUNT(*) FROM (
|
||||||
|
-- SELECT
|
||||||
|
-- bedp.BEDARFNR,
|
||||||
|
-- bedp.BEDP_SEQUENZ,
|
||||||
|
-- bedp.BEDP_TITELNR,
|
||||||
|
-- bedp.BEDP_MAN,
|
||||||
|
-- bedp.BEDP_MENGE_BEDARF_VM,
|
||||||
|
-- t_info.MELDENUMMER,
|
||||||
|
-- t_info.VERLAGSNR
|
||||||
|
-- t_info.MENGE_VORMERKER
|
||||||
|
-- t_info.MANDFUEHR
|
||||||
|
-- FROM EXT_BEDPBED bedp
|
||||||
|
-- LEFT JOIN EXT_TITEL_INFO t_info
|
||||||
|
-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
|
||||||
|
-- );
|
||||||
|
|
||||||
|
-- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26;
|
||||||
|
-- PROMPT ######################################
|
||||||
|
PROMPT #################################################
|
||||||
|
SELECT * FROM (
|
||||||
SELECT
|
SELECT
|
||||||
bedp.BEDARFNR,
|
bedp.BEDARFNR,
|
||||||
bedp.BEDP_SEQUENZ,
|
bedp.BEDP_SEQUENZ,
|
||||||
@ -28,17 +47,21 @@ SELECT COUNT(*) FROM (
|
|||||||
bedp.BEDP_MAN,
|
bedp.BEDP_MAN,
|
||||||
bedp.BEDP_MENGE_BEDARF_VM,
|
bedp.BEDP_MENGE_BEDARF_VM,
|
||||||
t_info.MELDENUMMER,
|
t_info.MELDENUMMER,
|
||||||
t_info.VERLAGSNR
|
t_info.VERLAGSNR,
|
||||||
t_info.MENGE_VORMERKER
|
t_info.MENGE_VORMERKER,
|
||||||
t_info.MANDFUEHR
|
t_info.MANDFUEHR
|
||||||
FROM EXT_BEDPBED bedp
|
FROM EXT_BEDPBED bedp
|
||||||
LEFT JOIN EXT_TITEL_INFO t_info
|
LEFT JOIN EXT_TITEL_INFO t_info
|
||||||
ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
|
ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
|
||||||
);
|
) view1
|
||||||
|
WHERE view1.VERLAGSNR IN (76008, 76070)
|
||||||
|
FETCH FIRST 100 ROWS ONLY;
|
||||||
|
|
||||||
|
SELECT * FROM EXT_BESPBES_INFO besp
|
||||||
|
WHERE besp.BESP_TITELNR = 7590554 AND
|
||||||
|
besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD');
|
||||||
|
|
||||||
|
|
||||||
-- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26;
|
|
||||||
-- PROMPT ######################################
|
|
||||||
PROMPT #################################################
|
|
||||||
-- SELECT COUNT(*) FROM (
|
-- SELECT COUNT(*) FROM (
|
||||||
-- SELECT /*+ NO_USE_HASH(bedp t_info) */
|
-- SELECT /*+ NO_USE_HASH(bedp t_info) */
|
||||||
-- view1.BEDP_TITELNR,
|
-- view1.BEDP_TITELNR,
|
||||||
|
|||||||
@ -16,13 +16,13 @@ class FilterResult:
|
|||||||
out_: pl.DataFrame
|
out_: pl.DataFrame
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True, kw_only=True, eq=False)
|
# @dataclass(slots=True, kw_only=True, eq=False)
|
||||||
class PipelineResult:
|
# class PipelineResult:
|
||||||
results: pl.DataFrame
|
# results: pl.DataFrame
|
||||||
open: pl.DataFrame
|
# open: pl.DataFrame
|
||||||
|
|
||||||
def __len__(self) -> int:
|
# def __len__(self) -> int:
|
||||||
return len(self.results) + len(self.open)
|
# return len(self.results) + len(self.open)
|
||||||
|
|
||||||
|
|
||||||
class Freigabe(enum.Enum):
|
class Freigabe(enum.Enum):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user