basic data analysis

This commit is contained in:
2026-01-13 16:13:22 +01:00
parent f65e1f2981
commit d5546b7fd0
3 changed files with 207 additions and 11 deletions

View File

@@ -2,12 +2,13 @@
import json
import pprint
from collections import Counter
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import pandas as pd
import polars as pl
WRITE_TO_DISK = False
from scipy import stats
# %%
p_data_base = (Path.cwd() / "../data/Datenauszug_20251212").resolve()
@@ -75,7 +76,7 @@ folder_to_types
# [timestep, process_step, pressure_value, valve_value]
# valid states are ps = [101, 102, 110]
schema = {
schema_read = {
"DU1260": pl.Float64,
"V1560": pl.Boolean,
"ps": pl.UInt32,
@@ -83,37 +84,60 @@ schema = {
"type_num": pl.UInt8,
"id": pl.UInt64,
}
df = pl.DataFrame(schema=schema)
schema = {
"DU1260": pl.Float64,
"V1560": pl.Boolean,
"ps": pl.UInt32,
"ts": pl.Datetime,
"type_num": pl.UInt8,
"id": pl.UInt64,
"ts_delta_step": pl.Duration,
"ts_delta_cum": pl.Duration,
}
df = pl.DataFrame(schema=schema).with_columns(pl.col("ts").dt.replace_time_zone("UTC"))
count = 0
for idx, file in enumerate(p_data_base.glob("**/*.json")):
with open(file, "r") as f:
data = json.load(f)
type_num = data["initial"]["dsc_TypeNumber"]["value"]
df_file = pl.DataFrame(data["rows"], schema_overrides=schema)
df_file = pl.DataFrame(data["rows"], schema_overrides=schema_read)
df_file = df_file.with_columns(
pl.col("ts").str.to_datetime(time_zone="UTC"),
pl.lit(type_num).alias("type_num").cast(pl.UInt8),
pl.lit(idx).alias("id").cast(pl.UInt64),
)
df_file = df_file.with_columns(
(pl.col.ts - pl.col.ts.shift(1))
.alias("ts_delta_step")
.fill_null(pl.lit(0).cast(pl.Duration))
)
df_file = df_file.with_columns(
pl.col("ts_delta_step").cum_sum().alias("ts_delta_cum"),
)
df = pl.concat((df, df_file))
count += 1
df = df.with_columns(pl.col("ts").str.to_datetime(time_zone="UTC"))
df = df.select(["id", "type_num", "ts", "ps", "DU1260", "V1560"])
# df = df.with_columns(pl.col("ts").str.to_datetime(time_zone="UTC"))
df = df.select(
["id", "type_num", "ts", "ts_delta_step", "ts_delta_cum", "ps", "DU1260", "V1560"]
)
df.head()
# %%
print(f"Files processed: {count}")
print(f"Length of obtained data: {len(df)}")
# %%
WRITE_TO_DISK = False
concat_data = p_data_base / "all_data.parquet"
if WRITE_TO_DISK:
df.write_parquet(concat_data)
else:
df = pl.read_parquet(concat_data)
# %%
df.head()
print(f"Number of entries in data: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
df.head()
# %%
# valid ps = 101, 102, 110
# filter all entries which contain invalid error states
@@ -121,8 +145,106 @@ invalid_ids = df.filter(~pl.col("ps").is_in((101, 102, 110))).select("id").uniqu
print(f"Number of invalid IDs: {len(invalid_ids)}")
df = df.filter(~pl.col("id").is_in(invalid_ids["id"].implode()))
print(f"Number of curves in data after cleansing: {len(df.select('id').unique())}")
# sort chronologically
df = df.sort(by=["id", "ts"], descending=[False, False])
# %%
df.select(["ts", "DU1260"])
# filter for relevant type number with maximum number of entries
TARGET_TYPE_NUM = 2
df = df.filter(pl.col.type_num == TARGET_TYPE_NUM)
print(f"Number of entries for type num {TARGET_TYPE_NUM}: {len(df)}")
print(f"Number of curves in data: {len(df.select('id').unique())}")
# %%
df.plot.line(x="ts", y="DU1260")
current_time = datetime.now(tz=ZoneInfo("UTC"))
df_reconst = df.with_columns(
(pl.col.ts_delta_cum + pl.lit(current_time)).alias("reconstructed")
)
# %%
df_reconst
# %%
collection = df_reconst.select(pl.col.id).unique().sort(by="id")["id"][:10]
# %%
series = df_reconst.filter(pl.col.id.is_in(collection))
series
# %%
series.select(pl.exclude("ts_delta_step", "ts_delta_cum")).plot.line(
x="reconstructed", y="DU1260"
)
# %%
series.group_by("id").agg(pl.col("ts_delta_cum").max())
# %%
series.group_by("id").agg(pl.len())
# ** simple stats
# try to separate anomalies by time/duration
# // "Duration Anomalies"
# IQR
durations = df_reconst.group_by("id").agg(pl.col("ts_delta_cum").max())
durations = durations.with_columns(pl.col.ts_delta_cum.dt.total_microseconds())
durations.head()
FACTOR = 1.5
iqr = stats.iqr(durations["ts_delta_cum"])
quantiles = stats.quantile(durations["ts_delta_cum"], [0.25, 0.75])
print(f"Quantiles (0.25, 0.75): {quantiles}")
print(f"IQR: {iqr}")
iqr_lb = max(iqr - FACTOR * quantiles[0], 0)
iqr_ub = iqr + FACTOR * quantiles[1]
print(f"Lower bound: {iqr_lb}")
print(f"Upper bound: {iqr_ub}")
durations.describe()
# %%
df_reconst.filter(pl.col.ps == 102).filter(
pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub)
)
# %%
filter_out_time = (
df_reconst.filter(pl.col.ts_delta_cum > pl.duration(microseconds=iqr_ub))
.select("id")
.unique()
)
df_out_time = df_reconst.filter(pl.col.id.is_in(filter_out_time["id"].implode()))
df_out_time
# TODO calculate duration for each phase
ids_out = df_out_time["id"].unique().implode()
df_remain = df_reconst.filter(~pl.col.id.is_in(ids_out))
df_remain
# %%
df_analyse = (
df_remain.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias(
"mean_sampling_rate"
)
)
)
# %%
df_analyse.describe()
# %%
df_analyse2 = (
df_reconst.group_by("id")
.agg(pl.len().alias("count"), pl.col("ts_delta_cum").max())
.with_columns(
(pl.col.count / pl.col.ts_delta_cum.dt.total_microseconds()).alias(
"mean_sampling_rate"
)
)
)
df_analyse2.describe()
# %%
df2
# %%
series
# %%
# %%
series.head()
# %%
temp = df.filter(pl.col.id.is_in(collection))
temp
# %%
temp = temp.with_columns((pl.col.ts_delta + pl.lit(current_time)).alias("reconstructed"))
# %%
temp
# %%