add new reworked sales forecast pipeline #8

Merged
foefl merged 3 commits from rework_forecast_pipeline into main 2025-03-26 12:31:00 +00:00
9 changed files with 203 additions and 91 deletions

8
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:58003d8d013a90ffc3a883bfbb6d307718a03b4103c40ece3c56c5ef78fb27ad"
content_hash = "sha256:d51351adbafc599b97f8b3c9047ad0c7b8607d47cff5874121f546af04793ee2"
[[metadata.targets]]
requires_python = ">=3.11"
@ -225,7 +225,7 @@ files = [
[[package]]
name = "bump-my-version"
version = "0.32.1"
version = "1.1.1"
requires_python = ">=3.8"
summary = "Version bump your Python project"
groups = ["dev"]
@ -241,8 +241,8 @@ dependencies = [
"wcmatch>=8.5.1",
]
files = [
{file = "bump_my_version-0.32.1-py3-none-any.whl", hash = "sha256:76605d0f98d0627b4cff972b4fcd56ff5423357047d954421fe6b721304e5ceb"},
{file = "bump_my_version-0.32.1.tar.gz", hash = "sha256:cb537096cba01a2832972902bfff9e0e0d6e8f8dc9fe31c742096a331262e2aa"},
{file = "bump_my_version-1.1.1-py3-none-any.whl", hash = "sha256:6bd78e20421f6335c1a49d7e85a2f16ae8966897d0a2dd130a0e8b6b55954686"},
{file = "bump_my_version-1.1.1.tar.gz", hash = "sha256:2f590e0eabe894196289c296c52170559c09876454514ae8fce5db75bd47289e"},
]
[[package]]

View File

@ -1,6 +1,6 @@
[project]
name = "delta-barth"
version = "0.3.4"
version = "0.4.0dev0"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.1.1"
current_version = "0.4.0dev0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@ -106,7 +106,7 @@ pre_commit_hooks = []
post_commit_hooks = []
[tool.bumpversion.parts.pre_l]
values = ["dev", "a", "b", "rc", "final"]
values = ["dev", "rc", "final"]
optional_value = "final"
[[tool.bumpversion.files]]
@ -143,7 +143,7 @@ lint = [
]
dev = [
"pdoc3>=0.11.5",
"bump-my-version>=0.32.1",
"bump-my-version>=1.1.1",
"nox>=2025.2.9",
]
nb = [

View File

@ -1,11 +1,15 @@
from __future__ import annotations
import datetime
from collections.abc import Mapping, Set
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final
from typing import TYPE_CHECKING, Final, cast
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
import scipy.stats
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
from delta_barth.analysis import parse
@ -20,12 +24,15 @@ from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
MIN_NUMBER_DATAPOINTS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.types import (
BestParametersXGBRegressor,
DualDict,
ParamSearchXGBRegressor,
PipeResult,
)
@ -58,14 +65,6 @@ def _parse_api_resp_to_df(
return pd.DataFrame(data)
# def _parse_df_to_api_resp(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# df_formatted = data.to_dict(orient="records")
# return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
def _parse_df_to_results(
data: pd.DataFrame,
) -> SalesPrognosisResults:
@ -81,13 +80,6 @@ def _parse_api_resp_to_df_wrapped(
return _parse_api_resp_to_df(resp)
# @wrap_result()
# def _parse_df_to_api_resp_wrapped(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# return _parse_df_to_api_resp(data)
@wrap_result()
def _parse_df_to_results_wrapped(
data: pd.DataFrame,
@ -158,7 +150,8 @@ def _preprocess_sales(
def _process_sales(
pipe: PipeResult[SalesPrognosisResultsExport],
min_num_data_points: int = 100,
min_num_data_points: int,
base_num_data_points_months: int,
) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1
Input-Data:
@ -183,22 +176,6 @@ def _process_sales(
# filter data
data = pipe.data
assert data is not None, "processing not existing pipe result"
data = data.copy()
# df_firma = data[
# (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0)
# ]
# for transaction in df_firma["vorgang_refid"].unique():
# cust_data.order.append(transaction)
# cust_data.date.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0]
# )
# cust_data.sales.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum()
# )
# df_cust = pd.DataFrame(dc.asdict(cust_data))
# df_cust = df_cust.sort_values(by="date").reset_index()
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
@ -206,13 +183,10 @@ def _process_sales(
df_cust = df_firma.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
# check data availability
# TODO rework criteria
if len(df_cust) < min_num_data_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
return pipe
# Entwicklung der Umsätze: definierte Zeiträume Monat
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
@ -223,31 +197,90 @@ def _process_sales(
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
monthly_sum = monthly_sum.set_index(DATE_FEAT)
train = monthly_sum.iloc[:-5].copy()
test = monthly_sum.iloc[-5:].copy()
features = ["jahr", "monat"]
target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min())
X_train, y_train = train[features], train[target]
X_test, y_test = test[features], test[target]
# Randomized Search
kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
"gamma": [1],
"subsample": [0.8],
"colsample_bytree": [0.8],
"early_stopping_rounds": [20, 50],
}
reg = XGBRegressor(
base_score=0.5,
booster="gbtree",
n_estimators=1000,
early_stopping_rounds=50,
objective="reg:squarederror",
max_depth=3,
learning_rate=0.01,
)
reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100)
best_params: BestParametersXGBRegressor | None = None
best_score_mae: float = float("inf")
best_score_r2: float = float("inf")
best_start_year: int | None = None
too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
test.loc[:, "vorhersage"] = reg.predict(X_test)
test = test.drop(SALES_FEAT, axis=1)
test = test.reset_index(drop=True)
for start_year in range(current_year - 4, first_year - 1, -1):
train = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
pipe.success(test, STATUS_HANDLER.SUCCESS)
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
too_few_month_points = False
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=-1,
n_iter=100,
verbose=0,
)
rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
# pp(y_pred)
error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year
print("executed")
forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred
# pp(best_params)
# pp(best_score_mae)
# pp(best_score_r2)
# pp(best_start_year)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
# TODO log metrics
if too_few_month_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS)
return pipe
elif best_params is None:
pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST)
return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned"
pipe.success(forecast, STATUS_HANDLER.SUCCESS)
return pipe
@ -321,7 +354,8 @@ def pipeline_sales(
pipe = _process_sales(
pipe,
min_num_data_points=MIN_NUMBER_DATAPOINTS,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
)
if pipe.status != STATUS_HANDLER.SUCCESS:
logger.error(

View File

@ -15,7 +15,7 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging
ENABLE_LOGGING: Final[bool] = True
ENABLE_LOGGING: Final[bool] = False
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True
@ -53,4 +53,8 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
)
)
MIN_NUMBER_DATAPOINTS: Final[int] = 100
# ** Pipelines
# ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@ -54,7 +54,8 @@ class UApiError(Exception):
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
("BAD_QUALITY", 2, "Prognosequalität des Modells unzureichend"),
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
)
@ -239,8 +240,9 @@ def wrap_result(
result=NotSet(), exception=err, code_on_error=code_on_error
)
logger.error(
"An exception in routine %s occurred, stack trace:",
"An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__,
str(err),
stack_info=True,
)

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import enum
import typing as t
from collections.abc import Sequence
from dataclasses import dataclass, field
import pandas as pd
@ -41,7 +42,8 @@ class ExportResponse(BaseModel):
class DataPipeStates:
SUCCESS: Status
TOO_FEW_POINTS: Status
BAD_QUALITY: Status
TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status
@dataclass(slots=True)
@ -125,3 +127,25 @@ class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list)
date: list[pd.Timestamp] = field(default_factory=list)
sales: list[float] = field(default_factory=list)
class ParamSearchXGBRegressor(t.TypedDict):
n_estimators: t.Any
learning_rate: Sequence[float]
max_depth: range
min_child_weight: range
gamma: Sequence[float]
subsample: Sequence[float]
colsample_bytree: Sequence[float]
early_stopping_rounds: Sequence[int]
class BestParametersXGBRegressor(t.TypedDict):
n_estimators: int
learning_rate: float
max_depth: int
min_child_weight: int
gamma: float
subsample: float
colsample_bytree: float
early_stopping_rounds: int

View File

@ -2,6 +2,7 @@ import importlib
from datetime import datetime as Datetime
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from pydantic import ValidationError
@ -114,16 +115,6 @@ def test_parse_api_resp_to_df_empty():
assert all(col in features for col in df.columns)
# def test_parse_df_to_api_resp_ValidData(valid_df):
# ret = fc._parse_df_to_api_resp(valid_df)
# assert len(ret.daten) > 0
# def test_parse_df_to_api_resp_InvalidData(invalid_df):
# with pytest.raises(ValidationError):
# _ = fc._parse_df_to_api_resp(invalid_df)
def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0
@ -171,9 +162,12 @@ def test_preprocess_sales_FailOnTargetFeature(
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=1,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
@ -183,9 +177,12 @@ def test_process_sales_Success(sales_data_real_preproc):
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
@ -193,6 +190,55 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.results is None
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None
assert pipe.results is None
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
class PatchSearchCV:
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
self.best_estimator_ = Predictor()
def fit(*args, **kwargs):
pass
with patch(
"delta_barth.analysis.forecast.RandomizedSearchCV",
new=PatchSearchCV,
):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None
assert pipe.results is None
def test_postprocess_sales_Success(
valid_results,
):
@ -234,17 +280,18 @@ def test_export_on_fail():
assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): # pragma: no cover
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch(
"delta_barth.api.requests.get_sales_prognosis_data",
new=mock_request,
):
importlib.reload(delta_barth.analysis.forecast)
"delta_barth.analysis.forecast.get_sales_prognosis_data",
# new=mock_request,
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0

View File

@ -92,7 +92,7 @@ def test_status_handler_raise_for_status_Success(status_hdlr):
def test_status_handler_raise_for_status_PredefinedErrors(status_hdlr):
# data related errors (predefined)
err_status = status_hdlr.pipe_states.BAD_QUALITY
err_status = status_hdlr.pipe_states.NO_RELIABLE_FORECAST
err_descr = err_status.description
with pytest.raises(errors.UDataProcessingError):
try:

View File

@ -9,6 +9,7 @@ from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",