From 94cdb7ee9726a94afc39dc0442149f091005f997 Mon Sep 17 00:00:00 2001 From: foefl Date: Tue, 16 Jun 2026 08:52:49 +0200 Subject: [PATCH] fix correct result handling for pipeline --- src/wattanalyse/external_interface.py | 20 ++++++++++++++++++-- src/wattanalyse/pipelines.py | 9 ++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/wattanalyse/external_interface.py b/src/wattanalyse/external_interface.py index fa397e6..cbd5df4 100644 --- a/src/wattanalyse/external_interface.py +++ b/src/wattanalyse/external_interface.py @@ -21,6 +21,7 @@ def pipeline_KPI_calculation( conn: OracleConnection, ) -> int: return_code: int = 0 + export_status_code: int = 0 PIPELINE_NAME: Final[str] = "KPI_calculation" logger_pipeline.info("Start pipeline >%s<", PIPELINE_NAME) @@ -33,12 +34,27 @@ def pipeline_KPI_calculation( if res_pipe.status != STATUS_HANDLER.SUCCESS: logger_pipeline.error( - ("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"), + ( + "[PIPELINE: %s] An immediate pipeline error occurred during the procedure " + "--- Status:\n%s" + ), PIPELINE_NAME, res_pipe.status, stack_info=True, ) return_code = 1 + export_status_code = res_pipe.status.code + + internal_pipe_state = res_pipe.unwrap() + if internal_pipe_state != STATUS_HANDLER.SUCCESS: + logger_pipeline.error( + ("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"), + PIPELINE_NAME, + internal_pipe_state, + stack_info=True, + ) + return_code = 1 + export_status_code = internal_pipe_state.code logger_database.info("Prepare collected metadata...") dur_sek = (t2 - t1) / 1e9 @@ -49,7 +65,7 @@ def pipeline_KPI_calculation( gestartet_um=start, beendet_um=stop, dauer_sek=dur_sek, - status_code=res_pipe.status.code, + status_code=export_status_code, ) res_metadata = pipelines.write_metadata(metadata) diff --git a/src/wattanalyse/pipelines.py b/src/wattanalyse/pipelines.py index 8b6c5d3..d8cd0f4 100644 --- a/src/wattanalyse/pipelines.py +++ b/src/wattanalyse/pipelines.py @@ -76,6 +76,8 @@ LOWER_BOUND_DATE_DEVIATION: Final[int] = ( UPPER_BOUND_DATE_DEVIATION: Final[int] = ( USER_CFG.Datenpipelines_PSM.Terminabweichung_obere_Schranke ) + + NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = ( USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig ) @@ -117,6 +119,11 @@ def load_PSM_data( def preprocess_psm( data: pl.LazyFrame, ) -> PreProcessResult: + if LOWER_BOUND_DATE_DEVIATION > UPPER_BOUND_DATE_DEVIATION: + raise ValueError( + "Lower bound for date deviation must not be greater than upper bound." + ) + data = data.rename(RENAMING_SCHEME_PSM) data = data.drop(DROP_COLUMNS, strict=False) REGEX_PATTERN = r"^[\s\-#+/$]+$" @@ -660,7 +667,7 @@ def oracle_save_polars( conn.commit() -@wrap_result(code_on_error=1, logger=logger) +@wrap_result(code_on_error=1) def KPI_calculation( conn: OracleConnection, ) -> Status: