diff --git a/prototypes/02_integrate_wokflow.py b/prototypes/02_integrate_wokflow.py index bc2892c..038e812 100644 --- a/prototypes/02_integrate_wokflow.py +++ b/prototypes/02_integrate_wokflow.py @@ -11,7 +11,6 @@ from wattanalyse import db importlib.reload(db) importlib.reload(external_code) - # %% PROJECT_BASE = Path(__file__).parents[1] DATA_PTH = PROJECT_BASE / "data" @@ -20,121 +19,44 @@ assert DATA_PTH.exists() # %% # // load data target = DATA_PTH / "PSM_20260507.arrow" -data_raw = pl.read_ipc(target) - - +data_raw = pl.scan_ipc(target) # %% # // preprocessing I -res = external_code.preprocess_psm(data_raw) +# res = external_code.preprocess_psm(data_raw) +# data = res.data -# %% -res.filtered -# %% -data = data_raw.rename(external_code.RENAMING_SCHEME) -REGEX_PATTERN = r"^[\s\-#+/$]+$" -data = data.with_columns( - pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) - .then(None) - .otherwise(pl.col(pl.String)) - .name.keep() -) -data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) -print(f"Size of dataset before cleansing: {data.height}") -filtered_data = pl.DataFrame(schema=data.schema) -# %% -# data.filter(pl.col.Meldezeitpunkt_Historie.is_null()) -# %% -# any NULL values in critical columns -NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") -conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] -filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) -data = data.filter(~pl.any_horizontal(*conds)) - -# implausible dates -# dates not allowed to be in the future -current_datetime = datetime.datetime.now() -current_date = current_datetime.date() -NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) -NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") -conds = [ - (pl.col(col) > current_datetime).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATETIME -] - -conds.extend( - [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] -) -filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) -data = data.filter(~pl.any_horizontal(*conds)) - -# too much in the future or the past -NUMBER_YEARS_UPPER_BOUND_DATES = 4 -# dates -future_limit = current_date + datetime.timedelta(days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES)) -past_limit = datetime.date(1990, 1, 1) -cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( - pl.col(pl.Date) < past_limit -).fill_null(False) -filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) -data = data.filter(~pl.any_horizontal(cond)) -# datetime -future_limit = current_datetime + datetime.timedelta( - days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) -) -past_limit = datetime.datetime(1990, 1, 1) -cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( - pl.col(pl.Datetime) < past_limit -).fill_null(False) -filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) -data = data.filter(~pl.any_horizontal(cond)) - -print(f"Size of dataset after cleansing: {data.height}") -print(f"Filtered data: {filtered_data}") -# %% -test = pl.DataFrame( - { - "t1": [0, 1, 3], - "t2": [1, None, 3], - "t3": [3, 8, None], - } -) -test - - -# %% -columns = ["t1", "t2", "t3"] -conds = [pl.col(col).is_null() for col in columns] -test.filter(pl.any_horizontal(*conds)) # %% most_occurrences = ( - data.group_by(["PA", "PA Pos", "Konfektionär"]) + data.group_by(["PA", "PA_Pos", "Konfektionär"]) .agg(pl.len().alias("count")) .sort("count", descending=True) ) -most_occurrences -# %% -most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme")) +print(most_occurrences.collect()) +most_occurrences.filter( + ~pl.col("Konfektionär").str.contains("May Tekstil Camcesme") +).collect() # %% # data = data.filter( # ((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)) # | ((pl.col.PA == 16856) & (pl.col("PA Pos") == 10)) # ).sort("PSM gemeldet am", descending=False) -data = data.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort( - "PSM gemeldet am", descending=False -) -data.select(pl.col.PA.unique()) +# data = data.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort( +# "PSM gemeldet am", descending=False +# ) +# data.select(pl.col.PA.unique()) # %% # // simulate time series # this is a sequence how data would be provided: first one entry, and then more additional entries -series: list[pl.DataFrame] = [] +# series: list[pl.DataFrame] = [] -for i in range(data.height): - series.append(data[: (i + 1)]) +# for i in range(data.height): +# series.append(data[: (i + 1)]) -assert len(series) == data.height +# assert len(series) == data.height -for idx, entry in enumerate(series, start=1): - assert idx == entry.height +# for idx, entry in enumerate(series, start=1): +# assert idx == entry.height # %% # 1. cleanup obtained new data @@ -148,14 +70,17 @@ for idx, entry in enumerate(series, start=1): # // (1) cleanup obtained new data # load data from internal database # integrate with with new data (whole snapshot) +res = external_code.preprocess_psm(data_raw) +data = res.data +print(f"Data:\n{data.collect()}\n\n---\n\nFiltered:\n{res.filtered}") -# // (2) processing order level -tmp = series[3] -tmp # %% -df = external_code.process_order_level(tmp) -df +# // (2) processing order level +df = external_code.process_order_level(data) + + +# TODO What is if "Konfektionär" is NULL? # %% # // (3) save results to internal database @@ -165,4 +90,75 @@ external_code.dump_order_level_to_internal_database_wipe(df) df = external_code.load_order_level_from_internal_database() df +############################################# +# %% +# handle "Liefertermin_Soll" nulls +df.filter(pl.col("Liefertermin_Soll").is_null()).collect() +# %% +df.head().collect() + +# %% +data_raw.filter(pl.col.PA == 18759) + +# %% +data_raw.filter(pl.col.PA == 16626).collect() +# %% +data_raw.filter(pl.all().is_duplicated()) +# %% +test = data_raw.collect() + +# %% +all_cols = test.columns +test = test.with_row_index("tmp_idx") + +# %% +all_uni = test.unique(subset=all_cols, keep="first") +# %% +sub_uni = test.unique(subset=["PA", "PA Pos", "PSM gemeldet am"], keep="first") +# %% +all_uni.join(sub_uni, on="tmp_idx", how="anti") +# %% +all_uni.height +# %% +sub_uni.height +# %% +tmp = test.filter(pl.col.PA == 17055).sort("PSM gemeldet am") +# %% +tmp.height +# %% +tmp_11 = tmp.unique(subset=["PA", "PA Pos", "PSM gemeldet am"], keep="first") +# %% +tmp_12 = tmp.unique(subset=all_cols, keep="first") +# %% +# tmp.select(all_cols).is_duplicated() +# %% +tmp.filter(tmp.is_unique()) +# %% +tmp_12.join(tmp_11, on="tmp_idx", how="anti") +# %% +test.filter( + (pl.col.PA == 17055) + & (pl.col("PSM gemeldet am") == datetime.datetime(2024, 10, 29, 10, 27)) +) +# after fix should be the entry with the most information (least null count) +# %% +t1 = data_raw.collect() +t1.head() +# %% +t1 = t1.with_columns(pl.sum_horizontal(pl.all().is_null()).alias("null_count")) + +t1.head() +# %% +check = data.collect() +check.filter( + (pl.col.PA == 17055) + & (pl.col("Meldezeitpunkt_Historie") == datetime.datetime(2024, 10, 29, 10, 27)) +) + +# %% +res.filtered.filter( + (pl.col.PA == 17055) + & (pl.col("Meldezeitpunkt_Historie") == datetime.datetime(2024, 10, 29, 10, 27)) +) + # %% diff --git a/prototypes/external_code.py b/prototypes/external_code.py index 5c32996..43187e1 100644 --- a/prototypes/external_code.py +++ b/prototypes/external_code.py @@ -20,7 +20,7 @@ from wattanalyse import db @dc.dataclass(slots=True, eq=False) class PreProcessResult: - data: pl.DataFrame + data: pl.LazyFrame filtered: pl.DataFrame @@ -58,7 +58,7 @@ NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 # // (1) preprocess def preprocess_psm( - data: pl.DataFrame, + data: pl.LazyFrame, ) -> PreProcessResult: data = data.rename(RENAMING_SCHEME) REGEX_PATTERN = r"^[\s\-#+/$]+$" @@ -69,7 +69,23 @@ def preprocess_psm( .name.keep() ) data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) - filtered_data = pl.DataFrame(schema=data.schema) + filtered_data = pl.LazyFrame(schema=data.collect_schema()) + + # drop duplicates + # use null count as information measure, least amount of nulls should be contained + base_columns = data.columns + data = data.with_columns(pl.sum_horizontal(pl.all().is_null()).alias("null_count")) + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie", "null_count"], descending=False) + filtered_data = pl.concat( + [ + filtered_data, + data.filter( + ~pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct() + ).select(base_columns), + ] + ) + data = data.filter(pl.struct(PRIM_KEYS + ["Meldezeitpunkt_Historie"]).is_first_distinct()) + data = data.drop("null_count") # any NULL values in critical columns NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") @@ -115,11 +131,13 @@ def preprocess_psm( filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) data = data.filter(~pl.any_horizontal(cond)) - return PreProcessResult(data=data, filtered=filtered_data) + return PreProcessResult(data=data, filtered=filtered_data.collect()) # // (2) process on order level -def process_order_level(data: pl.DataFrame) -> pl.DataFrame: +def process_order_level( + data: pl.LazyFrame, +) -> pl.LazyFrame: # ** renaming # data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) @@ -255,42 +273,67 @@ def process_order_level(data: pl.DataFrame) -> pl.DataFrame: # ** order specific aggregates data = ( data.with_columns( - pl.when( - (pl.col("Liefertermin_Ist").is_not_null()) - & (pl.col("Liefertermin_Soll").is_not_null()) - ) - .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) - .otherwise(None) + (pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")) + .dt.total_days() .alias("Terminabweichung_Anzahl_Tage") ) .with_columns( - pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) - .then(pl.lit(True)) - .otherwise(pl.lit(False)) - .alias("Terminunterschreitung"), - pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) - .then(pl.lit(True)) - .otherwise(pl.lit(False)) - .alias("Terminüberschreitung"), - pl.when( - (pl.col("Liefertermin_Ist").is_not_null()) - & (pl.col("Prod-Start").is_not_null()) - ) - .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) - .otherwise(None) + (pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION).alias( + "Terminunterschreitung" + ), + (pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION).alias( + "Terminüberschreitung" + ), + (pl.col("Liefertermin_Ist") - pl.col("Prod-Start")) + .dt.total_days() .alias("Durchlaufzeit_Anzahl_Tage"), ) .with_columns( - pl.when( - (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null()) - & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0) - ) + pl.when(pl.col("Durchlaufzeit_Anzahl_Tage") < 0) .then(None) .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) .alias("Durchlaufzeit_Anzahl_Tage") ) ) + # data = ( + # data.with_columns( + # pl.when( + # (pl.col("Liefertermin_Ist").is_not_null()) + # & (pl.col("Liefertermin_Soll").is_not_null()) + # ) + # .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) + # .otherwise(None) + # .alias("Terminabweichung_Anzahl_Tage") + # ) + # .with_columns( + # pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) + # .then(pl.lit(True)) + # .otherwise(pl.lit(False)) + # .alias("Terminunterschreitung"), + # pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) + # .then(pl.lit(True)) + # .otherwise(pl.lit(False)) + # .alias("Terminüberschreitung"), + # pl.when( + # (pl.col("Liefertermin_Ist").is_not_null()) + # & (pl.col("Prod-Start").is_not_null()) + # ) + # .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) + # .otherwise(None) + # .alias("Durchlaufzeit_Anzahl_Tage"), + # ) + # .with_columns( + # pl.when( + # (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null()) + # & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0) + # ) + # .then(None) + # .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) + # .alias("Durchlaufzeit_Anzahl_Tage") + # ) + # ) + return data @@ -313,7 +356,7 @@ def _parse_to_json( def dump_order_level_to_internal_database_staging( - data: pl.DataFrame, + data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( @@ -324,6 +367,7 @@ def dump_order_level_to_internal_database_staging( ) .name.keep() ) + staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht_Staging", connection=db.DB_URI, @@ -355,7 +399,7 @@ def dump_order_level_to_internal_database_staging( def dump_order_level_to_internal_database_wipe( - data: pl.DataFrame, + data: pl.LazyFrame, ) -> None: staging_data = data.with_columns( @@ -370,6 +414,7 @@ def dump_order_level_to_internal_database_wipe( with db.ENGINE_INTERNAL.begin() as conn: conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";')) + staging_data = staging_data.collect() rows_inserted = staging_data.write_database( "Produktionsauftrag-Einzelsicht", connection=db.DB_URI, diff --git a/src/wattanalyse/db.py b/src/wattanalyse/db.py index ddb0215..e2b7fd3 100644 --- a/src/wattanalyse/db.py +++ b/src/wattanalyse/db.py @@ -21,9 +21,9 @@ intern_prod_order_t: Table = Table( MD_INTERNAL, Column("PA", sql.Integer, primary_key=True), Column("PA_Pos", sql.Integer, primary_key=True), - Column("Konfektionär", sql.Text, nullable=False), + Column("Konfektionär", sql.Text, nullable=True), Column("Meldezeitpunkt_Historie", sql.Text, nullable=False), - Column("Liefertermin_Soll", sql.Date, nullable=False), + Column("Liefertermin_Soll", sql.Date, nullable=True), Column("Bestaetigter-Import_Historie", sql.Text, nullable=False), Column("Liefertermin_Ist", sql.Date, nullable=True), Column("Import-Ist_Historie", sql.Text, nullable=False),