diff --git a/prototypes/04-1_pipeline_with_db.py b/prototypes/04-1_pipeline_with_db.py index 9fc4cd3..f4f66a2 100644 --- a/prototypes/04-1_pipeline_with_db.py +++ b/prototypes/04-1_pipeline_with_db.py @@ -51,49 +51,74 @@ data_mis = data_mis.drop("ID", strict=False) data_psm = data_psm.drop("ID", strict=False) # %% -data_psm.height -# %% -data_psm.join(data_mis, on=["PA", "PA Pos"], how="semi") +# // (0) Load from external database +data_psm = external_code.load_PSM_data(conn) +data_psm.collect() # %% -# -tab_name_psm = "EXTERN_PSM" -tab_name_mis = "EXTERN_MIS" -stmt = f""" -SELECT t1.* FROM "{tab_name_psm}" t1 -WHERE EXISTS( - SELECT 1 FROM "{tab_name_mis}" t2 - WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" -) -""" -# test = external_code.oracle_load_table_as_polars( -# conn, db.extern_prod_order_t_schema, "", None -# ).collect() -test = external_code.oracle_load_table_as_polars( - conn, db.extern_prod_order_t_schema, tab_name_psm, stmt -).collect() - -# %% -# data_psm = external_code.load_PSM_data(conn).collect() - -# %% -# // preprocess data -# TODO: add check with MIS data if the orders are relevant +# // (1) preprocess data tmp = data_psm.clone() res = external_code.preprocess_psm(tmp.lazy()) tmp = res.data -tmp = tmp.collect() -tmp -# %% -tmp = tmp.rename({"PA_Pos": "PA Pos"}) -# %% -tmp.join(data_mis, on=["PA", "PA Pos"], how="semi") +tmp_show = tmp.collect() +tmp_show # %% -res.filtered +# // (2) process on order level +tmp = external_code.process_order_level(tmp) +tmp.collect() +# %% +# // (3) dump to database (intermediate result) +external_code.dump_order_level_to_internal_database_wipe(tmp) +# %% +# // (4) post-process +# ** aggregation for orders +aggregate_orders = external_code.aggregate_production_orders(tmp) +print(aggregate_orders.collect()) + +# ** aggregation for suppliers +aggregate_suppliers = external_code.aggregate_suppliers(tmp) +print(aggregate_suppliers.collect()) +# %% +# // (5) save to external database +# ** orders +aggregate_orders = external_code.oracle_prepare_KPI_aggregate(aggregate_orders) +print(aggregate_orders.head().collect()) +stmts_orders = external_code.oracle_generate_sql_insert( + table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=aggregate_orders.collect_schema().names() +) +print(f"SQL DELETE: {stmts_orders.delete}\nSQL Insert: {stmts_orders.insert}") + + +# ** suppliers +aggregate_suppliers = external_code.oracle_prepare_KPI_aggregate( + aggregate_suppliers, + sort_by="Konfektionaer", + sort_descending=False, +) +print(aggregate_suppliers.head().collect()) +stmts_suppliers = external_code.oracle_generate_sql_insert( + table_name="KPI_KONFEKTIONAERE", columns=aggregate_suppliers.collect_schema().names() +) +print(f"SQL DELETE: {stmts_suppliers.delete}\nSQL Insert: {stmts_suppliers.insert}") +# %% +# ** actual saving procedure +external_code.oracle_save_polars(conn, stmts_orders, aggregate_orders.collect()) +external_code.oracle_save_polars(conn, stmts_suppliers, aggregate_suppliers.collect()) + +# %% +print(f"Shape Aggregate Production Orders: {aggregate_orders.collect().shape}") +print(f"Shape Aggregate Suppliers: {aggregate_suppliers.collect().shape}") + +# %% +# // try loading +loaded_orders = external_code.oracle_load_table_as_polars( + conn, db.extern_results_prod_orders_t_schema, table_name="KPI_PRODUKTIONSAUFTRAEGE" +) +loaded_orders.collect() +# %% +loaded_suppliers = external_code.oracle_load_table_as_polars( + conn, db.extern_results_suppliers_t_schema, table_name="KPI_KONFEKTIONAERE" +) +loaded_suppliers.collect() # %% -tmp = data_psm.clone() -tmp = external_code.aggregate_production_orders(tmp.lazy()).collect() -print(tmp) -tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect() -print(tmp) diff --git a/prototypes/external_code.py b/prototypes/external_code.py index ec606c1..b5d84af 100644 --- a/prototypes/external_code.py +++ b/prototypes/external_code.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast import polars as pl import sqlalchemy as sql +from dopt_basics.datastructures import flatten from wattanalyse import db @@ -30,7 +31,13 @@ SqlStatement: TypeAlias = str @dc.dataclass(slots=True, eq=False) class PreProcessResult: data: pl.LazyFrame - filtered: pl.DataFrame + filtered: pl.LazyFrame + + +DROP_COLUMNS: Final[list[str]] = cast( + list[str], + list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))), +) @dc.dataclass(slots=True, kw_only=True) @@ -51,7 +58,7 @@ PSM_SCORES: dict[QualityPsm, int] = { QualityPsm.PLAUSIBEL: 2, } -RENAMING_SCHEME: dict[str, str] = { +RENAMING_SCHEME_PSM: dict[str, str] = { "PA Pos": "PA_Pos", "PSM gemeldet am": "Meldezeitpunkt_Historie", "Import Ist": "Import-Ist_Historie", @@ -62,6 +69,8 @@ RENAMING_SCHEME: dict[str, str] = { "Fertigware aus Nähband": "Prod-EP30_Historie", "Teile kontrolliert": "Prod-EP40_Historie", "Teile verpackt in Karton": "Prod-EP50_Historie", + "Konfektionär": "Konfektionaer", + "Lieferantnr.": "Konfektionaer_ID", } PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] @@ -91,7 +100,8 @@ def load_PSM_data( def preprocess_psm( data: pl.LazyFrame, ) -> PreProcessResult: - data = data.rename(RENAMING_SCHEME) + data = data.rename(RENAMING_SCHEME_PSM) + data = data.drop(DROP_COLUMNS, strict=False) REGEX_PATTERN = r"^[\s\-#+/$]+$" data = data.with_columns( pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) @@ -99,7 +109,7 @@ def preprocess_psm( .otherwise(pl.col(pl.String)) .name.keep() ) - data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) + data = data.with_columns(pl.col("Konfektionaer").str.strip_chars(" \n\t")) filtered_data = pl.LazyFrame(schema=data.collect_schema()) # drop duplicates @@ -161,7 +171,7 @@ def preprocess_psm( filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) - return PreProcessResult(data=data, filtered=filtered_data.collect()) + return PreProcessResult(data=data, filtered=filtered_data) # // (2) process on order level @@ -169,7 +179,6 @@ def process_order_level( data: pl.LazyFrame, ) -> pl.LazyFrame: # ** renaming - # data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) # ** plausibility check of order quantities @@ -272,7 +281,7 @@ def process_order_level( # whole aggregates see DB schema data = ( data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) - .group_by(PRIM_KEYS + ["Konfektionär"]) + .group_by(PRIM_KEYS + ["Konfektionaer", "Konfektionaer_ID"]) .agg( pl.col("Meldezeitpunkt_Historie"), pl.col("Liefertermin_Soll").drop_nulls().first(), @@ -508,7 +517,7 @@ def aggregate_production_orders( def aggregate_suppliers( data: pl.LazyFrame, ) -> pl.LazyFrame: - data = data.group_by("Konfektionär").agg( + data = data.group_by(["Konfektionaer", "Konfektionaer_ID"]).agg( ( ( ~(filter_date_deviation_early | filter_date_deviation_late) @@ -573,8 +582,6 @@ def aggregate_suppliers( # // (5) external database - - def oracle_prepare_KPI_aggregate( data: pl.LazyFrame, rename_schema: dict[str, str] | None = None, @@ -599,6 +606,7 @@ def oracle_prepare_KPI_aggregate( pl.all().exclude(pl.Boolean), ) .select(cols_sorted) + .select(pl.all().name.to_uppercase()) ) return data diff --git a/src/wattanalyse/db.py b/src/wattanalyse/db.py index 9b6da40..058be1e 100644 --- a/src/wattanalyse/db.py +++ b/src/wattanalyse/db.py @@ -22,7 +22,8 @@ intern_prod_order_t: Table = Table( MD_INTERNAL, Column("PA", sql.Integer, primary_key=True), Column("PA_Pos", sql.Integer, primary_key=True), - Column("Konfektionär", sql.Text, nullable=True), + Column("Konfektionaer", sql.Text, nullable=True), + Column("Konfektionaer_ID", sql.Integer, nullable=True), Column("Meldezeitpunkt_Historie", sql.Text, nullable=False), Column("Liefertermin_Soll", sql.Date, nullable=True), Column("Bestaetigter-Import_Historie", sql.Text, nullable=False), @@ -51,7 +52,8 @@ intern_prod_order_t: Table = Table( intern_prod_order_t_schema: dict[str, type[pl.DataType]] = { "PA": pl.UInt64, "PA_Pos": pl.UInt32, - "Konfektionär": pl.String, + "Konfektionaer": pl.String, + "Konfektionaer_ID": pl.UInt64, "Meldezeitpunkt_Historie": pl.String, "Liefertermin_Soll": pl.Date, "Bestaetigter-Import_Historie": pl.String, @@ -139,6 +141,17 @@ extern_results_prod_orders_t: Table = Table( Column("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE", sql.Integer, nullable=True), ) +extern_results_prod_orders_t_schema: dict[str, type[pl.DataType]] = { + "ID": pl.UInt32, + "AKTUALISIERT_AM": pl.Datetime, + "MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG": pl.Int64, + "MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG": pl.Int64, + "STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG": pl.Float64, + "MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN": pl.Int64, + "MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN": pl.Int64, + "MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE": pl.Int64, +} + extern_results_suppliers_t: Table = Table( "KPI_KONFEKTIONAERE", MD_EXTERNAL, @@ -160,3 +173,22 @@ extern_results_suppliers_t: Table = Table( Column("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE", sql.Integer, nullable=True), Column("MITTLERER_QUALITAETSSCORE_PSM", sql.Numeric(5, 4), nullable=True), ) + + +extern_results_suppliers_t_schema: dict[str, type[pl.DataType]] = { + "ID": pl.UInt32, + "AKTUALISIERT_AM": pl.Datetime, + "KONFEKTIONAER": pl.String, + "KONFEKTIONAER_ID": pl.UInt64, + "QUOTE_ERSTBESTAETIGUNG": pl.Float64, + "PROZENT_LIEFERTREUE": pl.Float64, + "ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG": pl.Float64, + "ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG": pl.Float64, + "MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG": pl.Int64, + "MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG": pl.Int64, + "STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG": pl.Float64, + "MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN": pl.Int64, + "MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN": pl.Int64, + "MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE": pl.Int64, + "MITTLERER_QUALITAETSSCORE_PSM": pl.Float64, +} diff --git a/src/wattanalyse/logging.py b/src/wattanalyse/logging.py index befd4d5..65936cf 100644 --- a/src/wattanalyse/logging.py +++ b/src/wattanalyse/logging.py @@ -28,3 +28,6 @@ logger_base = BASE_LOGGER.getChild("wattana") logger_database = logger_base.getChild("database") logger_database.setLevel(logging.DEBUG) + +logger_pipeline = logger_base.getChild("pipeline") +logger_pipeline.setLevel(logging.DEBUG) diff --git a/src/wattanalyse/pipeline.py b/src/wattanalyse/pipeline.py new file mode 100644 index 0000000..8cdfa0c --- /dev/null +++ b/src/wattanalyse/pipeline.py @@ -0,0 +1,723 @@ +from __future__ import annotations + +import dataclasses as dc +import datetime +import enum +import json +import warnings +from typing import TYPE_CHECKING, Any, Final, cast + +import polars as pl +import sqlalchemy as sql +from dopt_basics.datastructures import flatten +from dopt_basics.result_pattern import wrap_result + +from wattanalyse import db +from wattanalyse.logging import logger_pipeline as logger +from wattanalyse.types import SqlStatement + +if TYPE_CHECKING: + from oracledb import Connection as OracleConnection + from polars._typing import SchemaDict + + +@dc.dataclass(slots=True, eq=False) +class PreProcessResult: + data: pl.LazyFrame + filtered: pl.LazyFrame + + +DROP_COLUMNS: Final[list[str]] = cast( + list[str], + list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))), # type: ignore +) + + +@dc.dataclass(slots=True, kw_only=True) +class SqlInsertStmts: + delete: str + insert: str + + +class QualityPsm(enum.StrEnum): + FEHLEND = enum.auto() + UNPLAUSIBEL = enum.auto() + PLAUSIBEL = enum.auto() + + +PSM_SCORES: dict[QualityPsm, int] = { + QualityPsm.FEHLEND: 1, + QualityPsm.UNPLAUSIBEL: 0, + QualityPsm.PLAUSIBEL: 2, +} + +RENAMING_SCHEME_PSM: dict[str, str] = { + "PA Pos": "PA_Pos", + "PSM gemeldet am": "Meldezeitpunkt_Historie", + "Import Ist": "Import-Ist_Historie", + "1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie", + "Zuschnitt am": "Prod-Start_Historie", + "Teile in Zuschnitt": "Prod-EP10_Historie", + "Teile im Nähband": "Prod-EP20_Historie", + "Fertigware aus Nähband": "Prod-EP30_Historie", + "Teile kontrolliert": "Prod-EP40_Historie", + "Teile verpackt in Karton": "Prod-EP50_Historie", + "Konfektionär": "Konfektionaer", + "Lieferantnr.": "Konfektionaer_ID", +} + +PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] + +LOWER_BOUND_DATE_DEVIATION: Final[int] = 0 +UPPER_BOUND_DATE_DEVIATION: Final[int] = 0 +NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 +TAB_NAME_PSM: Final[str] = "EXTERN_PSM" +TAB_NAME_MIS: Final[str] = "EXTERN_MIS" + + +# // (0) load data +def load_PSM_data( + conn: OracleConnection, +) -> pl.LazyFrame: + stmt = f""" + SELECT t1.* FROM "{TAB_NAME_PSM}" t1 + WHERE EXISTS( + SELECT 1 FROM "{TAB_NAME_MIS}" t2 + WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" + ) + """ + return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt) + + +# // (1) preprocess +def preprocess_psm( + data: pl.LazyFrame, +) -> PreProcessResult: + data = data.rename(RENAMING_SCHEME_PSM) + data = data.drop(DROP_COLUMNS, strict=False) + REGEX_PATTERN = r"^[\s\-#+/$]+$" + data = data.with_columns( + pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) + .then(None) + .otherwise(pl.col(pl.String)) + .name.keep() + ) + data = data.with_columns(pl.col("Konfektionaer").str.strip_chars(" \n\t")) + filtered_data = pl.LazyFrame(schema=data.collect_schema()) + + # drop duplicates + # use null count as information measure, least amount of nulls should be contained + data = data.with_columns(pl.sum_horizontal(pl.all().is_null()).alias("null_count")) + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie", "null_count"], descending=False) + filtered_data = pl.concat( + [ + filtered_data, + data.filter( + ~pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct() + ).drop("null_count"), + ] + ) + data = data.filter(pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct()) + data = data.drop("null_count") + + # any NULL values in critical columns + NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") + conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) + data = data.filter(~pl.any_horizontal(*conds)) + + # implausible dates + # dates not allowed to be in the future + current_datetime = datetime.datetime.now() + current_date = current_datetime.date() + NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) + NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") + conds = [ + (pl.col(col) > current_datetime).fill_null(False) + for col in NOT_IN_FUTURE_COLS_DATETIME + ] + conds.extend( + [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] + ) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) + data = data.filter(~pl.any_horizontal(*conds)) + + # too much in the future or the past + # dates + future_limit = current_date + datetime.timedelta( + days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) + ) + past_limit = datetime.date(1990, 1, 1) + cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( + pl.col(pl.Date) < past_limit + ).fill_null(False) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) + data = data.filter(~pl.any_horizontal(cond)) + # datetimes + future_limit = current_datetime + datetime.timedelta( + days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) + ) + past_limit = datetime.datetime(1990, 1, 1) + cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( + pl.col(pl.Datetime) < past_limit + ).fill_null(False) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) + data = data.filter(~pl.any_horizontal(cond)) + + return PreProcessResult(data=data, filtered=filtered_data) + + +# // (2) process on order level +def process_order_level( + data: pl.LazyFrame, +) -> pl.LazyFrame: + # ** renaming + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) + + # ** plausibility check of order quantities + PLAUSI_FEATURES: list[str] = [ + "Prod-EP10_Historie", + "Prod-EP20_Historie", + "Prod-EP30_Historie", + "Prod-EP40_Historie", + "Prod-EP50_Historie", + ] + data = data.with_columns( + pl.all_horizontal( + pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0) + ).alias("is_empty") + ) + conditions = [ + pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1]) + for i in range(len(PLAUSI_FEATURES) - 1) + ] + data = data.with_columns( + pl.when(pl.all_horizontal(conditions) | pl.col("is_empty")) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Prod-Qty_is_valid") + ).with_columns( + pl.when(pl.col("is_empty")) + .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) + .when(pl.col("Prod-Qty_is_valid")) + .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) + .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) + .alias("Prod-Qualitaet_Historie") + ) + # aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last" + # aggregate "Prod-Qualitaet_Historie" and use "mean" + # need additional "alias" on "Prod-Qualitaet_Historie" + + # ** planned or target delivery date + current_date = datetime.datetime.now().date() + print(f"{current_date=}") + data = data.with_columns( + pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias( + "Liefertermin_Soll" + ) + ) + # aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first" + # first filled field for "Liefertermin Soll" is the relevant target date + # should be first confirmed date, but if this field is not filled we use the first + # filled import by the supplier + + # ** actual delivery date + # logic of Wattana: set date is before current date --> becomes actual value + data = data.with_columns( + pl.when(pl.col("Import-Ist_Historie") < current_date) + .then(pl.col("Import-Ist_Historie")) + .otherwise(None) + .alias("Liefertermin_Ist") + ) + # aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last" + # keep last because that is the latest value set by the supplier + # if all values are NULL then NULL is returned (no actual date available) + + # ** duration since last report in days + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + ( + pl.col("Meldezeitpunkt_Historie") + - pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS) + ) + .dt.total_days() + .alias("Tage_zu_letzter_PSM_Historie") + ) + # aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt" + # aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically) + # need additional "alias" on "Tage_zu_letzter_PSM_Historie" + + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + # Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position? + ( + pl.col("Import-Ist_Historie") + != pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS) + ) + .fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung + .alias("Import-Ist_geaendert") + ) + # aggregate hint for "Import-Ist_geaendert" + # aggregate "Import-Ist_geaendert" and use "last" + + # aggregate hint for "Import-Ist_letzter_Wert" + # aggregate "Import-Ist_Historie" and use "drop_nulls" "last" + # need additional "alias" on "Import-Ist_Historie" + + # aggregate hint for "Import-Ist_Anzahl_Aenderungen" + # aggregate "Import-Ist_geaendert" and use "sum" + # need additional "alias" on "Import-Ist_geaendert" + + # aggregate hint for "Prod-Start" + # aggregate "Prod-Start_Historie" and use "drop_nulls" "first" + # first entry should be treated as the truth value, changing later does not make sense + # need additional "alias" on "Prod-Start_Historie" + + # whole aggregates see DB schema + data = ( + data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) + .group_by(PRIM_KEYS + ["Konfektionaer", "Konfektionaer_ID"]) + .agg( + pl.col("Meldezeitpunkt_Historie"), + pl.col("Liefertermin_Soll").drop_nulls().first(), + pl.col("Bestaetigter-Import_Historie"), + pl.col("Liefertermin_Ist").drop_nulls().last(), + pl.col("Import-Ist_Historie"), + pl.col("Import-Ist_Historie") + .drop_nulls() + .last() + .alias("Import-Ist_letzter_Wert"), + pl.col("Import-Ist_geaendert").last(), + pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"), + pl.col("Tage_zu_letzter_PSM_Historie"), + pl.col("Tage_zu_letzter_PSM_Historie") + .mean() + .alias("Tage_zu_letzter_PSM_Durchschnitt"), + pl.col("Prod-EP10_Historie"), + pl.col("Prod-EP20_Historie"), + pl.col("Prod-EP30_Historie"), + pl.col("Prod-EP40_Historie"), + pl.col("Prod-EP50_Historie"), + pl.col("Prod-Qualitaet_Historie"), + pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"), + pl.col("Prod-Start_Historie"), + pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"), + ) + ) + # ** order specific aggregates + data = ( + data.with_columns( + (pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")) + .dt.total_days() + .alias("Terminabweichung_Anzahl_Tage") + ) + .with_columns( + (pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION).alias( + "Terminunterschreitung" + ), + (pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION).alias( + "Terminüberschreitung" + ), + (pl.col("Liefertermin_Ist") - pl.col("Prod-Start")) + .dt.total_days() + .alias("Durchlaufzeit_Anzahl_Tage"), + ) + .with_columns( + pl.when(pl.col("Durchlaufzeit_Anzahl_Tage") < 0) + .then(None) + .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) + .alias("Durchlaufzeit_Anzahl_Tage") + ) + ) + + return data + + +# // (3) dump order level to internal database +def _json_default( + value: Any, +) -> str: + if isinstance(value, (datetime.date, datetime.datetime)): + return value.isoformat() + raise TypeError + + +def _parse_to_json( + x: pl.Series | None, +) -> str | None: + if x is None: + return None + + return json.dumps(x.to_list(), default=_json_default) + + +def dump_order_level_to_internal_database_staging( + data: pl.LazyFrame, +) -> None: + + staging_data = data.with_columns( + pl.col(pl.List) + .map_elements( + _parse_to_json, + return_dtype=pl.String, + ) + .name.keep() + ) + staging_data = staging_data.collect() + rows_inserted = staging_data.write_database( + "Produktionsauftrag-Einzelsicht_Staging", + connection=db.DB_URI, + engine="adbc", + if_table_exists="replace", + ) + if rows_inserted != staging_data.height: + raise RuntimeError("Number of inserted rows and length of staging data do not match.") + + all_columns = staging_data.columns + update_columns = [col for col in all_columns if col not in PRIM_KEYS] + + sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns]) + sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS]) + sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns]) + + upsert_sql = f""" + INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str}) + SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1 + ON CONFLICT({sql_pk_list_str}) DO UPDATE SET + {sql_update_rules_str}; + """ + + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(sql.text(upsert_sql)) + conn.execute( + sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";') + ) + + +def dump_order_level_to_internal_database_wipe( + data: pl.LazyFrame, +) -> None: + + staging_data = data.with_columns( + pl.col(pl.List) + .map_elements( + _parse_to_json, + return_dtype=pl.String, + ) + .name.keep() + ) + # empty table + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";')) + + staging_data = staging_data.collect() + rows_inserted = staging_data.write_database( + "Produktionsauftrag-Einzelsicht", + connection=db.DB_URI, + engine="adbc", + if_table_exists="append", + ) + if rows_inserted != staging_data.height: + raise RuntimeError("Number of inserted rows and length of staging data do not match.") + + +# ** load order level data from internal database +def load_order_level_from_internal_database() -> pl.DataFrame: + data = pl.read_database_uri( + 'SELECT * FROM "Produktionsauftrag-Einzelsicht"', + uri=db.DB_URI, + engine="adbc", + schema_overrides=db.intern_prod_order_t_schema, + ) + + list_cols_to_type: dict[str, type[pl.DataType]] = { + "Meldezeitpunkt_Historie": pl.Datetime, + "Bestaetigter-Import_Historie": pl.Date, + "Import-Ist_Historie": pl.Date, + "Tage_zu_letzter_PSM_Historie": pl.Int64, + "Prod-EP10_Historie": pl.UInt64, + "Prod-EP20_Historie": pl.UInt64, + "Prod-EP30_Historie": pl.UInt64, + "Prod-EP40_Historie": pl.UInt64, + "Prod-EP50_Historie": pl.UInt64, + "Prod-Qualitaet_Historie": pl.Int32, + "Prod-Start_Historie": pl.Date, + } + + list_col_parse_conds = { + col: pl.col(col).str.json_decode(pl.List(list_type)) + for col, list_type in list_cols_to_type.items() + } + + return data.with_columns(**list_col_parse_conds) + + +# // (4) post-process results + +USE_BOUNDARIES: Final[bool] = False +filter_date_deviation_early: pl.Expr +filter_date_deviation_late: pl.Expr +if USE_BOUNDARIES: + filter_date_deviation_early = pl.col("Terminunterschreitung") + filter_date_deviation_late = pl.col("Terminüberschreitung") +else: + filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 + filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 + + +def aggregate_production_orders( + data: pl.LazyFrame, +) -> pl.LazyFrame: + data = data.select( + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_early) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_late) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .std(ddof=1) + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), + pl.col("Import-Ist_Anzahl_Aenderungen") + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), + pl.col("Tage_zu_letzter_PSM_Historie") + .list.explode() + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), + pl.col("Durchlaufzeit_Anzahl_Tage") + .mean() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), + ) + + return data + + +def aggregate_suppliers( + data: pl.LazyFrame, +) -> pl.LazyFrame: + data = data.group_by(["Konfektionaer", "Konfektionaer_ID"]).agg( + ( + ( + ~(filter_date_deviation_early | filter_date_deviation_late) + & (pl.col("Import-Ist_Anzahl_Aenderungen") == 0) + ).mean() + * 100 + ) + .round(4, mode="half_away_from_zero") + .alias("QUOTE_ERSTBESTAETIGUNG"), + ((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("PROZENT_LIEFERTREUE"), + (filter_date_deviation_early.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"), + (filter_date_deviation_late.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_early) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_late) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .std(ddof=1) + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), + pl.col("Import-Ist_Anzahl_Aenderungen") + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), + pl.col("Tage_zu_letzter_PSM_Historie") + .list.explode() + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), + pl.col("Durchlaufzeit_Anzahl_Tage") + .mean() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), + pl.col("Prod-Qualitaet_Historie") + .list.explode() + .mean() + .round(4, mode="half_away_from_zero") + .alias("MITTLERER_QUALITAETSSCORE_PSM"), + ) + + return data + + +# // (5) external database +def oracle_prepare_KPI_aggregate( + data: pl.LazyFrame, + rename_schema: dict[str, str] | None = None, + sort_by: str = "", + sort_descending: bool = False, +) -> pl.LazyFrame: + if rename_schema is not None: + data = data.rename(rename_schema) + + cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in data.collect_schema().names()] + + if sort_by: + data = data.sort(sort_by, descending=sort_descending) + + data = data.with_row_index("ID", 1) + data = ( + data.with_columns( + pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"), + ) + .select( + pl.col(pl.Boolean).cast(pl.Int8), + pl.all().exclude(pl.Boolean), + ) + .select(cols_sorted) + .select(pl.all().name.to_uppercase()) + ) + + return data + + +def oracle_generate_sql_insert( + table_name: str, + columns: list, +) -> SqlInsertStmts: + spalten_str = ", ".join([f'"{c}"' for c in columns]) + platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)]) + + sql_delete = f'DELETE FROM "{table_name}"' + sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})' + + return SqlInsertStmts(delete=sql_delete, insert=sql_insert) + + +def oracle_load_table_as_polars( + conn: OracleConnection, + schema: SchemaDict | None, + table_name: str | None = None, + stmt: SqlStatement | None = None, +) -> pl.LazyFrame: + if not any((table_name, stmt)): + raise ValueError("Table name or SQL statement must be provided") + if all((table_name, stmt)): + warnings.warn( + "Table name and SQL statement provided. In this case, the statement is used." + ) + if not stmt: + stmt = f"SELECT * FROM {table_name}" + + odf = conn.fetch_df_all(statement=stmt) + df = cast(pl.DataFrame, pl.from_arrow(odf, schema_overrides=schema)) + + return df.lazy() + + +def oracle_save_polars( + conn: OracleConnection, + stmts: SqlInsertStmts, + data: pl.DataFrame, +) -> None: + with conn.cursor() as cursor: + cursor.execute(stmts.delete) + cursor.executemany(stmts.insert, data) + conn.commit() + + +# TODO wrap this in a metadata tracking call +@wrap_result(code_on_error=1, logger=logger) +def run( + conn: OracleConnection, +) -> None: + # // (0) Load from external database + logger.info("Load data from database >load_PSM_data<...") + data = load_PSM_data(conn) + logger.info("Successfully loaded data from database") + + # // (1) preprocess data + logger.info("Preprocess data (cleansing) >preprocess_psm<...") + res = preprocess_psm(data) + data = res.data + logger.info("Successfully preprocessed data") + + # // (2) process on order level + logger.info("Process data on order level >process_order_level<...") + data = process_order_level(data) + logger.info("Successfully processed data on order level") + + # // (3) dump to database (intermediate result) + logger.info("Save order level data in internal database...") + dump_order_level_to_internal_database_wipe(data) + logger.info("Successfully saved order level data in internal DB") + + # // (4) post-process + # ** aggregation for orders + logger.info("Aggregate data with KPI calculation...") + logger.info("...production orders...") + orders_aggregated = aggregate_production_orders(data) + # ** aggregation for suppliers + logger.info("...suppliers...") + suppliers_aggregated = aggregate_suppliers(data) + logger.info("Successfully aggregated and calculated KPIs") + + # // (5) save to external database + logger.info("Prepare saving data to external database...") + logger.info("Prepare production order KPI table for Oracle export...") + orders_aggregated = oracle_prepare_KPI_aggregate(orders_aggregated) + stmts_orders = oracle_generate_sql_insert( + table_name="KPI_PRODUKTIONSAUFTRAEGE", + columns=orders_aggregated.collect_schema().names(), + ) + logger.info( + "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", + stmts_orders.delete, + stmts_orders.insert, + ) + + # ** suppliers + logger.info("Prepare supplier KPI table for Oracle export...") + suppliers_aggregated = oracle_prepare_KPI_aggregate( + suppliers_aggregated, + sort_by="Konfektionaer", + sort_descending=False, + ) + stmts_suppliers = oracle_generate_sql_insert( + table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names() + ) + logger.info( + "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", + stmts_suppliers.delete, + stmts_suppliers.insert, + ) + + # ** actual saving procedure + logger.info("Saving data to external database...") + oracle_save_polars(conn, stmts_orders, orders_aggregated.collect()) + oracle_save_polars(conn, stmts_suppliers, suppliers_aggregated.collect()) + logger.info("Successfully saved KPI tables to external database") diff --git a/src/wattanalyse/types.py b/src/wattanalyse/types.py index c518040..e61d0c0 100644 --- a/src/wattanalyse/types.py +++ b/src/wattanalyse/types.py @@ -1,6 +1,9 @@ from __future__ import annotations import dataclasses as dc +from typing import TypeAlias + +SqlStatement: TypeAlias = str @dc.dataclass(kw_only=True, slots=True)