add metadata tracking

This commit is contained in:
2026-06-11 10:43:30 +02:00
parent 7f864bc76b
commit 83b2610ee8
4 changed files with 161 additions and 12 deletions

View File

@@ -0,0 +1,49 @@
# %%
import dataclasses as dc
import importlib
import time
from pathlib import Path
import polars as pl
import sqlalchemy as sql
from dopt_basics import datetime as dopt_dt
import wattanalyse
from wattanalyse import constants, db, pipelines
importlib.reload(wattanalyse)
importlib.reload(constants)
importlib.reload(db)
PROJECT_BASE = Path(__file__).parents[1]
DATA_PTH = PROJECT_BASE / "data"
assert DATA_PTH.exists()
# %%
start = dopt_dt.current_time_tz()
t1 = time.perf_counter_ns()
time.sleep(1.5)
t2 = time.perf_counter_ns()
dur_sek = (t2 - t1) / 1e9
dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS)
stop = start + dur
print(f"Started: {start}\nDuration: {dur} sek\nEnded: {stop}")
# %%
metadata = db.InternMetadataInsertEntry(
pipeline_name="test",
gestartet_um=start,
beendet_um=stop,
dauer_sek=dur_sek,
status_code=0,
)
# %%
res = pipelines.write_metadata(metadata)
# %%
res.status
# %%

View File

@@ -1,8 +1,10 @@
import dataclasses as dc
import datetime
from typing import Final from typing import Final
import polars as pl import polars as pl
import sqlalchemy as sql import sqlalchemy as sql
from sqlalchemy import Column, Table from sqlalchemy import Column, Table, TypeDecorator
from wattanalyse import constants from wattanalyse import constants
@@ -10,12 +12,65 @@ assert constants.Config.DB_PATH_INTERNAL.parent.exists(), (
"database parent folder does not exists" "database parent folder does not exists"
) )
class UTCDateTime(TypeDecorator):
"""Safely coerces naive datetimes from SQLite into timezone-aware UTC."""
impl = sql.DateTime
cache_ok = True
def process_bind_param(self, value, dialect):
"""Runs when saving to the database."""
if value is not None:
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
else:
value = value.astimezone(datetime.timezone.utc)
return value
def process_result_value(self, value, dialect):
"""Runs when fetching from the database."""
if value is not None and value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
return value
DB_URI: Final[str] = f"sqlite:///{constants.Config.DB_PATH_INTERNAL}" DB_URI: Final[str] = f"sqlite:///{constants.Config.DB_PATH_INTERNAL}"
ENGINE_INTERNAL: Final[sql.Engine] = sql.create_engine(DB_URI) ENGINE_INTERNAL: Final[sql.Engine] = sql.create_engine(DB_URI)
MD_INTERNAL = sql.MetaData() MD_INTERNAL = sql.MetaData()
MD_EXTERNAL = sql.MetaData() MD_EXTERNAL = sql.MetaData()
# // internal database
intern_metadata_t: Table = Table(
"Metadaten",
MD_INTERNAL,
Column("ID", sql.Integer, primary_key=True, autoincrement=True),
Column("pipeline_name", sql.String, nullable=False),
Column("gestartet_um", UTCDateTime, nullable=False),
Column("beendet_um", UTCDateTime, nullable=False),
Column("dauer_sek", sql.Float, nullable=False),
Column("status_code", sql.Integer, nullable=False),
)
intern_metadata_t_schema: dict[str, type[pl.DataType] | pl.DataType] = {
"ID": pl.UInt64,
"pipeline_name": pl.String,
"gestartet_um": pl.Datetime(time_zone=datetime.timezone.utc),
"beendet_um": pl.Datetime(time_zone=datetime.timezone.utc),
"dauer_sek": pl.Float64,
"status_code": pl.Int16,
}
@dc.dataclass(slots=True, kw_only=True)
class InternMetadataInsertEntry:
pipeline_name: str
gestartet_um: datetime.datetime
beendet_um: datetime.datetime
dauer_sek: float
status_code: int
intern_prod_order_t: Table = Table( intern_prod_order_t: Table = Table(
"Produktionsauftrag-Einzelsicht", "Produktionsauftrag-Einzelsicht",

View File

@@ -1,29 +1,66 @@
import time
import dopt_basics.datetime as dopt_dt
import oracledb import oracledb
from dopt_basics.result_pattern import STATUS_HANDLER from dopt_basics.result_pattern import STATUS_HANDLER
from wattanalyse import pipelines from wattanalyse import db, pipelines
from wattanalyse.constants import USER_CFG from wattanalyse.constants import USER_CFG
from wattanalyse.logging import logger_pipeline from wattanalyse.logging import logger_database, logger_pipeline
ORACLE_CONN = oracledb.connect( ORACLE_CONN = oracledb.connect(
user=USER_CFG.Datenbank.NUTZER, user=USER_CFG.Datenbank.Nutzer,
password=USER_CFG.Datenbank.PASSWORT, password=USER_CFG.Datenbank.Passwort,
host=USER_CFG.Datenbank.HOST, host=USER_CFG.Datenbank.Host,
port=USER_CFG.Datenbank.PORT, port=USER_CFG.Datenbank.Port,
service_name=USER_CFG.Datenbank.SERVICE_NAME, service_name=USER_CFG.Datenbank.Service_Name,
) )
def pipeline_KPI_calculation() -> None: def pipeline_KPI_calculation() -> None:
logger_pipeline.info("Start pipeline >KPI_calculation<") logger_pipeline.info("Start pipeline >KPI_calculation<")
res = pipelines.KPI_calculation(ORACLE_CONN)
start = dopt_dt.current_time_tz()
t1 = time.perf_counter_ns()
res_pipe = pipelines.KPI_calculation(ORACLE_CONN)
t2 = time.perf_counter_ns()
if res_pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipeline.error(
(
"[PIPELINE: KPI Calculation] An error occurred during the "
"procedure --- Status:\n%s"
),
res_pipe.status,
stack_info=True,
)
dur_sek = (t2 - t1) / 1e9
dur = dopt_dt.timedelta_from_val(dur_sek, dopt_dt.TimeUnitsTimedelta.SECONDS)
stop = start + dur
metadata = db.InternMetadataInsertEntry(
pipeline_name="test",
gestartet_um=start,
beendet_um=stop,
dauer_sek=dur_sek,
status_code=res_pipe.status.code,
)
res = pipelines.write_metadata(metadata)
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
logger_pipeline.error( logger_database.error(
"An error occurred during the procedure --- Status:\n%s", (
"[INTERNAL DB] An error occurred while writing the metadata to the internal "
"database --- Status:\n%s"
),
res.status, res.status,
stack_info=True, stack_info=True,
) )
return
logger_pipeline.info("Pipeline >KPI_calculation< ended successfully") logger_pipeline.info("Pipeline >KPI_calculation< ended successfully")
if __name__ == "__main__":
pipeline_KPI_calculation()

View File

@@ -760,3 +760,11 @@ def KPI_calculation(
logger.info("Successfully saved KPI tables to external database") logger.info("Successfully saved KPI tables to external database")
return STATUS_HANDLER.SUCCESS return STATUS_HANDLER.SUCCESS
@wrap_result(code_on_error=200)
def write_metadata(metadata: db.InternMetadataInsertEntry) -> None:
stmt = sql.insert(db.intern_metadata_t)
with db.ENGINE_INTERNAL.begin() as conn:
conn.execute(stmt, dc.asdict(metadata))