fix correct result handling for pipeline

This commit is contained in:
2026-06-16 08:52:49 +02:00
parent 7d6dd86cee
commit 94cdb7ee97
2 changed files with 26 additions and 3 deletions

View File

@@ -21,6 +21,7 @@ def pipeline_KPI_calculation(
conn: OracleConnection, conn: OracleConnection,
) -> int: ) -> int:
return_code: int = 0 return_code: int = 0
export_status_code: int = 0
PIPELINE_NAME: Final[str] = "KPI_calculation" PIPELINE_NAME: Final[str] = "KPI_calculation"
logger_pipeline.info("Start pipeline >%s<", PIPELINE_NAME) logger_pipeline.info("Start pipeline >%s<", PIPELINE_NAME)
@@ -33,12 +34,27 @@ def pipeline_KPI_calculation(
if res_pipe.status != STATUS_HANDLER.SUCCESS: if res_pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipeline.error( logger_pipeline.error(
("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"), (
"[PIPELINE: %s] An immediate pipeline error occurred during the procedure "
"--- Status:\n%s"
),
PIPELINE_NAME, PIPELINE_NAME,
res_pipe.status, res_pipe.status,
stack_info=True, stack_info=True,
) )
return_code = 1 return_code = 1
export_status_code = res_pipe.status.code
internal_pipe_state = res_pipe.unwrap()
if internal_pipe_state != STATUS_HANDLER.SUCCESS:
logger_pipeline.error(
("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"),
PIPELINE_NAME,
internal_pipe_state,
stack_info=True,
)
return_code = 1
export_status_code = internal_pipe_state.code
logger_database.info("Prepare collected metadata...") logger_database.info("Prepare collected metadata...")
dur_sek = (t2 - t1) / 1e9 dur_sek = (t2 - t1) / 1e9
@@ -49,7 +65,7 @@ def pipeline_KPI_calculation(
gestartet_um=start, gestartet_um=start,
beendet_um=stop, beendet_um=stop,
dauer_sek=dur_sek, dauer_sek=dur_sek,
status_code=res_pipe.status.code, status_code=export_status_code,
) )
res_metadata = pipelines.write_metadata(metadata) res_metadata = pipelines.write_metadata(metadata)

View File

@@ -76,6 +76,8 @@ LOWER_BOUND_DATE_DEVIATION: Final[int] = (
UPPER_BOUND_DATE_DEVIATION: Final[int] = ( UPPER_BOUND_DATE_DEVIATION: Final[int] = (
USER_CFG.Datenpipelines_PSM.Terminabweichung_obere_Schranke USER_CFG.Datenpipelines_PSM.Terminabweichung_obere_Schranke
) )
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = ( NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = (
USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig
) )
@@ -117,6 +119,11 @@ def load_PSM_data(
def preprocess_psm( def preprocess_psm(
data: pl.LazyFrame, data: pl.LazyFrame,
) -> PreProcessResult: ) -> PreProcessResult:
if LOWER_BOUND_DATE_DEVIATION > UPPER_BOUND_DATE_DEVIATION:
raise ValueError(
"Lower bound for date deviation must not be greater than upper bound."
)
data = data.rename(RENAMING_SCHEME_PSM) data = data.rename(RENAMING_SCHEME_PSM)
data = data.drop(DROP_COLUMNS, strict=False) data = data.drop(DROP_COLUMNS, strict=False)
REGEX_PATTERN = r"^[\s\-#+/$]+$" REGEX_PATTERN = r"^[\s\-#+/$]+$"
@@ -660,7 +667,7 @@ def oracle_save_polars(
conn.commit() conn.commit()
@wrap_result(code_on_error=1, logger=logger) @wrap_result(code_on_error=1)
def KPI_calculation( def KPI_calculation(
conn: OracleConnection, conn: OracleConnection,
) -> Status: ) -> Status: