diff --git a/prototypes/05-1_metadata.py b/prototypes/05-1_metadata.py new file mode 100644 index 0000000..dab0712 --- /dev/null +++ b/prototypes/05-1_metadata.py @@ -0,0 +1,49 @@ +# %% +import dataclasses as dc +import importlib +import time +from pathlib import Path + +import polars as pl +import sqlalchemy as sql +from dopt_basics import datetime as dopt_dt + +import wattanalyse +from wattanalyse import constants, db, pipelines + +importlib.reload(wattanalyse) +importlib.reload(constants) +importlib.reload(db) + +PROJECT_BASE = Path(__file__).parents[1] +DATA_PTH = PROJECT_BASE / "data" +assert DATA_PTH.exists() + +# %% +start = dopt_dt.current_time_tz() +t1 = time.perf_counter_ns() + +time.sleep(1.5) + +t2 = time.perf_counter_ns() +dur_sek = (t2 - t1) / 1e9 +dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS) + +stop = start + dur + +print(f"Started: {start}\nDuration: {dur} sek\nEnded: {stop}") +# %% +metadata = db.InternMetadataInsertEntry( + pipeline_name="test", + gestartet_um=start, + beendet_um=stop, + dauer_sek=dur_sek, + status_code=0, +) + +# %% +res = pipelines.write_metadata(metadata) + +# %% +res.status +# %% diff --git a/src/wattanalyse/db.py b/src/wattanalyse/db.py index 058be1e..405cd00 100644 --- a/src/wattanalyse/db.py +++ b/src/wattanalyse/db.py @@ -1,8 +1,10 @@ +import dataclasses as dc +import datetime from typing import Final import polars as pl import sqlalchemy as sql -from sqlalchemy import Column, Table +from sqlalchemy import Column, Table, TypeDecorator from wattanalyse import constants @@ -10,12 +12,65 @@ assert constants.Config.DB_PATH_INTERNAL.parent.exists(), ( "database parent folder does not exists" ) + +class UTCDateTime(TypeDecorator): + """Safely coerces naive datetimes from SQLite into timezone-aware UTC.""" + + impl = sql.DateTime + cache_ok = True + + def process_bind_param(self, value, dialect): + """Runs when saving to the database.""" + if value is not None: + if value.tzinfo is None: + value = value.replace(tzinfo=datetime.timezone.utc) + else: + value = value.astimezone(datetime.timezone.utc) + return value + + def process_result_value(self, value, dialect): + """Runs when fetching from the database.""" + if value is not None and value.tzinfo is None: + value = value.replace(tzinfo=datetime.timezone.utc) + return value + + DB_URI: Final[str] = f"sqlite:///{constants.Config.DB_PATH_INTERNAL}" ENGINE_INTERNAL: Final[sql.Engine] = sql.create_engine(DB_URI) MD_INTERNAL = sql.MetaData() MD_EXTERNAL = sql.MetaData() +# // internal database +intern_metadata_t: Table = Table( + "Metadaten", + MD_INTERNAL, + Column("ID", sql.Integer, primary_key=True, autoincrement=True), + Column("pipeline_name", sql.String, nullable=False), + Column("gestartet_um", UTCDateTime, nullable=False), + Column("beendet_um", UTCDateTime, nullable=False), + Column("dauer_sek", sql.Float, nullable=False), + Column("status_code", sql.Integer, nullable=False), +) + +intern_metadata_t_schema: dict[str, type[pl.DataType] | pl.DataType] = { + "ID": pl.UInt64, + "pipeline_name": pl.String, + "gestartet_um": pl.Datetime(time_zone=datetime.timezone.utc), + "beendet_um": pl.Datetime(time_zone=datetime.timezone.utc), + "dauer_sek": pl.Float64, + "status_code": pl.Int16, +} + + +@dc.dataclass(slots=True, kw_only=True) +class InternMetadataInsertEntry: + pipeline_name: str + gestartet_um: datetime.datetime + beendet_um: datetime.datetime + dauer_sek: float + status_code: int + intern_prod_order_t: Table = Table( "Produktionsauftrag-Einzelsicht", diff --git a/src/wattanalyse/external_interface.py b/src/wattanalyse/external_interface.py index 9f533c8..3474a66 100644 --- a/src/wattanalyse/external_interface.py +++ b/src/wattanalyse/external_interface.py @@ -1,29 +1,66 @@ +import time + +import dopt_basics.datetime as dopt_dt import oracledb from dopt_basics.result_pattern import STATUS_HANDLER -from wattanalyse import pipelines +from wattanalyse import db, pipelines from wattanalyse.constants import USER_CFG -from wattanalyse.logging import logger_pipeline +from wattanalyse.logging import logger_database, logger_pipeline ORACLE_CONN = oracledb.connect( - user=USER_CFG.Datenbank.NUTZER, - password=USER_CFG.Datenbank.PASSWORT, - host=USER_CFG.Datenbank.HOST, - port=USER_CFG.Datenbank.PORT, - service_name=USER_CFG.Datenbank.SERVICE_NAME, + user=USER_CFG.Datenbank.Nutzer, + password=USER_CFG.Datenbank.Passwort, + host=USER_CFG.Datenbank.Host, + port=USER_CFG.Datenbank.Port, + service_name=USER_CFG.Datenbank.Service_Name, ) def pipeline_KPI_calculation() -> None: logger_pipeline.info("Start pipeline >KPI_calculation<") - res = pipelines.KPI_calculation(ORACLE_CONN) + + start = dopt_dt.current_time_tz() + t1 = time.perf_counter_ns() + + res_pipe = pipelines.KPI_calculation(ORACLE_CONN) + + t2 = time.perf_counter_ns() + + if res_pipe.status != STATUS_HANDLER.SUCCESS: + logger_pipeline.error( + ( + "[PIPELINE: KPI Calculation] An error occurred during the " + "procedure --- Status:\n%s" + ), + res_pipe.status, + stack_info=True, + ) + + dur_sek = (t2 - t1) / 1e9 + dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS) + stop = start + dur + metadata = db.InternMetadataInsertEntry( + pipeline_name="test", + gestartet_um=start, + beendet_um=stop, + dauer_sek=dur_sek, + status_code=res_pipe.status.code, + ) + res = pipelines.write_metadata(metadata) if res.status != STATUS_HANDLER.SUCCESS: - logger_pipeline.error( - "An error occurred during the procedure --- Status:\n%s", + logger_database.error( + ( + "[INTERNAL DB] An error occurred while writing the metadata to the internal " + "database --- Status:\n%s" + ), res.status, stack_info=True, ) - return logger_pipeline.info("Pipeline >KPI_calculation< ended successfully") + + +if __name__ == "__main__": + pipeline_KPI_calculation() diff --git a/src/wattanalyse/pipelines.py b/src/wattanalyse/pipelines.py index 5b76d07..7bc25e0 100644 --- a/src/wattanalyse/pipelines.py +++ b/src/wattanalyse/pipelines.py @@ -760,3 +760,11 @@ def KPI_calculation( logger.info("Successfully saved KPI tables to external database") return STATUS_HANDLER.SUCCESS + + +@wrap_result(code_on_error=200) +def write_metadata(metadata: db.InternMetadataInsertEntry) -> None: + stmt = sql.insert(db.intern_metadata_t) + + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(stmt, dc.asdict(metadata))