diff --git a/prototypes/01_first_analyse.py b/prototypes/01_dataset_transform.py similarity index 51% rename from prototypes/01_first_analyse.py rename to prototypes/01_dataset_transform.py index 7362923..a7704e9 100644 --- a/prototypes/01_first_analyse.py +++ b/prototypes/01_dataset_transform.py @@ -2,13 +2,9 @@ import json import pprint from collections import Counter -from datetime import datetime from pathlib import Path -from zoneinfo import ZoneInfo -import pandas as pd import polars as pl -from scipy import stats # %% p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve() @@ -134,117 +130,3 @@ if WRITE_TO_DISK: df.write_parquet(concat_data) else: df = pl.read_parquet(concat_data) -# %% -print(f"Number of entries in data: {len(df)}") -print(f"Number of curves in data: {len(df.select('id').unique())}") -df.head() -# %% -# valid ps = 101, 102, 110 -# filter all entries which contain invalid error states -invalid_ids = df.filter(~pl.col("ps").is_in((101, 102, 110))).select("id").unique() -print(f"Number of invalid IDs: {len(invalid_ids)}") -df = df.filter(~pl.col("id").is_in(invalid_ids["id"].implode())) -print(f"Number of curves in data after cleansing: {len(df.select('id').unique())}") -# sort chronologically -df = df.sort(by=["id", "ts"], descending=[False, False]) -# %% -# filter for relevant type number with maximum number of entries -TARGET_TYPE_NUM = 2 -df = df.filter(pl.col.type_num == TARGET_TYPE_NUM) -print(f"Number of entries for type num {TARGET_TYPE_NUM}: {len(df)}") -print(f"Number of curves in data: {len(df.select('id').unique())}") -# %% -current_time = datetime.now(tz=ZoneInfo("UTC")) -df_reconst = df.with_columns( - (pl.col.ts_delta_cum + pl.lit(current_time)).alias("reconstructed") -) -# %% -df_reconst -# %% -collection = df_reconst.select(pl.col.id).unique().sort(by="id")["id"][:10] -# %% -series = df_reconst.filter(pl.col.id.is_in(collection)) -series -# %% -series.select(pl.exclude("ts_delta_step", "ts_delta_cum")).plot.line( - x="reconstructed", y="DU1260" -) - -# %% -series.group_by("id").agg(pl.col("ts_delta_cum").max()) -# %% -series.group_by("id").agg(pl.len()) - -# ** simple stats -# try to separate anomalies by time/duration -# // "Duration Anomalies" -# IQR -durations = df_reconst.group_by("id").agg(pl.col("ts_delta_cum").max()) -durations = durations.with_columns(pl.col.ts_delta_cum.dt.total_microseconds()) -durations.head() - -FACTOR = 1.5 -iqr = stats.iqr(durations["ts_delta_cum"]) -quantiles = stats.quantile(durations["ts_delta_cum"], [0.25, 0.75]) -print(f"Quantiles (0.25, 0.75): {quantiles}") -print(f"IQR: {iqr}") -iqr_lb = max(iqr - FACTOR * quantiles[0], 0) -iqr_ub = iqr + FACTOR * quantiles[1] -print(f"Lower bound: {iqr_lb}") -print(f"Upper bound: {iqr_ub}") -durations.describe() -# %% -df_reconst.filter(pl.col.ps == 102).filter( - pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub) -) -# %% -filter_out_time = ( - df_reconst.filter(pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub)) - .select("id") - .unique() -) -df_out_time = df_reconst.filter(pl.col.id.is_in(filter_out_time["id"].implode())) -df_out_time -# TODO calculate duration for each phase -ids_out = df_out_time["id"].unique().implode() -df_remain = df_reconst.filter(~pl.col.id.is_in(ids_out)) -df_remain -# %% -df_analyse = ( - df_remain.group_by("id") - .agg(pl.len().alias("count"), pl.col("ts_delta_cum").max()) - .with_columns( - (pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias( - "mean_sampling_rate" - ) - ) -) -# %% -df_analyse.describe() -# %% -df_analyse2 = ( - df_reconst.group_by("id") - .agg(pl.len().alias("count"), pl.col("ts_delta_cum").max()) - .with_columns( - (pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias( - "mean_sampling_rate" - ) - ) -) -df_analyse2.describe() -# %% -df2 -# %% -series -# %% -# %% - -series.head() -# %% -temp = df.filter(pl.col.id.is_in(collection)) -temp -# %% -temp = temp.with_columns((pl.col.ts_delta + pl.lit(current_time)).alias("reconstructed")) -# %% -temp -# %% diff --git a/prototypes/02_first_analyse.py b/prototypes/02_first_analyse.py new file mode 100644 index 0000000..7b4dc3d --- /dev/null +++ b/prototypes/02_first_analyse.py @@ -0,0 +1,133 @@ +# %% +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo + +import pandas as pd +import polars as pl +from scipy import stats + +# %% +p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve() +assert p_data_base.exists() + +print("Total number of JSON files") +len(tuple(p_data_base.glob("**/*.json"))) + +# %% +concat_data = p_data_base / "all_data.parquet" +df = pl.read_parquet(concat_data) +# %% +print(f"Number of entries in data: {len(df)}") +print(f"Number of curves in data: {len(df.select('id').unique())}") +df.head() +# %% +# valid ps = 101, 102, 110 +# filter all entries which contain invalid error states +invalid_ids = df.filter(~pl.col("ps").is_in((101, 102, 110))).select("id").unique() +print(f"Number of invalid IDs: {len(invalid_ids)}") +df = df.filter(~pl.col("id").is_in(invalid_ids["id"].implode())) +print(f"Number of curves in data after cleansing: {len(df.select('id').unique())}") +# sort chronologically +df = df.sort(by=["id", "ts"], descending=[False, False]) +# %% +# filter for relevant type number with maximum number of entries +TARGET_TYPE_NUM = 2 +df = df.filter(pl.col.type_num == TARGET_TYPE_NUM) +print(f"Number of entries for type num {TARGET_TYPE_NUM}: {len(df)}") +print(f"Number of curves in data: {len(df.select('id').unique())}") +# %% +current_time = datetime.now(tz=ZoneInfo("UTC")) +df_reconst = df.with_columns( + (pl.col.ts_delta_cum + pl.lit(current_time)).alias("reconstructed") +) +# %% +df_reconst +# %% +collection = df_reconst.select(pl.col.id).unique().sort(by="id")["id"][:10] +# %% +series = df_reconst.filter(pl.col.id.is_in(collection)) +series +# %% +series.select(pl.exclude("ts_delta_step", "ts_delta_cum")).plot.line( + x="reconstructed", y="DU1260" +) + +# %% +series.group_by("id").agg(pl.col("ts_delta_cum").max()) +# %% +series.group_by("id").agg(pl.len()) + +# ** simple stats +# try to separate anomalies by time/duration +# // "Duration Anomalies" +# IQR +durations = df_reconst.group_by("id").agg(pl.col("ts_delta_cum").max()) +durations = durations.with_columns(pl.col.ts_delta_cum.dt.total_microseconds()) +durations.head() + +FACTOR = 1.5 +iqr = stats.iqr(durations["ts_delta_cum"]) +quantiles = stats.quantile(durations["ts_delta_cum"], [0.25, 0.75]) +print(f"Quantiles (0.25, 0.75): {quantiles}") +print(f"IQR: {iqr}") +iqr_lb = max(iqr - FACTOR * quantiles[0], 0) +iqr_ub = iqr + FACTOR * quantiles[1] +print(f"Lower bound: {iqr_lb}") +print(f"Upper bound: {iqr_ub}") +durations.describe() +# %% +df_reconst.filter(pl.col.ps == 102).filter( + pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub) +) +# %% +filter_out_time = ( + df_reconst.filter(pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub)) + .select("id") + .unique() +) +df_out_time = df_reconst.filter(pl.col.id.is_in(filter_out_time["id"].implode())) +df_out_time +# TODO calculate duration for each phase +ids_out = df_out_time["id"].unique().implode() +df_remain = df_reconst.filter(~pl.col.id.is_in(ids_out)) +df_remain +# %% +df_analyse = ( + df_remain.group_by("id") + .agg(pl.len().alias("count"), pl.col("ts_delta_cum").max()) + .with_columns( + (pl.col.count / (pl.col.ts_delta_cum.dt.total_microseconds() / 1e6)).alias( + "mean_sampling_rate" + ) + ) +) +# %% +df_analyse.describe() +# %% +df_analyse2 = ( + df_reconst.group_by("id") + .agg(pl.len().alias("count"), pl.col("ts_delta_cum").max()) + .with_columns( + (pl.col.count / (pl.col.ts_delta_cum.dt.total_microseconds() / 1e6)).alias( + "mean_sampling_rate" + ) + ) +) +df_analyse2.describe() +# %% +df2 +# %% +series +# %% +# %% + +series.head() +# %% +temp = df.filter(pl.col.id.is_in(collection)) +temp +# %% +temp = temp.with_columns((pl.col.ts_delta + pl.lit(current_time)).alias("reconstructed")) +# %% +temp +# %%