add new reworked sales forecast pipeline #8

Merged
foefl merged 3 commits from rework_forecast_pipeline into main 2025-03-26 12:31:00 +00:00
9 changed files with 203 additions and 91 deletions

8
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"] groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:58003d8d013a90ffc3a883bfbb6d307718a03b4103c40ece3c56c5ef78fb27ad" content_hash = "sha256:d51351adbafc599b97f8b3c9047ad0c7b8607d47cff5874121f546af04793ee2"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.11" requires_python = ">=3.11"
@ -225,7 +225,7 @@ files = [
[[package]] [[package]]
name = "bump-my-version" name = "bump-my-version"
version = "0.32.1" version = "1.1.1"
requires_python = ">=3.8" requires_python = ">=3.8"
summary = "Version bump your Python project" summary = "Version bump your Python project"
groups = ["dev"] groups = ["dev"]
@ -241,8 +241,8 @@ dependencies = [
"wcmatch>=8.5.1", "wcmatch>=8.5.1",
] ]
files = [ files = [
{file = "bump_my_version-0.32.1-py3-none-any.whl", hash = "sha256:76605d0f98d0627b4cff972b4fcd56ff5423357047d954421fe6b721304e5ceb"}, {file = "bump_my_version-1.1.1-py3-none-any.whl", hash = "sha256:6bd78e20421f6335c1a49d7e85a2f16ae8966897d0a2dd130a0e8b6b55954686"},
{file = "bump_my_version-0.32.1.tar.gz", hash = "sha256:cb537096cba01a2832972902bfff9e0e0d6e8f8dc9fe31c742096a331262e2aa"}, {file = "bump_my_version-1.1.1.tar.gz", hash = "sha256:2f590e0eabe894196289c296c52170559c09876454514ae8fce5db75bd47289e"},
] ]
[[package]] [[package]]

View File

