add statistics logging to sales forecast pipeline

This commit is contained in:
Florian Förster 2025-03-27 08:26:35 +01:00
parent 4828b84d79
commit df09732c56
3 changed files with 111 additions and 25 deletions

View File

@ -1,9 +1,10 @@
from __future__ import annotations from __future__ import annotations
import datetime import datetime
import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, cast from typing import TYPE_CHECKING, Final, TypeAlias, cast
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@ -34,12 +35,15 @@ from delta_barth.types import (
DualDict, DualDict,
ParamSearchXGBRegressor, ParamSearchXGBRegressor,
PipeResult, PipeResult,
SalesForecastStatistics,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.api.common import Session from delta_barth.api.common import Session
from delta_barth.types import Status from delta_barth.types import Status
ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics]
def _parse_api_resp_to_df( def _parse_api_resp_to_df(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
@ -110,7 +114,7 @@ def _preprocess_sales(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
target_features: Set[str], target_features: Set[str],
) -> PipeResult[SalesPrognosisResultsExport]: ) -> ForecastPipe:
"""n = 1 """n = 1
Parameters Parameters
@ -127,7 +131,7 @@ def _preprocess_sales(
PipeResult PipeResult
_description_ _description_
""" """
pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS) pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_api_resp_to_df_wrapped(resp) res = _parse_api_resp_to_df_wrapped(resp)
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
@ -149,10 +153,10 @@ def _preprocess_sales(
def _process_sales( def _process_sales(
pipe: PipeResult[SalesPrognosisResultsExport], pipe: ForecastPipe,
min_num_data_points: int, min_num_data_points: int,
base_num_data_points_months: int, base_num_data_points_months: int,
) -> PipeResult[SalesPrognosisResultsExport]: ) -> ForecastPipe:
"""n = 1 """n = 1
Input-Data: Input-Data:
fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"] fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"]
@ -182,9 +186,13 @@ def _process_sales(
df_firma = data[(data["betrag"] > 0)] df_firma = data[(data["betrag"] > 0)]
df_cust = df_firma.copy() df_cust = df_firma.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust)
if len(df_cust) < min_num_data_points: if len_ds < min_num_data_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
@ -216,8 +224,8 @@ def _process_sales(
} }
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float = float("inf") best_score_mae: float | None = float("inf")
best_score_r2: float = float("inf") best_score_r2: float | None = None
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None forecast: pd.DataFrame | None = None
@ -252,7 +260,6 @@ def _process_sales(
y_pred = rand.best_estimator_.predict(X_test) # type: ignore y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1: if len(np.unique(y_pred)) != 1:
# pp(y_pred)
error = cast(float, mean_absolute_error(y_test, y_pred)) error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae: if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
@ -263,31 +270,44 @@ def _process_sales(
forecast = test.copy() forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred forecast.loc[:, "vorhersage"] = y_pred
# pp(best_params)
# pp(best_score_mae)
# pp(best_score_r2)
# pp(best_start_year)
if forecast is not None: if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True) forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
# TODO log metrics
if too_few_month_points: if too_few_month_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS) status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
elif best_params is None: elif best_params is None:
pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST) status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned" assert forecast is not None, "forecast is None, but was attempted to be returned"
pipe.success(forecast, STATUS_HANDLER.SUCCESS) status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status)
stats = SalesForecastStatistics(
status.code,
status.description,
len_ds,
score_mae=best_score_mae,
score_r2=best_score_r2,
best_start_year=best_start_year,
XGB_params=best_params,
)
pipe.stats(stats)
return pipe return pipe
def _postprocess_sales( def _postprocess_sales(
pipe: PipeResult[SalesPrognosisResultsExport], pipe: ForecastPipe,
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
) -> PipeResult[SalesPrognosisResultsExport]: ) -> ForecastPipe:
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
# convert features back to original naming # convert features back to original naming
@ -393,7 +413,7 @@ def pipeline_sales_dummy(
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
assert data_pth.exists(), "sales forecast dummy data not existent" assert data_pth.exists(), "sales forecast dummy data not existent"
data = pd.read_pickle(data_pth) data = pd.read_pickle(data_pth)
pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS) pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_df_to_results_wrapped(data) res = _parse_df_to_results_wrapped(data)
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:

View File

@ -14,6 +14,7 @@ __all__ = ["DualDict"]
# ** Pipeline state management # ** Pipeline state management
StatusDescription: t.TypeAlias = tuple[str, int, str] StatusDescription: t.TypeAlias = tuple[str, int, str]
R = t.TypeVar("R", bound="ExportResponse") R = t.TypeVar("R", bound="ExportResponse")
S = t.TypeVar("S", bound="Statistics")
class IError(t.Protocol): class IError(t.Protocol):
@ -47,10 +48,11 @@ class DataPipeStates:
@dataclass(slots=True) @dataclass(slots=True)
class PipeResult(t.Generic[R]): class PipeResult(t.Generic[R, S]):
data: pd.DataFrame | None data: pd.DataFrame | None
status: Status status: Status
results: R | None = None results: R | None = None
statistics: S | None = None
def success( def success(
self, self,
@ -77,6 +79,12 @@ class PipeResult(t.Generic[R]):
self.status = response.status self.status = response.status
self.results = response self.results = response
def stats(
self,
statistics: S,
) -> None:
self.statistics = statistics
JsonExportResponse = t.NewType("JsonExportResponse", str) JsonExportResponse = t.NewType("JsonExportResponse", str)
JsonResponse = t.NewType("JsonResponse", str) JsonResponse = t.NewType("JsonResponse", str)
@ -121,6 +129,11 @@ HttpContentHeaders = t.TypedDict(
) )
# ** statistics
class Statistics:
pass
# ** forecasts # ** forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
@ -140,6 +153,17 @@ class ParamSearchXGBRegressor(t.TypedDict):
early_stopping_rounds: Sequence[int] early_stopping_rounds: Sequence[int]
@dataclass(slots=True, eq=False)
class SalesForecastStatistics(Statistics):
status_code: int
status_dscr: str
length_dataset: int
score_mae: float | None = None
score_r2: float | None = None
best_start_year: int | None = None
XGB_params: BestParametersXGBRegressor | None = None
class BestParametersXGBRegressor(t.TypedDict): class BestParametersXGBRegressor(t.TypedDict):
n_estimators: int n_estimators: int
learning_rate: float learning_rate: float

View File

@ -1,4 +1,3 @@
import importlib
from datetime import datetime as Datetime from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
@ -7,7 +6,6 @@ import pandas as pd
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
import delta_barth.analysis.forecast
from delta_barth.analysis import forecast as fc from delta_barth.analysis import forecast as fc
from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
@ -160,6 +158,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
@pytest.mark.new
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -172,8 +171,17 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None assert pipe.data is not None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.SUCCESS.code
assert pipe.statistics.status_dscr == STATUS_HANDLER.SUCCESS.description
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is not None
assert pipe.statistics.score_r2 is not None
assert pipe.statistics.best_start_year is not None
assert pipe.statistics.XGB_params is not None
@pytest.mark.new
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
@ -188,8 +196,19 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.code
assert (
pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.XGB_params is None
@pytest.mark.new
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -203,8 +222,20 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.XGB_params is None
@pytest.mark.new
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data["betrag"] = 10000 data["betrag"] = 10000
@ -237,6 +268,17 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.XGB_params is None
def test_postprocess_sales_Success( def test_postprocess_sales_Success(