diff --git a/pyproject.toml b/pyproject.toml index 1bae764..908fdd2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "delta-barth" -version = "0.5.4" +version = "0.5.5" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, @@ -73,7 +73,7 @@ directory = "reports/coverage" [tool.bumpversion] -current_version = "0.5.4" +current_version = "0.5.5" parse = """(?x) (?P0|[1-9]\\d*)\\. (?P0|[1-9]\\d*)\\. diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index d2a221c..aa8e59c 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -26,6 +26,7 @@ from delta_barth.api.requests import ( ) from delta_barth.constants import ( COL_MAP_SALES_PROGNOSIS, + DEFAULT_DB_ERR_CODE, DUMMY_DATA_PATH, FEATURES_SALES_PROGNOSIS, SALES_BASE_NUM_DATAPOINTS_MONTHS, @@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped( return _parse_df_to_results(data) -@wrap_result() +@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE) def _write_sales_forecast_stats_wrapped( stats: SalesForecastStatistics, ) -> None: diff --git a/src/delta_barth/constants.py b/src/delta_barth/constants.py index 412007a..e00964e 100644 --- a/src/delta_barth/constants.py +++ b/src/delta_barth/constants.py @@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True # ** error handling DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 +DEFAULT_DB_ERR_CODE: Final[int] = 150 DEFAULT_API_ERR_CODE: Final[int] = 400 diff --git a/src/delta_barth/databases.py b/src/delta_barth/databases.py index 6756040..95ff075 100644 --- a/src/delta_barth/databases.py +++ b/src/delta_barth/databases.py @@ -22,8 +22,8 @@ perf_meas = sql.Table( "performance_measurement", metadata, sql.Column("id", sql.Integer, primary_key=True), - sql.Column("execution_duration", sql.Float), sql.Column("pipeline_name", sql.String(length=30)), + sql.Column("execution_duration", sql.Float), ) # ** ---- forecasts sf_stats = sql.Table( diff --git a/src/delta_barth/pipelines.py b/src/delta_barth/pipelines.py index 70733ac..a63597c 100644 --- a/src/delta_barth/pipelines.py +++ b/src/delta_barth/pipelines.py @@ -1,24 +1,83 @@ """collection of configured data pipelines, intended to be invoked from C#""" +import time from datetime import datetime as Datetime +from typing import Final +import sqlalchemy as sql + +from delta_barth import databases as db from delta_barth.analysis import forecast +from delta_barth.constants import DEFAULT_DB_ERR_CODE +from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.logging import logger_pipelines as logger from delta_barth.management import SESSION -from delta_barth.types import JsonExportResponse +from delta_barth.types import JsonExportResponse, PipelineMetrics + + +def _write_performance_metrics( + pipeline_name: str, + time_start: int, + time_end: int, +) -> PipelineMetrics: + if time_end < time_start: + raise ValueError("Ending time smaller than starting time") + execution_duration = (time_end - time_start) / 1e9 + metrics = PipelineMetrics( + pipeline_name=pipeline_name, + execution_duration=execution_duration, + ) + + with SESSION.db_engine.begin() as con: + con.execute(sql.insert(db.perf_meas).values(**metrics)) + + return metrics + + +@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE) +def _write_performance_metrics_wrapped( + pipeline_name: str, + time_start: int, + time_end: int, +) -> PipelineMetrics: + return _write_performance_metrics(pipeline_name, time_start, time_end) def pipeline_sales_forecast( company_id: int | None, start_date: Datetime | None, ) -> JsonExportResponse: + PIPELINE_NAME: Final[str] = "sales_forecast" logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...") + t_start = time.perf_counter_ns() result = forecast.pipeline_sales_forecast( SESSION, company_id=company_id, start_date=start_date ) export = JsonExportResponse(result.model_dump_json()) - + t_end = time.perf_counter_ns() logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful") + logger.info("[EXT-CALL PIPELINES] Writing performance metrics...") + res = _write_performance_metrics_wrapped( + pipeline_name=PIPELINE_NAME, + time_start=t_start, + time_end=t_end, + ) + if res.status != STATUS_HANDLER.SUCCESS: + logger.error( + ( + "[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing " + "pipeline metrics to database: %s" + ), + PIPELINE_NAME, + res.status, + ) + else: + metrics = res.unwrap() + logger.info( + "[METRICS] Pipeline: >%s< - Execution time: %.6f", + PIPELINE_NAME, + metrics["execution_duration"], + ) return export @@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy( company_id: int | None, start_date: Datetime | None, ) -> JsonExportResponse: + PIPELINE_NAME: Final[str] = "sales_forecast_dummy" logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...") + t_start = time.perf_counter_ns() result = forecast.pipeline_sales_dummy( SESSION, company_id=company_id, start_date=start_date, ) export = JsonExportResponse(result.model_dump_json()) - + t_end = time.perf_counter_ns() logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful") + logger.info("[EXT-CALL PIPELINES] Writing performance metrics...") + res = _write_performance_metrics_wrapped( + pipeline_name=PIPELINE_NAME, + time_start=t_start, + time_end=t_end, + ) + if res.status != STATUS_HANDLER.SUCCESS: + logger.error( + ( + "[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing " + "pipeline metrics to database: %s" + ), + PIPELINE_NAME, + res.status, + ) + else: + metrics = res.unwrap() + logger.info( + "[METRICS] Pipeline: >%s< - Execution time: %.6f", + PIPELINE_NAME, + metrics["execution_duration"], + ) return export diff --git a/src/delta_barth/types.py b/src/delta_barth/types.py index 0e3e11d..636cbb9 100644 --- a/src/delta_barth/types.py +++ b/src/delta_barth/types.py @@ -141,7 +141,13 @@ class Statistics: pass -# ** forecasts +# ** ---- performance +class PipelineMetrics(t.TypedDict): + pipeline_name: str + execution_duration: float + + +# ** ---- forecasts @dataclass(slots=True) class CustomerDataSalesForecast: order: list[int] = field(default_factory=list) diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index 6451a53..b1387b7 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -3,20 +3,44 @@ import json from unittest.mock import patch import pytest +import sqlalchemy as sql import delta_barth.pipelines +from delta_barth import databases as db from delta_barth import pipelines as pl from delta_barth.errors import STATUS_HANDLER +def test_write_performance_metrics(session): + pipe_name = "test_pipe" + t_start = 20_000_000_000 + t_end = 30_000_000_000 + + with patch("delta_barth.pipelines.SESSION", session): + metrics = pl._write_performance_metrics( + pipeline_name=pipe_name, + time_start=t_start, + time_end=t_end, + ) + assert metrics["pipeline_name"] == pipe_name + assert metrics["execution_duration"] == 10 + + with session.db_engine.begin() as con: + ret = con.execute(sql.select(db.perf_meas)) + + metrics = ret.all()[-1] + assert metrics.pipeline_name == pipe_name + assert metrics.execution_duration == 10 + + @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) -def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): +def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session): with patch( "delta_barth.analysis.forecast.get_sales_prognosis_data", ) as mock: mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS) - importlib.reload(delta_barth.pipelines) - json_export = pl.pipeline_sales_forecast(None, None) + with patch("delta_barth.pipelines.SESSION", session): + json_export = pl.pipeline_sales_forecast(None, None) assert isinstance(json_export, str) parsed_resp = json.loads(json_export) @@ -27,9 +51,18 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): assert "code" in parsed_resp["status"] assert parsed_resp["status"]["code"] == 0 + with session.db_engine.begin() as con: + ret = con.execute(sql.select(db.perf_meas)) -def test_sales_prognosis_pipeline_dummy(): - json_export = pl.pipeline_sales_forecast_dummy(None, None) + metrics = ret.all()[-1] + assert metrics.pipeline_name == "sales_forecast" + assert metrics.execution_duration > 0 + + +@pytest.mark.new +def test_sales_prognosis_pipeline_dummy(session): + with patch("delta_barth.pipelines.SESSION", session): + json_export = pl.pipeline_sales_forecast_dummy(None, None) assert isinstance(json_export, str) parsed_resp = json.loads(json_export) @@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy(): assert entry["vorhersage"] == pytest.approx(47261.058594) assert "code" in parsed_resp["status"] assert parsed_resp["status"]["code"] == 0 + + with session.db_engine.begin() as con: + ret = con.execute(sql.select(db.perf_meas)) + + metrics = ret.all()[-1] + assert metrics.pipeline_name == "sales_forecast_dummy" + assert metrics.execution_duration > 0