further analysis and lib structure

This commit is contained in:
Florian Förster 2025-12-03 14:09:10 +01:00
parent 83ec387009
commit b6a87909ff
4 changed files with 253 additions and 71 deletions

View File

@ -1,6 +1,7 @@
# %% # %%
import json import json
import time import time
import typing
from collections.abc import Sequence from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from pprint import pprint from pprint import pprint
@ -14,8 +15,9 @@ from umbreit import db, types
# %% # %%
# import importlib # import importlib
# db = importlib.reload(db)
# types = importlib.reload(types) # types = importlib.reload(types)
# db = importlib.reload(db)
# %% # %%
p_cfg = io.search_file_iterative( p_cfg = io.search_file_iterative(
@ -80,6 +82,8 @@ start_date = (current_dt - td).date()
print("Starting date: ", start_date) print("Starting date: ", start_date)
# %% # %%
# // ---------- LIVE DATA -----------
# TODO find way to filter more efficiently # TODO find way to filter more efficiently
# WF-200: filter for relevant orders with current BEDP set # WF-200: filter for relevant orders with current BEDP set
# missing: order types which are relevant # missing: order types which are relevant
@ -168,11 +172,14 @@ print(stmt.compile(engine))
# raw data query # raw data query
# TODO look for entries which do not have an associated title number # TODO look for entries which do not have an associated title number
print("--------------- ext_bedpbed --------------") print("--------------- raw data query --------------")
t1 = time.perf_counter() t1 = time.perf_counter()
# join_condition = sql.and_(
# db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
# db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
# )
join_condition = sql.and_( join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
) )
stmt = sql.select( stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDARFNR,
@ -199,36 +206,24 @@ elapsed = t2 - t1
print(f"Query duration: {elapsed:.4f} sec") print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df)) print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %% # %%
# // NO LIVE DATA NEEDED
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20251202-2.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow"
# df.write_ipc(p_save) # df.write_ipc(p_save)
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
# %% # %%
len(df) print(len(df))
df.head() df.head()
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
# %% # %%
df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER")) # ** CHECK: different MANDANTEN
# %%
# !! CHECK: null values set in the query with CASE statement
print(len(df.filter(pl.col("MELDENUMMER") == 18)))
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %%
# !! CHECK: titles with request where no title information is found
# not_in_title_table = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(
# pl.col("MELDENUMMER").is_null()
# )
# EXPORT_FEAT = "BEDP_TITELNR"
# to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
# p_save_not_in_title_table = Path.cwd() / "not_in_title_table.json"
# with open(p_save_not_in_title_table, "w") as file:
# json.dump(to_save, file, indent=4)
# %%
# !! CHECK: different MANDANTEN
# check for valid entries for unknown MANDANTEN # check for valid entries for unknown MANDANTEN
# MANDANTEN others than (1, 90) do not possess relevant properties such as
# "MELDENUMMER" and others --> conclusion: not relevant
# MANDANT = 80 # MANDANT = 80
# print(f"Mandant: {MANDANT}") # print(f"Mandant: {MANDANT}")
@ -243,6 +238,58 @@ df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# ).null_count() # ).null_count()
# ) # )
# print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts())) # print("Unique value counts: ", df.select(pl.col("BEDP_MAN").value_counts()))
# %%
# ** PREFILTER
# always needed, entries filtered out are to be disposed
df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26)
# %%
len(df)
# %%
# ** CHECK: null values set in the query with CASE statement
# not known if NULL because of CASE statement or already set in table
# unknown consequences: Are they relevant? How does it relate to "MENGE_VORMERKER"?
# from the title DB
df.filter(pl.col("BEDP_MENGE_BEDARF_VM").is_null())
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
# %%
df.select("MELDENUMMER").unique()
# %%
# ** CHECK: null values for "MENGE_VORMERKER"
df.filter(pl.col("MENGE_VORMERKER").is_null())
# df.filter(pl.col("BEDP_MENGE_BEDARF_VM") == 0)
agg_t = (
df.group_by(["MELDENUMMER"]).agg(
# pl.count("MENGE_VORMERKER").alias("pos_count").n_unique(),
pl.col("MENGE_VORMERKER").alias("VM_count").unique(),
)
# .filter(pl.col("count_customer") >= 0) # !! should be 3
) # .filter(pl.col("MELDENUMMER") == 18)
agg_t
df.filter(pl.col("MELDENUMMER") == 18).select(pl.col("MENGE_VORMERKER").is_null().sum())
# %%
# ** CHECK: relationship between "BEDP_MENGE_BEDARF_VM" and "MENGE_VORMERKER"
# ** not known at this point
# there are entries where BEDP_MENGE_BEDARF_VM > MENGE_VORMERKER -->
# BEDP_MENGE_BEDARF_VM as reference or ground truth not suitable
df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"?
# %%
# ** CHECK: titles with request where no title information is found
# result: there were entries found on 02.12., but not on 03.12.2025
not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null())
EXPORT_FEAT = "BEDP_TITELNR"
to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-1.json"
to_save
# with open(p_save_not_in_title_table, "w") as file:
# json.dump(to_save, file, indent=4)
# %%
print(len(df.filter(pl.col("MELDENUMMER") == 18)))
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
# %% # %%
# VM_CRITERION = "MENGE_VORMERKER" # VM_CRITERION = "MENGE_VORMERKER"
VM_CRITERION = "BEDP_MENGE_BEDARF_VM" VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
@ -273,16 +320,40 @@ def get_raw_data() -> pl.DataFrame:
) )
def get_empyt_result_df() -> pl.DataFrame: def get_empyt_pipeline_result(
data: pl.DataFrame,
) -> types.PipelineResult:
schema = db.results_schema_map.copy() schema = db.results_schema_map.copy()
del schema["id"] del schema["id"]
return pl.DataFrame(schema=schema) results = pl.DataFrame(schema=schema)
return types.PipelineResult(results=results, open=data)
def apply_several_filters( def prepare_base_data(
df: pl.DataFrame,
) -> pl.DataFrame:
"""pre-routine to handle non-feasible entries
Parameters
----------
df : pl.DataFrame
raw data collected from database query
Returns
-------
pl.DataFrame
pre-processed data
"""
df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
return df
def _apply_several_filters(
df: pl.DataFrame, df: pl.DataFrame,
filters: Sequence[pl.Expr], filters: Sequence[pl.Expr],
) -> tuple[pl.DataFrame, pl.DataFrame]: ) -> types.FilterResult:
df_current = df df_current = df
removed_rows: list[pl.DataFrame] = [] removed_rows: list[pl.DataFrame] = []
@ -294,53 +365,39 @@ def apply_several_filters(
df_removed = pl.concat(removed_rows) df_removed = pl.concat(removed_rows)
return df_current, df_removed return types.FilterResult(in_=df_current, out_=df_removed)
def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame:
df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
return df
def workflow_100_umbreit(
results: pl.DataFrame,
data: pl.DataFrame,
vm_criterion: str,
) -> tuple[pl.DataFrame, pl.DataFrame]:
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) > 0
relevant, filt = apply_several_filters(
data, (filter_meldenummer, filter_mandant, filter_number_vm)
)
results = _results_workflow_100(
results,
relevant,
vorlage=True,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
)
return results, filt
# post-processing the results # post-processing the results
def _results_workflow_100( # TODO: order quantity not always necessary
results: pl.DataFrame, # TODO: change relevant criterion for order quantity
def _write_results(
results_table: pl.DataFrame,
data: pl.DataFrame, data: pl.DataFrame,
vorlage: bool, vorlage: bool,
wf_id: int, wf_id: int,
freigabe_auto: types.Freigabe, freigabe_auto: types.Freigabe,
is_out: bool,
) -> pl.DataFrame: ) -> pl.DataFrame:
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
data = data.rename(db.map_to_result) data = data.rename(db.map_to_result)
order_qty_expr: pl.Expr
if is_out:
order_qty_expr = (
pl.lit(0)
.alias("ORDER_QTY_CRIT")
.alias("best_menge")
.cast(db.results_schema_map["best_menge"])
)
else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
data = data.with_columns( data = data.with_columns(
[ [
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]),
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]),
pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), order_qty_expr,
pl.lit(freigabe_auto.value) pl.lit(freigabe_auto.value)
.alias("freigabe_auto") .alias("freigabe_auto")
.cast(db.results_schema_map["freigabe_auto"]), .cast(db.results_schema_map["freigabe_auto"]),
@ -356,18 +413,130 @@ def _results_workflow_100(
] ]
) )
return pl.concat([results, data]) return pl.concat([results_table, data])
# main routine
# results for filtered out entries written
def workflow_910(
pipe_result: types.PipelineResult,
) -> types.PipelineResult:
filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters(
pipe_result.open,
filters=(
filter_mandant,
filter_ignore_MNR26,
),
)
# write results for entries which were filtered out
pipe_result.results = _write_results(
pipe_result.results,
data=res.out_,
vorlage=False,
wf_id=910,
freigabe_auto=types.Freigabe.WF_910,
is_out=True,
)
pipe_result.open = res.in_
return pipe_result
# this a main routine:
# receives and gives back result objects
def workflow_100_umbreit(
pipe_result: types.PipelineResult,
vm_criterion: str,
) -> types.PipelineResult:
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.results = _write_results(
results_table=pipe_result.results,
data=res.in_,
vorlage=True,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
is_out=False,
)
pipe_result.open = res.out_
return pipe_result
# Petersen not present in data # Petersen not present in data
# %% # %%
df_raw = get_raw_data() # SAVING/LOADING
df_start = prepare_base_data(df_raw) p_save = Path.cwd() / "raw_data_from_sql_query_20251203-1.arrow"
df_start # df.write_ipc(p_save)
df = pl.read_ipc(p_save)
print(f"Number of entries: {len(df)}")
# %% # %%
results_init = get_empyt_result_df() df.head()
# %%
removed_rows = []
raw_data = df.clone()
print(f"Length raw data: {len(raw_data)}")
filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
filtered = raw_data.filter(filter_mandant)
filtered_n = raw_data.filter(~filter_mandant)
num_filter = len(filtered)
num_filter_n = len(filtered_n)
removed_rows.append(filtered_n)
print(f"Length filtered: {num_filter}")
print(f"Length filtered out: {num_filter_n}")
print(f"Length all: {num_filter + num_filter_n}")
raw_data = filtered
out = pl.concat(removed_rows)
print(f"Length out: {len(out)}")
# %%
print("---------------------------------------")
filtered = raw_data.filter(filter_ignore_MNR26)
filtered_n = raw_data.filter(~filter_ignore_MNR26)
num_filter = len(filtered)
num_filter_n = len(filtered_n)
len(filtered_n)
# %%
removed_rows.append(filtered_n)
print(f"Length filtered: {num_filter}")
print(f"Length filtered out: {num_filter_n}")
print(f"Length all: {num_filter + num_filter_n}")
out = pl.concat(removed_rows)
print(f"Length out: {len(out)}")
# %%
raw_data = df.clone()
pipe_res = get_empyt_pipeline_result(raw_data)
pipe_res.results
# %%
pipe_res = workflow_910(pipe_res)
pipe_res
# df_start = prepare_base_data(df_raw)
# df_start
# %%
results_init = get_empyt_pipeline_result()
results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION)
# df is where results are known # df is where results are known
# filt_out contains entries for other workflows # filt_out contains entries for other workflows
@ -398,7 +567,7 @@ def _init_workflow_200_umbreit(
filter_mandant = pl.col("BEDP_MAN") == 1 filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) == 0 filter_number_vm = pl.col(vm_criterion) == 0
relevant, filt = apply_several_filters( relevant, filt = _apply_several_filters(
data, (filter_meldenummer, filter_mandant, filter_number_vm) data, (filter_meldenummer, filter_mandant, filter_number_vm)
) )

View File

@ -35,5 +35,5 @@ SELECT count(*) FROM EXT_BEDPBED;
-- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26; -- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26;
-- PROMPT ###################################### -- PROMPT ######################################
-- SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 6132326; SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 6132326;
SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 4591588; -- SELECT * FROM EXT_TITEL_INFO t_info WHERE t_info.TI_NUMMER = 4591588;

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import enum import enum
from dataclasses import dataclass from dataclasses import dataclass, field
from typing import TypeAlias from typing import TypeAlias
import polars as pl import polars as pl
@ -10,6 +10,19 @@ PolarsSchema: TypeAlias = dict[str, type[pl.DataType]]
PolarsNullValues: TypeAlias = dict[str, str] PolarsNullValues: TypeAlias = dict[str, str]
@dataclass(slots=True, kw_only=True, eq=False)
class FilterResult:
in_: pl.DataFrame
out_: pl.DataFrame
@dataclass(slots=True, kw_only=True, eq=False)
class PipelineResult:
results: pl.DataFrame
open: pl.DataFrame
class Freigabe(enum.Enum): class Freigabe(enum.Enum):
WF_100 = False WF_100 = False
WF_200 = False WF_200 = False
WF_910 = False