diff --git a/prototypes/02-2_aggregates.py b/prototypes/02-2_aggregates.py index d419ef5..dccc4d3 100644 --- a/prototypes/02-2_aggregates.py +++ b/prototypes/02-2_aggregates.py @@ -9,6 +9,7 @@ import sqlalchemy as sql from wattanalyse import db +# %% importlib.reload(db) importlib.reload(external_code) # %% @@ -58,6 +59,7 @@ external_code.dump_order_level_to_internal_database_wipe(df) df = external_code.load_order_level_from_internal_database() df # %% +# ** aggregate production orders tmp = df.clone() # two ways to define the aggregate for date deviations: just use < 0 or use Boolean flag @@ -73,42 +75,203 @@ else: filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 -tmp.select( +tmp = tmp.select( pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_early) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) - .alias("Mittlere_Tage_Unterschreitung"), + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .filter(filter_date_deviation_late) .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) - .alias("Mittlere_Tage_Ueberschreitung"), + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), pl.col("Terminabweichung_Anzahl_Tage") .std(ddof=1) - .alias("Standardabweichung_Lieferterminabweichung"), + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), pl.col("Import-Ist_Anzahl_Aenderungen") .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) - .alias("Mittlere_Anzahl_Anpassungen_Liefertermin"), + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), pl.col("Tage_zu_letzter_PSM_Historie") .list.explode() .mean() .abs() .round(mode="half_away_from_zero") .cast(pl.Int64) - .alias("Mittlere_Abstaende_PSM"), + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), pl.col("Durchlaufzeit_Anzahl_Tage") .mean() .round(mode="half_away_from_zero") .cast(pl.Int64) - .alias("Mittlere_Durchlaufzeit"), + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), +) +tmp +# %% +# to DB transform (mock Oracle database) +cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in tmp.columns] +tmp = ( + tmp.with_columns( + pl.lit(1).alias("ID"), + pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"), + ) + .select( + pl.col(pl.Boolean).cast(pl.Int8), + pl.all().exclude(pl.Boolean), + ) + .select(cols_sorted) +) +tmp + +# %% + +# return sql_delete, sql_insert + +print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}") +# %% +prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow" +tmp.write_ipc(prepared_oracle_pth) +# %% +# ** aggregate supplier +tmp = df.clone() + +USE_BOUNDARIES = False +filter_date_deviation_early: pl.Expr +filter_date_deviation_late: pl.Expr +if USE_BOUNDARIES: + filter_date_deviation_early = pl.col("Terminunterschreitung") + filter_date_deviation_late = pl.col("Terminüberschreitung") +else: + filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0 + filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 + +tmp = ( + tmp.group_by("Konfektionär") + .agg( + ( + ( + ~(filter_date_deviation_early | filter_date_deviation_late) + & (pl.col("Import-Ist_Anzahl_Aenderungen") == 0) + ).mean() + * 100 + ) + .round(4, mode="half_away_from_zero") + .alias("QUOTE_ERSTBESTAETIGUNG"), + ((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("PROZENT_LIEFERTREUE"), + (filter_date_deviation_early.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"), + (filter_date_deviation_late.mean() * 100) + .round(4, mode="half_away_from_zero") + .alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_early) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .filter(filter_date_deviation_late) + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"), + pl.col("Terminabweichung_Anzahl_Tage") + .std(ddof=1) + .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"), + pl.col("Import-Ist_Anzahl_Aenderungen") + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"), + pl.col("Tage_zu_letzter_PSM_Historie") + .list.explode() + .mean() + .abs() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"), + pl.col("Durchlaufzeit_Anzahl_Tage") + .mean() + .round(mode="half_away_from_zero") + .cast(pl.Int64) + .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"), + pl.col("Prod-Qualitaet_Historie") + .list.explode() + .mean() + .round(4, mode="half_away_from_zero") + .alias("MITTLERER_QUALITAETSSCORE_PSM"), + ) + .sort("Konfektionär") +) +tmp + +# %% +tmp = df.clone() +tmp.filter(pl.col.Konfektionär == "BS Make Ltd").filter( + ~(filter_date_deviation_early | filter_date_deviation_late) +).filter(pl.col("Import-Ist_Anzahl_Aenderungen") == 0) +# %% +tmp.filter(pl.col.Konfektionär == "BS Make Ltd") +# %% +tmp.head() +# %% +tmp.filter(pl.col.Konfektionär == "Siluet") + +# %% +tmp.select(pl.col.Konfektionär.str.len_chars().alias("len_char")).sort( + "len_char", descending=True ) # %% +# // whole pipeline +# ** aggregate production orders +tmp = df.clone() + +tmp = external_code.aggregate_production_orders(tmp.lazy()).collect() +print(tmp) +tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect() +print(tmp) +prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow" +tmp.write_ipc(prepared_oracle_pth) + +# %% +stmts = external_code.oracle_generate_sql_insert( + table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=tmp.columns +) +print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") + +# %% +# ** aggregate supplier +tmp = df.clone() +RENAME_SCHEME = {"Konfektionär": "KONFEKTIONAER"} +tmp = external_code.aggregate_suppliers(tmp.lazy()).collect() +print(tmp.head()) +tmp = external_code.oracle_prepare_KPI_aggregate( + tmp.lazy(), + rename_schema=RENAME_SCHEME, + sort_by="KONFEKTIONAER", + sort_descending=False, +).collect() +print(tmp.head()) +prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_KONFEKTIONAERE.arrow" +tmp.write_ipc(prepared_oracle_pth) +# %% +stmts = external_code.oracle_generate_sql_insert( + table_name="KPI_KONFEKTIONAERE", columns=tmp.columns +) +print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}") +# %% +tmp +# %%