umbreit-py/data_analysis/02-3_oracle_workflow_test.py

1792 lines
50 KiB
Python

# %%
from __future__ import annotations
import datetime
import json
import shutil
import tempfile
import time
import typing
import uuid
from collections.abc import Sequence
from pathlib import Path
from pprint import pprint
import dopt_basics.datetime as dt
import oracledb
import polars as pl
import polars.selectors as cs
import sqlalchemy as sql
from dopt_basics import configs, io
from sqlalchemy import event
from umbreit import db, types
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# %%
# import importlib
# types = importlib.reload(types)
# db = importlib.reload(db)
# %%
def create_tmp_dir() -> Path:
tmp_pth = Path(tempfile.mkdtemp())
assert tmp_pth.exists()
return tmp_pth
TMP_DIR = create_tmp_dir()
def clear_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir()
def remove_tmp_dir() -> None:
shutil.rmtree(TMP_DIR)
print(f"Created temp directory under: >{TMP_DIR}<")
# %%
p_cfg = io.search_file_iterative(
starting_path=Path.cwd(),
glob_pattern="CRED*.toml",
stop_folder_name="umbreit-py",
)
assert p_cfg is not None
CFG = configs.load_toml(p_cfg)
HOST = CFG["server"]["host"]
PORT = CFG["server"]["port"]
SERVICE = CFG["server"]["service"]
USER_NAME = CFG["user"]["name"]
USER_PASS = CFG["user"]["pass"]
# %%
# !! init thick mode
# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29")
# assert p_oracle_client.exists()
# assert p_oracle_client.is_dir()
# oracledb.init_oracle_client(lib_dir=str(p_oracle_client))
# %%
conn_string = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
)
# engine = sql.create_engine(conn_string)
engine = sql.create_engine(
conn_string,
execution_options={"stream_results": True},
)
# @event.listens_for(engine, "after_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# @event.listens_for(engine, "before_cursor_execute")
# def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
# cursor.arraysize = 1000
# cursor.prefetchrows = 1000
# %%
########### RESULTS ###########
# temporary
res_engine = sql.create_engine("sqlite://")
db.metadata.create_all(res_engine, tables=(db.results_local,))
# %%
# delete existing results
def delete_results(
res_engine: sql.Engine,
) -> None:
with res_engine.begin() as conn:
res = conn.execute(sql.delete(db.results_local))
print("Rows deleted: ", res.rowcount)
delete_results(res_engine)
stmt = sql.select(db.results_local.c.bedarf_nr, db.results_local.c.bedarf_sequenz)
with res_engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
# define starting date for 3 month interval
# returns UTC time
current_dt = dt.current_time_tz(cut_microseconds=True)
print("Current DT: ", current_dt)
td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS)
print("Timedelta: ", td)
start_date = (current_dt - td).date()
print("Starting date: ", start_date)
# %%
# // ---------- LIVE DATA -----------
# WF-200: filter for relevant orders with current BEDP set
# missing: order types which are relevant
filter_K_rech = (608991, 260202)
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR,
db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT,
)
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR,
)
where_condition = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech),
)
stmt = (
sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
db.EXT_AUFPAUF,
)
.select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition))
.where(where_condition)
.limit(100) # full query really slow
)
# %%
print(stmt.compile(engine))
# %%
df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
df_order
# %%
# AUFPAUF
# stmt = sql.select(db.EXT_AUFPAUF)
# df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
# df_aufpauf
# df_aufpauf.filter(pl.col("TITELNR") == 6315273)
# prefilter amount columns for invalid entries
# // tests with ext_bedpbed
# print("--------------- ext_bedpbed --------------")
# t1 = time.perf_counter()
# AMOUNT_COLS = frozenset(
# (
# "BEDP_MENGE_BEDARF",
# "BEDP_MENGE_VERKAUF",
# "BEDP_MENGE_ANFRAGE",
# "BEDP_MENGE_BESTELLUNG",
# "BEDP_MENGE_FREI",
# "BEDP_MENGE_BEDARF_VM",
# )
# )
# case_stmts = []
# for col in AMOUNT_COLS:
# case_stmts.append(
# sql.case(
# (db.ext_bedpbed.c[col] <= -1, sql.null()),
# else_=db.ext_bedpbed.c[col],
# ).label(col)
# )
# stmt = sql.select(
# *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS],
# *case_stmts,
# )
# df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
# t2 = time.perf_counter()
# elapsed = t2 - t1
# %%
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
# print(f"Query duration: {elapsed:.4f} sec")
# print("Number of entries: ", len(df))
# print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %%
# try title_info parsing
stmt = sql.select(db.ext_titel_info)
print(stmt.compile(engine))
# %%
# raw data query
print("--------------- raw data query --------------")
t1 = time.perf_counter()
# join_condition = sql.and_(
# db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
# db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
# )
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
# db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
print(stmt.compile(engine))
df = pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
t2 = time.perf_counter()
elapsed = t2 - t1
# %%
print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %%
df.head()
# %%
temp = df.with_columns(
pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0),
)
temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0)
# %%
# // NO LIVE DATA NEEDED
# SAVING/LOADING
# p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow"
p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
# df.write_ipc(p_save)
df = pl.read_ipc(p_save)
# %%
print(len(df))
df.head()
# %%
temp = df.fill_null(0)
mask = df.select(pl.exclude("BEDARFNR", "BEDP_SEQUENZ")).is_duplicated()
temp.filter(mask).sort("BEDP_TITELNR")
# %%
temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90))).with_columns(
pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0),
)
temp = df.with_columns(
pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0),
)
temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0)
# %%
df.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0)
# %%
# ** CHECK: duplicates
temp = df.fill_null(0)
mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated()
temp.filter(mask)
# %%
df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False)
# %%
# ** CHECK: positions without titlenumber
df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique()
# %%
# ** CHECK: unique title number?
df.group_by("BEDP_TITELNR").agg(
pl.col("BEDP_TITELNR").len().alias("count"),
pl.col.BEDP_MAN.unique().alias("unique_bedp_man"),
pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"),
).unique().filter(pl.col("count") > 1)
# %%
df.filter(pl.col.BEDP_TITELNR == 8679893)
# %%
df.with_columns(
pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count")
).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter(
pl.col("titlenumber_count") > 1
)
# %%
# ** CHECK: distribution of MELDENUMMER
temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90)))
sum_entries = len(temp)
temp = (
temp.group_by("MELDENUMMER")
.agg(pl.col("MELDENUMMER").len().alias("count"))
.sort("count", descending=True)
)
temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion"))
temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum"))
temp
# df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select(
# pl.len()
# )
# p_save = Path.cwd() / "meldenummer_anteile_20260114-2.xlsx"
# temp.write_excel(p_save)
# %%
# ** CHECK: differences MANDANT in BEDP and in TINFO
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique())
# %%
df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique())
# %%
df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1)
# %%
# df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5)
df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null())
# %%
# ** CHECK: different MANDANTEN
# check for valid entries for unknown MANDANTEN
# MANDANTEN others than (1, 90) do not possess relevant properties such as
# "MELDENUMMER" and others --> conclusion: not relevant
# MANDANT = 80
# print(f"Mandant: {MANDANT}")
# print(
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
# )
# )
# print(
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
# ).null_count()
# )
# print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts()))
# %%
df.filter(pl.col("MELDENUMMER").is_null()).filter(pl.col("MANDFUEHR").is_not_null())
# %%
# ** PREFILTER
# always needed, entries filtered out are to be disposed
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
df.filter(filter_meldenummer_null).filter(filter_mandant)
# df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26)
# %%
len(df)
# %%
# ** CHECK: null values set in the query with CASE statement
# not known if NULL because of CASE statement or already set in table
# unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"?
# from the title DB
df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null())
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
# %%
df.select("MELDENUMMER").unique()
# %%
# ** CHECK: null values for "MENGE_VORMERKER"
df.filter(pl.col("MENGE_VORMERKER").is_null())
# df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
agg_t = (
df.group_by(["MELDENUMMER"]).agg(
# pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(),
pl.col("MENGE_VORMERKER").alias("VM_count").unique(),
)
# .filter(pl.col("count_customer") >= 0) # !! should be 3
) # .filter(pl.col("MELDENUMMER") == 18)
agg_t
# %%
df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum())
# %%
# ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER"
# ** not known at this point
# there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER -->
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20260130-1.xlsx"
)
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
# why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"?
# %%
# ** CHECK: titles with request where no title information is found
# result: there were entries found on 02.12., but not on 03.12.2025
not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null())
EXPORT_FEAT = "BEDP_TITELNR"
to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251211-1.json"
print(to_save)
# with open(p_save_not_in_title_table, "w") as file:
# json.dump(to_save, file, indent=4)
# %%
df.group_by("BEDP_MAN").agg(pl.len())
# %%
df.filter(pl.col("MELDENUMMER").is_null()).group_by("BEDP_MAN").agg(pl.len().alias("count"))
# %%
print(len(df.filter(pl.col("MELDENUMMER") == 18)))
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
# %%
# VM_CRITERION = "MENGE_VORMERKER"
VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys()
)
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1"
def save_tmp_file(
data: pl.DataFrame,
filename: str | None,
) -> None:
if filename is None:
filename = str(uuid.uuid4())
pth = (TMP_DIR / filename).with_suffix(".arrow")
n: int = 1
while pth.exists():
filename_new = pth.stem + f"_{n}"
pth = (TMP_DIR / filename_new).with_suffix(".arrow")
n += 1
data.write_ipc(pth)
def load_tmp_file(
filename: str,
) -> pl.DataFrame:
pth = (TMP_DIR / filename).with_suffix(".arrow")
if not pth.exists():
raise FileNotFoundError(f"File >{pth.name}< not found")
return pl.read_ipc(pth)
def load_all_tmp_files() -> dict[str, pl.DataFrame]:
all_dfs: dict[str, pl.DataFrame] = {}
for file in TMP_DIR.glob("*.arrow"):
df = pl.read_ipc(file)
all_dfs[file.stem] = df
return all_dfs
def get_starting_date(
days_from_now: int,
) -> datetime.date:
current_dt = dt.current_time_tz(cut_microseconds=True)
td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
return (current_dt - td).date()
def get_raw_data() -> pl.DataFrame:
join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
db.ext_titel_info.c.EINKAEUFER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database(
stmt,
engine,
schema_overrides=db.raw_data_query_schema_map,
)
def save_tmp_data(df: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.tmp_data))
with engine.begin() as conn:
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
def get_tmp_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.tmp_data),
engine,
schema_overrides=db.tmp_data_schema_map,
)
def get_result_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.EXT_DOPT_ERGEBNIS),
engine,
schema_overrides=db.results_schema_map,
)
def save_result_data(results: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
def clear_result_data() -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
def save_result_data_native(results: pl.DataFrame) -> None:
results = results.with_columns(
[
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
for c in results.select(cs.boolean()).columns
]
)
stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
"BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6)
"""
with engine.begin() as conn:
raw_conn = conn.connection.connection
with raw_conn.cursor() as cursor:
cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True))
def _apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> types.FilterResult:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return types.FilterResult(in_=df_current, out_=df_removed)
class PipelineResult:
__slots__ = ("_results", "_open", "_subtracted_indices")
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
def __init__(
self,
data: pl.DataFrame,
) -> None:
self._open = data
schema = db.results_schema_map.copy()
del schema["ID"]
self._results = pl.DataFrame(schema=schema)
schema = {}
for col in self._index_cols:
schema[col] = db.raw_data_query_schema_map[col]
self._subtracted_indices = pl.DataFrame(schema=schema)
def __len__(self) -> int:
return len(self._results) + len(self._open)
@property
def open(self) -> pl.DataFrame:
return self._open
@property
def results(self) -> pl.DataFrame:
return self._results
@property
def subtracted_indices(self) -> pl.DataFrame:
return self._subtracted_indices
def update_open(
self,
data: pl.DataFrame,
) -> None:
self._open = data
def _subtract_data(
self,
data: pl.DataFrame,
) -> None:
self._open = self._open.join(data, on=self._index_cols, how="anti")
self._subtracted_indices = pl.concat(
(self._subtracted_indices, data[self._index_cols])
)
def _add_results(
self,
data: pl.DataFrame,
) -> None:
res = pl.concat([self._results, data])
self._results = res
def merge_pipeline(
self,
pipeline: PipelineResult,
) -> None:
self._subtract_data(pipeline.subtracted_indices)
self._add_results(pipeline.results)
def write_results(
self,
data: pl.DataFrame,
vorlage: bool,
wf_id: types.Workflows,
freigabe_auto: types.Freigabe,
order_qty_expr: pl.Expr,
) -> None:
results = data.rename(db.map_data_to_result)
results = results.with_columns(
[
pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
order_qty_expr,
pl.lit(freigabe_auto.value)
.alias("FREIGABE_AUTO")
.cast(db.results_schema_map["FREIGABE_AUTO"]),
]
)
results = results.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"VERLAGSNR",
"MENGE_VORMERKER",
"MANDFUEHR",
"EINKAEUFER",
]
)
self._subtract_data(data)
self._add_results(results)
class ExprOrderQty(typing.Protocol): ...
class ExprOrderQty_Base(ExprOrderQty, typing.Protocol):
def __call__(self) -> pl.Expr: ...
ExprOrderQty_Base_Types: typing.TypeAlias = (
typing.Literal[types.Workflows.ID_200]
| typing.Literal[types.Workflows.ID_900]
| typing.Literal[types.Workflows.ID_910]
)
class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol):
def __call__(self, empty: bool) -> pl.Expr: ...
@typing.overload
def get_expr_order_qty(
wf_id: typing.Literal[types.Workflows.ID_100],
) -> ExprOrderQty_WF100: ...
@typing.overload
def get_expr_order_qty(
wf_id: ExprOrderQty_Base_Types,
) -> ExprOrderQty_Base: ...
def get_expr_order_qty(
wf_id: types.Workflows,
) -> ExprOrderQty:
empty_expr = (
pl.lit(0)
.alias(ORDER_QTY_CRIT)
.alias("BEST_MENGE")
.cast(db.results_schema_map["BEST_MENGE"])
)
def _empty() -> pl.Expr:
return empty_expr
func: ExprOrderQty
match wf_id:
case types.Workflows.ID_100:
def _func(empty: bool) -> pl.Expr:
order_qty_expr: pl.Expr
if empty:
order_qty_expr = empty_expr
else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE")
return order_qty_expr
func = _func
case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910:
func = _empty
case _:
raise NotImplementedError(
f"Order expression for WF-ID {wf_id.value} is not implemented"
)
return func
def wf900(
pipe_result: PipelineResult,
) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
res = _apply_several_filters(
pipe_res.open,
(
filter_meldenummer_null,
filter_mandant,
),
)
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_900,
freigabe_auto=types.Freigabe.WF_900,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.update_open(
res.in_.with_columns(
pl.col("MENGE_VORMERKER").fill_null(0),
pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0),
)
)
return pipe_result
def wf910(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
pipe_result.write_results(
data=res.out_,
vorlage=False,
wf_id=types.Workflows.ID_910,
freigabe_auto=types.Freigabe.WF_910,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result
# this a main routine:
# receives and gives back result objects
def wf100_umbreit(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf100_petersen(
pipe_result: PipelineResult,
vm_criterion: str,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# difference WDB and others
# // WDB branch
# order quantity 0, no further action in other WFs
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) == 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=True),
)
filter_number_vm = pl.col(vm_criterion) > 0
res_candidates = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
wdb_sub_pipe = PipelineResult(res_candidates.in_)
wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe)
assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch
# show always entries with #VM > 1
filter_number_vm = pl.col(vm_criterion) > 1
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def _wf100_petersen_sub1_wdb(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
# input: pre-filtered entries (WDB titles and #VM > 0)
# more than 1 VM: show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res = _apply_several_filters(
pipe_result.open,
(filter_number_vm,),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months
save_tmp_data(pipe_result.open)
RELEVANT_DATE = get_starting_date(180)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
stmt = (
sql.select(
db.tmp_data,
db.EXT_BESPBES_INFO.c.BESP_MENGE,
db.EXT_BESPBES_INFO.c.BESP_STATUS,
)
.select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
count_col = sql.func.count()
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB)
relevant_titles = relevant_titles.filter(pl.col.COUNT > 1)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
return pipe_result
def wf200_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
pipe_result.open,
(filter_meldenummer, filter_mandant),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def wf200_petersen(
pipe_result: PipelineResult,
) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_MNR: tuple[int, ...] = (17, 18)
# // WDB branch
filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR)
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
),
)
# ignore these
pipe_result.write_results(
data=res.in_,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
# // other branch
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
),
)
sub_pipe = PipelineResult(res.in_)
sub_pipe = _wf200_sub1(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf200_sub1(
pipe_result: PipelineResult,
) -> PipelineResult:
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(90)
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results(
data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False,
wf_id=types.Workflows.ID_200,
freigabe_auto=types.Freigabe.WF_200,
order_qty_expr=ORDER_QTY_FUNC(),
)
return pipe_result
# %%
# SAVING/LOADING
READ_DATABASE = False
OVERWRITE = True
FILENAME = "raw_data_from_sql_query_20260202-1.arrow"
p_save = Path.cwd() / FILENAME
if READ_DATABASE:
df = get_raw_data()
if not p_save.exists() or OVERWRITE:
df.write_ipc(p_save)
else:
df = pl.read_ipc(p_save)
# %%
df
# %%
# initialise pipeline
raw_data = df.clone()
print(f"Number of entries: {len(df)}")
clear_tmp_dir()
clear_result_data()
# %%
df.head()
# %%
raw_data = df.clone()
# pipe_res = get_empty_pipeline_result(raw_data)
pipe_res = PipelineResult(raw_data)
pipe_res.results
pipe_res = wf900(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results
# %%
pipe_res = wf910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res = wf100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res = wf100_petersen(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res = wf200_umbreit(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res = wf200_petersen(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18)))
# %%
pipe_res.results.select(pl.col("VORLAGE").value_counts())
# %%
pipe_res.results.height
# %%
# // aggregate test results
all_tmps = load_all_tmp_files()
print(len(all_tmps))
# %%
def prepare_tmp_data() -> list[pl.DataFrame]:
all_tmps = load_all_tmp_files()
WF_100_TMP_RENAME = {"COUNT": "WF-100_WDB_Anz-Best-Petersen_verg_6_Monate"}
WF_200_TMP_RENAME = {
"COUNT": "WF-200_Anz-Best-Kunde_verg_3_Monate",
"CUSTOMER_COUNT": "WF-200_Anz-Kunden_verg_3_Monate",
}
WF_100: list[pl.DataFrame] = []
WF_200: list[pl.DataFrame] = []
for name, df in all_tmps.items():
if TMPFILE_WF100_SUB1_WDB in name:
rename_schema = WF_100_TMP_RENAME
df = df.rename(rename_schema)
WF_100.append(df)
elif TMPFILE_WF200_SUB1 in name:
rename_schema = WF_200_TMP_RENAME
df = df.rename(rename_schema)
WF_200.append(df)
tmp_WF_collects = (WF_100, WF_200)
all_tmps_preproc: list[pl.DataFrame] = []
for collect in tmp_WF_collects:
if len(collect) > 1:
df = pl.concat(collect)
elif len(collect) == 1:
df = collect[0].clone()
else:
raise RuntimeError()
all_tmps_preproc.append(df)
return all_tmps_preproc
def generate_test_result_data(
raw_data: pl.DataFrame,
pipe_result: PipelineResult,
) -> pl.DataFrame:
all_tmps_preproc = prepare_tmp_data()
res_table = pipe_result.results.clone()
res_title_info = res_table.join(
raw_data,
left_on=["BEDARF_NR", "BEDARF_SEQUENZ"],
right_on=["BEDARFNR", "BEDP_SEQUENZ"],
how="inner",
)
exclude_cols = ("BEDARF_NR", "BEDARF_SEQUENZ")
res_title_info = res_title_info.select(pl.exclude(exclude_cols))
columns = [
"VORLAGE",
"WF_ID",
"BEST_MENGE",
"FREIGABE_AUTO",
"BEDP_MENGE_BEDARF_VM",
"MENGE_VORMERKER",
"BEDP_TITELNR",
"BEDP_MAN",
"MELDENUMMER",
"VERLAGSNR",
"EINKAEUFER",
"MANDFUEHR",
]
res_title_info = res_title_info.select(columns)
test_results = res_title_info.clone()
for df in all_tmps_preproc:
test_results = test_results.join(df, on="BEDP_TITELNR", how="left")
test_results = test_results.sort(by=["WF_ID", "BEDP_MAN"], descending=False)
test_results = test_results.select(pl.int_range(1, pl.len() + 1).alias("Index"), pl.all())
return test_results
# %%
test_results = generate_test_result_data(raw_data, pipe_res)
test_results.head()
# %%
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Testdatensatz_WF-100-200_{date_str}.xlsx"
test_results.to_pandas().set_index("Index").to_excel(
p_save,
freeze_panes=(1, 1),
sheet_name=f"Ergebnisse_Testphase_{date_str}",
)
#####################################################################
# %%
# ** deviating titles where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER
deviation_vm = test_results.with_columns(pl.col.MENGE_VORMERKER.fill_null(0)).filter(
pl.col.BEDP_MENGE_BEDARF_VM > pl.col.MENGE_VORMERKER
)
deviation_vm = test_results.filter(pl.col.BEDP_TITELNR.is_in(dev["BEDP_TITELNR"].implode()))
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
p_save = Path.cwd() / f"Abweichungen-VM_{date_str}.xlsx"
deviation_vm.to_pandas().set_index("Index").to_excel(p_save, freeze_panes=(1, 1))
# ** WF-200 potentially triggered
raw_data.filter(pl.col.MELDENUMMER.is_in((17, 18))).filter(
pl.col.BEDP_TITELNR.is_duplicated()
).sort("BEDP_TITELNR")
# %%
# ---------------------------------------------------------------------------- #
# Workflow 200 (Umbreit only)
# ---------------------------------------------------------------------------- #
# %%
wf_200_start_data = pipe_res.open.clone()
wf_200_start_data
# %%
engine.dispose()
# %%
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
wf_200_start_data,
(filter_meldenummer, filter_mandant),
)
# %%
# these entries must be checked for relevant orders
# therefore, a temp table must be created in the database to execute efficient
# queries, other approaches are just hacks
# SOLUTION:
# - save these entries to a temp table 'temp'
# - look up the order history of the past 3 months
# -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR
# -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND
# -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND
#
# this is a separate sub-pipeline like in Petersen WF-100
# these entries are either to be shown or not
sub_pipe_umbreit = PipelineResult(res.in_)
# %%
sub_pipe_umbreit.open
# %%
# %%
save_tmp_data(sub_pipe_umbreit.open)
# %%
rel_date = get_starting_date(90)
rel_date
# %%
# old way using in statements
# filter_ = sql.and_(
# db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice),
# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date,
# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
# )
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
# filter_ = sql.and_(
# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date,
# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
# db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
# )
# stmt = (
# sql.select(
# db.tmp_data,
# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
# db.EXT_AUFPAUF.c.AUFTRAGS_ART,
# )
# .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
# .where(filter_)
# )
# print(stmt.compile(engine))
# new_schema = db.EXT_AUFPAUF_schema_map.copy()
# new_schema.update(db.tmp_data_schema_map)
# new_schema
# %%
# demo = pl.read_database(
# stmt,
# engine,
# schema_overrides=db.EXT_AUFPAUF_schema_map,
# )
# # %%
# demo
# # %%
# demo.select(pl.col.AUFTRAGS_ART).unique()
# %%
get_tmp_data()
# %%
# demo_2 = demo.clone()
# # demo_2.head()
# print(f"Number of titles before filtering: {len(demo_2)}")
# demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99)))
# demo_2 = (
# demo_2.group_by("BEDP_TITELNR", maintain_order=True)
# .agg(
# pl.len().alias("count"),
# pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"),
# )
# .filter(pl.col.customer_count >= 3)
# )
# # these remaining titles are relevant and should be shown
# # the others should be disposed
# print(f"Number of titles which are relevant: {len(demo_2)}")
# print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}")
# demo_2
# %%
# make a subquery for the pre-filtered entries
# // query to obtain relevant title numbers
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)),
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFTRAGS_ART,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
print(stmt.compile(engine))
# %%
demo_agg = pl.read_database(
stmt,
engine,
)
# %%
demo_agg
# %%
sub_pipe_umbreit.open
# sub_pipe_umbreit.open.select("BEDP_TITELNR").n_unique()
# %%
# now obtain these entries from the open data
demo_agg["BEDP_TITELNR"].unique().implode()
entries_to_show = sub_pipe_umbreit.open.filter(
pl.col.BEDP_TITELNR.is_in(demo_agg["BEDP_TITELNR"].unique().implode())
)
entries_to_show
# %%
sub_pipe_umbreit.open
# %%
df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION)
df
# %%
df.filter(pl.col("BEDARFNR") == 884607)
# %%
df_order.filter(pl.col("BEDARFNR") == 884607)
# %%
# now obtain order data for entries
t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner")
t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0))
t
# %%
agg_t = (
t.group_by(["BEDARFNR", "BEDP_SEQUENZ"])
.agg(
pl.count("AUFP_POSITION").alias("pos_count"),
pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(),
)
.filter(pl.col("count_customer") >= 0) # !! should be 3
)
agg_t
# %%
df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65))
# %%
# ---------------------------------------------------------------------------- #
# Writing results in DB
# ---------------------------------------------------------------------------- #
delete_results()
pipe_post.write_database(db.results.fullname, engine, if_table_exists="append")
stmt = sql.select(db.results)
db_results = pl.read_database(stmt, engine)
db_results
# ---------------------------------------------------------------------------- #
# Further Data Analysis
# ---------------------------------------------------------------------------- #
# %%
stmt = sql.select(db.ext_bedpbed)
df = pl.read_database(
stmt,
engine,
schema_overrides=db.ext_bedpbed_schema_map,
)
# %%
df.group_by("BEDP_TITELNR").agg(
pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN")
).filter(pl.col("unique_BEDP_MAN") > 1)
# %%
df["BEDP_MAN"].unique()
# %%
df.estimated_size(unit="mb")
# %%
target_bednr = df_raw["BEDARFNR"].to_list()
target_seq = df_raw["BEDP_SEQUENZ"].to_list()
# %%
stmt = (
sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
)
.where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr))
.where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq))
)
df_targets = pl.read_database(stmt, engine)
# %%
# df_targets.filter(pl.col("BEDARFNR") == 884174)
df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0)
# %%
# interesting order: 883697, 1, titleno: 7945981, 9964027
TITLE_NO = 7945981
# TITLE_NO = 9964027
stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO)
title_buy = pl.read_database(stmt, engine)
# %%
title_buy
# %% when were the orders placed
stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981)
title_order = pl.read_database(stmt, engine)
# %%
title_order
# -------------------------------------------------------------------------------------------
# %%
# title DB complete?
# - includes only titles which are deliverable since 01.06.2025 and who are assigned to
# buyer "Fröhlich"
stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map)
# %%
titles["MANDFUEHR"].unique()
# %%
unique_titles = set(titles["TI_NUMMER"].to_list())
len(unique_titles)
# %%
# requirements?
# - includes only order since 05.11.2025
stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
# %%
reqs
# %%
reqs["BEDP_MAN"].unique()
# %%
# intersection between all titles and the titles contained in the requirements table
unique_titles_req = set(reqs["BEDP_TITELNR"].to_list())
len(unique_titles_req)
# %%
intersection = unique_titles & unique_titles_req
len(intersection)
# %%
# orders?
# - includes only order since 05.11.2025
stmt = sql.select(db.EXT_AUFPAUF)
orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map)
# %%
orders.estimated_size(unit="mb")
# %%
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
df = dataframes[1]
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%
len(df)
# %%
df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0"))
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed")
df = pl.read_database(stmt, engine)
# %%
df
# %%
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%
# ** Petersen WDB
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(VM_CRITERION) > 0
res = _apply_several_filters(
df,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
# %%
res.in_
# %%
# !! show these entries
filter_number_vm = pl.col(VM_CRITERION) > 1
res_vm_crit = _apply_several_filters(
res.in_,
(filter_number_vm,),
)
# %%
res_vm_crit.out_
# %%
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months
title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list()
len(title_nos)
# %%
title_nos
# %%
# define starting date for 6 month interval
# returns UTC time
start_date = get_starting_date(180)
filter_ = sql.and_(
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos),
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
)
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
df_order
# %%
# filter entries which have
df_show = (
df_order.group_by("BESP_TITELNR")
.agg(pl.col("BESP_TITELNR").count().alias("count"))
.filter(pl.col("count") > 1)
)
df_show
# %%
# !! show these entries
# !! do not show others
entries_to_show = df_show["BESP_TITELNR"].to_list()
print(f"Number of entries relevant: {len(entries_to_show)}")
# %%
res_vm_crit.out_
# %%
filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode())
res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,))
# %%
res_wdb.in_
# %%
res_wdb.out_
# %%
# %%
# %%
# %%
schema = {}
for col in ("BEDARFNR", "BEDP_SEQUENZ"):
schema[col] = db.raw_data_query_schema_map[col]
base = pl.DataFrame(schema=schema)
# %%
data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))}
orig_data = pl.DataFrame(data, schema=schema)
data = orig_data[:5].clone()
# %%
pl.concat([base, data])
# %%
orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti")
# %%
orig_data[("BEDARFNR", "BEDP_SEQUENZ")]
# %%
raw_data = df.clone()
pipe_res = PipelineResult(raw_data)
pipe_res.open
# %%
pipe_res.results
# %%
sub_data = pipe_res.open[:20].clone()
sub_data
# %%
pipe_res.write_results(
sub_data,
vorlage=True,
wf_id=30,
freigabe_auto=types.Freigabe.WF_100,
is_out=True,
)
# %%
pipe_res.open
# %%
pipe_res.results
# %%
raw_data = df.clone()
pipe_res_main = PipelineResult(raw_data)
pipe_res_main.open
# %%
pipe_res_main.merge_pipeline(pipe_res)
# %%
pipe_res_main.open
# %%
pipe_res.results
# %%
remove_tmp_dir()
# %%