add performance metrics logging to database #16

Merged
foefl merged 2 commits from performance_metrics into main 2025-04-04 11:40:11 +00:00
7 changed files with 144 additions and 13 deletions

View File

@ -1,6 +1,6 @@
[project]
name = "delta-barth"
version = "0.5.4"
version = "0.5.5"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.5.4"
current_version = "0.5.5"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@ -26,6 +26,7 @@ from delta_barth.api.requests import (
)
from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data)
@wrap_result()
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics,
) -> None:

View File

@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
# ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400

View File

@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement",
metadata,
sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
)
# ** ---- forecasts
sf_stats = sql.Table(

View File

@ -1,24 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse
from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast(
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export
@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy(
SESSION,
company_id=company_id,
start_date=start_date,
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export

View File

@ -141,7 +141,13 @@ class Statistics:
pass
# ** forecasts
# ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True)
class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list)

View File

@ -3,19 +3,43 @@ import json
from unittest.mock import patch
import pytest
import sqlalchemy as sql
import delta_barth.pipelines
from delta_barth import databases as db
from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session):
pipe_name = "test_pipe"
t_start = 20_000_000_000
t_end = 30_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
metrics = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
importlib.reload(delta_barth.pipelines)
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str)
@ -27,8 +51,17 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
def test_sales_prognosis_pipeline_dummy():
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast"
assert metrics.execution_duration > 0
@pytest.mark.new
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str)
@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy():
assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0