prepare metrics writing process

This commit is contained in:
Florian Förster 2025-04-03 16:05:46 +02:00
parent 5d78fc9e02
commit 6a418118d2
7 changed files with 102 additions and 8 deletions

View File

@ -1,6 +1,6 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.4" version = "0.5.5dev1"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.4" current_version = "0.5.5dev1"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@ -26,6 +26,7 @@ from delta_barth.api.requests import (
) )
from delta_barth.constants import ( from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS, SALES_BASE_NUM_DATAPOINTS_MONTHS,
@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data) return _parse_df_to_results(data)
@wrap_result() @wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped( def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics, stats: SalesForecastStatistics,
) -> None: ) -> None:

View File

@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400 DEFAULT_API_ERR_CODE: Final[int] = 400

View File

@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement", "performance_measurement",
metadata, metadata,
sql.Column("id", sql.Integer, primary_key=True), sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)), sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
) )
# ** ---- forecasts # ** ---- forecasts
sf_stats = sql.Table( sf_stats = sql.Table(

View File

@ -1,24 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#""" """collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 10e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...") logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast( result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date SESSION, company_id=company_id, start_date=start_date
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful") logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export
@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...") logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy( result = forecast.pipeline_sales_dummy(
SESSION, SESSION,
company_id=company_id, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful") logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export

View File

@ -141,7 +141,13 @@ class Statistics:
pass pass
# ** forecasts # ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)

View File

@ -9,6 +9,9 @@ from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session): ...
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
with patch( with patch(