generated from dopt-python/py311
1334 lines
37 KiB
Python
1334 lines
37 KiB
Python
# %%
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import time
|
|
import typing
|
|
from collections.abc import Sequence
|
|
from pathlib import Path
|
|
from pprint import pprint
|
|
|
|
import dopt_basics.datetime as dt
|
|
import oracledb
|
|
import polars as pl
|
|
import sqlalchemy as sql
|
|
from dopt_basics import configs, io
|
|
from sqlalchemy import event
|
|
|
|
from umbreit import db, types
|
|
|
|
# %%
|
|
# import importlib
|
|
# types = importlib.reload(types)
|
|
# db = importlib.reload(db)
|
|
|
|
|
|
# %%
|
|
p_cfg = io.search_file_iterative(
|
|
starting_path=Path.cwd(),
|
|
glob_pattern="CRED*.toml",
|
|
stop_folder_name="umbreit-py",
|
|
)
|
|
assert p_cfg is not None
|
|
CFG = configs.load_toml(p_cfg)
|
|
HOST = CFG["server"]["host"]
|
|
PORT = CFG["server"]["port"]
|
|
SERVICE = CFG["server"]["service"]
|
|
USER_NAME = CFG["user"]["name"]
|
|
USER_PASS = CFG["user"]["pass"]
|
|
# %%
|
|
# !! init thick mode
|
|
# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29")
|
|
# assert p_oracle_client.exists()
|
|
# assert p_oracle_client.is_dir()
|
|
# oracledb.init_oracle_client(lib_dir=str(p_oracle_client))
|
|
# %%
|
|
conn_string = (
|
|
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
|
|
)
|
|
# engine = sql.create_engine(conn_string)
|
|
engine = sql.create_engine(conn_string, execution_options={"stream_results": True})
|
|
|
|
|
|
@event.listens_for(engine, "after_cursor_execute")
|
|
def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
|
|
cursor.arraysize = 1000
|
|
cursor.prefetchrows = 1000
|
|
|
|
|
|
# %%
|
|
########### RESULTS ###########
|
|
# temporary
|
|
res_engine = sql.create_engine("sqlite://")
|
|
db.metadata.create_all(res_engine, tables=(db.results,))
|
|
|
|
|
|
# %%
|
|
# delete existing results
|
|
def delete_results(
|
|
res_engine: sql.Engine,
|
|
) -> None:
|
|
with res_engine.begin() as conn:
|
|
res = conn.execute(sql.delete(db.results))
|
|
|
|
print("Rows deleted: ", res.rowcount)
|
|
|
|
|
|
delete_results(res_engine)
|
|
stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz)
|
|
with res_engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
print(res.all())
|
|
|
|
# %%
|
|
# define starting date for 3 month interval
|
|
# returns UTC time
|
|
current_dt = dt.current_time_tz(cut_microseconds=True)
|
|
print("Current DT: ", current_dt)
|
|
td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS)
|
|
print("Timedelta: ", td)
|
|
|
|
start_date = (current_dt - td).date()
|
|
print("Starting date: ", start_date)
|
|
|
|
# %%
|
|
# // ---------- LIVE DATA -----------
|
|
|
|
# TODO find way to filter more efficiently
|
|
# WF-200: filter for relevant orders with current BEDP set
|
|
# missing: order types which are relevant
|
|
filter_K_rech = (608991, 260202)
|
|
join_condition = sql.and_(
|
|
db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR,
|
|
db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT,
|
|
)
|
|
join_condition = sql.and_(
|
|
db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR,
|
|
)
|
|
where_condition = sql.and_(
|
|
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date,
|
|
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech),
|
|
)
|
|
|
|
stmt = (
|
|
sql.select(
|
|
db.ext_bedpbed.c.BEDARFNR,
|
|
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
|
db.ext_bedpbed.c.BEDP_TITELNR,
|
|
db.ext_bedpbed.c.BEDP_MAN,
|
|
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
|
db.EXT_AUFPAUF,
|
|
)
|
|
.select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition))
|
|
.where(where_condition)
|
|
.limit(100) # full query really slow
|
|
)
|
|
|
|
# %%
|
|
print(stmt.compile(engine))
|
|
# %%
|
|
df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
|
|
df_order
|
|
|
|
# %%
|
|
# AUFPAUF
|
|
# stmt = sql.select(db.EXT_AUFPAUF)
|
|
# df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map)
|
|
# df_aufpauf
|
|
# df_aufpauf.filter(pl.col("TITELNR") == 6315273)
|
|
|
|
# prefilter amount columns for invalid entries
|
|
|
|
# // tests with ext_bedpbed
|
|
# print("--------------- ext_bedpbed --------------")
|
|
# t1 = time.perf_counter()
|
|
# AMOUNT_COLS = frozenset(
|
|
# (
|
|
# "BEDP_MENGE_BEDARF",
|
|
# "BEDP_MENGE_VERKAUF",
|
|
# "BEDP_MENGE_ANFRAGE",
|
|
# "BEDP_MENGE_BESTELLUNG",
|
|
# "BEDP_MENGE_FREI",
|
|
# "BEDP_MENGE_BEDARF_VM",
|
|
# )
|
|
# )
|
|
|
|
# case_stmts = []
|
|
# for col in AMOUNT_COLS:
|
|
# case_stmts.append(
|
|
# sql.case(
|
|
# (db.ext_bedpbed.c[col] <= -1, sql.null()),
|
|
# else_=db.ext_bedpbed.c[col],
|
|
# ).label(col)
|
|
# )
|
|
|
|
# stmt = sql.select(
|
|
# *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS],
|
|
# *case_stmts,
|
|
# )
|
|
# df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
|
|
# t2 = time.perf_counter()
|
|
# elapsed = t2 - t1
|
|
|
|
# %%
|
|
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
|
|
print(f"Query duration: {elapsed:.4f} sec")
|
|
print("Number of entries: ", len(df))
|
|
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
|
|
|
|
# %%
|
|
# try title_info parsing
|
|
stmt = sql.select(db.ext_titel_info)
|
|
print(stmt.compile(engine))
|
|
|
|
|
|
# %%
|
|
# raw data query
|
|
# TODO look for entries which do not have an associated title number
|
|
|
|
print("--------------- raw data query --------------")
|
|
t1 = time.perf_counter()
|
|
# join_condition = sql.and_(
|
|
# db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
|
|
# db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
|
|
# )
|
|
join_condition = sql.and_(
|
|
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
|
|
)
|
|
stmt = sql.select(
|
|
db.ext_bedpbed.c.BEDARFNR,
|
|
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
|
db.ext_bedpbed.c.BEDP_TITELNR,
|
|
db.ext_bedpbed.c.BEDP_MAN,
|
|
sql.case(
|
|
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
|
|
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
|
).label("BEDP_MENGE_BEDARF_VM"),
|
|
db.ext_titel_info.c.MELDENUMMER,
|
|
db.ext_titel_info.c.VERLAGSNR,
|
|
db.ext_titel_info.c.MENGE_VORMERKER,
|
|
db.ext_titel_info.c.MANDFUEHR,
|
|
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
|
|
|
print(stmt.compile(engine))
|
|
df = pl.read_database(
|
|
stmt,
|
|
engine,
|
|
schema_overrides=db.raw_data_query_schema_map,
|
|
)
|
|
t2 = time.perf_counter()
|
|
elapsed = t2 - t1
|
|
# %%
|
|
print(f"Query duration: {elapsed:.4f} sec")
|
|
print("Number of entries: ", len(df))
|
|
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
|
|
# %%
|
|
df.head()
|
|
|
|
# %%
|
|
# // NO LIVE DATA NEEDED
|
|
# SAVING/LOADING
|
|
p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow"
|
|
# df.write_ipc(p_save)
|
|
df = pl.read_ipc(p_save)
|
|
# %%
|
|
print(len(df))
|
|
df.head()
|
|
# %%
|
|
df.head()
|
|
# %%
|
|
# ** CHECK: duplicates
|
|
temp = df.fill_null(0)
|
|
mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated()
|
|
temp.filter(mask)
|
|
# %%
|
|
df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False)
|
|
# %%
|
|
# ** CHECK: positions without titlenumber
|
|
df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique()
|
|
# %%
|
|
# ** CHECK: unique title number?
|
|
df.group_by("BEDP_TITELNR").agg(
|
|
pl.col("BEDP_TITELNR").len().alias("count"),
|
|
pl.col.BEDP_MAN.unique().alias("unique_bedp_man"),
|
|
pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"),
|
|
).unique().filter(pl.col("count") > 1)
|
|
# %%
|
|
df.filter(pl.col.BEDP_TITELNR == 8679893)
|
|
# %%
|
|
df.with_columns(
|
|
pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count")
|
|
).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter(
|
|
pl.col("titlenumber_count") > 1
|
|
)
|
|
# %%
|
|
# ** CHECK: distribution of MELDENUMMER
|
|
temp = df.filter(pl.col.BEDP_MAN.is_in((1, 90)))
|
|
sum_entries = len(temp)
|
|
temp = (
|
|
temp.group_by("MELDENUMMER")
|
|
.agg(pl.col("MELDENUMMER").len().alias("count"))
|
|
.sort("count", descending=True)
|
|
)
|
|
|
|
temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion"))
|
|
temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum"))
|
|
temp
|
|
# df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select(
|
|
# pl.len()
|
|
# )
|
|
# p_save = Path.cwd() / "meldenummer_anteile_20260114-2.xlsx"
|
|
# temp.write_excel(p_save)
|
|
# %%
|
|
# ** CHECK: differences MANDANT in BEDP and in TINFO
|
|
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
|
|
df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique())
|
|
|
|
# %%
|
|
df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique())
|
|
# %%
|
|
df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1)
|
|
|
|
# %%
|
|
# df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5)
|
|
df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null())
|
|
# %%
|
|
# ** CHECK: different MANDANTEN
|
|
# check for valid entries for unknown MANDANTEN
|
|
# MANDANTEN others than (1, 90) do not possess relevant properties such as
|
|
# "MELDENUMMER" and others --> conclusion: not relevant
|
|
|
|
# MANDANT = 80
|
|
|
|
# print(f"Mandant: {MANDANT}")
|
|
# print(
|
|
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
|
|
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
|
|
# )
|
|
# )
|
|
# print(
|
|
# df.filter(pl.col("BEDP_MAN") == MANDANT).select(
|
|
# ["BEDP_MENGE_BEDARF_VM", "MELDENUMMER", "MENGE_VORMERKER"]
|
|
# ).null_count()
|
|
# )
|
|
# print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts()))
|
|
# %%
|
|
df.filter(pl.col("MELDENUMMER").is_null()).filter(pl.col("MANDFUEHR").is_not_null())
|
|
# %%
|
|
# ** PREFILTER
|
|
# always needed, entries filtered out are to be disposed
|
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
|
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
|
|
df.filter(filter_meldenummer_null).filter(filter_mandant)
|
|
|
|
# df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26)
|
|
# %%
|
|
len(df)
|
|
# %%
|
|
# ** CHECK: null values set in the query with CASE statement
|
|
# not known if NULL because of CASE statement or already set in table
|
|
# unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"?
|
|
# from the title DB
|
|
df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null())
|
|
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
|
|
# %%
|
|
df.select("MELDENUMMER").unique()
|
|
# %%
|
|
# ** CHECK: null values for "MENGE_VORMERKER"
|
|
df.filter(pl.col("MENGE_VORMERKER").is_null())
|
|
# df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
|
|
|
|
agg_t = (
|
|
df.group_by(["MELDENUMMER"]).agg(
|
|
# pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(),
|
|
pl.col("MENGE_VORMERKER").alias("VM_count").unique(),
|
|
)
|
|
# .filter(pl.col("count_customer") >= 0) # !! should be 3
|
|
) # .filter(pl.col("MELDENUMMER") == 18)
|
|
agg_t
|
|
# %%
|
|
df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum())
|
|
|
|
# %%
|
|
# ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER"
|
|
# ** not known at this point
|
|
# there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER -->
|
|
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
|
|
df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
|
|
p_save_diff_VM_bedp_tinfo = (
|
|
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx"
|
|
)
|
|
from polars.datatypes import classes as pl_dtypes
|
|
|
|
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
|
|
# why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"?
|
|
# %%
|
|
# ** CHECK: titles with request where no title information is found
|
|
# result: there were entries found on 02.12., but not on 03.12.2025
|
|
not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null())
|
|
EXPORT_FEAT = "BEDP_TITELNR"
|
|
to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
|
|
p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251211-1.json"
|
|
print(to_save)
|
|
# with open(p_save_not_in_title_table, "w") as file:
|
|
# json.dump(to_save, file, indent=4)
|
|
# %%
|
|
df.group_by("BEDP_MAN").agg(pl.len())
|
|
# %%
|
|
df.filter(pl.col("MELDENUMMER").is_null()).group_by("BEDP_MAN").agg(pl.len().alias("count"))
|
|
# %%
|
|
print(len(df.filter(pl.col("MELDENUMMER") == 18)))
|
|
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
|
|
# %%
|
|
# VM_CRITERION = "MENGE_VORMERKER"
|
|
VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
|
|
MANDANT_CRITERION = "BEDP_MAN"
|
|
|
|
|
|
def get_starting_date(
|
|
days: int,
|
|
) -> datetime.date:
|
|
current_dt = dt.current_time_tz(cut_microseconds=True)
|
|
td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS)
|
|
|
|
return (current_dt - td).date()
|
|
|
|
|
|
# TODO exchange to new query focusing on TINFO table
|
|
def get_raw_data() -> pl.DataFrame:
|
|
join_condition = sql.and_(
|
|
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
|
|
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
|
|
)
|
|
stmt = sql.select(
|
|
db.ext_bedpbed.c.BEDARFNR,
|
|
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
|
db.ext_bedpbed.c.BEDP_TITELNR,
|
|
db.ext_bedpbed.c.BEDP_MAN,
|
|
sql.case(
|
|
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
|
|
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
|
).label("BEDP_MENGE_BEDARF_VM"),
|
|
db.ext_titel_info.c.MELDENUMMER,
|
|
db.ext_titel_info.c.MENGE_VORMERKER,
|
|
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
|
|
|
|
return pl.read_database(
|
|
stmt,
|
|
engine,
|
|
schema_overrides=db.raw_data_query_schema_map,
|
|
)
|
|
|
|
|
|
# def get_empty_pipeline_result(
|
|
# data: pl.DataFrame,
|
|
# ) -> PipelineResult:
|
|
# return PipelineResult(data)
|
|
|
|
|
|
def _apply_several_filters(
|
|
df: pl.DataFrame,
|
|
filters: Sequence[pl.Expr],
|
|
) -> types.FilterResult:
|
|
df_current = df
|
|
removed_rows: list[pl.DataFrame] = []
|
|
|
|
for filter in filters:
|
|
removed = df_current.filter(~filter)
|
|
removed_rows.append(removed)
|
|
|
|
df_current = df_current.filter(filter)
|
|
|
|
df_removed = pl.concat(removed_rows)
|
|
|
|
return types.FilterResult(in_=df_current, out_=df_removed)
|
|
|
|
|
|
class PipelineResult:
|
|
__slots__ = ("_results", "_open", "_subtracted_indices")
|
|
_index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ")
|
|
|
|
def __init__(
|
|
self,
|
|
data: pl.DataFrame,
|
|
) -> None:
|
|
self._open = data
|
|
schema = db.results_schema_map.copy()
|
|
del schema["id"]
|
|
self._results = pl.DataFrame(schema=schema)
|
|
|
|
schema = {}
|
|
for col in self._index_cols:
|
|
schema[col] = db.raw_data_query_schema_map[col]
|
|
self._subtracted_indices = pl.DataFrame(schema=schema)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._results) + len(self._open)
|
|
|
|
@property
|
|
def open(self) -> pl.DataFrame:
|
|
return self._open
|
|
|
|
@property
|
|
def results(self) -> pl.DataFrame:
|
|
return self._results
|
|
|
|
@property
|
|
def subtracted_indices(self) -> pl.DataFrame:
|
|
return self._subtracted_indices
|
|
|
|
def update_open(
|
|
self,
|
|
data: pl.DataFrame,
|
|
) -> None:
|
|
self._open = data
|
|
|
|
def _subtract_data(
|
|
self,
|
|
data: pl.DataFrame,
|
|
) -> None:
|
|
self._open = self._open.join(data, on=self._index_cols, how="anti")
|
|
self._subtracted_indices = pl.concat(
|
|
(self._subtracted_indices, data[self._index_cols])
|
|
)
|
|
|
|
def _add_results(
|
|
self,
|
|
data: pl.DataFrame,
|
|
) -> None:
|
|
self._results = pl.concat([self._results, data])
|
|
|
|
def merge_pipeline(
|
|
self,
|
|
pipeline: PipelineResult,
|
|
) -> None:
|
|
self._subtract_data(pipeline.subtracted_indices)
|
|
self._add_results(pipeline.results)
|
|
|
|
def write_results(
|
|
self,
|
|
data: pl.DataFrame,
|
|
vorlage: bool,
|
|
wf_id: int,
|
|
freigabe_auto: types.Freigabe,
|
|
is_out: bool,
|
|
) -> None:
|
|
# TODO move to other position
|
|
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
|
|
|
|
results = data.rename(db.map_to_result)
|
|
order_qty_expr: pl.Expr
|
|
if is_out:
|
|
order_qty_expr = (
|
|
pl.lit(0)
|
|
.alias("ORDER_QTY_CRIT")
|
|
.alias("best_menge")
|
|
.cast(db.results_schema_map["best_menge"])
|
|
)
|
|
else:
|
|
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
|
|
|
|
results = results.with_columns(
|
|
[
|
|
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
|
|
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
|
|
order_qty_expr,
|
|
pl.lit(freigabe_auto.value)
|
|
.alias("freigabe_auto")
|
|
.cast(db.results_schema_map["freigabe_auto"]),
|
|
]
|
|
)
|
|
results = results.drop(
|
|
[
|
|
"BEDP_TITELNR",
|
|
"BEDP_MAN",
|
|
"BEDP_MENGE_BEDARF_VM",
|
|
"MELDENUMMER",
|
|
"VERLAGSNR",
|
|
"MENGE_VORMERKER",
|
|
"MANDFUEHR",
|
|
]
|
|
)
|
|
|
|
self._subtract_data(data)
|
|
self._add_results(results)
|
|
|
|
|
|
# post-processing the results
|
|
# TODO: order quantity not always necessary
|
|
# TODO: change relevant criterion for order quantity
|
|
# def _write_results(
|
|
# pipe_result: PipelineResult,
|
|
# data: pl.DataFrame,
|
|
# vorlage: bool,
|
|
# wf_id: int,
|
|
# freigabe_auto: types.Freigabe,
|
|
# is_out: bool,
|
|
# ) -> PipelineResult:
|
|
# ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
|
|
|
|
# results = data.rename(db.map_to_result)
|
|
# order_qty_expr: pl.Expr
|
|
# if is_out:
|
|
# order_qty_expr = (
|
|
# pl.lit(0)
|
|
# .alias("ORDER_QTY_CRIT")
|
|
# .alias("best_menge")
|
|
# .cast(db.results_schema_map["best_menge"])
|
|
# )
|
|
# else:
|
|
# order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
|
|
|
|
# results = results.with_columns(
|
|
# [
|
|
# pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
|
|
# pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
|
|
# order_qty_expr,
|
|
# pl.lit(freigabe_auto.value)
|
|
# .alias("freigabe_auto")
|
|
# .cast(db.results_schema_map["freigabe_auto"]),
|
|
# ]
|
|
# )
|
|
# results = results.drop(
|
|
# [
|
|
# "BEDP_TITELNR",
|
|
# "BEDP_MAN",
|
|
# "BEDP_MENGE_BEDARF_VM",
|
|
# "MELDENUMMER",
|
|
# "VERLAGSNR",
|
|
# "MENGE_VORMERKER",
|
|
# "MANDFUEHR",
|
|
# ]
|
|
# )
|
|
|
|
# pipe_result.subtract_from_open(data)
|
|
# pipe_result.add_results(results)
|
|
|
|
# return pipe_result
|
|
|
|
|
|
def workflow_900(
|
|
pipe_result: PipelineResult,
|
|
) -> PipelineResult:
|
|
"""filter 'Meldenummer' and fill non-feasible entries"""
|
|
|
|
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
|
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
|
res = _apply_several_filters(
|
|
pipe_res.open,
|
|
(
|
|
filter_meldenummer_null,
|
|
filter_mandant,
|
|
),
|
|
)
|
|
|
|
pipe_result.write_results(
|
|
data=res.out_,
|
|
vorlage=False,
|
|
wf_id=900,
|
|
freigabe_auto=types.Freigabe.WF_900,
|
|
is_out=True,
|
|
)
|
|
|
|
pipe_result.update_open(res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)))
|
|
pipe_result.update_open(res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)))
|
|
|
|
return pipe_result
|
|
|
|
|
|
def workflow_910(
|
|
pipe_result: PipelineResult,
|
|
) -> PipelineResult:
|
|
# TODO check if necessary because of WF-900
|
|
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
|
|
|
|
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
|
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
filters=(
|
|
filter_mandant,
|
|
filter_ignore_MNR26,
|
|
),
|
|
)
|
|
# write results for entries which were filtered out
|
|
pipe_result.write_results(
|
|
data=res.out_,
|
|
vorlage=False,
|
|
wf_id=910,
|
|
freigabe_auto=types.Freigabe.WF_910,
|
|
is_out=True,
|
|
)
|
|
|
|
return pipe_result
|
|
|
|
|
|
# this a main routine:
|
|
# receives and gives back result objects
|
|
def workflow_100_umbreit(
|
|
pipe_result: PipelineResult,
|
|
vm_criterion: str,
|
|
) -> PipelineResult:
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 1
|
|
filter_number_vm = pl.col(vm_criterion) > 0
|
|
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=True,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
return pipe_result
|
|
|
|
|
|
def workflow_100_petersen(
|
|
pipe_result: PipelineResult,
|
|
vm_criterion: str,
|
|
) -> PipelineResult:
|
|
# difference WDB and others
|
|
|
|
# // WDB branch
|
|
# order quantity 0, no further action in other WFs
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
filter_number_vm = pl.col(vm_criterion) == 0
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_WDB,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=False,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
# TODO add check for orders or quantity to transform
|
|
# TODO show them
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
filter_number_vm = pl.col(vm_criterion) > 0
|
|
res_candidates = _apply_several_filters(
|
|
pipe_result.open,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_WDB,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
wdb_sub_pipe = PipelineResult(res_candidates.in_)
|
|
wdb_sub_pipe = wf100_petersen_wdb_sub1(wdb_sub_pipe)
|
|
assert len(wdb_sub_pipe.open) == 0
|
|
pipe_result.merge_pipeline(wdb_sub_pipe)
|
|
|
|
# // other branch
|
|
# show always entries with #VM > 1
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
filter_number_vm = pl.col(vm_criterion) > 1
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=True,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
filter_number_vm = pl.col(vm_criterion) > 0
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=True,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
return pipe_result
|
|
|
|
|
|
def wf100_petersen_wdb_sub1(
|
|
pipe_result: PipelineResult,
|
|
) -> PipelineResult:
|
|
# input: pre-filtered entries (WDB titles and #VM > 0)
|
|
# more then 1 VM
|
|
# !! show these entries
|
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(filter_number_vm,),
|
|
)
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=True,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
|
|
# past 6 months
|
|
start_date = get_starting_date(180)
|
|
filter_ = sql.and_(
|
|
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()),
|
|
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
|
|
)
|
|
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
|
|
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
|
|
entries_show = (
|
|
df_order.group_by("BESP_TITELNR")
|
|
.agg(pl.col("BESP_TITELNR").count().alias("count"))
|
|
.filter(pl.col("count") > 1)
|
|
)
|
|
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list())
|
|
res = _apply_several_filters(pipe_result.open, (filter_titleno,))
|
|
pipe_result.write_results(
|
|
data=res.in_,
|
|
vorlage=True,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
pipe_result.write_results(
|
|
data=pipe_result.open,
|
|
vorlage=False,
|
|
wf_id=100,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=False,
|
|
)
|
|
|
|
return pipe_result
|
|
|
|
|
|
# %%
|
|
# SAVING/LOADING
|
|
p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow"
|
|
df = pl.read_ipc(p_save)
|
|
print(f"Number of entries: {len(df)}")
|
|
|
|
# %%
|
|
df.head()
|
|
|
|
|
|
# %%
|
|
# removed_rows = []
|
|
|
|
# raw_data = df.clone()
|
|
# print(f"Length raw data: {len(raw_data)}")
|
|
# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
|
|
# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
|
|
|
|
|
|
# filtered = raw_data.filter(filter_mandant)
|
|
# filtered_n = raw_data.filter(~filter_mandant)
|
|
# num_filter = len(filtered)
|
|
# num_filter_n = len(filtered_n)
|
|
# removed_rows.append(filtered_n)
|
|
# print(f"Length filtered: {num_filter}")
|
|
# print(f"Length filtered out: {num_filter_n}")
|
|
# print(f"Length all: {num_filter + num_filter_n}")
|
|
# raw_data = filtered
|
|
# out = pl.concat(removed_rows)
|
|
# print(f"Length out: {len(out)}")
|
|
|
|
# # %%
|
|
# print("---------------------------------------")
|
|
# filtered = raw_data.filter(filter_ignore_MNR26)
|
|
# filtered_n = raw_data.filter(~filter_ignore_MNR26)
|
|
# num_filter = len(filtered)
|
|
# num_filter_n = len(filtered_n)
|
|
# len(filtered_n)
|
|
# # %%
|
|
# removed_rows.append(filtered_n)
|
|
# print(f"Length filtered: {num_filter}")
|
|
# print(f"Length filtered out: {num_filter_n}")
|
|
# print(f"Length all: {num_filter + num_filter_n}")
|
|
# out = pl.concat(removed_rows)
|
|
# print(f"Length out: {len(out)}")
|
|
|
|
# %%
|
|
raw_data = df.clone()
|
|
# pipe_res = get_empty_pipeline_result(raw_data)
|
|
pipe_res = PipelineResult(raw_data)
|
|
pipe_res.results
|
|
pipe_res = workflow_900(pipe_res)
|
|
print(f"Length of base data: {len(raw_data):>18}")
|
|
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
|
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
|
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|
|
|
# %%
|
|
pipe_res.results
|
|
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
|
|
# %%
|
|
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
|
|
# %%
|
|
pipe_res = workflow_910(pipe_res)
|
|
print(f"Length of base data: {len(raw_data):>18}")
|
|
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
|
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
|
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|
# %%
|
|
# pipe_res.results.select(pl.col("vorlage").value_counts())
|
|
# %%
|
|
pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION)
|
|
print(f"Length of base data: {len(raw_data):>18}")
|
|
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
|
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
|
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|
# %%
|
|
pipe_res = workflow_100_petersen(pipe_res, VM_CRITERION)
|
|
print(f"Length of base data: {len(raw_data):>18}")
|
|
print(f"Number of entries pipe data: {len(pipe_res):>10}")
|
|
print(f"Number of entries result data: {len(pipe_res.results):>8}")
|
|
print(f"Number of entries open data: {len(pipe_res.open):>10}")
|
|
# %%
|
|
pipe_res.open.filter(pl.col.MELDENUMMER == 18).filter(pl.col.BEDP_MENGE_BEDARF_VM > 0)
|
|
|
|
# %%
|
|
pipe_res.results.select(pl.col("vorlage").value_counts())
|
|
# %%
|
|
pipe_res.results.filter(pl.col("vorlage") == True)
|
|
# %%
|
|
raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3)
|
|
# %%
|
|
raw_data.head()
|
|
|
|
# %%
|
|
filt_out
|
|
|
|
|
|
# %%
|
|
# ---------------------------------------------------------------------------- #
|
|
# Workflow 200 (Umbreit only)
|
|
# ---------------------------------------------------------------------------- #
|
|
# %%
|
|
wf_200_start_data = pipe_res.open.clone()
|
|
wf_200_start_data
|
|
|
|
|
|
# %%
|
|
def workflow_200_umbreit(
|
|
pipe_result: PipelineResult,
|
|
) -> PipelineResult:
|
|
relevant_mnr: tuple[int, ...] = (17, 18)
|
|
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
|
filter_mandant = pl.col("BEDP_MAN") == 1
|
|
# not relevant, because already done in WF-100
|
|
# filter_number_vm = pl.col(vm_criterion) == 0
|
|
|
|
res = _apply_several_filters(
|
|
pipe_result.open,
|
|
(filter_meldenummer, filter_mandant),
|
|
)
|
|
|
|
|
|
relevant_mnr: tuple[int, ...] = (17, 18)
|
|
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
|
|
filter_mandant = pl.col("BEDP_MAN") == 1
|
|
res = _apply_several_filters(
|
|
pipe_res.open,
|
|
(filter_meldenummer, filter_mandant),
|
|
)
|
|
# %%
|
|
# these entries must be checked for relevant orders
|
|
# therefore, a temp table must be created in the database to execute efficient
|
|
# queries, other approaches are just hacks
|
|
# SOLUTION:
|
|
# - save these entries to a temp table 'temp'
|
|
# - look up the order history of the past 3 months
|
|
# -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR
|
|
# -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND
|
|
# -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND
|
|
# --
|
|
res.in_
|
|
# %%
|
|
# // demo query with IN statement
|
|
data = res.in_.clone()
|
|
title_sub_choice = data["BEDP_TITELNR"][:300].to_list()
|
|
rel_date = get_starting_date(90)
|
|
rel_date
|
|
|
|
# %%
|
|
filter_ = sql.and_(
|
|
db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice),
|
|
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date,
|
|
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
|
|
)
|
|
stmt = sql.select(db.EXT_AUFPAUF).where(filter_)
|
|
|
|
print(stmt.compile(engine))
|
|
|
|
# %%
|
|
demo = pl.read_database(
|
|
stmt,
|
|
engine,
|
|
schema_overrides=db.EXT_AUFPAUF_schema_map,
|
|
)
|
|
# %%
|
|
demo.head()
|
|
# %%
|
|
demo_2 = demo.clone()
|
|
demo_2.head()
|
|
print(f"Number of titles before filtering: {len(demo_2)}")
|
|
demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99)))
|
|
demo_2 = (
|
|
demo_2.group_by("TITELNR", maintain_order=True)
|
|
.agg(
|
|
pl.len().alias("count"),
|
|
pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"),
|
|
)
|
|
.filter(pl.col.customer_count >= 3)
|
|
)
|
|
|
|
# these remaining titles are relevant and should be shown
|
|
# the others should be disposed
|
|
print(f"Number of titles which are relevant: {len(demo_2)}")
|
|
print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}")
|
|
demo_2
|
|
# %%
|
|
df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION)
|
|
df
|
|
|
|
# %%
|
|
df.filter(pl.col("BEDARFNR") == 884607)
|
|
|
|
# %%
|
|
df_order.filter(pl.col("BEDARFNR") == 884607)
|
|
|
|
# %%
|
|
# now obtain order data for entries
|
|
t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner")
|
|
t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0))
|
|
t
|
|
# %%
|
|
agg_t = (
|
|
t.group_by(["BEDARFNR", "BEDP_SEQUENZ"])
|
|
.agg(
|
|
pl.count("AUFP_POSITION").alias("pos_count"),
|
|
pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(),
|
|
)
|
|
.filter(pl.col("count_customer") >= 0) # !! should be 3
|
|
)
|
|
agg_t
|
|
|
|
# %%
|
|
df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65))
|
|
|
|
# %%
|
|
# ---------------------------------------------------------------------------- #
|
|
# Writing results in DB
|
|
# ---------------------------------------------------------------------------- #
|
|
|
|
delete_results()
|
|
pipe_post.write_database(db.results.fullname, engine, if_table_exists="append")
|
|
|
|
stmt = sql.select(db.results)
|
|
db_results = pl.read_database(stmt, engine)
|
|
db_results
|
|
|
|
# ---------------------------------------------------------------------------- #
|
|
# Further Data Analysis
|
|
# ---------------------------------------------------------------------------- #
|
|
# %%
|
|
stmt = sql.select(db.ext_bedpbed)
|
|
df = pl.read_database(
|
|
stmt,
|
|
engine,
|
|
schema_overrides=db.ext_bedpbed_schema_map,
|
|
)
|
|
# %%
|
|
df.group_by("BEDP_TITELNR").agg(
|
|
pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN")
|
|
).filter(pl.col("unique_BEDP_MAN") > 1)
|
|
# %%
|
|
df["BEDP_MAN"].unique()
|
|
# %%
|
|
df.estimated_size(unit="mb")
|
|
# %%
|
|
target_bednr = df_raw["BEDARFNR"].to_list()
|
|
target_seq = df_raw["BEDP_SEQUENZ"].to_list()
|
|
# %%
|
|
stmt = (
|
|
sql.select(
|
|
db.ext_bedpbed.c.BEDARFNR,
|
|
db.ext_bedpbed.c.BEDP_SEQUENZ,
|
|
db.ext_bedpbed.c.BEDP_TITELNR,
|
|
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
|
|
)
|
|
.where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr))
|
|
.where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq))
|
|
)
|
|
df_targets = pl.read_database(stmt, engine)
|
|
# %%
|
|
# df_targets.filter(pl.col("BEDARFNR") == 884174)
|
|
df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0)
|
|
|
|
# %%
|
|
# interesting order: 883697, 1, titleno: 7945981, 9964027
|
|
TITLE_NO = 7945981
|
|
# TITLE_NO = 9964027
|
|
stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO)
|
|
title_buy = pl.read_database(stmt, engine)
|
|
# %%
|
|
title_buy
|
|
|
|
# %% when were the orders placed
|
|
stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981)
|
|
title_order = pl.read_database(stmt, engine)
|
|
# %%
|
|
title_order
|
|
|
|
# -------------------------------------------------------------------------------------------
|
|
|
|
# %%
|
|
# title DB complete?
|
|
# - includes only titles which are deliverable since 01.06.2025 and who are assigned to
|
|
# buyer "Fröhlich"
|
|
stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
|
|
titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map)
|
|
# %%
|
|
titles["MANDFUEHR"].unique()
|
|
# %%
|
|
unique_titles = set(titles["TI_NUMMER"].to_list())
|
|
len(unique_titles)
|
|
|
|
# %%
|
|
# requirements?
|
|
# - includes only order since 05.11.2025
|
|
stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
|
|
reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
|
|
# %%
|
|
reqs
|
|
|
|
# %%
|
|
reqs["BEDP_MAN"].unique()
|
|
|
|
# %%
|
|
# intersection between all titles and the titles contained in the requirements table
|
|
unique_titles_req = set(reqs["BEDP_TITELNR"].to_list())
|
|
len(unique_titles_req)
|
|
# %%
|
|
intersection = unique_titles & unique_titles_req
|
|
len(intersection)
|
|
# %%
|
|
# orders?
|
|
# - includes only order since 05.11.2025
|
|
stmt = sql.select(db.EXT_AUFPAUF)
|
|
orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map)
|
|
|
|
# %%
|
|
orders.estimated_size(unit="mb")
|
|
|
|
# %%
|
|
with engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
print(res.all())
|
|
|
|
# %%
|
|
stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273")
|
|
with engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
print(res.all())
|
|
|
|
# %%
|
|
stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2")
|
|
with engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
print(res.all())
|
|
# %%
|
|
stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977")
|
|
with engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
print(res.all())
|
|
|
|
# %%
|
|
df = dataframes[1]
|
|
# %%
|
|
col_dtype = {}
|
|
for col, dtype in zip(df.columns, df.dtypes):
|
|
col_dtype[col] = dtype
|
|
|
|
print("dtypes of DF...")
|
|
pprint(col_dtype)
|
|
# %%
|
|
len(df)
|
|
# %%
|
|
df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0"))
|
|
# %%
|
|
stmt = sql.text("SELECT * FROM ext_bedpbed")
|
|
df = pl.read_database(stmt, engine)
|
|
|
|
# %%
|
|
df
|
|
# %%
|
|
# %%
|
|
col_dtype = {}
|
|
for col, dtype in zip(df.columns, df.dtypes):
|
|
col_dtype[col] = dtype
|
|
|
|
print("dtypes of DF...")
|
|
pprint(col_dtype)
|
|
# %%
|
|
# ** Petersen WDB
|
|
filter_meldenummer = pl.col("MELDENUMMER") == 18
|
|
filter_mandant = pl.col(MANDANT_CRITERION) == 90
|
|
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
|
|
filter_number_vm = pl.col(VM_CRITERION) > 0
|
|
|
|
res = _apply_several_filters(
|
|
df,
|
|
(
|
|
filter_meldenummer,
|
|
filter_mandant,
|
|
filter_WDB,
|
|
filter_number_vm,
|
|
),
|
|
)
|
|
# %%
|
|
res.in_
|
|
# %%
|
|
# !! show these entries
|
|
filter_number_vm = pl.col(VM_CRITERION) > 1
|
|
|
|
res_vm_crit = _apply_several_filters(
|
|
res.in_,
|
|
(filter_number_vm,),
|
|
)
|
|
# %%
|
|
res_vm_crit.out_
|
|
# %%
|
|
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months
|
|
title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list()
|
|
len(title_nos)
|
|
# %%
|
|
title_nos
|
|
|
|
# %%
|
|
# define starting date for 6 month interval
|
|
# returns UTC time
|
|
start_date = get_starting_date(180)
|
|
filter_ = sql.and_(
|
|
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos),
|
|
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
|
|
)
|
|
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
|
|
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
|
|
df_order
|
|
# %%
|
|
# filter entries which have
|
|
df_show = (
|
|
df_order.group_by("BESP_TITELNR")
|
|
.agg(pl.col("BESP_TITELNR").count().alias("count"))
|
|
.filter(pl.col("count") > 1)
|
|
)
|
|
df_show
|
|
# %%
|
|
# !! show these entries
|
|
# !! do not show others
|
|
entries_to_show = df_show["BESP_TITELNR"].to_list()
|
|
print(f"Number of entries relevant: {len(entries_to_show)}")
|
|
# %%
|
|
res_vm_crit.out_
|
|
# %%
|
|
filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode())
|
|
|
|
res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,))
|
|
|
|
# %%
|
|
res_wdb.in_
|
|
# %%
|
|
res_wdb.out_
|
|
|
|
|
|
# %%
|
|
|
|
|
|
# %%
|
|
# %%
|
|
# %%
|
|
schema = {}
|
|
for col in ("BEDARFNR", "BEDP_SEQUENZ"):
|
|
schema[col] = db.raw_data_query_schema_map[col]
|
|
base = pl.DataFrame(schema=schema)
|
|
# %%
|
|
data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))}
|
|
orig_data = pl.DataFrame(data, schema=schema)
|
|
data = orig_data[:5].clone()
|
|
# %%
|
|
pl.concat([base, data])
|
|
# %%
|
|
orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti")
|
|
# %%
|
|
orig_data[("BEDARFNR", "BEDP_SEQUENZ")]
|
|
# %%
|
|
raw_data = df.clone()
|
|
pipe_res = PipelineResult(raw_data)
|
|
pipe_res.open
|
|
# %%
|
|
pipe_res.results
|
|
# %%
|
|
sub_data = pipe_res.open[:20].clone()
|
|
sub_data
|
|
# %%
|
|
pipe_res.write_results(
|
|
sub_data,
|
|
vorlage=True,
|
|
wf_id=30,
|
|
freigabe_auto=types.Freigabe.WF_100,
|
|
is_out=True,
|
|
)
|
|
# %%
|
|
pipe_res.open
|
|
# %%
|
|
pipe_res.results
|
|
# %%
|
|
raw_data = df.clone()
|
|
pipe_res_main = PipelineResult(raw_data)
|
|
pipe_res_main.open
|
|
# %%
|
|
pipe_res_main.merge_pipeline(pipe_res)
|
|
# %%
|
|
pipe_res_main.open
|
|
# %%
|
|
pipe_res.results
|
|
# %%
|