major overhaul of forecast pipeline (#21)

includes several aspects:

- harden forecast logic with additional error checks
- fix wrong behaviour
- ensure minimum data viability
- extrapolate for multiple data points into the future

fix #19

Co-authored-by: frasu
Reviewed-on: #21
Co-authored-by: foefl <f.foerster@d-opt.com>
Co-committed-by: foefl <f.foerster@d-opt.com>
This commit is contained in:
Florian Förster 2025-04-16 09:24:33 +00:00 committed by Florian Förster
parent 6caa087efd
commit 063531a08e
6 changed files with 110 additions and 31 deletions

View File

@ -1,6 +1,6 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.7dev1" version = "0.5.7dev2"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -44,7 +44,8 @@ filterwarnings = [
] ]
markers = [ markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')", "api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')", "new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
] ]
log_cli = true log_cli = true
@ -73,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.7dev1" current_version = "0.5.7dev2"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import copy
import datetime import datetime
import math import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
@ -11,6 +12,9 @@ import numpy as np
import pandas as pd import pandas as pd
import scipy.stats import scipy.stats
import sqlalchemy as sql import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
@ -183,16 +187,14 @@ def _process_sales(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)] df_filter = data[(data["betrag"] > 0)]
df_cust = df_firma.copy() df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust) len_ds = len(df_cust)
@ -206,7 +208,26 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = ( monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
) )
@ -215,13 +236,17 @@ def _process_sales(
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min()) last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search # Randomized Search
kfold = KFold(n_splits=5, shuffle=True) kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = { params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000), "n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05], "learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9), "max_depth": range(2, 9),
"min_child_weight": range(1, 5), "min_child_weight": range(1, 5),
@ -231,26 +256,40 @@ def _process_sales(
"early_stopping_rounds": [20, 50], "early_stopping_rounds": [20, 50],
} }
best_estimator = None
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf") best_score_mae: float | None = float("inf")
best_score_r2: float | None = None best_score_r2: float | None = None
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1): dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast( train = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore monthly_sum.loc[first_date:split_date].copy(), # type: ignore
) )
test = cast( test = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore monthly_sum.loc[split_date:].copy(), # type: ignore
) )
X_train, X_test = train[features], test[features] X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target] y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): # test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False too_few_month_points = False
rand = RandomizedSearchCV( rand = RandomizedSearchCV(
@ -273,13 +312,21 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred)) best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year # --- new: use first_date for best_start_year
print("executed") best_start_year = first_date.year
forecast = test.copy() # --- new: store best_estimator
forecast.loc[:, "vorhersage"] = y_pred best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points: if too_few_month_points:
@ -295,7 +342,9 @@ def _process_sales(
pipe.stats(stats) pipe.stats(stats)
return pipe return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned" assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status) pipe.success(forecast, status)
stats = SalesForecastStatistics( stats = SalesForecastStatistics(

View File

@ -17,7 +17,7 @@ DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging # ** logging
ENABLE_LOGGING: Final[bool] = True ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log" LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases # ** databases
@ -40,7 +40,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API # ** API
API_CON_TIMEOUT: Final[float] = 5.0 # secs to response API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(

View File

@ -1,4 +1,6 @@
import datetime
from datetime import datetime as Datetime from datetime import datetime as Datetime
from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import numpy as np import numpy as np
@ -255,6 +257,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -277,6 +280,7 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
@ -303,6 +307,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -329,8 +334,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy() # prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000 data["betrag"] = 10000
print(data["betrag"]) print(data["betrag"])
data = data.iloc[:20000, :] data = data.iloc[:20000, :]
@ -340,7 +356,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
class Predictor: class Predictor:
def predict(self, *args, **kwargs): def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1]) return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor() self.best_estimator_ = Predictor()
@ -354,7 +370,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales( pipe = fc._process_sales(
pipe, pipe,
min_num_data_points=1, min_num_data_points=1,
base_num_data_points_months=-100, base_num_data_points_months=1,
) )
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS

View File

@ -1,17 +1,15 @@
import importlib
import json import json
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql import sqlalchemy as sql
import delta_barth.pipelines
from delta_barth import databases as db from delta_barth import databases as db
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session): def test_write_performance_metrics_Success(session):
pipe_name = "test_pipe" pipe_name = "test_pipe"
t_start = 20_000_000_000 t_start = 20_000_000_000
t_end = 30_000_000_000 t_end = 30_000_000_000
@ -33,6 +31,20 @@ def test_write_performance_metrics(session):
assert metrics.execution_duration == 10 assert metrics.execution_duration == 10
def test_write_performance_metrics_FailStartingTime(session):
pipe_name = "test_pipe"
t_start = 30_000_000_000
t_end = 20_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
with patch( with patch(

View File

@ -64,6 +64,7 @@ def test_session_setup_db_management(tmp_path):
@patch("delta_barth.logging.ENABLE_LOGGING", True) @patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True) @patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path): def test_session_setup_logging(tmp_path):
str_path = str(tmp_path) str_path = str(tmp_path)
foldername: str = "logging_test" foldername: str = "logging_test"