add aggregates for suppliers

This commit is contained in:
2026-06-08 15:41:08 +02:00
parent 6e76807298
commit af91e05d97

View File

@@ -9,6 +9,7 @@ import sqlalchemy as sql
from wattanalyse import db from wattanalyse import db
# %%
importlib.reload(db) importlib.reload(db)
importlib.reload(external_code) importlib.reload(external_code)
# %% # %%
@@ -58,6 +59,7 @@ external_code.dump_order_level_to_internal_database_wipe(df)
df = external_code.load_order_level_from_internal_database() df = external_code.load_order_level_from_internal_database()
df df
# %% # %%
# ** aggregate production orders
tmp = df.clone() tmp = df.clone()
# two ways to define the aggregate for date deviations: just use < 0 or use Boolean flag # two ways to define the aggregate for date deviations: just use < 0 or use Boolean flag
@@ -73,42 +75,203 @@ else:
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
tmp.select( tmp = tmp.select(
pl.col("Terminabweichung_Anzahl_Tage") pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_early) .filter(filter_date_deviation_early)
.mean() .mean()
.abs() .abs()
.round(mode="half_away_from_zero") .round(mode="half_away_from_zero")
.cast(pl.Int64) .cast(pl.Int64)
.alias("Mittlere_Tage_Unterschreitung"), .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage") pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_late) .filter(filter_date_deviation_late)
.mean() .mean()
.abs() .abs()
.round(mode="half_away_from_zero") .round(mode="half_away_from_zero")
.cast(pl.Int64) .cast(pl.Int64)
.alias("Mittlere_Tage_Ueberschreitung"), .alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage") pl.col("Terminabweichung_Anzahl_Tage")
.std(ddof=1) .std(ddof=1)
.alias("Standardabweichung_Lieferterminabweichung"), .alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
pl.col("Import-Ist_Anzahl_Aenderungen") pl.col("Import-Ist_Anzahl_Aenderungen")
.mean() .mean()
.abs() .abs()
.round(mode="half_away_from_zero") .round(mode="half_away_from_zero")
.cast(pl.Int64) .cast(pl.Int64)
.alias("Mittlere_Anzahl_Anpassungen_Liefertermin"), .alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
pl.col("Tage_zu_letzter_PSM_Historie") pl.col("Tage_zu_letzter_PSM_Historie")
.list.explode() .list.explode()
.mean() .mean()
.abs() .abs()
.round(mode="half_away_from_zero") .round(mode="half_away_from_zero")
.cast(pl.Int64) .cast(pl.Int64)
.alias("Mittlere_Abstaende_PSM"), .alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
pl.col("Durchlaufzeit_Anzahl_Tage") pl.col("Durchlaufzeit_Anzahl_Tage")
.mean() .mean()
.round(mode="half_away_from_zero") .round(mode="half_away_from_zero")
.cast(pl.Int64) .cast(pl.Int64)
.alias("Mittlere_Durchlaufzeit"), .alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
)
tmp
# %%
# to DB transform (mock Oracle database)
cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in tmp.columns]
tmp = (
tmp.with_columns(
pl.lit(1).alias("ID"),
pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"),
)
.select(
pl.col(pl.Boolean).cast(pl.Int8),
pl.all().exclude(pl.Boolean),
)
.select(cols_sorted)
)
tmp
# %%
# return sql_delete, sql_insert
print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}")
# %%
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow"
tmp.write_ipc(prepared_oracle_pth)
# %%
# ** aggregate supplier
tmp = df.clone()
USE_BOUNDARIES = False
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
tmp = (
tmp.group_by("Konfektionär")
.agg(
(
(
~(filter_date_deviation_early | filter_date_deviation_late)
& (pl.col("Import-Ist_Anzahl_Aenderungen") == 0)
).mean()
* 100
)
.round(4, mode="half_away_from_zero")
.alias("QUOTE_ERSTBESTAETIGUNG"),
((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("PROZENT_LIEFERTREUE"),
(filter_date_deviation_early.mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"),
(filter_date_deviation_late.mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_early)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_late)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.std(ddof=1)
.alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
pl.col("Import-Ist_Anzahl_Aenderungen")
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
pl.col("Tage_zu_letzter_PSM_Historie")
.list.explode()
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
pl.col("Durchlaufzeit_Anzahl_Tage")
.mean()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
pl.col("Prod-Qualitaet_Historie")
.list.explode()
.mean()
.round(4, mode="half_away_from_zero")
.alias("MITTLERER_QUALITAETSSCORE_PSM"),
)
.sort("Konfektionär")
)
tmp
# %%
tmp = df.clone()
tmp.filter(pl.col.Konfektionär == "BS Make Ltd").filter(
~(filter_date_deviation_early | filter_date_deviation_late)
).filter(pl.col("Import-Ist_Anzahl_Aenderungen") == 0)
# %%
tmp.filter(pl.col.Konfektionär == "BS Make Ltd")
# %%
tmp.head()
# %%
tmp.filter(pl.col.Konfektionär == "Siluet")
# %%
tmp.select(pl.col.Konfektionär.str.len_chars().alias("len_char")).sort(
"len_char", descending=True
) )
# %% # %%
# // whole pipeline
# ** aggregate production orders
tmp = df.clone()
tmp = external_code.aggregate_production_orders(tmp.lazy()).collect()
print(tmp)
tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect()
print(tmp)
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow"
tmp.write_ipc(prepared_oracle_pth)
# %%
stmts = external_code.oracle_generate_sql_insert(
table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=tmp.columns
)
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
# %%
# ** aggregate supplier
tmp = df.clone()
RENAME_SCHEME = {"Konfektionär": "KONFEKTIONAER"}
tmp = external_code.aggregate_suppliers(tmp.lazy()).collect()
print(tmp.head())
tmp = external_code.oracle_prepare_KPI_aggregate(
tmp.lazy(),
rename_schema=RENAME_SCHEME,
sort_by="KONFEKTIONAER",
sort_descending=False,
).collect()
print(tmp.head())
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_KONFEKTIONAERE.arrow"
tmp.write_ipc(prepared_oracle_pth)
# %%
stmts = external_code.oracle_generate_sql_insert(
table_name="KPI_KONFEKTIONAERE", columns=tmp.columns
)
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
# %%
tmp
# %%