prepare enhanced pipeline

This commit is contained in:
2026-06-10 13:32:39 +02:00
parent 59148aaaf3
commit b66d5a4921
2 changed files with 133 additions and 42 deletions

View File

@@ -4,7 +4,8 @@ import dataclasses as dc
import datetime
import enum
import json
from typing import TYPE_CHECKING, Any, Final, cast
import warnings
from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast
import polars as pl
import sqlalchemy as sql
@@ -13,6 +14,7 @@ from wattanalyse import db
if TYPE_CHECKING:
from oracledb import Connection as OracleConnection
from polars._typing import SchemaDict
# 1. cleanup obtained new data
# ~~2. load data from internal database~~
@@ -22,6 +24,8 @@ if TYPE_CHECKING:
# 4. post-process results
# 5. write to external database
SqlStatement: TypeAlias = str
@dc.dataclass(slots=True, eq=False)
class PreProcessResult:
@@ -65,6 +69,22 @@ PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"]
LOWER_BOUND_DATE_DEVIATION: Final[int] = 0
UPPER_BOUND_DATE_DEVIATION: Final[int] = 0
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4
TAB_NAME_PSM: Final[str] = "EXTERN_PSM"
TAB_NAME_MIS: Final[str] = "EXTERN_MIS"
# // (0) load data
def load_PSM_data(
conn: OracleConnection,
) -> pl.LazyFrame:
stmt = f"""
SELECT t1.* FROM "{TAB_NAME_PSM}" t1
WHERE EXISTS(
SELECT 1 FROM "{TAB_NAME_MIS}" t2
WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos"
)
"""
return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt)
# // (1) preprocess
@@ -306,44 +326,6 @@ def process_order_level(
)
)
# data = (
# data.with_columns(
# pl.when(
# (pl.col("Liefertermin_Ist").is_not_null())
# & (pl.col("Liefertermin_Soll").is_not_null())
# )
# .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days())
# .otherwise(None)
# .alias("Terminabweichung_Anzahl_Tage")
# )
# .with_columns(
# pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION)
# .then(pl.lit(True))
# .otherwise(pl.lit(False))
# .alias("Terminunterschreitung"),
# pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION)
# .then(pl.lit(True))
# .otherwise(pl.lit(False))
# .alias("Terminüberschreitung"),
# pl.when(
# (pl.col("Liefertermin_Ist").is_not_null())
# & (pl.col("Prod-Start").is_not_null())
# )
# .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days())
# .otherwise(None)
# .alias("Durchlaufzeit_Anzahl_Tage"),
# )
# .with_columns(
# pl.when(
# (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null())
# & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0)
# )
# .then(None)
# .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage"))
# .alias("Durchlaufzeit_Anzahl_Tage")
# )
# )
return data
@@ -637,11 +619,21 @@ def oracle_generate_sql_insert(
def oracle_load_table_as_polars(
conn: OracleConnection,
table_name: str,
schema: SchemaDict | None,
table_name: str | None = None,
stmt: SqlStatement | None = None,
) -> pl.LazyFrame:
stmt = f"SELECT * FROM {table_name}"
if not any((table_name, stmt)):
raise ValueError("Table name or SQL statement must be provided")
if all((table_name, stmt)):
warnings.warn(
"Table name and SQL statement provided. In this case, the statement is used."
)
if not stmt:
stmt = f"SELECT * FROM {table_name}"
odf = conn.fetch_df_all(statement=stmt)
df = cast(pl.DataFrame, pl.from_arrow(odf))
df = cast(pl.DataFrame, pl.from_arrow(odf, schema_overrides=schema))
return df.lazy()