diff --git a/prototypes/01_first-look_20260603.py b/prototypes/01_first-look_20260603.py index 7b7a9c0..de4be19 100644 --- a/prototypes/01_first-look_20260603.py +++ b/prototypes/01_first-look_20260603.py @@ -1,4 +1,5 @@ # %% +import datetime import enum from pathlib import Path @@ -29,6 +30,7 @@ class QualityPsm(enum.StrEnum): PLAUSIBEL = enum.auto() +# %% schema_PSM: dict[str, type[pl.DataType]] = { "VK Auftrag": pl.UInt32, "Artikelbez.": pl.String, @@ -76,6 +78,7 @@ psm.filter(pl.col("Konfektionär").str.contains("MEMTEKS")) psm.estimated_size("mb") # %% +# // preprocessing I regex_pattern = r"^[\s\-#+/$]+$" psm = psm.with_columns( pl.when(pl.col(pl.String).str.contains(regex_pattern)) @@ -94,12 +97,12 @@ psm.head() psm.filter(pl.any_horizontal(pl.col("VK Auftrag").is_null())) # %% -psm.filter(pl.col("Wareneingang am") == "01.01.1111 00:00:00").group_by( - pl.col.Konfektionär -).agg(pl.len()) +# psm.filter(pl.col("Wareneingang am") == "01.01.1111 00:00:00").group_by( +# pl.col.Konfektionär +# ).agg(pl.len()) # %% -dupl_filter = psm.select([pl.col.PA, pl.col("PA Pos")]).is_duplicated() +psm.select([pl.col.PA, pl.col("PA Pos")]).is_duplicated().sum() # %% psm.group_by(["PA", "PA Pos"]).agg(pl.col("PA").n_unique().alias("unique")).sort( "unique", descending=True @@ -113,6 +116,9 @@ most_occurrences = ( most_occurrences # %% most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme")) +# %% +psm.columns + # %% psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort( "PSM gemeldet am", descending=False @@ -130,6 +136,7 @@ tmp = psm.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort( tmp # %% # // simulate time series +# this is a sequence how data would be provided: first one entry, and then more additional entries series: list[pl.DataFrame] = [] for i in range(tmp.height): @@ -145,6 +152,7 @@ series[1] tmp = psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort( "PSM gemeldet am", descending=False ) +tmp # %% # // plausibility check # ** production quantities @@ -162,7 +170,7 @@ plausi_features_endpoint_only = [ "Teile verpackt in Karton", ] plausi_features = plausi_features_all -plausi_features = plausi_features_endpoint_only +# plausi_features = plausi_features_endpoint_only # %% IDX = None if IDX is None: @@ -187,61 +195,306 @@ df_marked = tmp_1.with_columns( .otherwise(pl.lit(False)) .alias("Produktionsstückzahlen_valide") ) -# print(df_marked) -# %% +PSM_SCORES: dict[QualityPsm, int] = { + QualityPsm.FEHLEND: 1, + QualityPsm.UNPLAUSIBEL: 0, + QualityPsm.PLAUSIBEL: 2, +} + df_score = df_marked.with_columns( pl.when(pl.col("is_empty")) - .then(pl.lit(QualityPsm.FEHLEND)) + .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) .when(pl.col("Produktionsstückzahlen_valide")) - .then(pl.lit(QualityPsm.PLAUSIBEL)) - .otherwise(pl.lit(QualityPsm.UNPLAUSIBEL)) + .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) + .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) .alias("Qualität Produktionsfortschritt") ) print(df_score) -# df_valide = tmp_1.filter(pl.all_horizontal(conditions)) -# df_invalide = tmp_1.filter( -# ~pl.all_horizontal(conditions) -# ) # Das Tilde-Zeichen ~ bedeutet "NOT" - -# print("--- valid rows ---") -# print(df_valide) - -# print("\n--- invalid rows ---") -# print(df_invalide) - - -# %% -# 1. Testdaten erstellen (Zeile 0-2 sind valide, Zeile 3 ist dein invalides Beispiel) -df = pl.DataFrame({"EP-1": [0, 100, 100, 0], "EP-2": [0, 0, 100, 100], "EP-3": [0, 0, 0, 0]}) - -# 2. Liste der Erfassungspunkte in der richtigen (konsekutiven) Reihenfolge -ep_spalten = ["EP-1", "EP-2", "EP-3"] - -# 3. Dynamisch die Bedingungen für alle Paare erstellen -# Wir prüfen für jedes Paar: Ist der vorherige Punkt (i) >= dem nächsten Punkt (i+1)? -bedingungen = [ - pl.col(ep_spalten[i]) >= pl.col(ep_spalten[i + 1]) for i in range(len(ep_spalten) - 1) -] - -# 4. Filter anwenden -# pl.all_horizontal stellt sicher, dass die Bedingung für JEDES Paar in der Zeile stimmt -df_valide = df.filter(pl.all_horizontal(bedingungen)) -df_invalide = df.filter(~pl.all_horizontal(bedingungen)) # Das Tilde-Zeichen ~ bedeutet "NOT" - -print("--- Valide Zeilen ---") -print(df_valide) - -print("\n--- Invalide Zeilen ---") -print(df_invalide) # %% # // principle of aggregated data in Polars # map the database structure to a Polars dataframe and just insert or update the # corresponding entries of the defined database table # We use an upsert strategy, keep local copies of the data and merge them with new entries. # This ensures that we always have a clean and complete history. +# %% +tmp = series[2] + +# ** production quants plausibility or quality check +renaming_scheme: dict[str, str] = { + "PA Pos": "PA_Pos", + "PSM gemeldet am": "Meldezeitpunkt_Historie", + "Import Ist": "Import-Ist_Historie", + "1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie", + "Zuschnitt am": "Prod-Start_Historie", + "Teile in Zuschnitt": "Prod-EP10_Historie", + "Teile im Nähband": "Prod-EP20_Historie", + "Fertigware aus Nähband": "Prod-EP30_Historie", + "Teile kontrolliert": "Prod-EP40_Historie", + "Teile verpackt in Karton": "Prod-EP50_Historie", +} + +KEYS = ["PA", "PA_Pos"] + +tmp = tmp.rename(renaming_scheme) +tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False) + + +plausi_features_all = [ + "Prod-EP10_Historie", + "Prod-EP20_Historie", + "Prod-EP30_Historie", + "Prod-EP40_Historie", + "Prod-EP50_Historie", +] +PLAUSI_FEATURES = plausi_features_all + + +tmp = tmp.with_columns( + pl.all_horizontal( + pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0) + ).alias("is_empty") +) + +conditions = [ + pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1]) + for i in range(len(PLAUSI_FEATURES) - 1) +] + +tmp = tmp.with_columns( + pl.when(pl.all_horizontal(conditions) | pl.col("is_empty")) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Prod-Qty_is_valid") +).with_columns( + pl.when(pl.col("is_empty")) + .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) + .when(pl.col("Prod-Qty_is_valid")) + .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) + .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) + .alias("Prod-Qualitaet_Historie") +) +# aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last" +# aggregate "Prod-Qualitaet_Historie" and use "mean" +# need additional "alias" on "Prod-Qualitaet_Historie" + +# tmp = ( +# tmp.with_row_index("row_nr") +# .with_columns( +# pl.when(pl.col("row_nr") == 1) # Index 1 ist die zweite Zeile +# .then(None) +# .otherwise(pl.col("1.bestät. Import Konfektionär")) +# .alias("1.bestät. Import Konfektionär") +# ) +# .drop("row_nr") +# ) +# tmp +current_date = datetime.datetime.now().date() +print(f"{current_date=}") +tmp = tmp.with_columns( + pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias( + "Liefertermin_Soll" + ) +) +# aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first" +# first filled field for "Liefertermin Soll" is the relevant target date +# should be first confirmed date, but if this field is not filled we use the first +# filled import by the supplier + +# now check if set import date is before current date --> becomes actual value +tmp = tmp.with_columns( + pl.when(pl.col("Import-Ist_Historie") < current_date) + .then(pl.col("Import-Ist_Historie")) + .otherwise(None) + .alias("Liefertermin_Ist") +) +# aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last" +# keep last because that is the latest value set by the supplier +# if all values are NULL then NULL is returned (no actual date available) + +# aggregate hint for "Prod-Start" +# aggregate "Prod-Start_Historie" and use "drop_nulls" "first" +# first entry should be treated as the truth value, changing later does not make sense +# need additional "alias" on "Prod-Start_Historie" + +# duration since last report in days +tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + ( + pl.col("Meldezeitpunkt_Historie") + - pl.col("Meldezeitpunkt_Historie").shift(1).over(KEYS) + ) + .dt.total_days() + .alias("Tage_zu_letzter_PSM_Historie") +) +# aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt" +# aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically) +# need additional "alias" on "Tage_zu_letzter_PSM_Historie" + +# aggregate hint for "Import-Ist_letzter_Wert" +# aggregate "Import-Ist_Historie" and use "drop_nulls" "last" +# need additional "alias" on "Import-Ist_Historie" + +tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + # Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position? + (pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(KEYS)) + .fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung + .alias("Import-Ist_geaendert") +) +# aggregate hint for "Import-Ist_geaendert" +# aggregate "Import-Ist_geaendert" and use "last" + +# aggregate hint for "Import-Ist_Anzahl_Aenderungen" +# aggregate "Import-Ist_geaendert" and use "sum" +# need additional "alias" on "Import-Ist_geaendert" + + +# whole aggregates see DB schema +tmp = ( + tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False) + .group_by(KEYS + ["Konfektionär"]) + .agg( + pl.col("Meldezeitpunkt_Historie"), + pl.col("Liefertermin_Soll").drop_nulls().first(), + pl.col("Bestaetigter-Import_Historie"), + pl.col("Liefertermin_Ist").drop_nulls().last(), + pl.col("Import-Ist_Historie"), + pl.col("Import-Ist_Historie").drop_nulls().last().alias("Import-Ist_letzter_Wert"), + pl.col("Import-Ist_geaendert").last(), + pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"), + pl.col("Tage_zu_letzter_PSM_Historie"), + pl.col("Tage_zu_letzter_PSM_Historie") + .mean() + .alias("Tage_zu_letzter_PSM_Durchschnitt"), + pl.col("Prod-EP10_Historie"), + pl.col("Prod-EP20_Historie"), + pl.col("Prod-EP30_Historie"), + pl.col("Prod-EP40_Historie"), + pl.col("Prod-EP50_Historie"), + pl.col("Prod-Qualitaet_Historie"), + pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"), + pl.col("Prod-Start_Historie"), + pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"), + ) +) + +tmp +# %% +# ** order specific aggregates +LOWER_BOUND_DATE_DEVIATION = 0 +UPPER_BOUND_DATE_DEVIATION = 0 + +tmp = tmp.with_columns( + pl.when( + (pl.col("Liefertermin_Ist").is_not_null()) + & (pl.col("Liefertermin_Soll").is_not_null()) + ) + .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) + .otherwise(None) + .alias("Terminabweichung_Anzahl_Tage") +).with_columns( + pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Terminunterschreitung"), + pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Terminüberschreitung"), + pl.when((pl.col("Liefertermin_Ist").is_not_null()) & (pl.col("Prod-Start").is_not_null())) + .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) + .otherwise(None) + .alias("Durchlaufzeit_Anzahl_Tage"), +) +tmp +# %% +tmp_1 = tmp.select("Meldezeitpunkt_Historie") +tmp_1 = tmp_1.with_columns( + Meldezeitpunkt_datum=pl.col("Meldezeitpunkt_Historie").dt.date(), +) +tmp_1 + +# %% +tmp_1 = tmp.with_columns( + # Aktuelles Datum minus verschobenes Datum (isoliert je Auftrag) + ( + pl.col("Meldezeitpunkt_Historie") + - pl.col("Meldezeitpunkt_Historie").shift(1).over(["PA", "PA_Pos"]) + ) + .dt.total_days() # Macht aus der Zeitspanne (Duration) eine nackte Ganzzahl (Tage) + .alias("Tage_zu_letzter_PSM") +) +tmp_1 +# %% +tmp_1.with_columns( + delta=( + pl.col("Meldezeitpunkt_datum").shift( + -1, fill_value=pl.col("Meldezeitpunkt_datum").last() + ) + - pl.col("Meldezeitpunkt_datum") + ) +) + +# %% + + +######################################## +# %% +# 1. Das ist der alte Zustand aus der SQLite-DB (aufgelöst als Dataframe) +# Angenommen, das Quellsystem hatte beim letzten Mal noch die alten Daten (10:00 Uhr) +df_db = pl.DataFrame( + { + "auftrag_id": [1], + "zeitstempel": [["10:00", "11:00"]], + "EP-1": [[0, 100]], + "EP-2": [[0, 0]], + } +) +df_db +# %% +# 2. Der neue Input (Das Quellsystem hat den 10:00 Uhr Eintrag plötzlich "vergessen"!) +df_input_neu = pl.DataFrame( + { + "auftrag_id": [1, 1], + "zeitstempel": ["11:00", "12:00"], # 10:00 fehlt, 11:00 ist redundant, 12:00 ist neu + "EP-1": [100, 100], + "EP-2": [0, 100], + } +) +df_input_neu + +# %% +# --- SCHRITT 1: Die Datenbank-Listen "flach" machen --- +# Wir entfalten die alten Listen, sodass jede Zeile wieder ein einzelnes Ereignis ist +df_db_flach = df_db.explode(["zeitstempel", "EP-1", "EP-2"]) +df_db_flach +# %% +# --- SCHRITT 2: Alles in einen Topf werfen --- +# Wir kleben die alten DB-Daten und die neuen Input-Daten einfach untereinander +df_kombiniert = pl.concat([df_db_flach, df_input_neu]) +df_kombiniert + +# %% +# --- SCHRITT 3: Duplikate entfernen (Die Magie) --- +# Wir behalten nur die einzigartigen Kombinationen aus Auftrag und Zeit. +# Durch keep="last" überschreibt ein eventuell korrigierter neuer Wert den alten. +df_dedupliziert = df_kombiniert.unique(subset=["auftrag_id", "zeitstempel"], keep="last") +df_dedupliziert +# %% +# --- SCHRITT 4: Wieder zu sauberen Listen zusammenbauen --- +# Jetzt aggregieren wir die sauberen Daten wieder zu unserer Datenbank-Sicht +df_final_db = ( + df_dedupliziert.sort("zeitstempel") # Wichtig, damit die Chronologie in der Liste stimmt! + .group_by("auftrag_id") + .agg(pl.col("zeitstempel"), pl.col("EP-1"), pl.col("EP-2")) +) + +print(df_final_db) + + +################################################################################### +# %% # 1. Testdaten: Auftrag 1 ist valide, Auftrag 2 enthält dein invalides Beispiel df = pl.DataFrame( {