From 7c354703174121402d682a33303a19f305804c32 Mon Sep 17 00:00:00 2001 From: foefl Date: Thu, 11 Jun 2026 11:45:48 +0200 Subject: [PATCH] enhanced error handling and metadata handling --- prototypes/05-1_metadata.py | 12 ++++ src/wattanalyse/external_interface.py | 81 ++++++++++++++++++++------- src/wattanalyse/pipelines.py | 48 ++++++++++++---- 3 files changed, 109 insertions(+), 32 deletions(-) diff --git a/prototypes/05-1_metadata.py b/prototypes/05-1_metadata.py index dab0712..5af2301 100644 --- a/prototypes/05-1_metadata.py +++ b/prototypes/05-1_metadata.py @@ -47,3 +47,15 @@ res = pipelines.write_metadata(metadata) # %% res.status # %% +res = pipelines.load_metadata_from_internal_database() +df = res.unwrap() +# %% +df +# %% +res = pipelines.delete_metadata_from_internal_database() +res.unwrap() +res = pipelines.load_metadata_from_internal_database() +df = res.unwrap() +# %% +df +# %% diff --git a/src/wattanalyse/external_interface.py b/src/wattanalyse/external_interface.py index 3474a66..b3513eb 100644 --- a/src/wattanalyse/external_interface.py +++ b/src/wattanalyse/external_interface.py @@ -1,24 +1,41 @@ +import sys import time +from typing import Final import dopt_basics.datetime as dopt_dt import oracledb from dopt_basics.result_pattern import STATUS_HANDLER +from oracledb.exceptions import OperationalError from wattanalyse import db, pipelines from wattanalyse.constants import USER_CFG -from wattanalyse.logging import logger_database, logger_pipeline +from wattanalyse.logging import logger_base, logger_database, logger_pipeline -ORACLE_CONN = oracledb.connect( - user=USER_CFG.Datenbank.Nutzer, - password=USER_CFG.Datenbank.Passwort, - host=USER_CFG.Datenbank.Host, - port=USER_CFG.Datenbank.Port, - service_name=USER_CFG.Datenbank.Service_Name, -) +try: + ORACLE_CONN = oracledb.connect( + user=USER_CFG.Datenbank.Nutzer, + password=USER_CFG.Datenbank.Passwort, + host=USER_CFG.Datenbank.Host, + port=USER_CFG.Datenbank.Port, + service_name=USER_CFG.Datenbank.Service_Name, + ) +except OperationalError as err: + logger_base.error( + ( + "[Oracle Database] Could not establish connection. Check if the database " + "online, fully functional and reachable. Check the configuration parameters.\n" + ">>> Exception:\n%s" + ), + err, + stack_info=True, + ) + sys.exit(1) -def pipeline_KPI_calculation() -> None: - logger_pipeline.info("Start pipeline >KPI_calculation<") +def pipeline_KPI_calculation() -> int: + return_code: int = 0 + PIPELINE_NAME: Final[str] = "KPI_calculation" + logger_pipeline.info("Start pipeline >%s<", PIPELINE_NAME) start = dopt_dt.current_time_tz() t1 = time.perf_counter_ns() @@ -29,38 +46,62 @@ def pipeline_KPI_calculation() -> None: if res_pipe.status != STATUS_HANDLER.SUCCESS: logger_pipeline.error( - ( - "[PIPELINE: KPI Calculation] An error occurred during the " - "procedure --- Status:\n%s" - ), + ("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"), + PIPELINE_NAME, res_pipe.status, stack_info=True, ) + return_code = 1 + logger_database.info("Prepare collected metadata...") dur_sek = (t2 - t1) / 1e9 dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS) stop = start + dur metadata = db.InternMetadataInsertEntry( - pipeline_name="test", + pipeline_name=PIPELINE_NAME, gestartet_um=start, beendet_um=stop, dauer_sek=dur_sek, status_code=res_pipe.status.code, ) - res = pipelines.write_metadata(metadata) + res_metadata = pipelines.write_metadata(metadata) - if res.status != STATUS_HANDLER.SUCCESS: + if res_metadata.status != STATUS_HANDLER.SUCCESS: logger_database.error( ( "[INTERNAL DB] An error occurred while writing the metadata to the internal " "database --- Status:\n%s" ), - res.status, + res_metadata.status, stack_info=True, ) + return_code = 1 + return return_code - logger_pipeline.info("Pipeline >KPI_calculation< ended successfully") + logger_database.info("Successfully saved metadata to database") + logger_pipeline.info("Pipeline >%s< ended successfully", PIPELINE_NAME) + logger_pipeline.info( + "Pipeline >%s<: execution duration was %.4f seconds", + PIPELINE_NAME, + metadata.dauer_sek, + ) + + return return_code if __name__ == "__main__": - pipeline_KPI_calculation() + try: + code = pipeline_KPI_calculation() + sys.exit(code) + except Exception as err: + logger_base.error( + ( + "[BASE ERROR] An unexpected and unwrapped error occurred during the " + "execution of the pipeline function.\n>>> Exception:\n%s" + ), + err, + stack_info=True, + ) + sys.exit(1) + finally: + ORACLE_CONN.close() diff --git a/src/wattanalyse/pipelines.py b/src/wattanalyse/pipelines.py index 7bc25e0..5b32aba 100644 --- a/src/wattanalyse/pipelines.py +++ b/src/wattanalyse/pipelines.py @@ -4,6 +4,7 @@ import dataclasses as dc import datetime import json import warnings +from pprint import pformat from typing import TYPE_CHECKING, Any, Final, cast import polars as pl @@ -13,6 +14,7 @@ from dopt_basics.result_pattern import STATUS_HANDLER, Status, wrap_result from wattanalyse import db from wattanalyse.constants import USER_CFG, QualityPsm +from wattanalyse.logging import logger_database from wattanalyse.logging import logger_pipeline as logger from wattanalyse.types import SqlInsertStmts, SqlStatement @@ -65,8 +67,10 @@ UPPER_BOUND_DATE_DEVIATION: Final[int] = ( NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = ( USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig ) -TAB_NAME_PSM: Final[str] = USER_CFG.Datenbank.Tabellenname_Produktionsstandmeldung -TAB_NAME_MIS: Final[str] = USER_CFG.Datenbank.Tabellenname_MIS_Auftraege +TAB_NAME_IMPORT_PSM: Final[str] = USER_CFG.Datenbank.Tabellenname_Produktionsstandmeldung +TAB_NAME_IMPORT_MIS: Final[str] = USER_CFG.Datenbank.Tabellenname_MIS_Auftraege +TAB_NAME_EXPORT_ORDERS: Final[str] = USER_CFG.Datenbank.Tabellenname_KPI_Auftraege +TAB_NAME_EXPORT_SUPPLIERS: Final[str] = USER_CFG.Datenbank.Tabellenname_KPI_Konfektionaere USE_BOUNDARIES: Final[bool] = ( USER_CFG.Datenpipelines_PSM.Nutze_Schranken_Terminabweichung_KPI_Berechnung @@ -87,9 +91,9 @@ def load_PSM_data( conn: OracleConnection, ) -> pl.LazyFrame: stmt = f""" - SELECT t1.* FROM "{TAB_NAME_PSM}" t1 + SELECT t1.* FROM "{TAB_NAME_IMPORT_PSM}" t1 WHERE EXISTS( - SELECT 1 FROM "{TAB_NAME_MIS}" t2 + SELECT 1 FROM "{TAB_NAME_IMPORT_MIS}" t2 WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" ) """ @@ -655,7 +659,6 @@ def oracle_save_polars( conn.commit() -# TODO wrap this in a metadata tracking call @wrap_result(code_on_error=1, logger=logger) def KPI_calculation( conn: OracleConnection, @@ -713,16 +716,15 @@ def KPI_calculation( if res.status != STATUS_HANDLER.SUCCESS: return res.status orders_aggregated = res.unwrap() - # TODO add table names as variables res = oracle_generate_sql_insert( - table_name="KPI_PRODUKTIONSAUFTRAEGE", + table_name=TAB_NAME_EXPORT_ORDERS, columns=orders_aggregated.collect_schema().names(), ) if res.status != STATUS_HANDLER.SUCCESS: return res.status stmts_orders = res.unwrap() logger.info( - "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", + "SQL Statemens:\n--- DELETE: %s\n--- INSERT: %s", stmts_orders.delete, stmts_orders.insert, ) @@ -738,13 +740,14 @@ def KPI_calculation( return res.status suppliers_aggregated = res.unwrap() res = oracle_generate_sql_insert( - table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names() + table_name=TAB_NAME_EXPORT_SUPPLIERS, + columns=suppliers_aggregated.collect_schema().names(), ) if res.status != STATUS_HANDLER.SUCCESS: return res.status stmts_suppliers = res.unwrap() logger.info( - "SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", + "SQL Statemens:\n--- DELETE: %s\n--- INSERT: %s", stmts_suppliers.delete, stmts_suppliers.insert, ) @@ -763,8 +766,29 @@ def KPI_calculation( @wrap_result(code_on_error=200) -def write_metadata(metadata: db.InternMetadataInsertEntry) -> None: +def write_metadata( + metadata: db.InternMetadataInsertEntry, +) -> None: stmt = sql.insert(db.intern_metadata_t) + metadata_insert = dc.asdict(metadata) + logger_database.info( + "Trying to save the following metadata to the internal database:\n%s", + pformat(metadata_insert), + ) with db.ENGINE_INTERNAL.begin() as conn: - conn.execute(stmt, dc.asdict(metadata)) + conn.execute(stmt, metadata_insert) + + +@wrap_result(code_on_error=201) +def load_metadata_from_internal_database() -> pl.DataFrame: + with db.ENGINE_INTERNAL.connect() as conn: + res = conn.execute(sql.select(db.intern_metadata_t)) + + return pl.DataFrame(res.fetchall()) + + +@wrap_result(code_on_error=202) +def delete_metadata_from_internal_database() -> None: + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(sql.delete(db.intern_metadata_t))