import dataclasses as dc import datetime import enum import json from typing import Any, Final import polars as pl import sqlalchemy as sql from wattanalyse import db # 1. cleanup obtained new data # ~~2. load data from internal database~~ # ~~3. integrate with with new data (whole snapshot)~~ # 2. process on order level # 3. save results to internal database # 4. post-process results # 5. write to external database @dc.dataclass(slots=True, eq=False) class PreProcessResult: data: pl.LazyFrame filtered: pl.DataFrame class QualityPsm(enum.StrEnum): FEHLEND = enum.auto() UNPLAUSIBEL = enum.auto() PLAUSIBEL = enum.auto() PSM_SCORES: dict[QualityPsm, int] = { QualityPsm.FEHLEND: 1, QualityPsm.UNPLAUSIBEL: 0, QualityPsm.PLAUSIBEL: 2, } RENAMING_SCHEME: dict[str, str] = { "PA Pos": "PA_Pos", "PSM gemeldet am": "Meldezeitpunkt_Historie", "Import Ist": "Import-Ist_Historie", "1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie", "Zuschnitt am": "Prod-Start_Historie", "Teile in Zuschnitt": "Prod-EP10_Historie", "Teile im Nähband": "Prod-EP20_Historie", "Fertigware aus Nähband": "Prod-EP30_Historie", "Teile kontrolliert": "Prod-EP40_Historie", "Teile verpackt in Karton": "Prod-EP50_Historie", } PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] LOWER_BOUND_DATE_DEVIATION: Final[int] = 0 UPPER_BOUND_DATE_DEVIATION: Final[int] = 0 NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 # // (1) preprocess def preprocess_psm( data: pl.LazyFrame, ) -> PreProcessResult: data = data.rename(RENAMING_SCHEME) REGEX_PATTERN = r"^[\s\-#+/$]+$" data = data.with_columns( pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) .then(None) .otherwise(pl.col(pl.String)) .name.keep() ) data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) filtered_data = pl.LazyFrame(schema=data.collect_schema()) # drop duplicates # use null count as information measure, least amount of nulls should be contained base_columns = data.columns data = data.with_columns(pl.sum_horizontal(pl.all().is_null()).alias("null_count")) data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie", "null_count"], descending=False) filtered_data = pl.concat( [ filtered_data, data.filter( ~pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct() ).select(base_columns), ] ) data = data.filter(pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct()) data = data.drop("null_count") # any NULL values in critical columns NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) data = data.filter(~pl.any_horizontal(*conds)) # implausible dates # dates not allowed to be in the future current_datetime = datetime.datetime.now() current_date = current_datetime.date() NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") conds = [ (pl.col(col) > current_datetime).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATETIME ] conds.extend( [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] ) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) data = data.filter(~pl.any_horizontal(*conds)) # too much in the future or the past # dates future_limit = current_date + datetime.timedelta( days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) ) past_limit = datetime.date(1990, 1, 1) cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( pl.col(pl.Date) < past_limit ).fill_null(False) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) # datetimes future_limit = current_datetime + datetime.timedelta( days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) ) past_limit = datetime.datetime(1990, 1, 1) cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( pl.col(pl.Datetime) < past_limit ).fill_null(False) filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) return PreProcessResult(data=data, filtered=filtered_data.collect()) # // (2) process on order level def process_order_level( data: pl.LazyFrame, ) -> pl.LazyFrame: # ** renaming # data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) # ** plausibility check of order quantities PLAUSI_FEATURES: list[str] = [ "Prod-EP10_Historie", "Prod-EP20_Historie", "Prod-EP30_Historie", "Prod-EP40_Historie", "Prod-EP50_Historie", ] data = data.with_columns( pl.all_horizontal( pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0) ).alias("is_empty") ) conditions = [ pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1]) for i in range(len(PLAUSI_FEATURES) - 1) ] data = data.with_columns( pl.when(pl.all_horizontal(conditions) | pl.col("is_empty")) .then(pl.lit(True)) .otherwise(pl.lit(False)) .alias("Prod-Qty_is_valid") ).with_columns( pl.when(pl.col("is_empty")) .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) .when(pl.col("Prod-Qty_is_valid")) .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) .alias("Prod-Qualitaet_Historie") ) # aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last" # aggregate "Prod-Qualitaet_Historie" and use "mean" # need additional "alias" on "Prod-Qualitaet_Historie" # ** planned or target delivery date current_date = datetime.datetime.now().date() print(f"{current_date=}") data = data.with_columns( pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias( "Liefertermin_Soll" ) ) # aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first" # first filled field for "Liefertermin Soll" is the relevant target date # should be first confirmed date, but if this field is not filled we use the first # filled import by the supplier # ** actual delivery date # logic of Wattana: set date is before current date --> becomes actual value data = data.with_columns( pl.when(pl.col("Import-Ist_Historie") < current_date) .then(pl.col("Import-Ist_Historie")) .otherwise(None) .alias("Liefertermin_Ist") ) # aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last" # keep last because that is the latest value set by the supplier # if all values are NULL then NULL is returned (no actual date available) # ** duration since last report in days data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( ( pl.col("Meldezeitpunkt_Historie") - pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS) ) .dt.total_days() .alias("Tage_zu_letzter_PSM_Historie") ) # aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt" # aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically) # need additional "alias" on "Tage_zu_letzter_PSM_Historie" data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( # Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position? ( pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS) ) .fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung .alias("Import-Ist_geaendert") ) # aggregate hint for "Import-Ist_geaendert" # aggregate "Import-Ist_geaendert" and use "last" # aggregate hint for "Import-Ist_letzter_Wert" # aggregate "Import-Ist_Historie" and use "drop_nulls" "last" # need additional "alias" on "Import-Ist_Historie" # aggregate hint for "Import-Ist_Anzahl_Aenderungen" # aggregate "Import-Ist_geaendert" and use "sum" # need additional "alias" on "Import-Ist_geaendert" # aggregate hint for "Prod-Start" # aggregate "Prod-Start_Historie" and use "drop_nulls" "first" # first entry should be treated as the truth value, changing later does not make sense # need additional "alias" on "Prod-Start_Historie" # whole aggregates see DB schema data = ( data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) .group_by(PRIM_KEYS + ["Konfektionär"]) .agg( pl.col("Meldezeitpunkt_Historie"), pl.col("Liefertermin_Soll").drop_nulls().first(), pl.col("Bestaetigter-Import_Historie"), pl.col("Liefertermin_Ist").drop_nulls().last(), pl.col("Import-Ist_Historie"), pl.col("Import-Ist_Historie") .drop_nulls() .last() .alias("Import-Ist_letzter_Wert"), pl.col("Import-Ist_geaendert").last(), pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"), pl.col("Tage_zu_letzter_PSM_Historie"), pl.col("Tage_zu_letzter_PSM_Historie") .mean() .alias("Tage_zu_letzter_PSM_Durchschnitt"), pl.col("Prod-EP10_Historie"), pl.col("Prod-EP20_Historie"), pl.col("Prod-EP30_Historie"), pl.col("Prod-EP40_Historie"), pl.col("Prod-EP50_Historie"), pl.col("Prod-Qualitaet_Historie"), pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"), pl.col("Prod-Start_Historie"), pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"), ) ) # ** order specific aggregates data = ( data.with_columns( (pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")) .dt.total_days() .alias("Terminabweichung_Anzahl_Tage") ) .with_columns( (pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION).alias( "Terminunterschreitung" ), (pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION).alias( "Terminüberschreitung" ), (pl.col("Liefertermin_Ist") - pl.col("Prod-Start")) .dt.total_days() .alias("Durchlaufzeit_Anzahl_Tage"), ) .with_columns( pl.when(pl.col("Durchlaufzeit_Anzahl_Tage") < 0) .then(None) .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) .alias("Durchlaufzeit_Anzahl_Tage") ) ) # data = ( # data.with_columns( # pl.when( # (pl.col("Liefertermin_Ist").is_not_null()) # & (pl.col("Liefertermin_Soll").is_not_null()) # ) # .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) # .otherwise(None) # .alias("Terminabweichung_Anzahl_Tage") # ) # .with_columns( # pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) # .then(pl.lit(True)) # .otherwise(pl.lit(False)) # .alias("Terminunterschreitung"), # pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) # .then(pl.lit(True)) # .otherwise(pl.lit(False)) # .alias("Terminüberschreitung"), # pl.when( # (pl.col("Liefertermin_Ist").is_not_null()) # & (pl.col("Prod-Start").is_not_null()) # ) # .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) # .otherwise(None) # .alias("Durchlaufzeit_Anzahl_Tage"), # ) # .with_columns( # pl.when( # (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null()) # & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0) # ) # .then(None) # .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) # .alias("Durchlaufzeit_Anzahl_Tage") # ) # ) return data # // (3) dump order level to internal database def _json_default( value: Any, ) -> str: if isinstance(value, (datetime.date, datetime.datetime)): return value.isoformat() raise TypeError def _parse_to_json( x: pl.Series | None, ) -> str | None: if x is None: return None return json.dumps(x.to_list(), default=_json_default) def dump_order_level_to_internal_database_staging( data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( pl.col(pl.List) .map_elements( _parse_to_json, return_dtype=pl.String, ) .name.keep() ) staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht_Staging", connection=db.DB_URI, engine="adbc", if_table_exists="replace", ) if rows_inserted != staging_data.height: raise RuntimeError("Number of inserted rows and length of staging data do not match.") all_columns = staging_data.columns update_columns = [col for col in all_columns if col not in PRIM_KEYS] sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns]) sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS]) sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns]) upsert_sql = f""" INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str}) SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1 ON CONFLICT({sql_pk_list_str}) DO UPDATE SET {sql_update_rules_str}; """ with db.ENGINE_INTERNAL.begin() as conn: conn.execute(sql.text(upsert_sql)) conn.execute( sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";') ) def dump_order_level_to_internal_database_wipe( data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( pl.col(pl.List) .map_elements( _parse_to_json, return_dtype=pl.String, ) .name.keep() ) # empty table with db.ENGINE_INTERNAL.begin() as conn: conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";')) staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht", connection=db.DB_URI, engine="adbc", if_table_exists="append", ) if rows_inserted != staging_data.height: raise RuntimeError("Number of inserted rows and length of staging data do not match.") # ** load order level data from internal database def load_order_level_from_internal_database() -> pl.DataFrame: data = pl.read_database_uri( 'SELECT * FROM "Produktionsauftrag-Einzelsicht"', uri=db.DB_URI, engine="adbc", schema_overrides=db.intern_prod_order_t_schema, ) list_cols_to_type: dict[str, type[pl.DataType]] = { "Meldezeitpunkt_Historie": pl.Datetime, "Bestaetigter-Import_Historie": pl.Date, "Import-Ist_Historie": pl.Date, "Tage_zu_letzter_PSM_Historie": pl.Int64, "Prod-EP10_Historie": pl.UInt64, "Prod-EP20_Historie": pl.UInt64, "Prod-EP30_Historie": pl.UInt64, "Prod-EP40_Historie": pl.UInt64, "Prod-EP50_Historie": pl.UInt64, "Prod-Qualitaet_Historie": pl.Int32, "Prod-Start_Historie": pl.Date, } list_col_parse_conds = { col: pl.col(col).str.json_decode(pl.List(list_type)) for col, list_type in list_cols_to_type.items() } return data.with_columns(**list_col_parse_conds)