From 4eeb92f939b28e75187ccb8fee6115678dc701e1 Mon Sep 17 00:00:00 2001 From: foefl Date: Fri, 14 Nov 2025 14:06:03 +0100 Subject: [PATCH] adapt architecture, added new prototype for WF-200 --- data_analysis/02-1_initial_workflow_test.py | 303 ++++++++++++++------ pdm.lock | 18 +- pyproject.toml | 2 +- src/umbreit/db.py | 23 +- src/umbreit/types.py | 6 + 5 files changed, 264 insertions(+), 88 deletions(-) diff --git a/data_analysis/02-1_initial_workflow_test.py b/data_analysis/02-1_initial_workflow_test.py index 13299cb..3347648 100644 --- a/data_analysis/02-1_initial_workflow_test.py +++ b/data_analysis/02-1_initial_workflow_test.py @@ -1,16 +1,21 @@ # %% -import importlib from collections.abc import Sequence from pathlib import Path from pprint import pprint +import dopt_basics.datetime as dt import polars as pl import sqlalchemy as sql -from umbreit import db +from umbreit import db, types # %% +# import importlib # db = importlib.reload(db) +# types = importlib.reload(types) + +# %% +types.Freigabe.WF_100.value # %% db_path = (Path.cwd() / "../data/data.db").resolve() @@ -20,51 +25,106 @@ assert data_path.exists() and data_path.is_dir() engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True) + # %% +# delete existing results +def delete_results() -> None: + with engine.begin() as conn: + res = conn.execute(sql.delete(db.results)) + + print("Rows deleted: ", res.rowcount) + + +# %% +delete_results() +stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +current_dt = dt.current_time_tz(cut_microseconds=True) +current_dt +td = dt.timedelta_from_val(90, dt.TimeUnitsTimedelta.DAYS) +td + +# %% +start_dt = current_dt - td +start_date = dt.dt_to_timezone(start_dt, target_tz=dt.TIMEZONE_CEST).date() +start_date + +# %% +# WF-200: filter for relevant orders with current BEDP set +# missing: order types which are relevant +filter_K_rech = (608991, 260202) join_condition = sql.and_( - db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, - db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, + db.ext_bedpbed.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR, + db.ext_bedpbed.c.BEDP_MAN == db.EXT_AUFPAUF.c.MANDANT, +) +where_condition = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM > start_date, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in(filter_K_rech), ) -stmt = sql.select( - db.ext_bedpbed.c.BEDARFNR, - db.ext_bedpbed.c.BEDP_SEQUENZ, - db.ext_bedpbed.c.BEDP_TITELNR, - db.ext_bedpbed.c.BEDP_MAN, - db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, - db.ext_titel_info.c.MELDENUMMER, - db.ext_titel_info.c.MENGE_VORMERKER, -).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition)) +stmt = ( + sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + db.EXT_AUFPAUF, + ) + .select_from(db.ext_bedpbed.join(db.EXT_AUFPAUF, join_condition)) + .where(where_condition) +) # %% print(stmt.compile(engine)) # %% -df_raw = pl.read_database(stmt, engine) -# %% -df_raw +df_order = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) +df_order # %% -filter_meldenummer = pl.col("MELDENUMMER") == 18 +# AUFPAUF +stmt = sql.select(db.EXT_AUFPAUF) +df_aufpauf = pl.read_database(stmt, engine, schema_overrides=db.raw_data_query_schema_map) +df_aufpauf # %% -# df_new = df.filter(pl.col("MENGE_VORMERKER").is_not_null() & pl.col("MENGE_VORMERKER") > 0) -# filter mandant: Umbreit -filter_mandant_umbreit = pl.col("BEDP_MAN") == 1 -df_mandant = df_raw.filter(filter_mandant_umbreit) -df_mandant - +df_aufpauf.filter(pl.col("TITELNR") == 6315273) # %% -# filter #VM # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION = "BEDP_MENGE_BEDARF_VM" -df_mandant = df_mandant.with_columns(pl.col(VM_CRITERION).fill_null(0)) -filter_vm = pl.col(VM_CRITERION) > 0 # pl.col("MENGE_VORMERKER").is_not_null() & -df_new = df_mandant.filter(filter_vm) -# df_new = df_mandant.filter(pl.col("MENGE_VORMERKER").is_not_null()).filter(pl.col("MENGE_VORMERKER") > 0) -df_new -# %% + +def get_raw_data() -> pl.DataFrame: + join_condition = sql.and_( + db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, + db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, + ) + stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.MENGE_VORMERKER, + ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition)) + + return pl.read_database( + stmt, + engine, + schema_overrides=db.raw_data_query_schema_map, + ) + + +def get_empyt_result_df() -> pl.DataFrame: + schema = db.results_schema_map.copy() + del schema["id"] + return pl.DataFrame(schema=schema) def apply_several_filters( @@ -87,62 +147,54 @@ def apply_several_filters( def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame: df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) + df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) return df -# def workflow_100_start( -# df: pl.DataFrame, -# ) -> tuple[pl.DataFrame, pl.DataFrame]: -# return apply_several_filters(df, (filter,)) - - def workflow_100_umbreit( - df: pl.DataFrame, + results: pl.DataFrame, + data: pl.DataFrame, vm_criterion: str, ) -> tuple[pl.DataFrame, pl.DataFrame]: filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col("BEDP_MAN") == 1 filter_number_vm = pl.col(vm_criterion) > 0 - return apply_several_filters(df, (filter_meldenummer, filter_mandant, filter_number_vm)) + relevant, filt = apply_several_filters( + data, (filter_meldenummer, filter_mandant, filter_number_vm) + ) + results = _results_workflow_100( + results, + relevant, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + ) + + return results, filt -# %% -out_remainder: list[pl.DataFrame] = [] -df_start = prepare_base_data(df_raw) -df_start - -# %% -df, filt_out = workflow_100_umbreit(df_start, VM_CRITERION) -# filt_out at this point represents all entries which are to be analysed in other workflows -out_remainder.append(filt_out) -pipe_removed = pl.concat(out_remainder) -# %% -df -# %% -pipe_removed - -# idea: use pipe_removed for other workflows -# in the end there should not be any open positions left (assuming all cases are implemented) - -# %% # post-processing the results - - -def results_workflow_100( - df: pl.DataFrame, +def _results_workflow_100( + results: pl.DataFrame, + data: pl.DataFrame, + vorlage: bool, + wf_id: int, + freigabe_auto: types.Freigabe, ) -> pl.DataFrame: - df = df.rename(db.map_to_result) - df = df.with_columns( + data = data.rename(db.map_to_result) + data = data.with_columns( [ + pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), + pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), - pl.lit(True).alias("vorlage"), - pl.lit(100).alias("wf_id"), - pl.lit(False).alias("freigabe_auto"), + pl.lit(freigabe_auto.value) + .alias("freigabe_auto") + .cast(db.results_schema_map["freigabe_auto"]), ] ) - df = df.drop( + data = data.drop( [ "BEDP_TITELNR", "BEDP_MAN", @@ -152,34 +204,117 @@ def results_workflow_100( ] ) - return df + return pl.concat([results, data]) +# Petersen not present in data + # %% -pipe_post = results_workflow_100(df) -pipe_post +df_raw = get_raw_data() +df_start = prepare_base_data(df_raw) +df_start # %% -pipe_post.write_database(db.results.fullname, engine, if_table_exists="replace") - +results_init = get_empyt_result_df() +results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) +# df is where results are known +# filt_out contains entries for other workflows +# filt_out at this point represents all entries which are to be analysed in other workflows # %% -stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) -with engine.connect() as conn: - res = conn.execute(stmt) - print(res.all()) - +results # %% filt_out -# %% -df_umbreit_18 = workflow_100_umbreit(df, VM_CRITERION) -df_umbreit_18 - -# ---------------------------------------------------------------------------- # %% -target_bednr = df_new["BEDARFNR"].to_list() -target_seq = df_new["BEDP_SEQUENZ"].to_list() +# ---------------------------------------------------------------------------- # +# Workflow 200 (Umbreit only) +# ---------------------------------------------------------------------------- # +# %% +wf_200_start_data = filt_out.clone() +wf_200_start_data + + +# %% +def _init_workflow_200_umbreit( + results: pl.DataFrame, + data: pl.DataFrame, + vm_criterion: str, +) -> tuple[pl.DataFrame, pl.DataFrame]: + relevant_mnr: tuple[int, ...] = (17, 18) + filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) + filter_mandant = pl.col("BEDP_MAN") == 1 + filter_number_vm = pl.col(vm_criterion) == 0 + + relevant, filt = apply_several_filters( + data, (filter_meldenummer, filter_mandant, filter_number_vm) + ) + + return relevant, filt + + +# %% +df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) +df + +# %% +df.filter(pl.col("BEDARFNR") == 884607) + +# %% +df_order.filter(pl.col("BEDARFNR") == 884607) + +# %% +# now obtain order data for entries +t = df.join(df_order, on=["BEDARFNR", "BEDP_SEQUENZ"], how="inner") +t = t.with_columns(pl.col("AUFP_POSITION").fill_null(0)) +t +# %% +agg_t = ( + t.group_by(["BEDARFNR", "BEDP_SEQUENZ"]) + .agg( + pl.count("AUFP_POSITION").alias("pos_count"), + pl.col("KUNDE_RECHNUNG").alias("count_customer").n_unique(), + ) + .filter(pl.col("count_customer") >= 0) # !! should be 3 +) +agg_t + +# %% +df_order.filter((pl.col("BEDARFNR") == 883608) & (pl.col("BEDP_SEQUENZ") == 65)) + +# %% +# ---------------------------------------------------------------------------- # +# Writing results in DB +# ---------------------------------------------------------------------------- # + +delete_results() +pipe_post.write_database(db.results.fullname, engine, if_table_exists="append") + +stmt = sql.select(db.results) +db_results = pl.read_database(stmt, engine) +db_results + +# ---------------------------------------------------------------------------- # +# Further Data Analysis +# ---------------------------------------------------------------------------- # +# %% +stmt = sql.select(db.ext_bedpbed) +df = pl.read_database( + stmt, + engine, + schema_overrides=db.ext_bedpbed_schema_map, +) +# %% +df.group_by("BEDP_TITELNR").agg( + pl.col("BEDP_MAN").n_unique().alias("unique_BEDP_MAN") +).filter(pl.col("unique_BEDP_MAN") > 1) +# %% +df["BEDP_MAN"].unique() +# %% +df.estimated_size(unit="mb") +# %% +target_bednr = df_raw["BEDARFNR"].to_list() +target_seq = df_raw["BEDP_SEQUENZ"].to_list() # %% stmt = ( sql.select( diff --git a/pdm.lock b/pdm.lock index 1a1c549..e3795b1 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "data", "dev", "lint", "nb", "tests"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:1ae1f4583c19e6eacb7e148e056e96b8e8efd64b3372362da0c954cbe6cbb4ee" +content_hash = "sha256:840ff2052fc1669708f329a0e3733da307684a31ddea2105c6aec1949c9293bf" [[metadata.targets]] requires_python = ">=3.11" @@ -723,6 +723,20 @@ files = [ {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, ] +[[package]] +name = "dopt-basics" +version = "0.2.4" +requires_python = ">=3.11" +summary = "basic cross-project tools for Python-based d-opt projects" +groups = ["default"] +dependencies = [ + "tzdata>=2025.1", +] +files = [ + {file = "dopt_basics-0.2.4-py3-none-any.whl", hash = "sha256:b7d05b80dde1f856b352580aeac500fc7505e4513ed162791d8735cdc182ebc1"}, + {file = "dopt_basics-0.2.4.tar.gz", hash = "sha256:c21fbe183bec5eab4cfd1404e10baca670035801596960822d0019e6e885983f"}, +] + [[package]] name = "execnet" version = "2.1.1" @@ -2817,7 +2831,7 @@ name = "tzdata" version = "2025.2" requires_python = ">=2" summary = "Provider of IANA time zone data" -groups = ["data", "nb"] +groups = ["default", "data", "nb"] files = [ {file = "tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8"}, {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, diff --git a/pyproject.toml b/pyproject.toml index 95465f7..eae309b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Umbreit's Python-based application" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, ] -dependencies = ["sqlalchemy>=2.0.44"] +dependencies = ["sqlalchemy>=2.0.44", "dopt-basics>=0.2.4"] requires-python = ">=3.11" readme = "README.md" license = {text = "LicenseRef-Proprietary"} diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 684d2d0..5ad646a 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -168,11 +168,32 @@ results = Table( Column("bedarf_sequenz", sql.Integer, nullable=False), Column("vorlage", sql.Boolean, nullable=False), Column("wf_id", sql.Integer, nullable=False), - Column("best_menge", sql.Integer, nullable=False), + Column("best_menge", sql.Integer, nullable=True), Column("freigabe_auto", sql.Boolean, nullable=False), ) +results_schema_map: PolarsSchema = { + "id": pl.UInt32, + "bedarf_nr": pl.UInt32, + "bedarf_sequenz": pl.UInt32, + "vorlage": pl.Boolean, + "wf_id": pl.UInt16, + "best_menge": pl.UInt32, + "freigabe_auto": pl.Boolean, +} + + map_to_result: dict[str, str] = { "BEDARFNR": "bedarf_nr", "BEDP_SEQUENZ": "bedarf_sequenz", } + +raw_data_query_schema_map: PolarsSchema = { + "BEDARFNR": pl.UInt32, + "BEDP_SEQUENZ": pl.UInt32, + "BEDP_TITELNR": pl.UInt32, + "BEDP_MAN": pl.UInt8, + "BEDP_MENGE_BEDARF_VM": pl.UInt32, + "MELDENUMMER": pl.UInt8, + "MENGE_VORMERKER": pl.UInt32, +} diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 4423b6a..7b24e33 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -1,5 +1,6 @@ from __future__ import annotations +import enum from dataclasses import dataclass from typing import TypeAlias @@ -7,3 +8,8 @@ import polars as pl PolarsSchema: TypeAlias = dict[str, type[pl.DataType]] PolarsNullValues: TypeAlias = dict[str, str] + + +class Freigabe(enum.Enum): + WF_100 = False + WF_200 = False