@ -1,6 +1,6 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.3.4" version = "0.4.0dev0"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.1.1" current_version = "0.4.0dev0"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.
@ -106,7 +106,7 @@ pre_commit_hooks = []
post_commit_hooks = [] post_commit_hooks = []
[tool.bumpversion.parts.pre_l] [tool.bumpversion.parts.pre_l]
values = ["dev", "a", "b", "rc", "final"] values = ["dev", "rc", "final"]
optional_value = "final" optional_value = "final"
[[tool.bumpversion.files]] [[tool.bumpversion.files]]
@ -143,7 +143,7 @@ lint = [
] ]
dev = [ dev = [
"pdoc3>=0.11.5", "pdoc3>=0.11.5",
"bump-my-version>=0.32.1", "bump-my-version>=1.1.1",
"nox>=2025.2.9", "nox>=2025.2.9",
] ]
nb = [ nb = [

View File

@ -1,11 +1,15 @@
from __future__ import annotations from __future__ import annotations
import datetime
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final from typing import TYPE_CHECKING, Final, cast
import numpy as np
import pandas as pd import pandas as pd
from sklearn.metrics import mean_squared_error import scipy.stats
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
from delta_barth.analysis import parse from delta_barth.analysis import parse
@ -20,12 +24,15 @@ from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
MIN_NUMBER_DATAPOINTS, SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS,
) )
from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger from delta_barth.logging import logger_pipelines as logger
from delta_barth.types import ( from delta_barth.types import (
BestParametersXGBRegressor,
DualDict, DualDict,
ParamSearchXGBRegressor,
PipeResult, PipeResult,
) )
@ -58,14 +65,6 @@ def _parse_api_resp_to_df(
return pd.DataFrame(data) return pd.DataFrame(data)
# def _parse_df_to_api_resp(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# df_formatted = data.to_dict(orient="records")
# return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
def _parse_df_to_results( def _parse_df_to_results(
data: pd.DataFrame, data: pd.DataFrame,
) -> SalesPrognosisResults: ) -> SalesPrognosisResults:
@ -81,13 +80,6 @@ def _parse_api_resp_to_df_wrapped(
return _parse_api_resp_to_df(resp) return _parse_api_resp_to_df(resp)
# @wrap_result()
# def _parse_df_to_api_resp_wrapped(
# data: pd.DataFrame,
# ) -> SalesPrognosisResponse:
# return _parse_df_to_api_resp(data)
@wrap_result() @wrap_result()
def _parse_df_to_results_wrapped( def _parse_df_to_results_wrapped(
data: pd.DataFrame, data: pd.DataFrame,
@ -158,7 +150,8 @@ def _preprocess_sales(
def _process_sales( def _process_sales(
pipe: PipeResult[SalesPrognosisResultsExport], pipe: PipeResult[SalesPrognosisResultsExport],
min_num_data_points: int = 100, min_num_data_points: int,
base_num_data_points_months: int,
) -> PipeResult[SalesPrognosisResultsExport]: ) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1 """n = 1
Input-Data: Input-Data:
@ -183,22 +176,6 @@ def _process_sales(
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
data = data.copy()
# df_firma = data[
# (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0)
# ]
# for transaction in df_firma["vorgang_refid"].unique():
# cust_data.order.append(transaction)
# cust_data.date.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0]
# )
# cust_data.sales.append(
# df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum()
# )
# df_cust = pd.DataFrame(dc.asdict(cust_data))
# df_cust = df_cust.sort_values(by="date").reset_index()
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
@ -206,13 +183,10 @@ def _process_sales(
df_cust = df_firma.copy() df_cust = df_firma.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
# check data availability
# TODO rework criteria
if len(df_cust) < min_num_data_points: if len(df_cust) < min_num_data_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
return pipe return pipe
# Entwicklung der Umsätze: definierte Zeiträume Monat
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
@ -223,31 +197,90 @@ def _process_sales(
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
monthly_sum = monthly_sum.set_index(DATE_FEAT) monthly_sum = monthly_sum.set_index(DATE_FEAT)
train = monthly_sum.iloc[:-5].copy()
test = monthly_sum.iloc[-5:].copy()
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min())
X_train, y_train = train[features], train[target] # Randomized Search
X_test, y_test = test[features], test[target] kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
"gamma": [1],
"subsample": [0.8],
"colsample_bytree": [0.8],
"early_stopping_rounds": [20, 50],
}
reg = XGBRegressor( best_params: BestParametersXGBRegressor | None = None
base_score=0.5, best_score_mae: float = float("inf")
booster="gbtree", best_score_r2: float = float("inf")
n_estimators=1000, best_start_year: int | None = None
early_stopping_rounds=50, too_few_month_points: bool = True
objective="reg:squarederror", forecast: pd.DataFrame | None = None
max_depth=3,
learning_rate=0.01,
)
reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100)
test.loc[:, "vorhersage"] = reg.predict(X_test) for start_year in range(current_year - 4, first_year - 1, -1):
test = test.drop(SALES_FEAT, axis=1) train = cast(
test = test.reset_index(drop=True) pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
pipe.success(test, STATUS_HANDLER.SUCCESS) if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
too_few_month_points = False
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=-1,
n_iter=100,
verbose=0,
)
rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
# pp(y_pred)
error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year
print("executed")
forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred
# pp(best_params)
# pp(best_score_mae)
# pp(best_score_r2)
# pp(best_start_year)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
# TODO log metrics
if too_few_month_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS)
return pipe
elif best_params is None:
pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST)
return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned"
pipe.success(forecast, STATUS_HANDLER.SUCCESS)
return pipe return pipe
@ -321,7 +354,8 @@ def pipeline_sales(
pipe = _process_sales( pipe = _process_sales(
pipe, pipe,
min_num_data_points=MIN_NUMBER_DATAPOINTS, min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
) )
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
logger.error( logger.error(

View File

@ -15,7 +15,7 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging # ** logging
ENABLE_LOGGING: Final[bool] = True ENABLE_LOGGING: Final[bool] = False
LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True LOGGING_TO_STDERR: Final[bool] = True
@ -53,4 +53,8 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
) )
) )
MIN_NUMBER_DATAPOINTS: Final[int] = 100
# ** Pipelines
# ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@ -54,7 +54,8 @@ class UApiError(Exception):
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"), ("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
("BAD_QUALITY", 2, "Prognosequalität des Modells unzureichend"), ("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
) )
@ -239,8 +240,9 @@ def wrap_result(
result=NotSet(), exception=err, code_on_error=code_on_error result=NotSet(), exception=err, code_on_error=code_on_error
) )
logger.error( logger.error(
"An exception in routine %s occurred, stack trace:", "An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__, func.__name__,
str(err),
stack_info=True, stack_info=True,
) )

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import enum import enum
import typing as t import typing as t
from collections.abc import Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
import pandas as pd import pandas as pd
@ -41,7 +42,8 @@ class ExportResponse(BaseModel):
class DataPipeStates: class DataPipeStates:
SUCCESS: Status SUCCESS: Status
TOO_FEW_POINTS: Status TOO_FEW_POINTS: Status
BAD_QUALITY: Status TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status
@dataclass(slots=True) @dataclass(slots=True)
@ -125,3 +127,25 @@ class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)
date: list[pd.Timestamp] = field(default_factory=list) date: list[pd.Timestamp] = field(default_factory=list)
sales: list[float] = field(default_factory=list) sales: list[float] = field(default_factory=list)
class ParamSearchXGBRegressor(t.TypedDict):
n_estimators: t.Any
learning_rate: Sequence[float]
max_depth: range
min_child_weight: range
gamma: Sequence[float]
subsample: Sequence[float]
colsample_bytree: Sequence[float]
early_stopping_rounds: Sequence[int]
class BestParametersXGBRegressor(t.TypedDict):
n_estimators: int
learning_rate: float
max_depth: int
min_child_weight: int
gamma: float
subsample: float
colsample_bytree: float
early_stopping_rounds: int

View File

@ -2,6 +2,7 @@ import importlib
from datetime import datetime as Datetime from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
import numpy as np
import pandas as pd import pandas as pd
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
@ -114,16 +115,6 @@ def test_parse_api_resp_to_df_empty():
assert all(col in features for col in df.columns) assert all(col in features for col in df.columns)
# def test_parse_df_to_api_resp_ValidData(valid_df):
# ret = fc._parse_df_to_api_resp(valid_df)
# assert len(ret.daten) > 0
# def test_parse_df_to_api_resp_InvalidData(invalid_df):
# with pytest.raises(ValidationError):
# _ = fc._parse_df_to_api_resp(invalid_df)
def test_parse_df_to_results_ValidData(valid_results): def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results) ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0 assert len(ret.daten) > 0
@ -171,9 +162,12 @@ def test_preprocess_sales_FailOnTargetFeature(
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe) pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=1,
)
assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None assert pipe.data is not None
@ -183,9 +177,12 @@ def test_process_sales_Success(sales_data_real_preproc):
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(pipe) pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
@ -193,6 +190,55 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.results is None assert pipe.results is None
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None
assert pipe.results is None
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
class PatchSearchCV:
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
self.best_estimator_ = Predictor()
def fit(*args, **kwargs):
pass
with patch(
"delta_barth.analysis.forecast.RandomizedSearchCV",
new=PatchSearchCV,
):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None
assert pipe.results is None
def test_postprocess_sales_Success( def test_postprocess_sales_Success(
valid_results, valid_results,
): ):
@ -234,17 +280,18 @@ def test_export_on_fail():
assert res.status.description == status.description assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp): def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): # pragma: no cover def mock_request(*args, **kwargs): # pragma: no cover
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch( with patch(
"delta_barth.api.requests.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
new=mock_request, # new=mock_request,
): ) as mock:
importlib.reload(delta_barth.analysis.forecast) mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales(None) # type: ignore result = fc.pipeline_sales(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0 assert len(result.response.daten) > 0

View File

@ -92,7 +92,7 @@ def test_status_handler_raise_for_status_Success(status_hdlr):
def test_status_handler_raise_for_status_PredefinedErrors(status_hdlr): def test_status_handler_raise_for_status_PredefinedErrors(status_hdlr):
# data related errors (predefined) # data related errors (predefined)
err_status = status_hdlr.pipe_states.BAD_QUALITY err_status = status_hdlr.pipe_states.NO_RELIABLE_FORECAST
err_descr = err_status.description err_descr = err_status.description
with pytest.raises(errors.UDataProcessingError): with pytest.raises(errors.UDataProcessingError):
try: try:

View File

@ -9,6 +9,7 @@ from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
with patch( with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",