From 34ba005ddef283fb7119cc359e3d2c7e0f2a990c Mon Sep 17 00:00:00 2001 From: foefl Date: Mon, 8 Jun 2026 15:42:26 +0200 Subject: [PATCH] enhanced pipeline functions --- prototypes/03-1_check_db.py | 119 +++++++++++++++++++++ prototypes/external_code.py | 204 +++++++++++++++++++++++++++++++++++- 2 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 prototypes/03-1_check_db.py diff --git a/prototypes/03-1_check_db.py b/prototypes/03-1_check_db.py new file mode 100644 index 0000000..d15b9a5 --- /dev/null +++ b/prototypes/03-1_check_db.py @@ -0,0 +1,119 @@ +# %% +import importlib +from pathlib import Path + +import external_code +import oracledb +import polars as pl + +import wattanalyse +from wattanalyse import constants + +importlib.reload(wattanalyse) +importlib.reload(constants) +# %% +PROJECT_BASE = Path(__file__).parents[1] +DATA_PTH = PROJECT_BASE / "data" +assert DATA_PTH.exists() + +# %% +conn = oracledb.connect( + user=constants.USER_CFG.Datenbank.NUTZER, + password=constants.USER_CFG.Datenbank.PASSWORT, + host=constants.USER_CFG.Datenbank.HOST, + port=constants.USER_CFG.Datenbank.PORT, + service_name=constants.USER_CFG.Datenbank.SERVICE_NAME, +) + +# %% +# // KPI_PRODUKTIONSAUFTRAEGE +TABLE_NAME = "KPI_PRODUKTIONSAUFTRAEGE" +prepared_oracle_pth = DATA_PTH / f"db/oracle_prepare_{TABLE_NAME}.arrow" +assert prepared_oracle_pth.exists() +df = pl.read_ipc(prepared_oracle_pth) + +# %% +with conn.cursor() as cursor: + cursor.execute(f'SELECT * FROM "{TABLE_NAME}"') + data = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] + +print("columns:", columns) +print("data:", data) +# %% +# ** insert +stmts = external_code.oracle_generate_sql_insert(TABLE_NAME, columns=df.columns) +print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") + +with conn.cursor() as cursor: + cursor.execute(stmts.delete) + cursor.executemany(stmts.insert, df) + conn.commit() + +# %% +# ** read +stmt = f"SELECT * FROM {TABLE_NAME}" +odf = conn.fetch_df_all(statement=stmt) +loaded_df = pl.from_arrow(odf) +print(loaded_df) + +############# +# %% +# // +TABLE_NAME = "KPI_KONFEKTIONAERE" +prepared_oracle_pth = DATA_PTH / f"db/oracle_prepare_{TABLE_NAME}.arrow" +assert prepared_oracle_pth.exists() +df = pl.read_ipc(prepared_oracle_pth) + +# %% +with conn.cursor() as cursor: + cursor.execute(f'SELECT * FROM "{TABLE_NAME}"') + data = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] + +print("columns:", columns) +print("data:", data) +# %% +# ** insert +stmts = external_code.oracle_generate_sql_insert(TABLE_NAME, columns=df.columns) +print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") + +with conn.cursor() as cursor: + cursor.execute(stmts.delete) + cursor.executemany(stmts.insert, df) + conn.commit() + +# %% +# ** read +stmt = f"SELECT * FROM {TABLE_NAME}" +odf = conn.fetch_df_all(statement=stmt) +loaded_df = pl.from_arrow(odf) +print(loaded_df) + +# %% +df.height + +##################################### +# %% +columns = df.columns +spalten_str = ", ".join([f'"{c}"' for c in columns]) +platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)]) + +table_name = "KPI_PRODUKTIONSAUFTRAEGE" +sql_delete = f'DELETE FROM "{table_name}"' +sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})' +print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}") +# %% +with conn.cursor() as cursor: + cursor.execute(sql_delete) + # df_oracle_bereit wird direkt als Arrow-Stream an Oracle übergeben! + cursor.executemany(sql_insert, df) + conn.commit() + +# %% +stmt = f"SELECT * FROM {table_name}" +odf = conn.fetch_df_all(statement=stmt) +pl_df = pl.from_arrow(odf) +# %% +pl_df +# %% diff --git a/prototypes/external_code.py b/prototypes/external_code.py index 7e7d201..da1f6dd 100644 --- a/prototypes/external_code.py +++ b/prototypes/external_code.py @@ -1,14 +1,19 @@ +from __future__ import annotations + import dataclasses as dc import datetime import enum import json -from typing import Any, Final +from typing import TYPE_CHECKING, Any, Final, cast import polars as pl import sqlalchemy as sql from wattanalyse import db +if TYPE_CHECKING: + from oracledb import Connection as OracleConnection + # 1. cleanup obtained new data # ~~2. load data from internal database~~ # ~~3. integrate with with new data (whole snapshot)~~ @@ -24,6 +29,12 @@ class PreProcessResult: filtered: pl.DataFrame +@dc.dataclass(slots=True, kw_only=True) +class SqlInsertStmts: + delete: str + insert: str + + class QualityPsm(enum.StrEnum): FEHLEND = enum.auto() UNPLAUSIBEL = enum.auto() @@ -453,3 +464,194 @@ def load_order_level_from_internal_database() -> pl.DataFrame: } return data.with_columns(**list_col_parse_conds) + + +# // (4) post-process results + +USE_BOUNDARIES: Final[bool] = False +filter_date_deviation_early: pl.Expr +filter_date_deviation_late: pl.Expr +if USE_BOUNDARIES: + filter_date_deviation_early = pl.col("Terminunterschreitung") + filter_date_deviation_late = pl.col("Terminüberschreitung") +else: + filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 + filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 + + +def aggregate_production_orders( + data: pl.LazyFrame, +) -> pl.LazyFrame: + data = data.select( + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_early) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_late) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .std(ddof=1) + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), + pl.col("Import-Ist_Anzahl_Aenderungen") + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), + pl.col("Tage_zu_letzter_PSM_Historie") + .list.explode() + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), + pl.col("Durchlaufzeit_Anzahl_Tage") + .mean() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), + ) + + return data + + +def aggregate_suppliers( + data: pl.LazyFrame, +) -> pl.LazyFrame: + data = data.group_by("Konfektionär").agg( + ( + ( + ~(filter_date_deviation_early | filter_date_deviation_late) + & (pl.col("Import-Ist_Anzahl_Aenderungen") == 0) + ).mean() + * 100 + ) + .round(4, mode="half_away_from_zero") + .alias("QUOTE_ERSTBESTAETIGUNG"), + ((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("PROZENT_LIEFERTREUE"), + (filter_date_deviation_early.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"), + (filter_date_deviation_late.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_early) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_late) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .std(ddof=1) + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), + pl.col("Import-Ist_Anzahl_Aenderungen") + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), + pl.col("Tage_zu_letzter_PSM_Historie") + .list.explode() + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), + pl.col("Durchlaufzeit_Anzahl_Tage") + .mean() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), + pl.col("Prod-Qualitaet_Historie") + .list.explode() + .mean() + .round(4, mode="half_away_from_zero") + .alias("MITTLERER_QUALITAETSSCORE_PSM"), + ) + + return data + + +# // (5) external database + + +def oracle_prepare_KPI_aggregate( + data: pl.LazyFrame, + rename_schema: dict[str, str] | None = None, + sort_by: str = "", + sort_descending: bool = False, +) -> pl.LazyFrame: + if rename_schema is not None: + data = data.rename(rename_schema) + + cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in data.collect_schema().names()] + + if sort_by: + data = data.sort(sort_by, descending=sort_descending) + + data = data.with_row_index("ID", 1) + data = ( + data.with_columns( + pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"), + ) + .select( + pl.col(pl.Boolean).cast(pl.Int8), + pl.all().exclude(pl.Boolean), + ) + .select(cols_sorted) + ) + + return data + + +def oracle_generate_sql_insert( + table_name: str, + columns: list, +) -> SqlInsertStmts: + spalten_str = ", ".join([f'"{c}"' for c in columns]) + platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)]) + + sql_delete = f'DELETE FROM "{table_name}"' + sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})' + + return SqlInsertStmts(delete=sql_delete, insert=sql_insert) + + +def oracle_load_table_as_polars( + conn: OracleConnection, + table_name: str, +) -> pl.LazyFrame: + stmt = f"SELECT * FROM {table_name}" + odf = conn.fetch_df_all(statement=stmt) + df = cast(pl.DataFrame, pl.from_arrow(odf)) + + return df.lazy() + + +def oracle_save_polars( + conn: OracleConnection, + stmts: SqlInsertStmts, + data: pl.DataFrame, +) -> None: + with conn.cursor() as cursor: + cursor.execute(stmts.delete) + cursor.executemany(stmts.insert, data) + conn.commit()