add new reworked sales forecast pipeline #8

Merged
foefl merged 3 commits from rework_forecast_pipeline into main 2025-03-26 12:31:00 +00:00
6 changed files with 189 additions and 81 deletions
Showing only changes of commit 6deda74a80 - Show all commits

View File

@ -1,11 +1,15 @@
from __future__ import annotations
import datetime
from collections.abc import Mapping, Set
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final
from typing import TYPE_CHECKING, Final, cast
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
import scipy.stats
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
from delta_barth.analysis import parse
@ -20,12 +24,15 @@ from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
MIN_NUMBER_DATAPOINTS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.types import (
BestParametersXGBRegressor,
DualDict,
ParamSearchXGBRegressor,
PipeResult,
)
@ -58,14 +65,6 @@ def _parse_api_resp_to_df(
return pd.DataFrame(data)
# def _parse_df_to_api_resp(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# df_formatted = data.to_dict(orient="records")
# return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
def _parse_df_to_results(
data: pd.DataFrame,
) -> SalesPrognosisResults:
@ -81,13 +80,6 @@ def _parse_api_resp_to_df_wrapped(
return _parse_api_resp_to_df(resp)
# @wrap_result()
# def _parse_df_to_api_resp_wrapped(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# return _parse_df_to_api_resp(data)
@wrap_result()
def _parse_df_to_results_wrapped(
data: pd.DataFrame,
@ -158,7 +150,8 @@ def _preprocess_sales(
def _process_sales(
pipe: PipeResult[SalesPrognosisResultsExport],
min_num_data_points: int = 100,
min_num_data_points: int,
base_num_data_points_months: int,
) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1
Input-Data:
@ -183,22 +176,6 @@ def _process_sales(
# filter data
data = pipe.data
assert data is not None, "processing not existing pipe result"
data = data.copy()
# df_firma = data[
# (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0)
# ]
# for transaction in df_firma["vorgang_refid"].unique():
# cust_data.order.append(transaction)
# cust_data.date.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0]
# )
# cust_data.sales.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum()
# )
# df_cust = pd.DataFrame(dc.asdict(cust_data))
# df_cust = df_cust.sort_values(by="date").reset_index()
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
@ -206,13 +183,10 @@ def _process_sales(
df_cust = df_firma.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
# check data availability
# TODO rework criteria
if len(df_cust) < min_num_data_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
return pipe
# Entwicklung der Umsätze: definierte Zeiträume Monat
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
@ -223,31 +197,90 @@ def _process_sales(
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
monthly_sum = monthly_sum.set_index(DATE_FEAT)
train = monthly_sum.iloc[:-5].copy()
test = monthly_sum.iloc[-5:].copy()
features = ["jahr", "monat"]
target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min())
X_train, y_train = train[features], train[target]
X_test, y_test = test[features], test[target]
# Randomized Search
kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
"gamma": [1],
"subsample": [0.8],
"colsample_bytree": [0.8],
"early_stopping_rounds": [20, 50],
}
reg = XGBRegressor(
base_score=0.5,
booster="gbtree",
n_estimators=1000,
early_stopping_rounds=50,
objective="reg:squarederror",
max_depth=3,
learning_rate=0.01,
)
reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100)
best_params: BestParametersXGBRegressor | None = None
best_score_mae: float = float("inf")
best_score_r2: float = float("inf")
best_start_year: int | None = None
too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
test.loc[:, "vorhersage"] = reg.predict(X_test)
test = test.drop(SALES_FEAT, axis=1)
test = test.reset_index(drop=True)
for start_year in range(current_year - 4, first_year - 1, -1):
train = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
pipe.success(test, STATUS_HANDLER.SUCCESS)
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
too_few_month_points = False
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=-1,
n_iter=100,
verbose=0,
)
rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
# pp(y_pred)
error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year
print("executed")
forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred
# pp(best_params)
# pp(best_score_mae)
# pp(best_score_r2)
# pp(best_start_year)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
# TODO log metrics
if too_few_month_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS)
return pipe
elif best_params is None:
pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST)
return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned"
pipe.success(forecast, STATUS_HANDLER.SUCCESS)
return pipe
@ -321,7 +354,8 @@ def pipeline_sales(
pipe = _process_sales(
pipe,
min_num_data_points=MIN_NUMBER_DATAPOINTS,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
)
if pipe.status != STATUS_HANDLER.SUCCESS:
logger.error(

View File

@ -54,7 +54,8 @@ class UApiError(Exception):
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
("BAD_QUALITY", 2, "Prognosequalität des Modells unzureichend"),
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
)
@ -239,8 +240,9 @@ def wrap_result(
result=NotSet(), exception=err, code_on_error=code_on_error
)
logger.error(
"An exception in routine %s occurred, stack trace:",
"An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__,
str(err),
stack_info=True,
)

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import enum
import typing as t
from collections.abc import Sequence
from dataclasses import dataclass, field
import pandas as pd
@ -41,7 +42,8 @@ class ExportResponse(BaseModel):
class DataPipeStates:
SUCCESS: Status
TOO_FEW_POINTS: Status
BAD_QUALITY: Status
TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status
@dataclass(slots=True)
@ -125,3 +127,25 @@ class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list)
date: list[pd.Timestamp] = field(default_factory=list)
sales: list[float] = field(default_factory=list)
class ParamSearchXGBRegressor(t.TypedDict):
n_estimators: t.Any
learning_rate: Sequence[float]
max_depth: range
min_child_weight: range
gamma: Sequence[float]
subsample: Sequence[float]
colsample_bytree: Sequence[float]
early_stopping_rounds: Sequence[int]
class BestParametersXGBRegressor(t.TypedDict):
n_estimators: int
learning_rate: float
max_depth: int
min_child_weight: int
gamma: float
subsample: float
colsample_bytree: float
early_stopping_rounds: int

View File

@ -2,6 +2,7 @@ import importlib
from datetime import datetime as Datetime
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from pydantic import ValidationError
@ -114,16 +115,6 @@ def test_parse_api_resp_to_df_empty():
assert all(col in features for col in df.columns)
# def test_parse_df_to_api_resp_ValidData(valid_df):
# ret = fc._parse_df_to_api_resp(valid_df)
# assert len(ret.daten) > 0
# def test_parse_df_to_api_resp_InvalidData(invalid_df):
# with pytest.raises(ValidationError):
# _ = fc._parse_df_to_api_resp(invalid_df)
def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0
@ -171,9 +162,12 @@ def test_preprocess_sales_FailOnTargetFeature(
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=1,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
@ -183,9 +177,12 @@ def test_process_sales_Success(sales_data_real_preproc):
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
@ -193,6 +190,55 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.results is None
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None
assert pipe.results is None
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
class PatchSearchCV:
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
self.best_estimator_ = Predictor()
def fit(*args, **kwargs):
pass
with patch(
"delta_barth.analysis.forecast.RandomizedSearchCV",
new=PatchSearchCV,
):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None
assert pipe.results is None
def test_postprocess_sales_Success(
valid_results,
):
@ -234,17 +280,18 @@ def test_export_on_fail():
assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): # pragma: no cover
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch(
"delta_barth.api.requests.get_sales_prognosis_data",
new=mock_request,
):
importlib.reload(delta_barth.analysis.forecast)
"delta_barth.analysis.forecast.get_sales_prognosis_data",
# new=mock_request,
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0

View File

@ -92,7 +92,7 @@ def test_status_handler_raise_for_status_Success(status_hdlr):
def test_status_handler_raise_for_status_PredefinedErrors(status_hdlr):
# data related errors (predefined)
err_status = status_hdlr.pipe_states.BAD_QUALITY
err_status = status_hdlr.pipe_states.NO_RELIABLE_FORECAST
err_descr = err_status.description
with pytest.raises(errors.UDataProcessingError):
try:

View File

@ -9,6 +9,7 @@ from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",