enhanced error handling and metadata handling

This commit is contained in:
2026-06-11 11:45:48 +02:00
parent e1048bb78c
commit 7c35470317
3 changed files with 109 additions and 32 deletions

View File

@@ -47,3 +47,15 @@ res = pipelines.write_metadata(metadata)
# %% # %%
res.status res.status
# %% # %%
res = pipelines.load_metadata_from_internal_database()
df = res.unwrap()
# %%
df
# %%
res = pipelines.delete_metadata_from_internal_database()
res.unwrap()
res = pipelines.load_metadata_from_internal_database()
df = res.unwrap()
# %%
df
# %%

View File

@@ -1,24 +1,41 @@
import sys
import time import time
from typing import Final
import dopt_basics.datetime as dopt_dt import dopt_basics.datetime as dopt_dt
import oracledb import oracledb
from dopt_basics.result_pattern import STATUS_HANDLER from dopt_basics.result_pattern import STATUS_HANDLER
from oracledb.exceptions import OperationalError
from wattanalyse import db, pipelines from wattanalyse import db, pipelines
from wattanalyse.constants import USER_CFG from wattanalyse.constants import USER_CFG
from wattanalyse.logging import logger_database, logger_pipeline from wattanalyse.logging import logger_base, logger_database, logger_pipeline
ORACLE_CONN = oracledb.connect( try:
ORACLE_CONN = oracledb.connect(
user=USER_CFG.Datenbank.Nutzer, user=USER_CFG.Datenbank.Nutzer,
password=USER_CFG.Datenbank.Passwort, password=USER_CFG.Datenbank.Passwort,
host=USER_CFG.Datenbank.Host, host=USER_CFG.Datenbank.Host,
port=USER_CFG.Datenbank.Port, port=USER_CFG.Datenbank.Port,
service_name=USER_CFG.Datenbank.Service_Name, service_name=USER_CFG.Datenbank.Service_Name,
) )
except OperationalError as err:
logger_base.error(
(
"[Oracle Database] Could not establish connection. Check if the database "
"online, fully functional and reachable. Check the configuration parameters.\n"
">>> Exception:\n%s"
),
err,
stack_info=True,
)
sys.exit(1)
def pipeline_KPI_calculation() -> None: def pipeline_KPI_calculation() -> int:
logger_pipeline.info("Start pipeline >KPI_calculation<") return_code: int = 0
PIPELINE_NAME: Final[str] = "KPI_calculation"
logger_pipeline.info("Start pipeline >%s<", PIPELINE_NAME)
start = dopt_dt.current_time_tz() start = dopt_dt.current_time_tz()
t1 = time.perf_counter_ns() t1 = time.perf_counter_ns()
@@ -29,38 +46,62 @@ def pipeline_KPI_calculation() -> None:
if res_pipe.status != STATUS_HANDLER.SUCCESS: if res_pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipeline.error( logger_pipeline.error(
( ("[PIPELINE: %s] An error occurred during the procedure --- Status:\n%s"),
"[PIPELINE: KPI Calculation] An error occurred during the " PIPELINE_NAME,
"procedure --- Status:\n%s"
),
res_pipe.status, res_pipe.status,
stack_info=True, stack_info=True,
) )
return_code = 1
logger_database.info("Prepare collected metadata...")
dur_sek = (t2 - t1) / 1e9 dur_sek = (t2 - t1) / 1e9
dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS) dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS)
stop = start + dur stop = start + dur
metadata = db.InternMetadataInsertEntry( metadata = db.InternMetadataInsertEntry(
pipeline_name="test", pipeline_name=PIPELINE_NAME,
gestartet_um=start, gestartet_um=start,
beendet_um=stop, beendet_um=stop,
dauer_sek=dur_sek, dauer_sek=dur_sek,
status_code=res_pipe.status.code, status_code=res_pipe.status.code,
) )
res = pipelines.write_metadata(metadata) res_metadata = pipelines.write_metadata(metadata)
if res.status != STATUS_HANDLER.SUCCESS: if res_metadata.status != STATUS_HANDLER.SUCCESS:
logger_database.error( logger_database.error(
( (
"[INTERNAL DB] An error occurred while writing the metadata to the internal " "[INTERNAL DB] An error occurred while writing the metadata to the internal "
"database --- Status:\n%s" "database --- Status:\n%s"
), ),
res.status, res_metadata.status,
stack_info=True, stack_info=True,
) )
return_code = 1
return return_code
logger_pipeline.info("Pipeline >KPI_calculation< ended successfully") logger_database.info("Successfully saved metadata to database")
logger_pipeline.info("Pipeline >%s< ended successfully", PIPELINE_NAME)
logger_pipeline.info(
"Pipeline >%s<: execution duration was %.4f seconds",
PIPELINE_NAME,
metadata.dauer_sek,
)
return return_code
if __name__ == "__main__": if __name__ == "__main__":
pipeline_KPI_calculation() try:
code = pipeline_KPI_calculation()
sys.exit(code)
except Exception as err:
logger_base.error(
(
"[BASE ERROR] An unexpected and unwrapped error occurred during the "
"execution of the pipeline function.\n>>> Exception:\n%s"
),
err,
stack_info=True,
)
sys.exit(1)
finally:
ORACLE_CONN.close()

