wrapped pipeline

This commit is contained in:
2026-06-11 09:18:09 +02:00
parent 5e15c99520
commit 2a6777becc
3 changed files with 106 additions and 28 deletions

View File

@@ -4,6 +4,7 @@ import os
from pathlib import Path from pathlib import Path
from typing import Final from typing import Final
import oracledb
from dopt_basics import configs from dopt_basics import configs
from dopt_basics import io as io_ from dopt_basics import io as io_
@@ -31,3 +32,6 @@ user_cfg = configs.load_toml(Config.PTH_USER_CFG)
USER_CFG: t.UserConfig = t.UserConfig( USER_CFG: t.UserConfig = t.UserConfig(
Datenbank=t.UserConfig_Datenbank(**user_cfg["Datenbank"]) Datenbank=t.UserConfig_Datenbank(**user_cfg["Datenbank"])
) )
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000

View File

@@ -0,0 +1,29 @@
import oracledb
from dopt_basics.result_pattern import STATUS_HANDLER
from wattanalyse import pipelines
from wattanalyse.constants import USER_CFG
from wattanalyse.logging import logger_pipeline
ORACLE_CONN = oracledb.connect(
user=USER_CFG.Datenbank.NUTZER,
password=USER_CFG.Datenbank.PASSWORT,
host=USER_CFG.Datenbank.HOST,
port=USER_CFG.Datenbank.PORT,
service_name=USER_CFG.Datenbank.SERVICE_NAME,
)
def pipeline_KPI_calculation() -> None:
logger_pipeline.info("Start pipeline >KPI_calculation<")
res = pipelines.KPI_calculation(ORACLE_CONN)
if res.status != STATUS_HANDLER.SUCCESS:
logger_pipeline.error(
"An error occurred during the procedure --- Status:\n%s",
res.status,
stack_info=True,
)
return
logger_pipeline.info("Pipeline >KPI_calculation< ended successfully")

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Final, cast
import polars as pl import polars as pl
import sqlalchemy as sql import sqlalchemy as sql
from dopt_basics.datastructures import flatten from dopt_basics.datastructures import flatten
from dopt_basics.result_pattern import wrap_result from dopt_basics.result_pattern import STATUS_HANDLER, Status, wrap_result
from wattanalyse import db from wattanalyse import db
from wattanalyse.logging import logger_pipeline as logger from wattanalyse.logging import logger_pipeline as logger
@@ -75,7 +75,8 @@ TAB_NAME_PSM: Final[str] = "EXTERN_PSM"
TAB_NAME_MIS: Final[str] = "EXTERN_MIS" TAB_NAME_MIS: Final[str] = "EXTERN_MIS"
# // (0) load data # // (10) load data
@wrap_result(code_on_error=10)
def load_PSM_data( def load_PSM_data(
conn: OracleConnection, conn: OracleConnection,
) -> pl.LazyFrame: ) -> pl.LazyFrame:
@@ -89,7 +90,8 @@ def load_PSM_data(
return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt) return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt)
# // (1) preprocess # // (20) preprocess
@wrap_result(code_on_error=20)
def preprocess_psm( def preprocess_psm(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> PreProcessResult: ) -> PreProcessResult:
@@ -167,7 +169,8 @@ def preprocess_psm(
return PreProcessResult(data=data, filtered=filtered_data) return PreProcessResult(data=data, filtered=filtered_data)
# // (2) process on order level # // (30) process on order level
@wrap_result(code_on_error=30)
def process_order_level( def process_order_level(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> pl.LazyFrame: ) -> pl.LazyFrame:
@@ -331,7 +334,7 @@ def process_order_level(
return data return data
# // (3) dump order level to internal database # // (40) dump order level to internal database
def _json_default( def _json_default(
value: Any, value: Any,
) -> str: ) -> str:
@@ -349,6 +352,7 @@ def _parse_to_json(
return json.dumps(x.to_list(), default=_json_default) return json.dumps(x.to_list(), default=_json_default)
@wrap_result(code_on_error=41)
def dump_order_level_to_internal_database_staging( def dump_order_level_to_internal_database_staging(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> None: ) -> None:
@@ -392,6 +396,7 @@ def dump_order_level_to_internal_database_staging(
) )
@wrap_result(code_on_error=40)
def dump_order_level_to_internal_database_wipe( def dump_order_level_to_internal_database_wipe(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> None: ) -> None:
@@ -420,6 +425,7 @@ def dump_order_level_to_internal_database_wipe(
# ** load order level data from internal database # ** load order level data from internal database
@wrap_result(code_on_error=49)
def load_order_level_from_internal_database() -> pl.DataFrame: def load_order_level_from_internal_database() -> pl.DataFrame:
data = pl.read_database_uri( data = pl.read_database_uri(
'SELECT * FROM "Produktionsauftrag-Einzelsicht"', 'SELECT * FROM "Produktionsauftrag-Einzelsicht"',
@@ -450,7 +456,7 @@ def load_order_level_from_internal_database() -> pl.DataFrame:
return data.with_columns(**list_col_parse_conds) return data.with_columns(**list_col_parse_conds)
# // (4) post-process results # // (50) post-process results
USE_BOUNDARIES: Final[bool] = False USE_BOUNDARIES: Final[bool] = False
filter_date_deviation_early: pl.Expr filter_date_deviation_early: pl.Expr
@@ -463,6 +469,7 @@ else:
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
@wrap_result(code_on_error=51)
def aggregate_production_orders( def aggregate_production_orders(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> pl.LazyFrame: ) -> pl.LazyFrame:
@@ -507,6 +514,7 @@ def aggregate_production_orders(
return data return data
@wrap_result(code_on_error=52)
def aggregate_suppliers( def aggregate_suppliers(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> pl.LazyFrame: ) -> pl.LazyFrame:
@@ -574,7 +582,8 @@ def aggregate_suppliers(
return data return data
# // (5) external database # // (60) external database
@wrap_result(code_on_error=60)
def oracle_prepare_KPI_aggregate( def oracle_prepare_KPI_aggregate(
data: pl.LazyFrame, data: pl.LazyFrame,
rename_schema: dict[str, str] | None = None, rename_schema: dict[str, str] | None = None,
@@ -605,6 +614,7 @@ def oracle_prepare_KPI_aggregate(
return data return data
@wrap_result(code_on_error=61)
def oracle_generate_sql_insert( def oracle_generate_sql_insert(
table_name: str, table_name: str,
columns: list, columns: list,
@@ -639,6 +649,7 @@ def oracle_load_table_as_polars(
return df.lazy() return df.lazy()
@wrap_result(code_on_error=62)
def oracle_save_polars( def oracle_save_polars(
conn: OracleConnection, conn: OracleConnection,
stmts: SqlInsertStmts, stmts: SqlInsertStmts,
@@ -652,48 +663,70 @@ def oracle_save_polars(
# TODO wrap this in a metadata tracking call # TODO wrap this in a metadata tracking call
@wrap_result(code_on_error=1, logger=logger) @wrap_result(code_on_error=1, logger=logger)
def run( def KPI_calculation(
conn: OracleConnection, conn: OracleConnection,
) -> None: ) -> Status:
# // (0) Load from external database # // (10) Load from external database
logger.info("Load data from database >load_PSM_data<...") logger.info("Load data from database >load_PSM_data<...")
data = load_PSM_data(conn) res = load_PSM_data(conn)
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
logger.info("Successfully loaded data from database") logger.info("Successfully loaded data from database")
# // (1) preprocess data # // (20) preprocess data
logger.info("Preprocess data (cleansing) >preprocess_psm<...") logger.info("Preprocess data (cleansing) >preprocess_psm<...")
res = preprocess_psm(data) res = preprocess_psm(res.unwrap())
data = res.data if res.status != STATUS_HANDLER.SUCCESS:
return res.status
# data = res.result.data
logger.info("Successfully preprocessed data") logger.info("Successfully preprocessed data")
# // (2) process on order level # // (30) process on order level
logger.info("Process data on order level >process_order_level<...") logger.info("Process data on order level >process_order_level<...")
data = process_order_level(data) res = process_order_level(res.unwrap().data)
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
data = res.unwrap()
logger.info("Successfully processed data on order level") logger.info("Successfully processed data on order level")
# // (3) dump to database (intermediate result) # // (40) dump to database (intermediate result)
logger.info("Save order level data in internal database...") logger.info("Save order level data in internal database...")
dump_order_level_to_internal_database_wipe(data) res = dump_order_level_to_internal_database_wipe(data)
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
logger.info("Successfully saved order level data in internal DB") logger.info("Successfully saved order level data in internal DB")
# // (4) post-process # // (50) post-process
# ** aggregation for orders # ** aggregation for orders
logger.info("Aggregate data with KPI calculation...") logger.info("Aggregate data with KPI calculation...")
logger.info("...production orders...") logger.info("...production orders...")
orders_aggregated = aggregate_production_orders(data) res = aggregate_production_orders(data)
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
orders_aggregated = res.unwrap()
# ** aggregation for suppliers # ** aggregation for suppliers
logger.info("...suppliers...") logger.info("...suppliers...")
suppliers_aggregated = aggregate_suppliers(data) res = aggregate_suppliers(data)
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
suppliers_aggregated = res.unwrap()
logger.info("Successfully aggregated and calculated KPIs") logger.info("Successfully aggregated and calculated KPIs")
# // (5) save to external database # // (60) save to external database
logger.info("Prepare saving data to external database...") logger.info("Prepare saving data to external database...")
logger.info("Prepare production order KPI table for Oracle export...") logger.info("Prepare production order KPI table for Oracle export...")
orders_aggregated = oracle_prepare_KPI_aggregate(orders_aggregated) res = oracle_prepare_KPI_aggregate(orders_aggregated)
stmts_orders = oracle_generate_sql_insert( if res.status != STATUS_HANDLER.SUCCESS:
return res.status
orders_aggregated = res.unwrap()
# TODO add table names as variables
res = oracle_generate_sql_insert(
table_name="KPI_PRODUKTIONSAUFTRAEGE", table_name="KPI_PRODUKTIONSAUFTRAEGE",
columns=orders_aggregated.collect_schema().names(), columns=orders_aggregated.collect_schema().names(),
) )
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
stmts_orders = res.unwrap()
logger.info( logger.info(
"SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s",
stmts_orders.delete, stmts_orders.delete,
@@ -702,14 +735,20 @@ def run(
# ** suppliers # ** suppliers
logger.info("Prepare supplier KPI table for Oracle export...") logger.info("Prepare supplier KPI table for Oracle export...")
suppliers_aggregated = oracle_prepare_KPI_aggregate( res = oracle_prepare_KPI_aggregate(
suppliers_aggregated, suppliers_aggregated,
sort_by="Konfektionaer", sort_by="Konfektionaer",
sort_descending=False, sort_descending=False,
) )
stmts_suppliers = oracle_generate_sql_insert( if res.status != STATUS_HANDLER.SUCCESS:
return res.status
suppliers_aggregated = res.unwrap()
res = oracle_generate_sql_insert(
table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names() table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names()
) )
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
stmts_suppliers = res.unwrap()
logger.info( logger.info(
"SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s",
stmts_suppliers.delete, stmts_suppliers.delete,
@@ -718,6 +757,12 @@ def run(
# ** actual saving procedure # ** actual saving procedure
logger.info("Saving data to external database...") logger.info("Saving data to external database...")
oracle_save_polars(conn, stmts_orders, orders_aggregated.collect()) res = oracle_save_polars(conn, stmts_orders, orders_aggregated.collect())
oracle_save_polars(conn, stmts_suppliers, suppliers_aggregated.collect()) if res.status != STATUS_HANDLER.SUCCESS:
return res.status
res = oracle_save_polars(conn, stmts_suppliers, suppliers_aggregated.collect())
if res.status != STATUS_HANDLER.SUCCESS:
return res.status
logger.info("Successfully saved KPI tables to external database") logger.info("Successfully saved KPI tables to external database")
return STATUS_HANDLER.SUCCESS