From 7488bc19b19a9a87c00c82ae24009414ee74a958 Mon Sep 17 00:00:00 2001 From: foefl Date: Fri, 7 Nov 2025 16:24:23 +0100 Subject: [PATCH] add all prototype and data analysis code --- data_analysis/01-1_transform_db.py | 120 ++++++++ data_analysis/01_analyse_data.py | 75 +++++ data_analysis/02-1_initial_workflow_test.py | 304 ++++++++++++++++++++ src/umbreit/db.py | 5 + src/umbreit/types.py | 1 + 5 files changed, 505 insertions(+) create mode 100644 data_analysis/01-1_transform_db.py create mode 100644 data_analysis/01_analyse_data.py create mode 100644 data_analysis/02-1_initial_workflow_test.py diff --git a/data_analysis/01-1_transform_db.py b/data_analysis/01-1_transform_db.py new file mode 100644 index 0000000..5422f97 --- /dev/null +++ b/data_analysis/01-1_transform_db.py @@ -0,0 +1,120 @@ +# %% +import importlib +from pathlib import Path +from pprint import pprint + +import polars as pl +import sqlalchemy as sql + +from umbreit import db +from umbreit.types import PolarsNullValues, PolarsSchema + +# %% +db = importlib.reload(db) + +# %% +db_path = (Path.cwd() / "../data/data.db").resolve() +data_path = db_path.parent / "20251105" +assert db_path.parent.exists() +assert data_path.exists() and data_path.is_dir() + + +def transform_csv_to_database( + db_path: Path, target_folder: Path +) -> tuple[sql.Engine, list[pl.DataFrame]]: + if db_path.exists(): + db_path.unlink() + + engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True) + db.metadata.create_all(engine) + + map_file_db: dict[ + str, + tuple[sql.Table, PolarsSchema, PolarsNullValues], + ] = {} # table name to db + for table, schema, null_values in db.csv_tables: + map_file_db[table.fullname] = table, schema, null_values + + csv_files = target_folder.glob("*.csv") + dataframes: list[pl.DataFrame] = [] + + for file in csv_files: + table_name = file.stem + db_table, schema, null_values = map_file_db[table_name] + print(f"Reading table: {table_name}") + df = pl.read_csv( + file, + separator=";", + try_parse_dates=True, + schema_overrides=schema, + null_values=null_values, + ) + dataframes.append(df) + + df.write_database(db_table.fullname, connection=engine, if_table_exists="replace") + + print(f"Successfully written database to: >{db_path}<") + + return engine, dataframes + + +# %% +engine, dataframes = transform_csv_to_database(db_path, data_path) + + +# %% +stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM ext_titel_info WHERE MANDFUEHR=1 and TI_NUMMER=2928800") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) +# %% +stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM results") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +df = dataframes[1] +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% +len(df) +# %% +df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed") +df = pl.read_database(stmt, engine) + +# %% +df +# %% +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% diff --git a/data_analysis/01_analyse_data.py b/data_analysis/01_analyse_data.py new file mode 100644 index 0000000..351f579 --- /dev/null +++ b/data_analysis/01_analyse_data.py @@ -0,0 +1,75 @@ +# %% +from pathlib import Path + +import pandas as pd +import polars as pl + +# %% +f_data = Path.cwd() / "../data/20251105/" +# %% +files = tuple(f_data.glob("*.csv")) +files +# %% +file_pattern = "*aufpauf*" +rel_file = tuple(f_data.glob(file_pattern))[0] +rel_file +# %% +df = pd.read_csv( + rel_file, sep=";", parse_dates=["AUFTRAGS_DATUM", "AUFP_DATUM_ANLAGE"], dayfirst=True +) +df.head() +# %% +df["AUFP_DATUM_ANLAGE"].dtype +# %% +len(df) +# %% +pd.unique(df["TITELNR"]).shape +# %% +df["TITELNR"].value_counts() +# %% +df.loc[df["TITELNR"] == 0] +# %% +# prim_keys = df[["AUFTRAGSNUMMER", "TITELNR"]].copy() +prim_keys = df.copy() +# %% +unique_prim_key_pairs = set(zip(prim_keys["AUFTRAGSNUMMER"], prim_keys["TITELNR"])) +# %% +len(unique_prim_key_pairs) +# %% +duplicates = prim_keys[prim_keys.duplicated(subset=["AUFTRAGSNUMMER", "TITELNR"])] +duplicates["AUFTRAGS_ART"].unique() +# %% +duplicates["EINGANGS_ART"].unique() +# %% +duplicates.loc[duplicates["AUFP_VORMERKUNG"] == "J"] +# %% +# Auftragsart 99 always "AUFP_VORMERKUNG == NaN" +filt = df.loc[df["AUFTRAGS_ART"] == 99] +print(len(filt)) +filt["AUFP_VORMERKUNG"].isna().sum() +# %% +# %% +# specific title number and all orders, sorted +dt = pd.Timestamp(year=2025, month=11, day=5, hour=13) + +filt = df.loc[df["TITELNR"] == 6315273] +filt = filt.loc[filt["AUFP_VORMERKUNG"] == "J"] +filt = filt.loc[filt["AUFTRAGS_DATUM"] >= dt] +filt = filt.sort_values("AUFTRAGS_DATUM", ascending=False) +order_size = filt["AUFP_MENGE_AUFTRAG"].sum() +order_size +# %% + +# %% +df = pl.read_csv( + rel_file, + separator=";", + try_parse_dates=True, + infer_schema=True, + infer_schema_length=100_000, +) +df +# %% +res = df.select(pl.col("AUFTRAGSNUMMER"), pl.col("AUFTRAGS_DATUM").dt.day().alias("day_num")) +res +# %% diff --git a/data_analysis/02-1_initial_workflow_test.py b/data_analysis/02-1_initial_workflow_test.py new file mode 100644 index 0000000..13299cb --- /dev/null +++ b/data_analysis/02-1_initial_workflow_test.py @@ -0,0 +1,304 @@ +# %% +import importlib +from collections.abc import Sequence +from pathlib import Path +from pprint import pprint + +import polars as pl +import sqlalchemy as sql + +from umbreit import db + +# %% +# db = importlib.reload(db) + +# %% +db_path = (Path.cwd() / "../data/data.db").resolve() +data_path = db_path.parent / "20251105" +assert db_path.parent.exists() +assert data_path.exists() and data_path.is_dir() + +engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True) + +# %% +join_condition = sql.and_( + db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, + db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, +) + +stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.MENGE_VORMERKER, +).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition)) + +# %% +print(stmt.compile(engine)) +# %% +df_raw = pl.read_database(stmt, engine) +# %% +df_raw + +# %% +filter_meldenummer = pl.col("MELDENUMMER") == 18 + +# %% +# df_new = df.filter(pl.col("MENGE_VORMERKER").is_not_null() & pl.col("MENGE_VORMERKER") > 0) +# filter mandant: Umbreit +filter_mandant_umbreit = pl.col("BEDP_MAN") == 1 +df_mandant = df_raw.filter(filter_mandant_umbreit) +df_mandant + + +# %% +# filter #VM +# VM_CRITERION = "MENGE_VORMERKER" +VM_CRITERION = "BEDP_MENGE_BEDARF_VM" +df_mandant = df_mandant.with_columns(pl.col(VM_CRITERION).fill_null(0)) +filter_vm = pl.col(VM_CRITERION) > 0 # pl.col("MENGE_VORMERKER").is_not_null() & +df_new = df_mandant.filter(filter_vm) +# df_new = df_mandant.filter(pl.col("MENGE_VORMERKER").is_not_null()).filter(pl.col("MENGE_VORMERKER") > 0) + +df_new +# %% + + +def apply_several_filters( + df: pl.DataFrame, + filters: Sequence[pl.Expr], +) -> tuple[pl.DataFrame, pl.DataFrame]: + df_current = df + removed_rows: list[pl.DataFrame] = [] + + for filter in filters: + removed = df_current.filter(~filter) + removed_rows.append(removed) + + df_current = df_current.filter(filter) + + df_removed = pl.concat(removed_rows) + + return df_current, df_removed + + +def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame: + df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) + + return df + + +# def workflow_100_start( +# df: pl.DataFrame, +# ) -> tuple[pl.DataFrame, pl.DataFrame]: +# return apply_several_filters(df, (filter,)) + + +def workflow_100_umbreit( + df: pl.DataFrame, + vm_criterion: str, +) -> tuple[pl.DataFrame, pl.DataFrame]: + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 1 + filter_number_vm = pl.col(vm_criterion) > 0 + + return apply_several_filters(df, (filter_meldenummer, filter_mandant, filter_number_vm)) + + +# %% +out_remainder: list[pl.DataFrame] = [] +df_start = prepare_base_data(df_raw) +df_start + +# %% +df, filt_out = workflow_100_umbreit(df_start, VM_CRITERION) +# filt_out at this point represents all entries which are to be analysed in other workflows +out_remainder.append(filt_out) +pipe_removed = pl.concat(out_remainder) +# %% +df +# %% +pipe_removed + +# idea: use pipe_removed for other workflows +# in the end there should not be any open positions left (assuming all cases are implemented) + +# %% +# post-processing the results + + +def results_workflow_100( + df: pl.DataFrame, +) -> pl.DataFrame: + df = df.rename(db.map_to_result) + df = df.with_columns( + [ + pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"), + pl.lit(True).alias("vorlage"), + pl.lit(100).alias("wf_id"), + pl.lit(False).alias("freigabe_auto"), + ] + ) + df = df.drop( + [ + "BEDP_TITELNR", + "BEDP_MAN", + "BEDP_MENGE_BEDARF_VM", + "MELDENUMMER", + "MENGE_VORMERKER", + ] + ) + + return df + + +# %% +pipe_post = results_workflow_100(df) +pipe_post + +# %% +pipe_post.write_database(db.results.fullname, engine, if_table_exists="replace") + +# %% +stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +filt_out + +# %% +df_umbreit_18 = workflow_100_umbreit(df, VM_CRITERION) +df_umbreit_18 + +# ---------------------------------------------------------------------------- + +# %% +target_bednr = df_new["BEDARFNR"].to_list() +target_seq = df_new["BEDP_SEQUENZ"].to_list() +# %% +stmt = ( + sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ) + .where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr)) + .where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq)) +) +df_targets = pl.read_database(stmt, engine) +# %% +# df_targets.filter(pl.col("BEDARFNR") == 884174) +df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0) + +# %% +# interesting order: 883697, 1, titleno: 7945981, 9964027 +TITLE_NO = 7945981 +# TITLE_NO = 9964027 +stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO) +title_buy = pl.read_database(stmt, engine) +# %% +title_buy + +# %% when were the orders placed +stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981) +title_order = pl.read_database(stmt, engine) +# %% +title_order + +# ------------------------------------------------------------------------------------------- + +# %% +# title DB complete? +# - includes only titles which are deliverable since 01.06.2025 and who are assigned to +# buyer "Fröhlich" +stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) +titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map) +# %% +titles["MANDFUEHR"].unique() +# %% +unique_titles = set(titles["TI_NUMMER"].to_list()) +len(unique_titles) + +# %% +# requirements? +# - includes only order since 05.11.2025 +stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800) +reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) +# %% +reqs + +# %% +reqs["BEDP_MAN"].unique() + +# %% +# intersection between all titles and the titles contained in the requirements table +unique_titles_req = set(reqs["BEDP_TITELNR"].to_list()) +len(unique_titles_req) +# %% +intersection = unique_titles & unique_titles_req +len(intersection) +# %% +# orders? +# - includes only order since 05.11.2025 +stmt = sql.select(db.EXT_AUFPAUF) +orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map) + +# %% +orders.estimated_size(unit="mb") + +# %% +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) +# %% +stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") +with engine.connect() as conn: + res = conn.execute(stmt) + print(res.all()) + +# %% +df = dataframes[1] +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% +len(df) +# %% +df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) +# %% +stmt = sql.text("SELECT * FROM ext_bedpbed") +df = pl.read_database(stmt, engine) + +# %% +df +# %% +# %% +col_dtype = {} +for col, dtype in zip(df.columns, df.dtypes): + col_dtype[col] = dtype + +print("dtypes of DF...") +pprint(col_dtype) +# %% diff --git a/src/umbreit/db.py b/src/umbreit/db.py index c410ec4..684d2d0 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -171,3 +171,8 @@ results = Table( Column("best_menge", sql.Integer, nullable=False), Column("freigabe_auto", sql.Boolean, nullable=False), ) + +map_to_result: dict[str, str] = { + "BEDARFNR": "bedarf_nr", + "BEDP_SEQUENZ": "bedarf_sequenz", +} diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 1e97b43..4423b6a 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import dataclass from typing import TypeAlias import polars as pl