enhanced pipeline functions

This commit is contained in:
2026-06-08 15:42:26 +02:00
parent 4ce5fce0d9
commit 34ba005dde
2 changed files with 322 additions and 1 deletions

119
prototypes/03-1_check_db.py Normal file
View File

@@ -0,0 +1,119 @@
# %%
import importlib
from pathlib import Path
import external_code
import oracledb
import polars as pl
import wattanalyse
from wattanalyse import constants
importlib.reload(wattanalyse)
importlib.reload(constants)
# %%
PROJECT_BASE = Path(__file__).parents[1]
DATA_PTH = PROJECT_BASE / "data"
assert DATA_PTH.exists()
# %%
conn = oracledb.connect(
user=constants.USER_CFG.Datenbank.NUTZER,
password=constants.USER_CFG.Datenbank.PASSWORT,
host=constants.USER_CFG.Datenbank.HOST,
port=constants.USER_CFG.Datenbank.PORT,
service_name=constants.USER_CFG.Datenbank.SERVICE_NAME,
)
# %%
# // KPI_PRODUKTIONSAUFTRAEGE
TABLE_NAME = "KPI_PRODUKTIONSAUFTRAEGE"
prepared_oracle_pth = DATA_PTH / f"db/oracle_prepare_{TABLE_NAME}.arrow"
assert prepared_oracle_pth.exists()
df = pl.read_ipc(prepared_oracle_pth)
# %%
with conn.cursor() as cursor:
cursor.execute(f'SELECT * FROM "{TABLE_NAME}"')
data = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
print("columns:", columns)
print("data:", data)
# %%
# ** insert
stmts = external_code.oracle_generate_sql_insert(TABLE_NAME, columns=df.columns)
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
with conn.cursor() as cursor:
cursor.execute(stmts.delete)
cursor.executemany(stmts.insert, df)
conn.commit()
# %%
# ** read
stmt = f"SELECT * FROM {TABLE_NAME}"
odf = conn.fetch_df_all(statement=stmt)
loaded_df = pl.from_arrow(odf)
print(loaded_df)
#############
# %%
# //
TABLE_NAME = "KPI_KONFEKTIONAERE"
prepared_oracle_pth = DATA_PTH / f"db/oracle_prepare_{TABLE_NAME}.arrow"
assert prepared_oracle_pth.exists()
df = pl.read_ipc(prepared_oracle_pth)
# %%
with conn.cursor() as cursor:
cursor.execute(f'SELECT * FROM "{TABLE_NAME}"')
data = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
print("columns:", columns)
print("data:", data)
# %%
# ** insert
stmts = external_code.oracle_generate_sql_insert(TABLE_NAME, columns=df.columns)
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
with conn.cursor() as cursor:
cursor.execute(stmts.delete)
cursor.executemany(stmts.insert, df)
conn.commit()
# %%
# ** read
stmt = f"SELECT * FROM {TABLE_NAME}"
odf = conn.fetch_df_all(statement=stmt)
loaded_df = pl.from_arrow(odf)
print(loaded_df)
# %%
df.height
#####################################
# %%
columns = df.columns
spalten_str = ", ".join([f'"{c}"' for c in columns])
platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)])
table_name = "KPI_PRODUKTIONSAUFTRAEGE"
sql_delete = f'DELETE FROM "{table_name}"'
sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})'
print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}")
# %%
with conn.cursor() as cursor:
cursor.execute(sql_delete)
# df_oracle_bereit wird direkt als Arrow-Stream an Oracle übergeben!
cursor.executemany(sql_insert, df)
conn.commit()
# %%
stmt = f"SELECT * FROM {table_name}"
odf = conn.fetch_df_all(statement=stmt)
pl_df = pl.from_arrow(odf)
# %%
pl_df
# %%

View File

