construct base pipeline with "run" function

This commit is contained in:
2026-06-10 16:48:03 +02:00
parent b66d5a4921
commit 5e15c99520
6 changed files with 844 additions and 50 deletions

View File

@@ -51,49 +51,74 @@ data_mis = data_mis.drop("ID", strict=False)
data_psm = data_psm.drop("ID", strict=False)
# %%
data_psm.height
# %%
data_psm.join(data_mis, on=["PA", "PA Pos"], how="semi")
# // (0) Load from external database
data_psm = external_code.load_PSM_data(conn)
data_psm.collect()
# %%
#
tab_name_psm = "EXTERN_PSM"
tab_name_mis = "EXTERN_MIS"
stmt = f"""
SELECT t1.* FROM "{tab_name_psm}" t1
WHERE EXISTS(
SELECT 1 FROM "{tab_name_mis}" t2
WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos"
)
"""
# test = external_code.oracle_load_table_as_polars(
# conn, db.extern_prod_order_t_schema, "", None
# ).collect()
test = external_code.oracle_load_table_as_polars(
conn, db.extern_prod_order_t_schema, tab_name_psm, stmt
).collect()
# %%
# data_psm = external_code.load_PSM_data(conn).collect()
# %%
# // preprocess data
# TODO: add check with MIS data if the orders are relevant
# // (1) preprocess data
tmp = data_psm.clone()
res = external_code.preprocess_psm(tmp.lazy())
tmp = res.data
tmp = tmp.collect()
tmp
# %%
tmp = tmp.rename({"PA_Pos": "PA Pos"})
# %%
tmp.join(data_mis, on=["PA", "PA Pos"], how="semi")
tmp_show = tmp.collect()
tmp_show
# %%
res.filtered
# // (2) process on order level
tmp = external_code.process_order_level(tmp)
tmp.collect()
# %%
# // (3) dump to database (intermediate result)
external_code.dump_order_level_to_internal_database_wipe(tmp)
# %%
# // (4) post-process
# ** aggregation for orders
aggregate_orders = external_code.aggregate_production_orders(tmp)
print(aggregate_orders.collect())
# ** aggregation for suppliers
aggregate_suppliers = external_code.aggregate_suppliers(tmp)
print(aggregate_suppliers.collect())
# %%
# // (5) save to external database
# ** orders
aggregate_orders = external_code.oracle_prepare_KPI_aggregate(aggregate_orders)
print(aggregate_orders.head().collect())
stmts_orders = external_code.oracle_generate_sql_insert(
table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=aggregate_orders.collect_schema().names()
)
print(f"SQL DELETE: {stmts_orders.delete}\nSQL Insert: {stmts_orders.insert}")
# ** suppliers
aggregate_suppliers = external_code.oracle_prepare_KPI_aggregate(
aggregate_suppliers,
sort_by="Konfektionaer",
sort_descending=False,
)
print(aggregate_suppliers.head().collect())
stmts_suppliers = external_code.oracle_generate_sql_insert(
table_name="KPI_KONFEKTIONAERE", columns=aggregate_suppliers.collect_schema().names()
)
print(f"SQL DELETE: {stmts_suppliers.delete}\nSQL Insert: {stmts_suppliers.insert}")
# %%
# ** actual saving procedure
external_code.oracle_save_polars(conn, stmts_orders, aggregate_orders.collect())
external_code.oracle_save_polars(conn, stmts_suppliers, aggregate_suppliers.collect())
# %%
print(f"Shape Aggregate Production Orders: {aggregate_orders.collect().shape}")
print(f"Shape Aggregate Suppliers: {aggregate_suppliers.collect().shape}")
# %%
# // try loading
loaded_orders = external_code.oracle_load_table_as_polars(
conn, db.extern_results_prod_orders_t_schema, table_name="KPI_PRODUKTIONSAUFTRAEGE"
)
loaded_orders.collect()
# %%
loaded_suppliers = external_code.oracle_load_table_as_polars(
conn, db.extern_results_suppliers_t_schema, table_name="KPI_KONFEKTIONAERE"
)
loaded_suppliers.collect()
# %%
tmp = data_psm.clone()
tmp = external_code.aggregate_production_orders(tmp.lazy()).collect()
print(tmp)
tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect()
print(tmp)

