From b66d5a4921bb1023a21b277b80a4fbee6a7f6734 Mon Sep 17 00:00:00 2001 From: foefl Date: Wed, 10 Jun 2026 13:32:39 +0200 Subject: [PATCH] prepare enhanced pipeline --- prototypes/04-1_pipeline_with_db.py | 99 +++++++++++++++++++++++++++++ prototypes/external_code.py | 76 ++++++++++------------ 2 files changed, 133 insertions(+), 42 deletions(-) create mode 100644 prototypes/04-1_pipeline_with_db.py diff --git a/prototypes/04-1_pipeline_with_db.py b/prototypes/04-1_pipeline_with_db.py new file mode 100644 index 0000000..9fc4cd3 --- /dev/null +++ b/prototypes/04-1_pipeline_with_db.py @@ -0,0 +1,99 @@ +# %% +import importlib +from pathlib import Path +from pprint import pprint + +import external_code +import oracledb +import polars as pl + +import wattanalyse +from wattanalyse import constants, db + +importlib.reload(wattanalyse) +importlib.reload(constants) +importlib.reload(external_code) +importlib.reload(db) + +PROJECT_BASE = Path(__file__).parents[1] +DATA_PTH = PROJECT_BASE / "data" +assert DATA_PTH.exists() +# %% +conn = oracledb.connect( + user=constants.USER_CFG.Datenbank.NUTZER, + password=constants.USER_CFG.Datenbank.PASSWORT, + host=constants.USER_CFG.Datenbank.HOST, + port=constants.USER_CFG.Datenbank.PORT, + service_name=constants.USER_CFG.Datenbank.SERVICE_NAME, +) + +##################################### +# // Get data from database +# %% +schema = db.extern_MIS_t_schema +data_mis = external_code.oracle_load_table_as_polars( + conn, + schema=schema, + table_name="EXTERN_MIS", +).collect() +data_mis +# %% +schema = db.extern_prod_order_t_schema +data_psm = external_code.oracle_load_table_as_polars( + conn, + schema=schema, + table_name="EXTERN_PSM", +).collect() +data_psm + +# %% +data_mis = data_mis.drop("ID", strict=False) +data_psm = data_psm.drop("ID", strict=False) + +# %% +data_psm.height +# %% +data_psm.join(data_mis, on=["PA", "PA Pos"], how="semi") + +# %% +# +tab_name_psm = "EXTERN_PSM" +tab_name_mis = "EXTERN_MIS" +stmt = f""" +SELECT t1.* FROM "{tab_name_psm}" t1 +WHERE EXISTS( + SELECT 1 FROM "{tab_name_mis}" t2 + WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" +) +""" +# test = external_code.oracle_load_table_as_polars( +# conn, db.extern_prod_order_t_schema, "", None +# ).collect() +test = external_code.oracle_load_table_as_polars( + conn, db.extern_prod_order_t_schema, tab_name_psm, stmt +).collect() + +# %% +# data_psm = external_code.load_PSM_data(conn).collect() + +# %% +# // preprocess data +# TODO: add check with MIS data if the orders are relevant +tmp = data_psm.clone() +res = external_code.preprocess_psm(tmp.lazy()) +tmp = res.data +tmp = tmp.collect() +tmp +# %% +tmp = tmp.rename({"PA_Pos": "PA Pos"}) +# %% +tmp.join(data_mis, on=["PA", "PA Pos"], how="semi") + +# %% +res.filtered +# %% +tmp = data_psm.clone() +tmp = external_code.aggregate_production_orders(tmp.lazy()).collect() +print(tmp) +tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect() +print(tmp) diff --git a/prototypes/external_code.py b/prototypes/external_code.py index da1f6dd..ec606c1 100644 --- a/prototypes/external_code.py +++ b/prototypes/external_code.py @@ -4,7 +4,8 @@ import dataclasses as dc import datetime import enum import json -from typing import TYPE_CHECKING, Any, Final, cast +import warnings +from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast import polars as pl import sqlalchemy as sql @@ -13,6 +14,7 @@ from wattanalyse import db if TYPE_CHECKING: from oracledb import Connection as OracleConnection + from polars._typing import SchemaDict # 1. cleanup obtained new data # ~~2. load data from internal database~~ @@ -22,6 +24,8 @@ if TYPE_CHECKING: # 4. post-process results # 5. write to external database +SqlStatement: TypeAlias = str + @dc.dataclass(slots=True, eq=False) class PreProcessResult: @@ -65,6 +69,22 @@ PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] LOWER_BOUND_DATE_DEVIATION: Final[int] = 0 UPPER_BOUND_DATE_DEVIATION: Final[int] = 0 NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 +TAB_NAME_PSM: Final[str] = "EXTERN_PSM" +TAB_NAME_MIS: Final[str] = "EXTERN_MIS" + + +# // (0) load data +def load_PSM_data( + conn: OracleConnection, +) -> pl.LazyFrame: + stmt = f""" + SELECT t1.* FROM "{TAB_NAME_PSM}" t1 + WHERE EXISTS( + SELECT 1 FROM "{TAB_NAME_MIS}" t2 + WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" + ) + """ + return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt) # // (1) preprocess @@ -306,44 +326,6 @@ def process_order_level( ) ) - # data = ( - # data.with_columns( - # pl.when( - # (pl.col("Liefertermin_Ist").is_not_null()) - # & (pl.col("Liefertermin_Soll").is_not_null()) - # ) - # .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) - # .otherwise(None) - # .alias("Terminabweichung_Anzahl_Tage") - # ) - # .with_columns( - # pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) - # .then(pl.lit(True)) - # .otherwise(pl.lit(False)) - # .alias("Terminunterschreitung"), - # pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) - # .then(pl.lit(True)) - # .otherwise(pl.lit(False)) - # .alias("Terminüberschreitung"), - # pl.when( - # (pl.col("Liefertermin_Ist").is_not_null()) - # & (pl.col("Prod-Start").is_not_null()) - # ) - # .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) - # .otherwise(None) - # .alias("Durchlaufzeit_Anzahl_Tage"), - # ) - # .with_columns( - # pl.when( - # (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null()) - # & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0) - # ) - # .then(None) - # .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) - # .alias("Durchlaufzeit_Anzahl_Tage") - # ) - # ) - return data @@ -637,11 +619,21 @@ def oracle_generate_sql_insert( def oracle_load_table_as_polars( conn: OracleConnection, - table_name: str, + schema: SchemaDict | None, + table_name: str | None = None, + stmt: SqlStatement | None = None, ) -> pl.LazyFrame: - stmt = f"SELECT * FROM {table_name}" + if not any((table_name, stmt)): + raise ValueError("Table name or SQL statement must be provided") + if all((table_name, stmt)): + warnings.warn( + "Table name and SQL statement provided. In this case, the statement is used." + ) + if not stmt: + stmt = f"SELECT * FROM {table_name}" + odf = conn.fetch_df_all(statement=stmt) - df = cast(pl.DataFrame, pl.from_arrow(odf)) + df = cast(pl.DataFrame, pl.from_arrow(odf, schema_overrides=schema)) return df.lazy()