generated from dopt-python/py311
base routine to transform, calculate and aggregate data
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
# %%
|
# %%
|
||||||
|
import datetime
|
||||||
import enum
|
import enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -29,6 +30,7 @@ class QualityPsm(enum.StrEnum):
|
|||||||
PLAUSIBEL = enum.auto()
|
PLAUSIBEL = enum.auto()
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
schema_PSM: dict[str, type[pl.DataType]] = {
|
schema_PSM: dict[str, type[pl.DataType]] = {
|
||||||
"VK Auftrag": pl.UInt32,
|
"VK Auftrag": pl.UInt32,
|
||||||
"Artikelbez.": pl.String,
|
"Artikelbez.": pl.String,
|
||||||
@@ -76,6 +78,7 @@ psm.filter(pl.col("Konfektionär").str.contains("MEMTEKS"))
|
|||||||
psm.estimated_size("mb")
|
psm.estimated_size("mb")
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
|
# // preprocessing I
|
||||||
regex_pattern = r"^[\s\-#+/$]+$"
|
regex_pattern = r"^[\s\-#+/$]+$"
|
||||||
psm = psm.with_columns(
|
psm = psm.with_columns(
|
||||||
pl.when(pl.col(pl.String).str.contains(regex_pattern))
|
pl.when(pl.col(pl.String).str.contains(regex_pattern))
|
||||||
@@ -94,12 +97,12 @@ psm.head()
|
|||||||
psm.filter(pl.any_horizontal(pl.col("VK Auftrag").is_null()))
|
psm.filter(pl.any_horizontal(pl.col("VK Auftrag").is_null()))
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
psm.filter(pl.col("Wareneingang am") == "01.01.1111 00:00:00").group_by(
|
# psm.filter(pl.col("Wareneingang am") == "01.01.1111 00:00:00").group_by(
|
||||||
pl.col.Konfektionär
|
# pl.col.Konfektionär
|
||||||
).agg(pl.len())
|
# ).agg(pl.len())
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
dupl_filter = psm.select([pl.col.PA, pl.col("PA Pos")]).is_duplicated()
|
psm.select([pl.col.PA, pl.col("PA Pos")]).is_duplicated().sum()
|
||||||
# %%
|
# %%
|
||||||
psm.group_by(["PA", "PA Pos"]).agg(pl.col("PA").n_unique().alias("unique")).sort(
|
psm.group_by(["PA", "PA Pos"]).agg(pl.col("PA").n_unique().alias("unique")).sort(
|
||||||
"unique", descending=True
|
"unique", descending=True
|
||||||
@@ -113,6 +116,9 @@ most_occurrences = (
|
|||||||
most_occurrences
|
most_occurrences
|
||||||
# %%
|
# %%
|
||||||
most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme"))
|
most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme"))
|
||||||
|
# %%
|
||||||
|
psm.columns
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort(
|
psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort(
|
||||||
"PSM gemeldet am", descending=False
|
"PSM gemeldet am", descending=False
|
||||||
@@ -130,6 +136,7 @@ tmp = psm.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort(
|
|||||||
tmp
|
tmp
|
||||||
# %%
|
# %%
|
||||||
# // simulate time series
|
# // simulate time series
|
||||||
|
# this is a sequence how data would be provided: first one entry, and then more additional entries
|
||||||
series: list[pl.DataFrame] = []
|
series: list[pl.DataFrame] = []
|
||||||
|
|
||||||
for i in range(tmp.height):
|
for i in range(tmp.height):
|
||||||
@@ -145,6 +152,7 @@ series[1]
|
|||||||
tmp = psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort(
|
tmp = psm.filter((pl.col.PA == 16003) & (pl.col("PA Pos") == 10)).sort(
|
||||||
"PSM gemeldet am", descending=False
|
"PSM gemeldet am", descending=False
|
||||||
)
|
)
|
||||||
|
tmp
|
||||||
# %%
|
# %%
|
||||||
# // plausibility check
|
# // plausibility check
|
||||||
# ** production quantities
|
# ** production quantities
|
||||||
@@ -162,7 +170,7 @@ plausi_features_endpoint_only = [
|
|||||||
"Teile verpackt in Karton",
|
"Teile verpackt in Karton",
|
||||||
]
|
]
|
||||||
plausi_features = plausi_features_all
|
plausi_features = plausi_features_all
|
||||||
plausi_features = plausi_features_endpoint_only
|
# plausi_features = plausi_features_endpoint_only
|
||||||
# %%
|
# %%
|
||||||
IDX = None
|
IDX = None
|
||||||
if IDX is None:
|
if IDX is None:
|
||||||
@@ -187,61 +195,306 @@ df_marked = tmp_1.with_columns(
|
|||||||
.otherwise(pl.lit(False))
|
.otherwise(pl.lit(False))
|
||||||
.alias("Produktionsstückzahlen_valide")
|
.alias("Produktionsstückzahlen_valide")
|
||||||
)
|
)
|
||||||
# print(df_marked)
|
|
||||||
|
|
||||||
# %%
|
PSM_SCORES: dict[QualityPsm, int] = {
|
||||||
|
QualityPsm.FEHLEND: 1,
|
||||||
|
QualityPsm.UNPLAUSIBEL: 0,
|
||||||
|
QualityPsm.PLAUSIBEL: 2,
|
||||||
|
}
|
||||||
|
|
||||||
df_score = df_marked.with_columns(
|
df_score = df_marked.with_columns(
|
||||||
pl.when(pl.col("is_empty"))
|
pl.when(pl.col("is_empty"))
|
||||||
.then(pl.lit(QualityPsm.FEHLEND))
|
.then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND]))
|
||||||
.when(pl.col("Produktionsstückzahlen_valide"))
|
.when(pl.col("Produktionsstückzahlen_valide"))
|
||||||
.then(pl.lit(QualityPsm.PLAUSIBEL))
|
.then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL]))
|
||||||
.otherwise(pl.lit(QualityPsm.UNPLAUSIBEL))
|
.otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL]))
|
||||||
.alias("Qualität Produktionsfortschritt")
|
.alias("Qualität Produktionsfortschritt")
|
||||||
)
|
)
|
||||||
print(df_score)
|
print(df_score)
|
||||||
|
|
||||||
# df_valide = tmp_1.filter(pl.all_horizontal(conditions))
|
|
||||||
# df_invalide = tmp_1.filter(
|
|
||||||
# ~pl.all_horizontal(conditions)
|
|
||||||
# ) # Das Tilde-Zeichen ~ bedeutet "NOT"
|
|
||||||
|
|
||||||
# print("--- valid rows ---")
|
|
||||||
# print(df_valide)
|
|
||||||
|
|
||||||
# print("\n--- invalid rows ---")
|
|
||||||
# print(df_invalide)
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
# 1. Testdaten erstellen (Zeile 0-2 sind valide, Zeile 3 ist dein invalides Beispiel)
|
|
||||||
df = pl.DataFrame({"EP-1": [0, 100, 100, 0], "EP-2": [0, 0, 100, 100], "EP-3": [0, 0, 0, 0]})
|
|
||||||
|
|
||||||
# 2. Liste der Erfassungspunkte in der richtigen (konsekutiven) Reihenfolge
|
|
||||||
ep_spalten = ["EP-1", "EP-2", "EP-3"]
|
|
||||||
|
|
||||||
# 3. Dynamisch die Bedingungen für alle Paare erstellen
|
|
||||||
# Wir prüfen für jedes Paar: Ist der vorherige Punkt (i) >= dem nächsten Punkt (i+1)?
|
|
||||||
bedingungen = [
|
|
||||||
pl.col(ep_spalten[i]) >= pl.col(ep_spalten[i + 1]) for i in range(len(ep_spalten) - 1)
|
|
||||||
]
|
|
||||||
|
|
||||||
# 4. Filter anwenden
|
|
||||||
# pl.all_horizontal stellt sicher, dass die Bedingung für JEDES Paar in der Zeile stimmt
|
|
||||||
df_valide = df.filter(pl.all_horizontal(bedingungen))
|
|
||||||
df_invalide = df.filter(~pl.all_horizontal(bedingungen)) # Das Tilde-Zeichen ~ bedeutet "NOT"
|
|
||||||
|
|
||||||
print("--- Valide Zeilen ---")
|
|
||||||
print(df_valide)
|
|
||||||
|
|
||||||
print("\n--- Invalide Zeilen ---")
|
|
||||||
print(df_invalide)
|
|
||||||
# %%
|
# %%
|
||||||
# // principle of aggregated data in Polars
|
# // principle of aggregated data in Polars
|
||||||
# map the database structure to a Polars dataframe and just insert or update the
|
# map the database structure to a Polars dataframe and just insert or update the
|
||||||
# corresponding entries of the defined database table
|
# corresponding entries of the defined database table
|
||||||
# We use an upsert strategy, keep local copies of the data and merge them with new entries.
|
# We use an upsert strategy, keep local copies of the data and merge them with new entries.
|
||||||
# This ensures that we always have a clean and complete history.
|
# This ensures that we always have a clean and complete history.
|
||||||
|
# %%
|
||||||
|
tmp = series[2]
|
||||||
|
|
||||||
|
|
||||||
|
# ** production quants plausibility or quality check
|
||||||
|
renaming_scheme: dict[str, str] = {
|
||||||
|
"PA Pos": "PA_Pos",
|
||||||
|
"PSM gemeldet am": "Meldezeitpunkt_Historie",
|
||||||
|
"Import Ist": "Import-Ist_Historie",
|
||||||
|
"1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie",
|
||||||
|
"Zuschnitt am": "Prod-Start_Historie",
|
||||||
|
"Teile in Zuschnitt": "Prod-EP10_Historie",
|
||||||
|
"Teile im Nähband": "Prod-EP20_Historie",
|
||||||
|
"Fertigware aus Nähband": "Prod-EP30_Historie",
|
||||||
|
"Teile kontrolliert": "Prod-EP40_Historie",
|
||||||
|
"Teile verpackt in Karton": "Prod-EP50_Historie",
|
||||||
|
}
|
||||||
|
|
||||||
|
KEYS = ["PA", "PA_Pos"]
|
||||||
|
|
||||||
|
tmp = tmp.rename(renaming_scheme)
|
||||||
|
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||||
|
|
||||||
|
|
||||||
|
plausi_features_all = [
|
||||||
|
"Prod-EP10_Historie",
|
||||||
|
"Prod-EP20_Historie",
|
||||||
|
"Prod-EP30_Historie",
|
||||||
|
"Prod-EP40_Historie",
|
||||||
|
"Prod-EP50_Historie",
|
||||||
|
]
|
||||||
|
PLAUSI_FEATURES = plausi_features_all
|
||||||
|
|
||||||
|
|
||||||
|
tmp = tmp.with_columns(
|
||||||
|
pl.all_horizontal(
|
||||||
|
pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0)
|
||||||
|
).alias("is_empty")
|
||||||
|
)
|
||||||
|
|
||||||
|
conditions = [
|
||||||
|
pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1])
|
||||||
|
for i in range(len(PLAUSI_FEATURES) - 1)
|
||||||
|
]
|
||||||
|
|
||||||
|
tmp = tmp.with_columns(
|
||||||
|
pl.when(pl.all_horizontal(conditions) | pl.col("is_empty"))
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Prod-Qty_is_valid")
|
||||||
|
).with_columns(
|
||||||
|
pl.when(pl.col("is_empty"))
|
||||||
|
.then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND]))
|
||||||
|
.when(pl.col("Prod-Qty_is_valid"))
|
||||||
|
.then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL]))
|
||||||
|
.otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL]))
|
||||||
|
.alias("Prod-Qualitaet_Historie")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last"
|
||||||
|
# aggregate "Prod-Qualitaet_Historie" and use "mean"
|
||||||
|
# need additional "alias" on "Prod-Qualitaet_Historie"
|
||||||
|
|
||||||
|
# tmp = (
|
||||||
|
# tmp.with_row_index("row_nr")
|
||||||
|
# .with_columns(
|
||||||
|
# pl.when(pl.col("row_nr") == 1) # Index 1 ist die zweite Zeile
|
||||||
|
# .then(None)
|
||||||
|
# .otherwise(pl.col("1.bestät. Import Konfektionär"))
|
||||||
|
# .alias("1.bestät. Import Konfektionär")
|
||||||
|
# )
|
||||||
|
# .drop("row_nr")
|
||||||
|
# )
|
||||||
|
# tmp
|
||||||
|
current_date = datetime.datetime.now().date()
|
||||||
|
print(f"{current_date=}")
|
||||||
|
tmp = tmp.with_columns(
|
||||||
|
pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias(
|
||||||
|
"Liefertermin_Soll"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first"
|
||||||
|
# first filled field for "Liefertermin Soll" is the relevant target date
|
||||||
|
# should be first confirmed date, but if this field is not filled we use the first
|
||||||
|
# filled import by the supplier
|
||||||
|
|
||||||
|
# now check if set import date is before current date --> becomes actual value
|
||||||
|
tmp = tmp.with_columns(
|
||||||
|
pl.when(pl.col("Import-Ist_Historie") < current_date)
|
||||||
|
.then(pl.col("Import-Ist_Historie"))
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Liefertermin_Ist")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last"
|
||||||
|
# keep last because that is the latest value set by the supplier
|
||||||
|
# if all values are NULL then NULL is returned (no actual date available)
|
||||||
|
|
||||||
|
# aggregate hint for "Prod-Start"
|
||||||
|
# aggregate "Prod-Start_Historie" and use "drop_nulls" "first"
|
||||||
|
# first entry should be treated as the truth value, changing later does not make sense
|
||||||
|
# need additional "alias" on "Prod-Start_Historie"
|
||||||
|
|
||||||
|
# duration since last report in days
|
||||||
|
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||||
|
(
|
||||||
|
pl.col("Meldezeitpunkt_Historie")
|
||||||
|
- pl.col("Meldezeitpunkt_Historie").shift(1).over(KEYS)
|
||||||
|
)
|
||||||
|
.dt.total_days()
|
||||||
|
.alias("Tage_zu_letzter_PSM_Historie")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt"
|
||||||
|
# aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically)
|
||||||
|
# need additional "alias" on "Tage_zu_letzter_PSM_Historie"
|
||||||
|
|
||||||
|
# aggregate hint for "Import-Ist_letzter_Wert"
|
||||||
|
# aggregate "Import-Ist_Historie" and use "drop_nulls" "last"
|
||||||
|
# need additional "alias" on "Import-Ist_Historie"
|
||||||
|
|
||||||
|
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||||
|
# Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position?
|
||||||
|
(pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(KEYS))
|
||||||
|
.fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung
|
||||||
|
.alias("Import-Ist_geaendert")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Import-Ist_geaendert"
|
||||||
|
# aggregate "Import-Ist_geaendert" and use "last"
|
||||||
|
|
||||||
|
# aggregate hint for "Import-Ist_Anzahl_Aenderungen"
|
||||||
|
# aggregate "Import-Ist_geaendert" and use "sum"
|
||||||
|
# need additional "alias" on "Import-Ist_geaendert"
|
||||||
|
|
||||||
|
|
||||||
|
# whole aggregates see DB schema
|
||||||
|
tmp = (
|
||||||
|
tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||||
|
.group_by(KEYS + ["Konfektionär"])
|
||||||
|
.agg(
|
||||||
|
pl.col("Meldezeitpunkt_Historie"),
|
||||||
|
pl.col("Liefertermin_Soll").drop_nulls().first(),
|
||||||
|
pl.col("Bestaetigter-Import_Historie"),
|
||||||
|
pl.col("Liefertermin_Ist").drop_nulls().last(),
|
||||||
|
pl.col("Import-Ist_Historie"),
|
||||||
|
pl.col("Import-Ist_Historie").drop_nulls().last().alias("Import-Ist_letzter_Wert"),
|
||||||
|
pl.col("Import-Ist_geaendert").last(),
|
||||||
|
pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"),
|
||||||
|
pl.col("Tage_zu_letzter_PSM_Historie"),
|
||||||
|
pl.col("Tage_zu_letzter_PSM_Historie")
|
||||||
|
.mean()
|
||||||
|
.alias("Tage_zu_letzter_PSM_Durchschnitt"),
|
||||||
|
pl.col("Prod-EP10_Historie"),
|
||||||
|
pl.col("Prod-EP20_Historie"),
|
||||||
|
pl.col("Prod-EP30_Historie"),
|
||||||
|
pl.col("Prod-EP40_Historie"),
|
||||||
|
pl.col("Prod-EP50_Historie"),
|
||||||
|
pl.col("Prod-Qualitaet_Historie"),
|
||||||
|
pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"),
|
||||||
|
pl.col("Prod-Start_Historie"),
|
||||||
|
pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
tmp
|
||||||
|
# %%
|
||||||
|
# ** order specific aggregates
|
||||||
|
LOWER_BOUND_DATE_DEVIATION = 0
|
||||||
|
UPPER_BOUND_DATE_DEVIATION = 0
|
||||||
|
|
||||||
|
tmp = tmp.with_columns(
|
||||||
|
pl.when(
|
||||||
|
(pl.col("Liefertermin_Ist").is_not_null())
|
||||||
|
& (pl.col("Liefertermin_Soll").is_not_null())
|
||||||
|
)
|
||||||
|
.then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days())
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Terminabweichung_Anzahl_Tage")
|
||||||
|
).with_columns(
|
||||||
|
pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION)
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Terminunterschreitung"),
|
||||||
|
pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION)
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Terminüberschreitung"),
|
||||||
|
pl.when((pl.col("Liefertermin_Ist").is_not_null()) & (pl.col("Prod-Start").is_not_null()))
|
||||||
|
.then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days())
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Durchlaufzeit_Anzahl_Tage"),
|
||||||
|
)
|
||||||
|
tmp
|
||||||
|
# %%
|
||||||
|
tmp_1 = tmp.select("Meldezeitpunkt_Historie")
|
||||||
|
tmp_1 = tmp_1.with_columns(
|
||||||
|
Meldezeitpunkt_datum=pl.col("Meldezeitpunkt_Historie").dt.date(),
|
||||||
|
)
|
||||||
|
tmp_1
|
||||||
|
|
||||||
|
# %%
|
||||||
|
tmp_1 = tmp.with_columns(
|
||||||
|
# Aktuelles Datum minus verschobenes Datum (isoliert je Auftrag)
|
||||||
|
(
|
||||||
|
pl.col("Meldezeitpunkt_Historie")
|
||||||
|
- pl.col("Meldezeitpunkt_Historie").shift(1).over(["PA", "PA_Pos"])
|
||||||
|
)
|
||||||
|
.dt.total_days() # Macht aus der Zeitspanne (Duration) eine nackte Ganzzahl (Tage)
|
||||||
|
.alias("Tage_zu_letzter_PSM")
|
||||||
|
)
|
||||||
|
tmp_1
|
||||||
|
# %%
|
||||||
|
tmp_1.with_columns(
|
||||||
|
delta=(
|
||||||
|
pl.col("Meldezeitpunkt_datum").shift(
|
||||||
|
-1, fill_value=pl.col("Meldezeitpunkt_datum").last()
|
||||||
|
)
|
||||||
|
- pl.col("Meldezeitpunkt_datum")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
########################################
|
||||||
|
# %%
|
||||||
|
# 1. Das ist der alte Zustand aus der SQLite-DB (aufgelöst als Dataframe)
|
||||||
|
# Angenommen, das Quellsystem hatte beim letzten Mal noch die alten Daten (10:00 Uhr)
|
||||||
|
df_db = pl.DataFrame(
|
||||||
|
{
|
||||||
|
"auftrag_id": [1],
|
||||||
|
"zeitstempel": [["10:00", "11:00"]],
|
||||||
|
"EP-1": [[0, 100]],
|
||||||
|
"EP-2": [[0, 0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
df_db
|
||||||
|
# %%
|
||||||
|
# 2. Der neue Input (Das Quellsystem hat den 10:00 Uhr Eintrag plötzlich "vergessen"!)
|
||||||
|
df_input_neu = pl.DataFrame(
|
||||||
|
{
|
||||||
|
"auftrag_id": [1, 1],
|
||||||
|
"zeitstempel": ["11:00", "12:00"], # 10:00 fehlt, 11:00 ist redundant, 12:00 ist neu
|
||||||
|
"EP-1": [100, 100],
|
||||||
|
"EP-2": [0, 100],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
df_input_neu
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# --- SCHRITT 1: Die Datenbank-Listen "flach" machen ---
|
||||||
|
# Wir entfalten die alten Listen, sodass jede Zeile wieder ein einzelnes Ereignis ist
|
||||||
|
df_db_flach = df_db.explode(["zeitstempel", "EP-1", "EP-2"])
|
||||||
|
df_db_flach
|
||||||
|
# %%
|
||||||
|
# --- SCHRITT 2: Alles in einen Topf werfen ---
|
||||||
|
# Wir kleben die alten DB-Daten und die neuen Input-Daten einfach untereinander
|
||||||
|
df_kombiniert = pl.concat([df_db_flach, df_input_neu])
|
||||||
|
df_kombiniert
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# --- SCHRITT 3: Duplikate entfernen (Die Magie) ---
|
||||||
|
# Wir behalten nur die einzigartigen Kombinationen aus Auftrag und Zeit.
|
||||||
|
# Durch keep="last" überschreibt ein eventuell korrigierter neuer Wert den alten.
|
||||||
|
df_dedupliziert = df_kombiniert.unique(subset=["auftrag_id", "zeitstempel"], keep="last")
|
||||||
|
df_dedupliziert
|
||||||
|
# %%
|
||||||
|
# --- SCHRITT 4: Wieder zu sauberen Listen zusammenbauen ---
|
||||||
|
# Jetzt aggregieren wir die sauberen Daten wieder zu unserer Datenbank-Sicht
|
||||||
|
df_final_db = (
|
||||||
|
df_dedupliziert.sort("zeitstempel") # Wichtig, damit die Chronologie in der Liste stimmt!
|
||||||
|
.group_by("auftrag_id")
|
||||||
|
.agg(pl.col("zeitstempel"), pl.col("EP-1"), pl.col("EP-2"))
|
||||||
|
)
|
||||||
|
|
||||||
|
print(df_final_db)
|
||||||
|
|
||||||
|
|
||||||
|
###################################################################################
|
||||||
|
# %%
|
||||||
# 1. Testdaten: Auftrag 1 ist valide, Auftrag 2 enthält dein invalides Beispiel
|
# 1. Testdaten: Auftrag 1 ist valide, Auftrag 2 enthält dein invalides Beispiel
|
||||||
df = pl.DataFrame(
|
df = pl.DataFrame(
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user