from __future__ import annotations import dataclasses as dc import datetime import json import warnings from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast import polars as pl import sqlalchemy as sql from dopt_basics.datastructures import flatten from wattanalyse import db from wattanalyse.constants import QualityPsm from wattanalyse.types import SqlInsertStmts, SqlStatement if TYPE_CHECKING: from oracledb import Connection as OracleConnection from polars._typing import SchemaDict @dc.dataclass(slots=True, eq=False) class PreProcessResult: data: pl.LazyFrame filtered: pl.LazyFrame DROP_COLUMNS: Final[list[str]] = cast( list[str], list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))), # type: ignore ) PSM_SCORES: dict[QualityPsm, int] = { QualityPsm.FEHLEND: 1, QualityPsm.UNPLAUSIBEL: 0, QualityPsm.PLAUSIBEL: 2, } RENAMING_SCHEME_PSM: dict[str, str] = { "PA Pos": "PA_Pos", "PSM gemeldet am": "Meldezeitpunkt_Historie", "Import Ist": "Import-Ist_Historie", "1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie", "Zuschnitt am": "Prod-Start_Historie", "Teile in Zuschnitt": "Prod-EP10_Historie", "Teile im Nähband": "Prod-EP20_Historie", "Fertigware aus Nähband": "Prod-EP30_Historie", "Teile kontrolliert": "Prod-EP40_Historie", "Teile verpackt in Karton": "Prod-EP50_Historie", "Konfektionär": "Konfektionaer", "Lieferantnr.": "Konfektionaer_ID", } PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] LOWER_BOUND_DATE_DEVIATION: Final[int] = 0 UPPER_BOUND_DATE_DEVIATION: Final[int] = 0 NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 TAB_NAME_PSM: Final[str] = "EXTERN_PSM" TAB_NAME_MIS: Final[str] = "EXTERN_MIS" USE_BOUNDARIES: Final[bool] = False filter_date_deviation_early: pl.Expr filter_date_deviation_late: pl.Expr if USE_BOUNDARIES: filter_date_deviation_early = pl.col("Terminunterschreitung") filter_date_deviation_late = pl.col("Terminüberschreitung") else: filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 # // (0) load data def load_PSM_data( conn: OracleConnection, ) -> pl.LazyFrame: stmt = f""" SELECT t1.* FROM "{TAB_NAME_PSM}" t1 WHERE EXISTS( SELECT 1 FROM "{TAB_NAME_MIS}" t2 WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" ) """ return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt) # // (1) preprocess def preprocess_psm( data: pl.LazyFrame, ) -> PreProcessResult: data = data.rename(RENAMING_SCHEME_PSM) data = data.drop(DROP_COLUMNS, strict=False) REGEX_PATTERN = r"^[\s\-#+/$]+$" data = data.with_columns( pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) .then(None) .otherwise(pl.col(pl.String)) .name.keep() ) data = data.with_columns(pl.col("Konfektionaer").str.strip_chars(" \n\t")) filtered_data = pl.LazyFrame(schema=data.collect_schema()) # drop duplicates # use null count as information measure, least amount of nulls should be contained data = data.with_columns(pl.sum_horizontal(pl.all().is_null()).alias("null_count")) data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie", "null_count"], descending=False) filtered_data = pl.concat( [ filtered_data, data.filter( ~pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct() ).drop("null_count"), ] ) data = data.filter(pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct()) data = data.drop("null_count") # any NULL values in critical columns NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) data = data.filter(~pl.any_horizontal(*conds)) # implausible dates # dates not allowed to be in the future current_datetime = datetime.datetime.now() current_date = current_datetime.date() NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") conds = [ (pl.col(col) > current_datetime).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATETIME ] conds.extend( [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] ) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) data = data.filter(~pl.any_horizontal(*conds)) # too much in the future or the past # dates future_limit = current_date + datetime.timedelta( days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) ) past_limit = datetime.date(1990, 1, 1) cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( pl.col(pl.Date) < past_limit ).fill_null(False) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) # datetimes future_limit = current_datetime + datetime.timedelta( days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) ) past_limit = datetime.datetime(1990, 1, 1) cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( pl.col(pl.Datetime) < past_limit ).fill_null(False) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) return PreProcessResult(data=data, filtered=filtered_data) # // (2) process on order level def process_order_level( data: pl.LazyFrame, ) -> pl.LazyFrame: # ** renaming data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) # ** plausibility check of order quantities PLAUSI_FEATURES: list[str] = [ "Prod-EP10_Historie", "Prod-EP20_Historie", "Prod-EP30_Historie", "Prod-EP40_Historie", "Prod-EP50_Historie", ] data = data.with_columns( pl.all_horizontal( pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0) ).alias("is_empty") ) conditions = [ pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1]) for i in range(len(PLAUSI_FEATURES) - 1) ] data = data.with_columns( pl.when(pl.all_horizontal(conditions) | pl.col("is_empty")) .then(pl.lit(True)) .otherwise(pl.lit(False)) .alias("Prod-Qty_is_valid") ).with_columns( pl.when(pl.col("is_empty")) .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) .when(pl.col("Prod-Qty_is_valid")) .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) .alias("Prod-Qualitaet_Historie") ) # aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last" # aggregate "Prod-Qualitaet_Historie" and use "mean" # need additional "alias" on "Prod-Qualitaet_Historie" # ** planned or target delivery date current_date = datetime.datetime.now().date() print(f"{current_date=}") data = data.with_columns( pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias( "Liefertermin_Soll" ) ) # aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first" # first filled field for "Liefertermin Soll" is the relevant target date # should be first confirmed date, but if this field is not filled we use the first # filled import by the supplier # ** actual delivery date # logic of Wattana: set date is before current date --> becomes actual value data = data.with_columns( pl.when(pl.col("Import-Ist_Historie") < current_date) .then(pl.col("Import-Ist_Historie")) .otherwise(None) .alias("Liefertermin_Ist") ) # aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last" # keep last because that is the latest value set by the supplier # if all values are NULL then NULL is returned (no actual date available) # ** duration since last report in days data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( ( pl.col("Meldezeitpunkt_Historie") - pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS) ) .dt.total_days() .alias("Tage_zu_letzter_PSM_Historie") ) # aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt" # aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically) # need additional "alias" on "Tage_zu_letzter_PSM_Historie" data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( # Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position? ( pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS) ) .fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung .alias("Import-Ist_geaendert") ) # aggregate hint for "Import-Ist_geaendert" # aggregate "Import-Ist_geaendert" and use "last" # aggregate hint for "Import-Ist_letzter_Wert" # aggregate "Import-Ist_Historie" and use "drop_nulls" "last" # need additional "alias" on "Import-Ist_Historie" # aggregate hint for "Import-Ist_Anzahl_Aenderungen" # aggregate "Import-Ist_geaendert" and use "sum" # need additional "alias" on "Import-Ist_geaendert" # aggregate hint for "Prod-Start" # aggregate "Prod-Start_Historie" and use "drop_nulls" "first" # first entry should be treated as the truth value, changing later does not make sense # need additional "alias" on "Prod-Start_Historie" # whole aggregates see DB schema data = ( data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) .group_by(PRIM_KEYS + ["Konfektionaer", "Konfektionaer_ID"]) .agg( pl.col("Meldezeitpunkt_Historie"), pl.col("Liefertermin_Soll").drop_nulls().first(), pl.col("Bestaetigter-Import_Historie"), pl.col("Liefertermin_Ist").drop_nulls().last(), pl.col("Import-Ist_Historie"), pl.col("Import-Ist_Historie") .drop_nulls() .last() .alias("Import-Ist_letzter_Wert"), pl.col("Import-Ist_geaendert").last(), pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"), pl.col("Tage_zu_letzter_PSM_Historie"), pl.col("Tage_zu_letzter_PSM_Historie") .mean() .alias("Tage_zu_letzter_PSM_Durchschnitt"), pl.col("Prod-EP10_Historie"), pl.col("Prod-EP20_Historie"), pl.col("Prod-EP30_Historie"), pl.col("Prod-EP40_Historie"), pl.col("Prod-EP50_Historie"), pl.col("Prod-Qualitaet_Historie"), pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"), pl.col("Prod-Start_Historie"), pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"), ) ) # ** order specific aggregates data = ( data.with_columns( (pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")) .dt.total_days() .alias("Terminabweichung_Anzahl_Tage") ) .with_columns( (pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION).alias( "Terminunterschreitung" ), (pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION).alias( "Terminüberschreitung" ), (pl.col("Liefertermin_Ist") - pl.col("Prod-Start")) .dt.total_days() .alias("Durchlaufzeit_Anzahl_Tage"), ) .with_columns( pl.when(pl.col("Durchlaufzeit_Anzahl_Tage") < 0) .then(None) .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) .alias("Durchlaufzeit_Anzahl_Tage") ) ) return data # // (3) dump order level to internal database def _json_default( value: Any, ) -> str: if isinstance(value, (datetime.date, datetime.datetime)): return value.isoformat() raise TypeError def _parse_to_json( x: pl.Series | None, ) -> str | None: if x is None: return None return json.dumps(x.to_list(), default=_json_default) def dump_order_level_to_internal_database_staging( data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( pl.col(pl.List) .map_elements( _parse_to_json, return_dtype=pl.String, ) .name.keep() ) staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht_Staging", connection=db.DB_URI, engine="adbc", if_table_exists="replace", ) if rows_inserted != staging_data.height: raise RuntimeError("Number of inserted rows and length of staging data do not match.") all_columns = staging_data.columns update_columns = [col for col in all_columns if col not in PRIM_KEYS] sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns]) sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS]) sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns]) upsert_sql = f""" INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str}) SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1 ON CONFLICT({sql_pk_list_str}) DO UPDATE SET {sql_update_rules_str}; """ with db.ENGINE_INTERNAL.begin() as conn: conn.execute(sql.text(upsert_sql)) conn.execute( sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";') ) def dump_order_level_to_internal_database_wipe( data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( pl.col(pl.List) .map_elements( _parse_to_json, return_dtype=pl.String, ) .name.keep() ) # empty table with db.ENGINE_INTERNAL.begin() as conn: conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";')) staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht", connection=db.DB_URI, engine="adbc", if_table_exists="append", ) if rows_inserted != staging_data.height: raise RuntimeError("Number of inserted rows and length of staging data do not match.") # ** load order level data from internal database def load_order_level_from_internal_database() -> pl.DataFrame: data = pl.read_database_uri( 'SELECT * FROM "Produktionsauftrag-Einzelsicht"', uri=db.DB_URI, engine="adbc", schema_overrides=db.intern_prod_order_t_schema, ) list_cols_to_type: dict[str, type[pl.DataType]] = { "Meldezeitpunkt_Historie": pl.Datetime, "Bestaetigter-Import_Historie": pl.Date, "Import-Ist_Historie": pl.Date, "Tage_zu_letzter_PSM_Historie": pl.Int64, "Prod-EP10_Historie": pl.UInt64, "Prod-EP20_Historie": pl.UInt64, "Prod-EP30_Historie": pl.UInt64, "Prod-EP40_Historie": pl.UInt64, "Prod-EP50_Historie": pl.UInt64, "Prod-Qualitaet_Historie": pl.Int32, "Prod-Start_Historie": pl.Date, } list_col_parse_conds = { col: pl.col(col).str.json_decode(pl.List(list_type)) for col, list_type in list_cols_to_type.items() } return data.with_columns(**list_col_parse_conds) # // (4) post-process results def aggregate_production_orders( data: pl.LazyFrame, ) -> pl.LazyFrame: data = data.select( pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_early) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_late) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .std(ddof=1) .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), pl.col("Import-Ist_Anzahl_Aenderungen") .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), pl.col("Tage_zu_letzter_PSM_Historie") .list.explode() .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), pl.col("Durchlaufzeit_Anzahl_Tage") .mean() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), ) return data def aggregate_suppliers( data: pl.LazyFrame, ) -> pl.LazyFrame: data = data.group_by(["Konfektionaer", "Konfektionaer_ID"]).agg( ( ( ~(filter_date_deviation_early | filter_date_deviation_late) & (pl.col("Import-Ist_Anzahl_Aenderungen") == 0) ).mean() * 100 ) .round(4, mode="half_away_from_zero") .alias("QUOTE_ERSTBESTAETIGUNG"), ((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100) .round(4, mode="half_away_from_zero") .alias("PROZENT_LIEFERTREUE"), (filter_date_deviation_early.mean() * 100) .round(4, mode="half_away_from_zero") .alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"), (filter_date_deviation_late.mean() * 100) .round(4, mode="half_away_from_zero") .alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_early) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_late) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .std(ddof=1) .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), pl.col("Import-Ist_Anzahl_Aenderungen") .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), pl.col("Tage_zu_letzter_PSM_Historie") .list.explode() .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), pl.col("Durchlaufzeit_Anzahl_Tage") .mean() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), pl.col("Prod-Qualitaet_Historie") .list.explode() .mean() .round(4, mode="half_away_from_zero") .alias("MITTLERER_QUALITAETSSCORE_PSM"), ) return data # // (5) external database def oracle_prepare_KPI_aggregate( data: pl.LazyFrame, rename_schema: dict[str, str] | None = None, sort_by: str = "", sort_descending: bool = False, ) -> pl.LazyFrame: if rename_schema is not None: data = data.rename(rename_schema) cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in data.collect_schema().names()] if sort_by: data = data.sort(sort_by, descending=sort_descending) data = data.with_row_index("ID", 1) data = ( data.with_columns( pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"), ) .select( pl.col(pl.Boolean).cast(pl.Int8), pl.all().exclude(pl.Boolean), ) .select(cols_sorted) .select(pl.all().name.to_uppercase()) ) return data def oracle_generate_sql_insert( table_name: str, columns: list, ) -> SqlInsertStmts: spalten_str = ", ".join([f'"{c}"' for c in columns]) platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)]) sql_delete = f'DELETE FROM "{table_name}"' sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})' return SqlInsertStmts(delete=sql_delete, insert=sql_insert) def oracle_load_table_as_polars( conn: OracleConnection, schema: SchemaDict | None, table_name: str | None = None, stmt: SqlStatement | None = None, ) -> pl.LazyFrame: if not any((table_name, stmt)): raise ValueError("Table name or SQL statement must be provided") if all((table_name, stmt)): warnings.warn( "Table name and SQL statement provided. In this case, the statement is used." ) if not stmt: stmt = f"SELECT * FROM {table_name}" odf = conn.fetch_df_all(statement=stmt) df = cast(pl.DataFrame, pl.from_arrow(odf, schema_overrides=schema)) return df.lazy() def oracle_save_polars( conn: OracleConnection, stmts: SqlInsertStmts, data: pl.DataFrame, ) -> None: with conn.cursor() as cursor: cursor.execute(stmts.delete) cursor.executemany(stmts.insert, data) conn.commit()