From df09732c561ab95554ee8711587071c01edb5019 Mon Sep 17 00:00:00 2001 From: foefl Date: Thu, 27 Mar 2025 08:26:35 +0100 Subject: [PATCH] add statistics logging to sales forecast pipeline --- src/delta_barth/analysis/forecast.py | 64 ++++++++++++++++++---------- src/delta_barth/types.py | 26 ++++++++++- tests/analysis/test_forecast.py | 46 +++++++++++++++++++- 3 files changed, 111 insertions(+), 25 deletions(-) diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index d494c15..a3dbe56 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -1,9 +1,10 @@ from __future__ import annotations import datetime +import math from collections.abc import Mapping, Set from datetime import datetime as Datetime -from typing import TYPE_CHECKING, Final, cast +from typing import TYPE_CHECKING, Final, TypeAlias, cast import numpy as np import pandas as pd @@ -34,12 +35,15 @@ from delta_barth.types import ( DualDict, ParamSearchXGBRegressor, PipeResult, + SalesForecastStatistics, ) if TYPE_CHECKING: from delta_barth.api.common import Session from delta_barth.types import Status +ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics] + def _parse_api_resp_to_df( resp: SalesPrognosisResponse, @@ -110,7 +114,7 @@ def _preprocess_sales( resp: SalesPrognosisResponse, feature_map: Mapping[str, str], target_features: Set[str], -) -> PipeResult[SalesPrognosisResultsExport]: +) -> ForecastPipe: """n = 1 Parameters @@ -127,7 +131,7 @@ def _preprocess_sales( PipeResult _description_ """ - pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS) + pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) res = _parse_api_resp_to_df_wrapped(resp) if res.status != STATUS_HANDLER.SUCCESS: @@ -149,10 +153,10 @@ def _preprocess_sales( def _process_sales( - pipe: PipeResult[SalesPrognosisResultsExport], + pipe: ForecastPipe, min_num_data_points: int, base_num_data_points_months: int, -) -> PipeResult[SalesPrognosisResultsExport]: +) -> ForecastPipe: """n = 1 Input-Data: fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"] @@ -182,9 +186,13 @@ def _process_sales( df_firma = data[(data["betrag"] > 0)] df_cust = df_firma.copy() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() + len_ds = len(df_cust) - if len(df_cust) < min_num_data_points: - pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) + if len_ds < min_num_data_points: + status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS + pipe.fail(status) + stats = SalesForecastStatistics(status.code, status.description, len_ds) + pipe.stats(stats) return pipe df_cust["jahr"] = df_cust[DATE_FEAT].dt.year @@ -216,8 +224,8 @@ def _process_sales( } best_params: BestParametersXGBRegressor | None = None - best_score_mae: float = float("inf") - best_score_r2: float = float("inf") + best_score_mae: float | None = float("inf") + best_score_r2: float | None = None best_start_year: int | None = None too_few_month_points: bool = True forecast: pd.DataFrame | None = None @@ -252,7 +260,6 @@ def _process_sales( y_pred = rand.best_estimator_.predict(X_test) # type: ignore if len(np.unique(y_pred)) != 1: - # pp(y_pred) error = cast(float, mean_absolute_error(y_test, y_pred)) if error < best_score_mae: best_params = cast(BestParametersXGBRegressor, rand.best_params_) @@ -263,31 +270,44 @@ def _process_sales( forecast = test.copy() forecast.loc[:, "vorhersage"] = y_pred - # pp(best_params) - # pp(best_score_mae) - # pp(best_score_r2) - # pp(best_start_year) if forecast is not None: forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True) - - # TODO log metrics + best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None if too_few_month_points: - pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS) + status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS + pipe.fail(status) + stats = SalesForecastStatistics(status.code, status.description, len_ds) + pipe.stats(stats) return pipe elif best_params is None: - pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST) + status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST + pipe.fail(status) + stats = SalesForecastStatistics(status.code, status.description, len_ds) + pipe.stats(stats) return pipe assert forecast is not None, "forecast is None, but was attempted to be returned" - pipe.success(forecast, STATUS_HANDLER.SUCCESS) + status = STATUS_HANDLER.SUCCESS + pipe.success(forecast, status) + stats = SalesForecastStatistics( + status.code, + status.description, + len_ds, + score_mae=best_score_mae, + score_r2=best_score_r2, + best_start_year=best_start_year, + XGB_params=best_params, + ) + pipe.stats(stats) + return pipe def _postprocess_sales( - pipe: PipeResult[SalesPrognosisResultsExport], + pipe: ForecastPipe, feature_map: Mapping[str, str], -) -> PipeResult[SalesPrognosisResultsExport]: +) -> ForecastPipe: data = pipe.data assert data is not None, "processing not existing pipe result" # convert features back to original naming @@ -393,7 +413,7 @@ def pipeline_sales_dummy( data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" assert data_pth.exists(), "sales forecast dummy data not existent" data = pd.read_pickle(data_pth) - pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS) + pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) res = _parse_df_to_results_wrapped(data) if res.status != STATUS_HANDLER.SUCCESS: diff --git a/src/delta_barth/types.py b/src/delta_barth/types.py index c871d2e..43a0c13 100644 --- a/src/delta_barth/types.py +++ b/src/delta_barth/types.py @@ -14,6 +14,7 @@ __all__ = ["DualDict"] # ** Pipeline state management StatusDescription: t.TypeAlias = tuple[str, int, str] R = t.TypeVar("R", bound="ExportResponse") +S = t.TypeVar("S", bound="Statistics") class IError(t.Protocol): @@ -47,10 +48,11 @@ class DataPipeStates: @dataclass(slots=True) -class PipeResult(t.Generic[R]): +class PipeResult(t.Generic[R, S]): data: pd.DataFrame | None status: Status results: R | None = None + statistics: S | None = None def success( self, @@ -77,6 +79,12 @@ class PipeResult(t.Generic[R]): self.status = response.status self.results = response + def stats( + self, + statistics: S, + ) -> None: + self.statistics = statistics + JsonExportResponse = t.NewType("JsonExportResponse", str) JsonResponse = t.NewType("JsonResponse", str) @@ -121,6 +129,11 @@ HttpContentHeaders = t.TypedDict( ) +# ** statistics +class Statistics: + pass + + # ** forecasts @dataclass(slots=True) class CustomerDataSalesForecast: @@ -140,6 +153,17 @@ class ParamSearchXGBRegressor(t.TypedDict): early_stopping_rounds: Sequence[int] +@dataclass(slots=True, eq=False) +class SalesForecastStatistics(Statistics): + status_code: int + status_dscr: str + length_dataset: int + score_mae: float | None = None + score_r2: float | None = None + best_start_year: int | None = None + XGB_params: BestParametersXGBRegressor | None = None + + class BestParametersXGBRegressor(t.TypedDict): n_estimators: int learning_rate: float diff --git a/tests/analysis/test_forecast.py b/tests/analysis/test_forecast.py index da95e52..5548578 100644 --- a/tests/analysis/test_forecast.py +++ b/tests/analysis/test_forecast.py @@ -1,4 +1,3 @@ -import importlib from datetime import datetime as Datetime from unittest.mock import patch @@ -7,7 +6,6 @@ import pandas as pd import pytest from pydantic import ValidationError -import delta_barth.analysis.forecast from delta_barth.analysis import forecast as fc from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry from delta_barth.errors import STATUS_HANDLER @@ -160,6 +158,7 @@ def test_preprocess_sales_FailOnTargetFeature( assert pipe.results is None +@pytest.mark.new def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) @@ -172,8 +171,17 @@ def test_process_sales_Success(sales_data_real_preproc): assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None assert pipe.results is None + assert pipe.statistics is not None + assert pipe.statistics.status_code == STATUS_HANDLER.SUCCESS.code + assert pipe.statistics.status_dscr == STATUS_HANDLER.SUCCESS.description + assert pipe.statistics.length_dataset is not None + assert pipe.statistics.score_mae is not None + assert pipe.statistics.score_r2 is not None + assert pipe.statistics.best_start_year is not None + assert pipe.statistics.XGB_params is not None +@pytest.mark.new def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] @@ -188,8 +196,19 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc): assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.data is None assert pipe.results is None + assert pipe.statistics is not None + assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.code + assert ( + pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.description + ) + assert pipe.statistics.length_dataset is not None + assert pipe.statistics.score_mae is None + assert pipe.statistics.score_r2 is None + assert pipe.statistics.best_start_year is None + assert pipe.statistics.XGB_params is None +@pytest.mark.new def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) @@ -203,8 +222,20 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS assert pipe.data is None assert pipe.results is None + assert pipe.statistics is not None + assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.code + assert ( + pipe.statistics.status_dscr + == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.description + ) + assert pipe.statistics.length_dataset is not None + assert pipe.statistics.score_mae is None + assert pipe.statistics.score_r2 is None + assert pipe.statistics.best_start_year is None + assert pipe.statistics.XGB_params is None +@pytest.mark.new def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): data = sales_data_real_preproc.copy() data["betrag"] = 10000 @@ -237,6 +268,17 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST assert pipe.data is None assert pipe.results is None + assert pipe.statistics is not None + assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.code + assert ( + pipe.statistics.status_dscr + == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.description + ) + assert pipe.statistics.length_dataset is not None + assert pipe.statistics.score_mae is None + assert pipe.statistics.score_r2 is None + assert pipe.statistics.best_start_year is None + assert pipe.statistics.XGB_params is None def test_postprocess_sales_Success(