@@ -1,14 +1,19 @@
from __future__ import annotations
import dataclasses as dc
import datetime
import enum
import json
from typing import Any, Final
from typing import TYPE_CHECKING, Any, Final, cast
import polars as pl
import sqlalchemy as sql
from wattanalyse import db
if TYPE_CHECKING:
from oracledb import Connection as OracleConnection
# 1. cleanup obtained new data
# ~~2. load data from internal database~~
# ~~3. integrate with with new data (whole snapshot)~~
@@ -24,6 +29,12 @@ class PreProcessResult:
filtered: pl.DataFrame
@dc.dataclass(slots=True, kw_only=True)
class SqlInsertStmts:
delete: str
insert: str
class QualityPsm(enum.StrEnum):
FEHLEND = enum.auto()
UNPLAUSIBEL = enum.auto()
@@ -453,3 +464,194 @@ def load_order_level_from_internal_database() -> pl.DataFrame:
}
return data.with_columns(**list_col_parse_conds)
# // (4) post-process results
USE_BOUNDARIES: Final[bool] = False
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
def aggregate_production_orders(
data: pl.LazyFrame,
) -> pl.LazyFrame:
data = data.select(
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_early)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_late)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.std(ddof=1)
.alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
pl.col("Import-Ist_Anzahl_Aenderungen")
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
pl.col("Tage_zu_letzter_PSM_Historie")
.list.explode()
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
pl.col("Durchlaufzeit_Anzahl_Tage")
.mean()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
)
return data
def aggregate_suppliers(
data: pl.LazyFrame,
) -> pl.LazyFrame:
data = data.group_by("Konfektionär").agg(
(
(
~(filter_date_deviation_early | filter_date_deviation_late)
& (pl.col("Import-Ist_Anzahl_Aenderungen") == 0)
).mean()
* 100
)
.round(4, mode="half_away_from_zero")
.alias("QUOTE_ERSTBESTAETIGUNG"),
((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("PROZENT_LIEFERTREUE"),
(filter_date_deviation_early.mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"),
(filter_date_deviation_late.mean() * 100)
.round(4, mode="half_away_from_zero")
.alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_early)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.filter(filter_date_deviation_late)
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
pl.col("Terminabweichung_Anzahl_Tage")
.std(ddof=1)
.alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
pl.col("Import-Ist_Anzahl_Aenderungen")
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
pl.col("Tage_zu_letzter_PSM_Historie")
.list.explode()
.mean()
.abs()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
pl.col("Durchlaufzeit_Anzahl_Tage")
.mean()
.round(mode="half_away_from_zero")
.cast(pl.Int64)
.alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
pl.col("Prod-Qualitaet_Historie")
.list.explode()
.mean()
.round(4, mode="half_away_from_zero")
.alias("MITTLERER_QUALITAETSSCORE_PSM"),
)
return data
# // (5) external database
def oracle_prepare_KPI_aggregate(
data: pl.LazyFrame,
rename_schema: dict[str, str] | None = None,
sort_by: str = "",
sort_descending: bool = False,
) -> pl.LazyFrame:
if rename_schema is not None:
data = data.rename(rename_schema)
cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in data.collect_schema().names()]
if sort_by:
data = data.sort(sort_by, descending=sort_descending)
data = data.with_row_index("ID", 1)
data = (
data.with_columns(
pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"),
)
.select(
pl.col(pl.Boolean).cast(pl.Int8),
pl.all().exclude(pl.Boolean),
)
.select(cols_sorted)
)
return data
def oracle_generate_sql_insert(
table_name: str,
columns: list,
) -> SqlInsertStmts:
spalten_str = ", ".join([f'"{c}"' for c in columns])
platzhalter_str = ", ".join([f":{i}" for i in range(1, len(columns) + 1)])
sql_delete = f'DELETE FROM "{table_name}"'
sql_insert = f'INSERT INTO "{table_name}" ({spalten_str}) VALUES ({platzhalter_str})'
return SqlInsertStmts(delete=sql_delete, insert=sql_insert)
def oracle_load_table_as_polars(
conn: OracleConnection,
table_name: str,
) -> pl.LazyFrame:
stmt = f"SELECT * FROM {table_name}"
odf = conn.fetch_df_all(statement=stmt)
df = cast(pl.DataFrame, pl.from_arrow(odf))
return df.lazy()
def oracle_save_polars(
conn: OracleConnection,
stmts: SqlInsertStmts,
data: pl.DataFrame,
) -> None:
with conn.cursor() as cursor:
cursor.execute(stmts.delete)
cursor.executemany(stmts.insert, data)
conn.commit()