View File

@@ -4,6 +4,7 @@ import dataclasses as dc
import datetime import datetime
import json import json
import warnings import warnings
from pprint import pformat
from typing import TYPE_CHECKING, Any, Final, cast from typing import TYPE_CHECKING, Any, Final, cast
import polars as pl import polars as pl
@@ -13,6 +14,7 @@ from dopt_basics.result_pattern import STATUS_HANDLER, Status, wrap_result
from wattanalyse import db from wattanalyse import db
from wattanalyse.constants import USER_CFG, QualityPsm from wattanalyse.constants import USER_CFG, QualityPsm
from wattanalyse.logging import logger_database
from wattanalyse.logging import logger_pipeline as logger from wattanalyse.logging import logger_pipeline as logger
from wattanalyse.types import SqlInsertStmts, SqlStatement from wattanalyse.types import SqlInsertStmts, SqlStatement
@@ -65,8 +67,10 @@ UPPER_BOUND_DATE_DEVIATION: Final[int] = (
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = ( NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = (
USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig
) )
TAB_NAME_PSM: Final[str] = USER_CFG.Datenbank.Tabellenname_Produktionsstandmeldung TAB_NAME_IMPORT_PSM: Final[str] = USER_CFG.Datenbank.Tabellenname_Produktionsstandmeldung
TAB_NAME_MIS: Final[str] = USER_CFG.Datenbank.Tabellenname_MIS_Auftraege TAB_NAME_IMPORT_MIS: Final[str] = USER_CFG.Datenbank.Tabellenname_MIS_Auftraege
TAB_NAME_EXPORT_ORDERS: Final[str] = USER_CFG.Datenbank.Tabellenname_KPI_Auftraege
TAB_NAME_EXPORT_SUPPLIERS: Final[str] = USER_CFG.Datenbank.Tabellenname_KPI_Konfektionaere
USE_BOUNDARIES: Final[bool] = ( USE_BOUNDARIES: Final[bool] = (
USER_CFG.Datenpipelines_PSM.Nutze_Schranken_Terminabweichung_KPI_Berechnung USER_CFG.Datenpipelines_PSM.Nutze_Schranken_Terminabweichung_KPI_Berechnung
@@ -87,9 +91,9 @@ def load_PSM_data(
conn: OracleConnection, conn: OracleConnection,
) -> pl.LazyFrame: ) -> pl.LazyFrame:
stmt = f""" stmt = f"""
SELECT t1.* FROM "{TAB_NAME_PSM}" t1 SELECT t1.* FROM "{TAB_NAME_IMPORT_PSM}" t1
WHERE EXISTS( WHERE EXISTS(
SELECT 1 FROM "{TAB_NAME_MIS}" t2 SELECT 1 FROM "{TAB_NAME_IMPORT_MIS}" t2
WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos" WHERE t1."PA" = t2."PA" AND t1."PA Pos" = t2."PA Pos"
) )
""" """
@@ -655,7 +659,6 @@ def oracle_save_polars(
conn.commit() conn.commit()
# TODO wrap this in a metadata tracking call
@wrap_result(code_on_error=1, logger=logger) @wrap_result(code_on_error=1, logger=logger)
def KPI_calculation( def KPI_calculation(
conn: OracleConnection, conn: OracleConnection,
@@ -713,16 +716,15 @@ def KPI_calculation(
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
return res.status return res.status
orders_aggregated = res.unwrap() orders_aggregated = res.unwrap()
# TODO add table names as variables
res = oracle_generate_sql_insert( res = oracle_generate_sql_insert(
table_name="KPI_PRODUKTIONSAUFTRAEGE", table_name=TAB_NAME_EXPORT_ORDERS,
columns=orders_aggregated.collect_schema().names(), columns=orders_aggregated.collect_schema().names(),
) )
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
return res.status return res.status
stmts_orders = res.unwrap() stmts_orders = res.unwrap()
logger.info( logger.info(
"SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", "SQL Statemens:\n--- DELETE: %s\n--- INSERT: %s",
stmts_orders.delete, stmts_orders.delete,
stmts_orders.insert, stmts_orders.insert,
) )
@@ -738,13 +740,14 @@ def KPI_calculation(
return res.status return res.status
suppliers_aggregated = res.unwrap() suppliers_aggregated = res.unwrap()
res = oracle_generate_sql_insert( res = oracle_generate_sql_insert(
table_name="KPI_KONFEKTIONAERE", columns=suppliers_aggregated.collect_schema().names() table_name=TAB_NAME_EXPORT_SUPPLIERS,
columns=suppliers_aggregated.collect_schema().names(),
) )
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
return res.status return res.status
stmts_suppliers = res.unwrap() stmts_suppliers = res.unwrap()
logger.info( logger.info(
"SQL Statemens:\n--- DELETE: %s\n---INSERT: %s", "SQL Statemens:\n--- DELETE: %s\n--- INSERT: %s",
stmts_suppliers.delete, stmts_suppliers.delete,
stmts_suppliers.insert, stmts_suppliers.insert,
) )
@@ -763,8 +766,29 @@ def KPI_calculation(
@wrap_result(code_on_error=200) @wrap_result(code_on_error=200)
def write_metadata(metadata: db.InternMetadataInsertEntry) -> None: def write_metadata(
metadata: db.InternMetadataInsertEntry,
) -> None:
stmt = sql.insert(db.intern_metadata_t) stmt = sql.insert(db.intern_metadata_t)
metadata_insert = dc.asdict(metadata)
logger_database.info(
"Trying to save the following metadata to the internal database:\n%s",
pformat(metadata_insert),
)
with db.ENGINE_INTERNAL.begin() as conn: with db.ENGINE_INTERNAL.begin() as conn:
conn.execute(stmt, dc.asdict(metadata)) conn.execute(stmt, metadata_insert)
@wrap_result(code_on_error=201)
def load_metadata_from_internal_database() -> pl.DataFrame:
with db.ENGINE_INTERNAL.connect() as conn:
res = conn.execute(sql.select(db.intern_metadata_t))
return pl.DataFrame(res.fetchall())
@wrap_result(code_on_error=202)
def delete_metadata_from_internal_database() -> None:
with db.ENGINE_INTERNAL.begin() as conn:
conn.execute(sql.delete(db.intern_metadata_t))