latest analysis

This commit is contained in:
2026-05-07 11:22:04 +02:00
parent d5546b7fd0
commit 2397f23385
2 changed files with 133 additions and 118 deletions

View File

@@ -2,13 +2,9 @@
import json import json
import pprint import pprint
from collections import Counter from collections import Counter
from datetime import datetime
from pathlib import Path from pathlib import Path
from zoneinfo import ZoneInfo
import pandas as pd
import polars as pl import polars as pl
from scipy import stats
# %% # %%
p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve() p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve()
@@ -134,117 +130,3 @@ if WRITE_TO_DISK:
df.write_parquet(concat_data) df.write_parquet(concat_data)
else: else:
df = pl.read_parquet(concat_data) df = pl.read_parquet(concat_data)
# %%
print(f"Number of entries in data: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
df.head()
# %%
# valid ps = 101, 102, 110
# filter all entries which contain invalid error states
invalid_ids = df.filter(~pl.col("ps").is_in((101, 102, 110))).select("id").unique()
print(f"Number of invalid IDs: {len(invalid_ids)}")
df = df.filter(~pl.col("id").is_in(invalid_ids["id"].implode()))
print(f"Number of curves in data after cleansing: {len(df.select('id').unique())}")
# sort chronologically
df = df.sort(by=["id", "ts"], descending=[False, False])
# %%
# filter for relevant type number with maximum number of entries
TARGET_TYPE_NUM = 2
df = df.filter(pl.col.type_num == TARGET_TYPE_NUM)
print(f"Number of entries for type num {TARGET_TYPE_NUM}: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
# %%
current_time = datetime.now(tz=ZoneInfo("UTC"))
df_reconst = df.with_columns(
(pl.col.ts_delta_cum + pl.lit(current_time)).alias("reconstructed")
)
# %%
df_reconst
# %%
collection = df_reconst.select(pl.col.id).unique().sort(by="id")["id"][:10]
# %%
series = df_reconst.filter(pl.col.id.is_in(collection))
series
# %%
series.select(pl.exclude("ts_delta_step", "ts_delta_cum")).plot.line(
x="reconstructed", y="DU1260"
)
# %%
series.group_by("id").agg(pl.col("ts_delta_cum").max())
# %%
series.group_by("id").agg(pl.len())
# ** simple stats
# try to separate anomalies by time/duration
# // "Duration Anomalies"
# IQR
durations = df_reconst.group_by("id").agg(pl.col("ts_delta_cum").max())
durations = durations.with_columns(pl.col.ts_delta_cum.dt.total_microseconds())
durations.head()
FACTOR = 1.5
iqr = stats.iqr(durations["ts_delta_cum"])
quantiles = stats.quantile(durations["ts_delta_cum"], [0.25, 0.75])
print(f"Quantiles (0.25, 0.75): {quantiles}")
print(f"IQR: {iqr}")
iqr_lb = max(iqr - FACTOR * quantiles[0], 0)
iqr_ub = iqr + FACTOR * quantiles[1]
print(f"Lower bound: {iqr_lb}")
print(f"Upper bound: {iqr_ub}")
durations.describe()
# %%
df_reconst.filter(pl.col.ps == 102).filter(
pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub)
)
# %%
filter_out_time = (
df_reconst.filter(pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub))
.select("id")
.unique()
)
df_out_time = df_reconst.filter(pl.col.id.is_in(filter_out_time["id"].implode()))
df_out_time
# TODO calculate duration for each phase
ids_out = df_out_time["id"].unique().implode()
df_remain = df_reconst.filter(~pl.col.id.is_in(ids_out))
df_remain
# %%
df_analyse = (
df_remain.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias(
"mean_sampling_rate"
)
)
)
# %%
df_analyse.describe()
# %%
df_analyse2 = (
df_reconst.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias(
"mean_sampling_rate"
)
)
)
df_analyse2.describe()
# %%
df2
# %%
series
# %%
# %%
series.head()
# %%
temp = df.filter(pl.col.id.is_in(collection))
temp
# %%
temp = temp.with_columns((pl.col.ts_delta + pl.lit(current_time)).alias("reconstructed"))
# %%
temp
# %%

View File

@@ -0,0 +1,133 @@
# %%
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import pandas as pd
import polars as pl
from scipy import stats
# %%
p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve()
assert p_data_base.exists()
print("Total number of JSON files")
len(tuple(p_data_base.glob("**/*.json")))
# %%
concat_data = p_data_base / "all_data.parquet"
df = pl.read_parquet(concat_data)
# %%
print(f"Number of entries in data: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
df.head()
# %%
# valid ps = 101, 102, 110
# filter all entries which contain invalid error states
invalid_ids = df.filter(~pl.col("ps").is_in((101, 102, 110))).select("id").unique()
print(f"Number of invalid IDs: {len(invalid_ids)}")
df = df.filter(~pl.col("id").is_in(invalid_ids["id"].implode()))
print(f"Number of curves in data after cleansing: {len(df.select('id').unique())}")
# sort chronologically
df = df.sort(by=["id", "ts"], descending=[False, False])
# %%
# filter for relevant type number with maximum number of entries
TARGET_TYPE_NUM = 2
df = df.filter(pl.col.type_num == TARGET_TYPE_NUM)
print(f"Number of entries for type num {TARGET_TYPE_NUM}: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
# %%
current_time = datetime.now(tz=ZoneInfo("UTC"))
df_reconst = df.with_columns(
(pl.col.ts_delta_cum + pl.lit(current_time)).alias("reconstructed")
)
# %%
df_reconst
# %%
collection = df_reconst.select(pl.col.id).unique().sort(by="id")["id"][:10]
# %%
series = df_reconst.filter(pl.col.id.is_in(collection))
series
# %%
series.select(pl.exclude("ts_delta_step", "ts_delta_cum")).plot.line(
x="reconstructed", y="DU1260"
)
# %%
series.group_by("id").agg(pl.col("ts_delta_cum").max())
# %%
series.group_by("id").agg(pl.len())
# ** simple stats
# try to separate anomalies by time/duration
# // "Duration Anomalies"
# IQR
durations = df_reconst.group_by("id").agg(pl.col("ts_delta_cum").max())
durations = durations.with_columns(pl.col.ts_delta_cum.dt.total_microseconds())
durations.head()
FACTOR = 1.5
iqr = stats.iqr(durations["ts_delta_cum"])
quantiles = stats.quantile(durations["ts_delta_cum"], [0.25, 0.75])
print(f"Quantiles (0.25, 0.75): {quantiles}")
print(f"IQR: {iqr}")
iqr_lb = max(iqr - FACTOR * quantiles[0], 0)
iqr_ub = iqr + FACTOR * quantiles[1]
print(f"Lower bound: {iqr_lb}")
print(f"Upper bound: {iqr_ub}")
durations.describe()
# %%
df_reconst.filter(pl.col.ps == 102).filter(
pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub)
)
# %%
filter_out_time = (
df_reconst.filter(pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub))
.select("id")
.unique()
)
df_out_time = df_reconst.filter(pl.col.id.is_in(filter_out_time["id"].implode()))
df_out_time
# TODO calculate duration for each phase
ids_out = df_out_time["id"].unique().implode()
df_remain = df_reconst.filter(~pl.col.id.is_in(ids_out))
df_remain
# %%
df_analyse = (
df_remain.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / (pl.col.ts_delta_cum.dt.total_microseconds() / 1e6)).alias(
"mean_sampling_rate"
)
)
)
# %%
df_analyse.describe()
# %%
df_analyse2 = (
df_reconst.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / (pl.col.ts_delta_cum.dt.total_microseconds() / 1e6)).alias(
"mean_sampling_rate"
)
)
)
df_analyse2.describe()
# %%
df2
# %%
series
# %%
# %%
series.head()
# %%
temp = df.filter(pl.col.id.is_in(collection))
temp
# %%
temp = temp.with_columns((pl.col.ts_delta + pl.lit(current_time)).alias("reconstructed"))
# %%
temp
# %%