diff --git a/src/wattanalyse/constants.py b/src/wattanalyse/constants.py index 51a8712..19561f5 100644 --- a/src/wattanalyse/constants.py +++ b/src/wattanalyse/constants.py @@ -4,6 +4,7 @@ import os from pathlib import Path from typing import Final +import oracledb from dopt_basics import configs from dopt_basics import io as io_ @@ -31,3 +32,6 @@ user_cfg = configs.load_toml(Config.PTH_USER_CFG) USER_CFG: t.UserConfig = t.UserConfig( Datenbank=t.UserConfig_Datenbank(**user_cfg["Datenbank"]) ) + +oracledb.defaults.arraysize = 1000 +oracledb.defaults.prefetchrows = 1000 diff --git a/src/wattanalyse/external_interface.py b/src/wattanalyse/external_interface.py new file mode 100644 index 0000000..9f533c8 --- /dev/null +++ b/src/wattanalyse/external_interface.py @@ -0,0 +1,29 @@ +import oracledb +from dopt_basics.result_pattern import STATUS_HANDLER + +from wattanalyse import pipelines +from wattanalyse.constants import USER_CFG +from wattanalyse.logging import logger_pipeline + +ORACLE_CONN = oracledb.connect( + user=USER_CFG.Datenbank.NUTZER, + password=USER_CFG.Datenbank.PASSWORT, + host=USER_CFG.Datenbank.HOST, + port=USER_CFG.Datenbank.PORT, + service_name=USER_CFG.Datenbank.SERVICE_NAME, +) + + +def pipeline_KPI_calculation() -> None: + logger_pipeline.info("Start pipeline >KPI_calculation<") + res = pipelines.KPI_calculation(ORACLE_CONN) + + if res.status != STATUS_HANDLER.SUCCESS: + logger_pipeline.error( + "An error occurred during the procedure --- Status:\n%s", + res.status, + stack_info=True, + ) + return + + logger_pipeline.info("Pipeline >KPI_calculation< ended successfully") diff --git a/src/wattanalyse/pipeline.py b/src/wattanalyse/pipelines.py similarity index 90% rename from src/wattanalyse/pipeline.py rename to src/wattanalyse/pipelines.py index 8cdfa0c..edc32ec 100644 --- a/src/wattanalyse/pipeline.py +++ b/src/wattanalyse/pipelines.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Final, cast import polars as pl import sqlalchemy as sql from dopt_basics.datastructures import flatten -from dopt_basics.result_pattern import wrap_result +from dopt_basics.result_pattern import STATUS_HANDLER, Status, wrap_result from wattanalyse import db from wattanalyse.logging import logger_pipeline as logger @@ -75,7 +75,8 @@ TAB_NAME_PSM: Final[str] = "EXTERN_PSM" TAB_NAME_MIS: Final[str] = "EXTERN_MIS" -# // (0) load data +# // (10) load data +@wrap_result(code_on_error=10) def load_PSM_data( conn: OracleConnection, ) -> pl.LazyFrame: @@ -89,7 +90,8 @@ def load_PSM_data( return oracle_load_table_as_polars(conn, db.extern_prod_order_t_schema, None, stmt) -# // (1) preprocess +# // (20) preprocess +@wrap_result(code_on_error=20) def preprocess_psm( data: pl.LazyFrame, ) -> PreProcessResult: @@ -167,7 +169,8 @@ def preprocess_psm( return PreProcessResult(data=data, filtered=filtered_data) -# // (2) process on order level +# // (30) process on order level +@wrap_result(code_on_error=30) def process_order_level( data: pl.LazyFrame, ) -> pl.LazyFrame: @@ -331,7 +334,7 @@ def process_order_level( return data -# // (3) dump order level to internal database +# // (40) dump order level to internal database def _json_default( value: Any, ) -> str: @@ -349,6 +352,7 @@ def _parse_to_json( return json.dumps(x.to_list(), default=_json_default) +@wrap_result(code_on_error=41) def dump_order_level_to_internal_database_staging( data: pl.LazyFrame, ) -> None: @@ -392,6 +396,7 @@ def dump_order_level_to_internal_database_staging( ) +@wrap_result(code_on_error=40) def dump_order_level_to_internal_database_wipe( data: pl.LazyFrame, ) -> None: @@ -420,6 +425,7 @@ def dump_order_level_to_internal_database_wipe( # ** load order level data from internal database +@wrap_result(code_on_error=49) def load_order_level_from_internal_database() -> pl.DataFrame: data = pl.read_database_uri( 'SELECT * FROM "Produktionsauftrag-Einzelsicht"', @@ -450,7 +456,7 @@ def load_order_level_from_internal_database() -> pl.DataFrame: return data.with_columns(**list_col_parse_conds) -# // (4) post-process results +# // (50) post-process results USE_BOUNDARIES: Final[bool] = False filter_date_deviation_early: pl.Expr @@ -463,6 +469,7 @@ else: filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0 +@wrap_result(code_on_error=51) def aggregate_production_orders( data: pl.LazyFrame, ) -> pl.LazyFrame: @@ -507,6 +514,7 @@ def aggregate_production_orders( return data +@wrap_result(code_on_error=52) def aggregate_suppliers( data: pl.LazyFrame, ) -> pl.LazyFrame: @@ -574,7 +582,8 @@ def aggregate_suppliers( return data -# // (5) external database +# // (60) external database +@wrap_result(code_on_error=60) def oracle_prepare_KPI_aggregate( data: pl.LazyFrame, rename_schema: dict[str, str] | None = None, @@ -605,6 +614,7 @@ def oracle_prepare_KPI_aggregate( return data +@wrap_result(code_on_error=61) def oracle_generate_sql_insert( table_name: str, columns: list, @@ -639,6 +649,7 @@ def oracle_load_table_as_polars( return df.lazy() +@wrap_result(code_on_error=62) def oracle_save_polars( conn: OracleConnection, stmts: SqlInsertStmts, @@ -652,48 +663,70 @@ def oracle_save_polars( # TODO wrap this in a metadata tracking call @wrap_result(code_on_error=1, logger=logger) -def run( +def KPI_calculation( conn: OracleConnection, -) -> None: - # // (0) Load from external database +) -> Status: + # // (10) Load from external database logger.info("Load data from database >load_PSM_data<...") - data = load_PSM_data(conn) + res = load_PSM_data(conn) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status logger.info("Successfully loaded data from database") - # // (1) preprocess data + # // (20) preprocess data logger.info("Preprocess data (cleansing) >preprocess_psm<...") - res = preprocess_psm(data) - data = res.data + res = preprocess_psm(res.unwrap()) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + # data = res.result.data logger.info("Successfully preprocessed data") - # // (2) process on order level + # // (30) process on order level logger.info("Process data on order level >process_order_level<...") - data = process_order_level(data) + res = process_order_level(res.unwrap().data) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + data = res.unwrap() logger.info("Successfully processed data on order level") - # // (3) dump to database (intermediate result) + # // (40) dump to database (intermediate result) logger.info("Save order level data in internal database...") - dump_order_level_to_internal_database_wipe(data) + res = dump_order_level_to_internal_database_wipe(data) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status logger.info("Successfully saved order level data in internal DB") - # // (4) post-process + # // (50) post-process # ** aggregation for orders logger.info("Aggregate data with KPI calculation...") logger.info("...production orders...") - orders_aggregated = aggregate_production_orders(data) + res = aggregate_production_orders(data) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + orders_aggregated = res.unwrap() # ** aggregation for suppliers logger.info("...suppliers...") - suppliers_aggregated = aggregate_suppliers(data) + res = aggregate_suppliers(data) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + suppliers_aggregated = res.unwrap() logger.info("Successfully aggregated and calculated KPIs") - # // (5) save to external database + # // (60) save to external database logger.info("Prepare saving data to external database...") logger.info("Prepare production order KPI table for Oracle export...") - orders_aggregated = oracle_prepare_KPI_aggregate(orders_aggregated) - stmts_orders = oracle_generate_sql_insert( + res = oracle_prepare_KPI_aggregate(orders_aggregated) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + orders_aggregated = res.unwrap() + # TODO add table names as variables + res = oracle_generate_sql_insert( table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=orders_aggregated.collect_schema().names(), ) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + stmts_orders = res.unwrap() logger.info( "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", stmts_orders.delete, @@ -702,14 +735,20 @@ def run( # ** suppliers logger.info("Prepare supplier KPI table for Oracle export...") - suppliers_aggregated = oracle_prepare_KPI_aggregate( + res = oracle_prepare_KPI_aggregate( suppliers_aggregated, sort_by="Konfektionaer", sort_descending=False, ) - stmts_suppliers = oracle_generate_sql_insert( + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + suppliers_aggregated = res.unwrap() + res = oracle_generate_sql_insert( table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names() ) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + stmts_suppliers = res.unwrap() logger.info( "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", stmts_suppliers.delete, @@ -718,6 +757,12 @@ def run( # ** actual saving procedure logger.info("Saving data to external database...") - oracle_save_polars(conn, stmts_orders, orders_aggregated.collect()) - oracle_save_polars(conn, stmts_suppliers, suppliers_aggregated.collect()) + res = oracle_save_polars(conn, stmts_orders, orders_aggregated.collect()) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status + res = oracle_save_polars(conn, stmts_suppliers, suppliers_aggregated.collect()) + if res.status != STATUS_HANDLER.SUCCESS: + return res.status logger.info("Successfully saved KPI tables to external database") + + return STATUS_HANDLER.SUCCESS