View File

@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast
import polars as pl
import sqlalchemy as sql
from dopt_basics.datastructures import flatten
from wattanalyse import db
@@ -30,7 +31,13 @@ SqlStatement: TypeAlias = str
@dc.dataclass(slots=True, eq=False)
class PreProcessResult:
data: pl.LazyFrame
filtered: pl.DataFrame
filtered: pl.LazyFrame
DROP_COLUMNS: Final[list[str]] = cast(
list[str],
list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))),
)
@dc.dataclass(slots=True, kw_only=True)
@@ -51,7 +58,7 @@ PSM_SCORES: dict[QualityPsm, int] = {
QualityPsm.PLAUSIBEL: 2,
}
RENAMING_SCHEME: dict[str, str] = {
RENAMING_SCHEME_PSM: dict[str, str] = {
"PA Pos": "PA_Pos",
"PSM gemeldet am": "Meldezeitpunkt_Historie",
"Import Ist": "Import-Ist_Historie",
@@ -62,6 +69,8 @@ RENAMING_SCHEME: dict[str, str] = {
"Fertigware aus Nähband": "Prod-EP30_Historie",
"Teile kontrolliert": "Prod-EP40_Historie",
"Teile verpackt in Karton": "Prod-EP50_Historie",
"Konfektionär": "Konfektionaer",
"Lieferantnr.": "Konfektionaer_ID",
}
PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"]
@@ -91,7 +100,8 @@ def load_PSM_data(
def preprocess_psm(
data: pl.LazyFrame,
) -> PreProcessResult:
data = data.rename(RENAMING_SCHEME)
data = data.rename(RENAMING_SCHEME_PSM)
data = data.drop(DROP_COLUMNS, strict=False)
REGEX_PATTERN = r"^[\s\-#+/$]+$"
data = data.with_columns(
pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN))
@@ -99,7 +109,7 @@ def preprocess_psm(
.otherwise(pl.col(pl.String))
.name.keep()
)
data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t"))
data = data.with_columns(pl.col("Konfektionaer").str.strip_chars(" \n\t"))
filtered_data = pl.LazyFrame(schema=data.collect_schema())
# drop duplicates
@@ -161,7 +171,7 @@ def preprocess_psm(
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))])
data = data.filter(~pl.any_horizontal(cond))
return PreProcessResult(data=data, filtered=filtered_data.collect())
return PreProcessResult(data=data, filtered=filtered_data)
# // (2) process on order level
@@ -169,7 +179,6 @@ def process_order_level(
data: pl.LazyFrame,
) -> pl.LazyFrame:
# ** renaming
# data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing
data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
# ** plausibility check of order quantities
@@ -272,7 +281,7 @@ def process_order_level(
# whole aggregates see DB schema
data = (
data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
.group_by(PRIM_KEYS + ["Konfektionär"])
.group_by(PRIM_KEYS + ["Konfektionaer", "Konfektionaer_ID"])
.agg(
pl.col("Meldezeitpunkt_Historie"),
pl.col("Liefertermin_Soll").drop_nulls().first(),
@@ -508,7 +517,7 @@ def aggregate_production_orders(
def aggregate_suppliers(
data: pl.LazyFrame,
) -> pl.LazyFrame:
data = data.group_by("Konfektionär").agg(
data = data.group_by(["Konfektionaer", "Konfektionaer_ID"]).agg(
(
(
~(filter_date_deviation_early | filter_date_deviation_late)
@@ -573,8 +582,6 @@ def aggregate_suppliers(
# // (5) external database
def oracle_prepare_KPI_aggregate(
data: pl.LazyFrame,
rename_schema: dict[str, str] | None = None,
@@ -599,6 +606,7 @@ def oracle_prepare_KPI_aggregate(
pl.all().exclude(pl.Boolean),
)
.select(cols_sorted)
.select(pl.all().name.to_uppercase())
)
return data