# %% import datetime import importlib from pathlib import Path import external_code import polars as pl import sqlalchemy as sql from wattanalyse import db # %% importlib.reload(db) importlib.reload(external_code) # %% PROJECT_BASE = Path(__file__).parents[1] DATA_PTH = PROJECT_BASE / "data" assert DATA_PTH.exists() # %% # // load data target = DATA_PTH / "PSM_20260507.arrow" data_raw = pl.scan_ipc(target) # %% # 0. read data (from customer's database) # 1. cleanup obtained new data # ~~2. load data from internal database~~ # ~~3. integrate with with new data (whole snapshot)~~ # 2. process on order level # 3. save results to internal database # 4. post-process results # 5. write to external database # // (1) cleanup obtained new data # load data from internal database # integrate with with new data (whole snapshot) res = external_code.preprocess_psm(data_raw) data = res.data print(f"Data:\n{data.collect()}\n\n---\n\nFiltered:\n{res.filtered}") # %% # // (2) processing order level df = external_code.process_order_level(data) # ?? What is if "Konfektionär" is NULL? # If this is NULL, then the aggregates for "Konfektionär" will not work. Instead, they are # calculated for all NULL entries which might incorporate different production orders which # belong to different "Konfektionär". Thus, these values will be calculated, but should not be # considered. # %% # // (3) save results to internal database external_code.dump_order_level_to_internal_database_wipe(df) # %% # now load data from database df = external_code.load_order_level_from_internal_database() df # %% # ** aggregate production orders tmp = df.clone() # two ways to define the aggregate for date deviations: just use < 0 or use Boolean flag # defined by the user-specified boundaries USE_BOUNDARIES = False filter_date_deviation_early: pl.Expr filter_date_deviation_late: pl.Expr if USE_BOUNDARIES: filter_date_deviation_early = pl.col("Terminunterschreitung") filter_date_deviation_late = pl.col("Terminüberschreitung") else: filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 tmp = tmp.select( pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_early) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_late) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .std(ddof=1) .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), pl.col("Import-Ist_Anzahl_Aenderungen") .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), pl.col("Tage_zu_letzter_PSM_Historie") .list.explode() .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), pl.col("Durchlaufzeit_Anzahl_Tage") .mean() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), ) tmp # %% # to DB transform (mock Oracle database) cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in tmp.columns] tmp = ( tmp.with_columns( pl.lit(1).alias("ID"), pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"), ) .select( pl.col(pl.Boolean).cast(pl.Int8), pl.all().exclude(pl.Boolean), ) .select(cols_sorted) ) tmp # %% # return sql_delete, sql_insert print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}") # %% prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow" tmp.write_ipc(prepared_oracle_pth) # %% # ** aggregate supplier tmp = df.clone() USE_BOUNDARIES = False filter_date_deviation_early: pl.Expr filter_date_deviation_late: pl.Expr if USE_BOUNDARIES: filter_date_deviation_early = pl.col("Terminunterschreitung") filter_date_deviation_late = pl.col("Terminüberschreitung") else: filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 tmp = ( tmp.group_by("Konfektionär") .agg( ( ( ~(filter_date_deviation_early | filter_date_deviation_late) & (pl.col("Import-Ist_Anzahl_Aenderungen") == 0) ).mean() * 100 ) .round(4, mode="half_away_from_zero") .alias("QUOTE_ERSTBESTAETIGUNG"), ((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100) .round(4, mode="half_away_from_zero") .alias("PROZENT_LIEFERTREUE"), (filter_date_deviation_early.mean() * 100) .round(4, mode="half_away_from_zero") .alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"), (filter_date_deviation_late.mean() * 100) .round(4, mode="half_away_from_zero") .alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_early) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_late) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .std(ddof=1) .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), pl.col("Import-Ist_Anzahl_Aenderungen") .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), pl.col("Tage_zu_letzter_PSM_Historie") .list.explode() .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), pl.col("Durchlaufzeit_Anzahl_Tage") .mean() .round(mode="half_away_from_zero") .cast(pl.Int64) .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), pl.col("Prod-Qualitaet_Historie") .list.explode() .mean() .round(4, mode="half_away_from_zero") .alias("MITTLERER_QUALITAETSSCORE_PSM"), ) .sort("Konfektionär") ) tmp # %% tmp = df.clone() tmp.filter(pl.col.Konfektionär == "BS Make Ltd").filter( ~(filter_date_deviation_early | filter_date_deviation_late) ).filter(pl.col("Import-Ist_Anzahl_Aenderungen") == 0) # %% tmp.filter(pl.col.Konfektionär == "BS Make Ltd") # %% tmp.head() # %% tmp.filter(pl.col.Konfektionär == "Siluet") # %% tmp.select(pl.col.Konfektionär.str.len_chars().alias("len_char")).sort( "len_char", descending=True ) # %% # // whole pipeline # ** aggregate production orders tmp = df.clone() tmp = external_code.aggregate_production_orders(tmp.lazy()).collect() print(tmp) tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect() print(tmp) prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow" tmp.write_ipc(prepared_oracle_pth) # %% stmts = external_code.oracle_generate_sql_insert( table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=tmp.columns ) print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") # %% # ** aggregate supplier tmp = df.clone() RENAME_SCHEME = {"Konfektionär": "KONFEKTIONAER"} tmp = external_code.aggregate_suppliers(tmp.lazy()).collect() print(tmp.head()) tmp = external_code.oracle_prepare_KPI_aggregate( tmp.lazy(), rename_schema=RENAME_SCHEME, sort_by="KONFEKTIONAER", sort_descending=False, ).collect() print(tmp.head()) prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_KONFEKTIONAERE.arrow" tmp.write_ipc(prepared_oracle_pth) # %% stmts = external_code.oracle_generate_sql_insert( table_name="KPI_KONFEKTIONAERE", columns=tmp.columns ) print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") # %% tmp # %%