add performance metrics logging to database #16
@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "delta-barth"
|
name = "delta-barth"
|
||||||
version = "0.5.4"
|
version = "0.5.5"
|
||||||
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
||||||
@ -73,7 +73,7 @@ directory = "reports/coverage"
|
|||||||
|
|
||||||
|
|
||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.5.4"
|
current_version = "0.5.5"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
@ -26,6 +26,7 @@ from delta_barth.api.requests import (
|
|||||||
)
|
)
|
||||||
from delta_barth.constants import (
|
from delta_barth.constants import (
|
||||||
COL_MAP_SALES_PROGNOSIS,
|
COL_MAP_SALES_PROGNOSIS,
|
||||||
|
DEFAULT_DB_ERR_CODE,
|
||||||
DUMMY_DATA_PATH,
|
DUMMY_DATA_PATH,
|
||||||
FEATURES_SALES_PROGNOSIS,
|
FEATURES_SALES_PROGNOSIS,
|
||||||
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
||||||
@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped(
|
|||||||
return _parse_df_to_results(data)
|
return _parse_df_to_results(data)
|
||||||
|
|
||||||
|
|
||||||
@wrap_result()
|
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||||
def _write_sales_forecast_stats_wrapped(
|
def _write_sales_forecast_stats_wrapped(
|
||||||
stats: SalesForecastStatistics,
|
stats: SalesForecastStatistics,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
|
|||||||
|
|
||||||
# ** error handling
|
# ** error handling
|
||||||
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
||||||
|
DEFAULT_DB_ERR_CODE: Final[int] = 150
|
||||||
DEFAULT_API_ERR_CODE: Final[int] = 400
|
DEFAULT_API_ERR_CODE: Final[int] = 400
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -22,8 +22,8 @@ perf_meas = sql.Table(
|
|||||||
"performance_measurement",
|
"performance_measurement",
|
||||||
metadata,
|
metadata,
|
||||||
sql.Column("id", sql.Integer, primary_key=True),
|
sql.Column("id", sql.Integer, primary_key=True),
|
||||||
sql.Column("execution_duration", sql.Float),
|
|
||||||
sql.Column("pipeline_name", sql.String(length=30)),
|
sql.Column("pipeline_name", sql.String(length=30)),
|
||||||
|
sql.Column("execution_duration", sql.Float),
|
||||||
)
|
)
|
||||||
# ** ---- forecasts
|
# ** ---- forecasts
|
||||||
sf_stats = sql.Table(
|
sf_stats = sql.Table(
|
||||||
|
|||||||
@ -1,24 +1,83 @@
|
|||||||
"""collection of configured data pipelines, intended to be invoked from C#"""
|
"""collection of configured data pipelines, intended to be invoked from C#"""
|
||||||
|
|
||||||
|
import time
|
||||||
from datetime import datetime as Datetime
|
from datetime import datetime as Datetime
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
from delta_barth import databases as db
|
||||||
from delta_barth.analysis import forecast
|
from delta_barth.analysis import forecast
|
||||||
|
from delta_barth.constants import DEFAULT_DB_ERR_CODE
|
||||||
|
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
||||||
from delta_barth.logging import logger_pipelines as logger
|
from delta_barth.logging import logger_pipelines as logger
|
||||||
from delta_barth.management import SESSION
|
from delta_barth.management import SESSION
|
||||||
from delta_barth.types import JsonExportResponse
|
from delta_barth.types import JsonExportResponse, PipelineMetrics
|
||||||
|
|
||||||
|
|
||||||
|
def _write_performance_metrics(
|
||||||
|
pipeline_name: str,
|
||||||
|
time_start: int,
|
||||||
|
time_end: int,
|
||||||
|
) -> PipelineMetrics:
|
||||||
|
if time_end < time_start:
|
||||||
|
raise ValueError("Ending time smaller than starting time")
|
||||||
|
execution_duration = (time_end - time_start) / 1e9
|
||||||
|
metrics = PipelineMetrics(
|
||||||
|
pipeline_name=pipeline_name,
|
||||||
|
execution_duration=execution_duration,
|
||||||
|
)
|
||||||
|
|
||||||
|
with SESSION.db_engine.begin() as con:
|
||||||
|
con.execute(sql.insert(db.perf_meas).values(**metrics))
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
|
||||||
|
|
||||||
|
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||||
|
def _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name: str,
|
||||||
|
time_start: int,
|
||||||
|
time_end: int,
|
||||||
|
) -> PipelineMetrics:
|
||||||
|
return _write_performance_metrics(pipeline_name, time_start, time_end)
|
||||||
|
|
||||||
|
|
||||||
def pipeline_sales_forecast(
|
def pipeline_sales_forecast(
|
||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
PIPELINE_NAME: Final[str] = "sales_forecast"
|
||||||
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
|
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
|
||||||
|
t_start = time.perf_counter_ns()
|
||||||
result = forecast.pipeline_sales_forecast(
|
result = forecast.pipeline_sales_forecast(
|
||||||
SESSION, company_id=company_id, start_date=start_date
|
SESSION, company_id=company_id, start_date=start_date
|
||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
t_end = time.perf_counter_ns()
|
||||||
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
|
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||||
|
res = _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name=PIPELINE_NAME,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
if res.status != STATUS_HANDLER.SUCCESS:
|
||||||
|
logger.error(
|
||||||
|
(
|
||||||
|
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||||
|
"pipeline metrics to database: %s"
|
||||||
|
),
|
||||||
|
PIPELINE_NAME,
|
||||||
|
res.status,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
metrics = res.unwrap()
|
||||||
|
logger.info(
|
||||||
|
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||||
|
PIPELINE_NAME,
|
||||||
|
metrics["execution_duration"],
|
||||||
|
)
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|
||||||
@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
|
|||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
|
||||||
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
|
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
|
||||||
|
t_start = time.perf_counter_ns()
|
||||||
result = forecast.pipeline_sales_dummy(
|
result = forecast.pipeline_sales_dummy(
|
||||||
SESSION,
|
SESSION,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
start_date=start_date,
|
start_date=start_date,
|
||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
t_end = time.perf_counter_ns()
|
||||||
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
|
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||||
|
res = _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name=PIPELINE_NAME,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
if res.status != STATUS_HANDLER.SUCCESS:
|
||||||
|
logger.error(
|
||||||
|
(
|
||||||
|
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||||
|
"pipeline metrics to database: %s"
|
||||||
|
),
|
||||||
|
PIPELINE_NAME,
|
||||||
|
res.status,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
metrics = res.unwrap()
|
||||||
|
logger.info(
|
||||||
|
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||||
|
PIPELINE_NAME,
|
||||||
|
metrics["execution_duration"],
|
||||||
|
)
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|||||||
@ -141,7 +141,13 @@ class Statistics:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ** forecasts
|
# ** ---- performance
|
||||||
|
class PipelineMetrics(t.TypedDict):
|
||||||
|
pipeline_name: str
|
||||||
|
execution_duration: float
|
||||||
|
|
||||||
|
|
||||||
|
# ** ---- forecasts
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class CustomerDataSalesForecast:
|
class CustomerDataSalesForecast:
|
||||||
order: list[int] = field(default_factory=list)
|
order: list[int] = field(default_factory=list)
|
||||||
|
|||||||
@ -3,19 +3,43 @@ import json
|
|||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
import delta_barth.pipelines
|
import delta_barth.pipelines
|
||||||
|
from delta_barth import databases as db
|
||||||
from delta_barth import pipelines as pl
|
from delta_barth import pipelines as pl
|
||||||
from delta_barth.errors import STATUS_HANDLER
|
from delta_barth.errors import STATUS_HANDLER
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_performance_metrics(session):
|
||||||
|
pipe_name = "test_pipe"
|
||||||
|
t_start = 20_000_000_000
|
||||||
|
t_end = 30_000_000_000
|
||||||
|
|
||||||
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
|
metrics = pl._write_performance_metrics(
|
||||||
|
pipeline_name=pipe_name,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
assert metrics["pipeline_name"] == pipe_name
|
||||||
|
assert metrics["execution_duration"] == 10
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
|
metrics = ret.all()[-1]
|
||||||
|
assert metrics.pipeline_name == pipe_name
|
||||||
|
assert metrics.execution_duration == 10
|
||||||
|
|
||||||
|
|
||||||
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
||||||
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
|
||||||
with patch(
|
with patch(
|
||||||
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
||||||
) as mock:
|
) as mock:
|
||||||
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
||||||
importlib.reload(delta_barth.pipelines)
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
json_export = pl.pipeline_sales_forecast(None, None)
|
json_export = pl.pipeline_sales_forecast(None, None)
|
||||||
|
|
||||||
assert isinstance(json_export, str)
|
assert isinstance(json_export, str)
|
||||||
@ -27,8 +51,17 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
|||||||
assert "code" in parsed_resp["status"]
|
assert "code" in parsed_resp["status"]
|
||||||
assert parsed_resp["status"]["code"] == 0
|
assert parsed_resp["status"]["code"] == 0
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
def test_sales_prognosis_pipeline_dummy():
|
metrics = ret.all()[-1]
|
||||||
|
assert metrics.pipeline_name == "sales_forecast"
|
||||||
|
assert metrics.execution_duration > 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.new
|
||||||
|
def test_sales_prognosis_pipeline_dummy(session):
|
||||||
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
json_export = pl.pipeline_sales_forecast_dummy(None, None)
|
json_export = pl.pipeline_sales_forecast_dummy(None, None)
|
||||||
|
|
||||||
assert isinstance(json_export, str)
|
assert isinstance(json_export, str)
|
||||||
@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy():
|
|||||||
assert entry["vorhersage"] == pytest.approx(47261.058594)
|
assert entry["vorhersage"] == pytest.approx(47261.058594)
|
||||||
assert "code" in parsed_resp["status"]
|
assert "code" in parsed_resp["status"]
|
||||||
assert parsed_resp["status"]["code"] == 0
|
assert parsed_resp["status"]["code"] == 0
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
|
metrics = ret.all()[-1]
|
||||||
|
assert metrics.pipeline_name == "sales_forecast_dummy"
|
||||||
|
assert metrics.execution_duration > 0
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user