diff --git a/pdm.lock b/pdm.lock index de8ca95..ef7827f 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "lint", "nb", "tests"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:58003d8d013a90ffc3a883bfbb6d307718a03b4103c40ece3c56c5ef78fb27ad" +content_hash = "sha256:d51351adbafc599b97f8b3c9047ad0c7b8607d47cff5874121f546af04793ee2" [[metadata.targets]] requires_python = ">=3.11" @@ -225,7 +225,7 @@ files = [ [[package]] name = "bump-my-version" -version = "0.32.1" +version = "1.1.1" requires_python = ">=3.8" summary = "Version bump your Python project" groups = ["dev"] @@ -241,8 +241,8 @@ dependencies = [ "wcmatch>=8.5.1", ] files = [ - {file = "bump_my_version-0.32.1-py3-none-any.whl", hash = "sha256:76605d0f98d0627b4cff972b4fcd56ff5423357047d954421fe6b721304e5ceb"}, - {file = "bump_my_version-0.32.1.tar.gz", hash = "sha256:cb537096cba01a2832972902bfff9e0e0d6e8f8dc9fe31c742096a331262e2aa"}, + {file = "bump_my_version-1.1.1-py3-none-any.whl", hash = "sha256:6bd78e20421f6335c1a49d7e85a2f16ae8966897d0a2dd130a0e8b6b55954686"}, + {file = "bump_my_version-1.1.1.tar.gz", hash = "sha256:2f590e0eabe894196289c296c52170559c09876454514ae8fce5db75bd47289e"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 4c4fbd1..31bc993 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "delta-barth" -version = "0.3.4" +version = "0.4.0dev0" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, @@ -73,7 +73,7 @@ directory = "reports/coverage" [tool.bumpversion] -current_version = "0.1.1" +current_version = "0.4.0dev0" parse = """(?x) (?P0|[1-9]\\d*)\\. (?P0|[1-9]\\d*)\\. @@ -106,7 +106,7 @@ pre_commit_hooks = [] post_commit_hooks = [] [tool.bumpversion.parts.pre_l] -values = ["dev", "a", "b", "rc", "final"] +values = ["dev", "rc", "final"] optional_value = "final" [[tool.bumpversion.files]] @@ -143,7 +143,7 @@ lint = [ ] dev = [ "pdoc3>=0.11.5", - "bump-my-version>=0.32.1", + "bump-my-version>=1.1.1", "nox>=2025.2.9", ] nb = [ diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index e15cf71..d494c15 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -1,11 +1,15 @@ from __future__ import annotations +import datetime from collections.abc import Mapping, Set from datetime import datetime as Datetime -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING, Final, cast +import numpy as np import pandas as pd -from sklearn.metrics import mean_squared_error +import scipy.stats +from sklearn.metrics import mean_absolute_error, r2_score +from sklearn.model_selection import KFold, RandomizedSearchCV from xgboost import XGBRegressor from delta_barth.analysis import parse @@ -20,12 +24,15 @@ from delta_barth.constants import ( COL_MAP_SALES_PROGNOSIS, DUMMY_DATA_PATH, FEATURES_SALES_PROGNOSIS, - MIN_NUMBER_DATAPOINTS, + SALES_BASE_NUM_DATAPOINTS_MONTHS, + SALES_MIN_NUM_DATAPOINTS, ) from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.logging import logger_pipelines as logger from delta_barth.types import ( + BestParametersXGBRegressor, DualDict, + ParamSearchXGBRegressor, PipeResult, ) @@ -58,14 +65,6 @@ def _parse_api_resp_to_df( return pd.DataFrame(data) -# def _parse_df_to_api_resp( -# data: pd.DataFrame, -# ) -> SalesPrognosisResponse: -# df_formatted = data.to_dict(orient="records") - -# return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore - - def _parse_df_to_results( data: pd.DataFrame, ) -> SalesPrognosisResults: @@ -81,13 +80,6 @@ def _parse_api_resp_to_df_wrapped( return _parse_api_resp_to_df(resp) -# @wrap_result() -# def _parse_df_to_api_resp_wrapped( -# data: pd.DataFrame, -# ) -> SalesPrognosisResponse: -# return _parse_df_to_api_resp(data) - - @wrap_result() def _parse_df_to_results_wrapped( data: pd.DataFrame, @@ -158,7 +150,8 @@ def _preprocess_sales( def _process_sales( pipe: PipeResult[SalesPrognosisResultsExport], - min_num_data_points: int = 100, + min_num_data_points: int, + base_num_data_points_months: int, ) -> PipeResult[SalesPrognosisResultsExport]: """n = 1 Input-Data: @@ -183,22 +176,6 @@ def _process_sales( # filter data data = pipe.data assert data is not None, "processing not existing pipe result" - data = data.copy() - # df_firma = data[ - # (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0) - # ] - - # for transaction in df_firma["vorgang_refid"].unique(): - # cust_data.order.append(transaction) - # cust_data.date.append( - # df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0] - # ) - # cust_data.sales.append( - # df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum() - # ) - - # df_cust = pd.DataFrame(dc.asdict(cust_data)) - # df_cust = df_cust.sort_values(by="date").reset_index() DATE_FEAT: Final[str] = "buchungs_datum" SALES_FEAT: Final[str] = "betrag" @@ -206,13 +183,10 @@ def _process_sales( df_cust = df_firma.copy() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() - # check data availability - # TODO rework criteria if len(df_cust) < min_num_data_points: pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) return pipe - # Entwicklung der Umsätze: definierte Zeiträume Monat df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["monat"] = df_cust[DATE_FEAT].dt.month @@ -223,31 +197,90 @@ def _process_sales( monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") monthly_sum = monthly_sum.set_index(DATE_FEAT) - train = monthly_sum.iloc[:-5].copy() - test = monthly_sum.iloc[-5:].copy() - features = ["jahr", "monat"] target = SALES_FEAT + current_year = datetime.datetime.now().year + first_year = cast(int, df_cust["jahr"].min()) - X_train, y_train = train[features], train[target] - X_test, y_test = test[features], test[target] + # Randomized Search + kfold = KFold(n_splits=5, shuffle=True) + params: ParamSearchXGBRegressor = { + "n_estimators": scipy.stats.poisson(mu=1000), + "learning_rate": [0.03, 0.04, 0.05], + "max_depth": range(2, 9), + "min_child_weight": range(1, 5), + "gamma": [1], + "subsample": [0.8], + "colsample_bytree": [0.8], + "early_stopping_rounds": [20, 50], + } - reg = XGBRegressor( - base_score=0.5, - booster="gbtree", - n_estimators=1000, - early_stopping_rounds=50, - objective="reg:squarederror", - max_depth=3, - learning_rate=0.01, - ) - reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100) + best_params: BestParametersXGBRegressor | None = None + best_score_mae: float = float("inf") + best_score_r2: float = float("inf") + best_start_year: int | None = None + too_few_month_points: bool = True + forecast: pd.DataFrame | None = None - test.loc[:, "vorhersage"] = reg.predict(X_test) - test = test.drop(SALES_FEAT, axis=1) - test = test.reset_index(drop=True) + for start_year in range(current_year - 4, first_year - 1, -1): + train = cast( + pd.DataFrame, + monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore + ) + test = cast( + pd.DataFrame, + monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore + ) + X_train, X_test = train[features], test[features] + y_train, y_test = train[target], test[target] - pipe.success(test, STATUS_HANDLER.SUCCESS) + if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): + too_few_month_points = False + + rand = RandomizedSearchCV( + XGBRegressor(), + params, + scoring="neg_mean_absolute_error", + cv=kfold, + n_jobs=-1, + n_iter=100, + verbose=0, + ) + rand.fit( + X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0 + ) + y_pred = rand.best_estimator_.predict(X_test) # type: ignore + + if len(np.unique(y_pred)) != 1: + # pp(y_pred) + error = cast(float, mean_absolute_error(y_test, y_pred)) + if error < best_score_mae: + best_params = cast(BestParametersXGBRegressor, rand.best_params_) + best_score_mae = error + best_score_r2 = cast(float, r2_score(y_test, y_pred)) + best_start_year = start_year + print("executed") + forecast = test.copy() + forecast.loc[:, "vorhersage"] = y_pred + + # pp(best_params) + # pp(best_score_mae) + # pp(best_score_r2) + # pp(best_start_year) + if forecast is not None: + forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True) + + # TODO log metrics + + if too_few_month_points: + pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS) + return pipe + elif best_params is None: + pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST) + return pipe + + assert forecast is not None, "forecast is None, but was attempted to be returned" + pipe.success(forecast, STATUS_HANDLER.SUCCESS) return pipe @@ -321,7 +354,8 @@ def pipeline_sales( pipe = _process_sales( pipe, - min_num_data_points=MIN_NUMBER_DATAPOINTS, + min_num_data_points=SALES_MIN_NUM_DATAPOINTS, + base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS, ) if pipe.status != STATUS_HANDLER.SUCCESS: logger.error( diff --git a/src/delta_barth/constants.py b/src/delta_barth/constants.py index 022295f..b613d9d 100644 --- a/src/delta_barth/constants.py +++ b/src/delta_barth/constants.py @@ -15,7 +15,7 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}" DUMMY_DATA_PATH: Final[Path] = dummy_data_pth # ** logging -ENABLE_LOGGING: Final[bool] = True +ENABLE_LOGGING: Final[bool] = False LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_STDERR: Final[bool] = True @@ -53,4 +53,8 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset( ) ) -MIN_NUMBER_DATAPOINTS: Final[int] = 100 + +# ** Pipelines +# ** Forecast +SALES_MIN_NUM_DATAPOINTS: Final[int] = 36 +SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36 diff --git a/src/delta_barth/errors.py b/src/delta_barth/errors.py index 3c7e834..4c126f7 100644 --- a/src/delta_barth/errors.py +++ b/src/delta_barth/errors.py @@ -54,7 +54,8 @@ class UApiError(Exception): DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( ("SUCCESS", 0, "Erfolg"), ("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"), - ("BAD_QUALITY", 2, "Prognosequalität des Modells unzureichend"), + ("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"), + ("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"), ) @@ -239,8 +240,9 @@ def wrap_result( result=NotSet(), exception=err, code_on_error=code_on_error ) logger.error( - "An exception in routine %s occurred, stack trace:", + "An exception in routine %s occurred - msg: %s, stack trace:", func.__name__, + str(err), stack_info=True, ) diff --git a/src/delta_barth/types.py b/src/delta_barth/types.py index d6652fc..c871d2e 100644 --- a/src/delta_barth/types.py +++ b/src/delta_barth/types.py @@ -2,6 +2,7 @@ from __future__ import annotations import enum import typing as t +from collections.abc import Sequence from dataclasses import dataclass, field import pandas as pd @@ -41,7 +42,8 @@ class ExportResponse(BaseModel): class DataPipeStates: SUCCESS: Status TOO_FEW_POINTS: Status - BAD_QUALITY: Status + TOO_FEW_MONTH_POINTS: Status + NO_RELIABLE_FORECAST: Status @dataclass(slots=True) @@ -125,3 +127,25 @@ class CustomerDataSalesForecast: order: list[int] = field(default_factory=list) date: list[pd.Timestamp] = field(default_factory=list) sales: list[float] = field(default_factory=list) + + +class ParamSearchXGBRegressor(t.TypedDict): + n_estimators: t.Any + learning_rate: Sequence[float] + max_depth: range + min_child_weight: range + gamma: Sequence[float] + subsample: Sequence[float] + colsample_bytree: Sequence[float] + early_stopping_rounds: Sequence[int] + + +class BestParametersXGBRegressor(t.TypedDict): + n_estimators: int + learning_rate: float + max_depth: int + min_child_weight: int + gamma: float + subsample: float + colsample_bytree: float + early_stopping_rounds: int diff --git a/tests/analysis/test_forecast.py b/tests/analysis/test_forecast.py index feeb613..da95e52 100644 --- a/tests/analysis/test_forecast.py +++ b/tests/analysis/test_forecast.py @@ -2,6 +2,7 @@ import importlib from datetime import datetime as Datetime from unittest.mock import patch +import numpy as np import pandas as pd import pytest from pydantic import ValidationError @@ -114,16 +115,6 @@ def test_parse_api_resp_to_df_empty(): assert all(col in features for col in df.columns) -# def test_parse_df_to_api_resp_ValidData(valid_df): -# ret = fc._parse_df_to_api_resp(valid_df) -# assert len(ret.daten) > 0 - - -# def test_parse_df_to_api_resp_InvalidData(invalid_df): -# with pytest.raises(ValidationError): -# _ = fc._parse_df_to_api_resp(invalid_df) - - def test_parse_df_to_results_ValidData(valid_results): ret = fc._parse_df_to_results(valid_results) assert len(ret.daten) > 0 @@ -171,9 +162,12 @@ def test_preprocess_sales_FailOnTargetFeature( def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() - # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._process_sales(pipe) + pipe = fc._process_sales( + pipe, + min_num_data_points=36, + base_num_data_points_months=1, + ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None @@ -183,9 +177,12 @@ def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] - # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._process_sales(pipe) + pipe = fc._process_sales( + pipe, + min_num_data_points=36, + base_num_data_points_months=36, + ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS @@ -193,6 +190,55 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc): assert pipe.results is None +def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): + data = sales_data_real_preproc.copy() + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + pipe = fc._process_sales( + pipe, + min_num_data_points=36, + base_num_data_points_months=36, + ) + + assert pipe.status != STATUS_HANDLER.SUCCESS + assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS + assert pipe.data is None + assert pipe.results is None + + +def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): + data = sales_data_real_preproc.copy() + data["betrag"] = 10000 + print(data["betrag"]) + data = data.iloc[:20000, :] + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + + class PatchSearchCV: + def __init__(self, *args, **kwargs) -> None: + class Predictor: + def predict(self, *args, **kwargs): + return np.array([1, 1, 1, 1]) + + self.best_estimator_ = Predictor() + + def fit(*args, **kwargs): + pass + + with patch( + "delta_barth.analysis.forecast.RandomizedSearchCV", + new=PatchSearchCV, + ): + pipe = fc._process_sales( + pipe, + min_num_data_points=1, + base_num_data_points_months=-100, + ) + + assert pipe.status != STATUS_HANDLER.SUCCESS + assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST + assert pipe.data is None + assert pipe.results is None + + def test_postprocess_sales_Success( valid_results, ): @@ -234,17 +280,18 @@ def test_export_on_fail(): assert res.status.description == status.description +@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp): def mock_request(*args, **kwargs): # pragma: no cover return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS with patch( - "delta_barth.api.requests.get_sales_prognosis_data", - new=mock_request, - ): - importlib.reload(delta_barth.analysis.forecast) + "delta_barth.analysis.forecast.get_sales_prognosis_data", + # new=mock_request, + ) as mock: + mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS result = fc.pipeline_sales(None) # type: ignore - + print(result) assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 diff --git a/tests/test_errors.py b/tests/test_errors.py index 31e2c88..4b1d6f3 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -92,7 +92,7 @@ def test_status_handler_raise_for_status_Success(status_hdlr): def test_status_handler_raise_for_status_PredefinedErrors(status_hdlr): # data related errors (predefined) - err_status = status_hdlr.pipe_states.BAD_QUALITY + err_status = status_hdlr.pipe_states.NO_RELIABLE_FORECAST err_descr = err_status.description with pytest.raises(errors.UDataProcessingError): try: diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index 7c7080a..6451a53 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -9,6 +9,7 @@ from delta_barth import pipelines as pl from delta_barth.errors import STATUS_HANDLER +@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): with patch( "delta_barth.analysis.forecast.get_sales_prognosis_data",