diff --git a/pyproject.toml b/pyproject.toml index 0974385..cf56355 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "delta-barth" -version = "0.5.7dev1" +version = "0.5.7dev2" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, @@ -44,7 +44,8 @@ filterwarnings = [ ] markers = [ "api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')", - "new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')", + "new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')", + "forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')" ] log_cli = true @@ -73,7 +74,7 @@ directory = "reports/coverage" [tool.bumpversion] -current_version = "0.5.7dev1" +current_version = "0.5.7dev2" parse = """(?x) (?P0|[1-9]\\d*)\\. (?P0|[1-9]\\d*)\\. diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index aa8e59c..39bb13f 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import datetime import math from collections.abc import Mapping, Set @@ -11,6 +12,9 @@ import numpy as np import pandas as pd import scipy.stats import sqlalchemy as sql + +# --- new: for calculating timedelta +from dateutil.relativedelta import relativedelta from sklearn.metrics import mean_absolute_error, r2_score from sklearn.model_selection import KFold, RandomizedSearchCV from xgboost import XGBRegressor @@ -183,16 +187,14 @@ def _process_sales( PipeResult _description_ """ - # cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() - # filter data data = pipe.data assert data is not None, "processing not existing pipe result" DATE_FEAT: Final[str] = "buchungs_datum" SALES_FEAT: Final[str] = "betrag" - df_firma = data[(data["betrag"] > 0)] - df_cust = df_firma.copy() + df_filter = data[(data["betrag"] > 0)] + df_cust = df_filter.copy() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() len_ds = len(df_cust) @@ -206,7 +208,26 @@ def _process_sales( df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["monat"] = df_cust[DATE_FEAT].dt.month - monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() + monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() + + current_year = datetime.datetime.now().year + current_month = datetime.datetime.now().month + years = range(df_cust["jahr"].min(), current_year + 1) + + all_month_year_combinations = pd.DataFrame( + [ + (year, month) + for year in years + for month in range(1, 13) + if (year < current_year or (year == current_year and month <= current_month)) + ], + columns=["jahr", "monat"], + ) + + monthly_sum = pd.merge( + all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left" + ) + monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0) monthly_sum[DATE_FEAT] = ( monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) ) @@ -215,13 +236,17 @@ def _process_sales( features = ["jahr", "monat"] target = SALES_FEAT - current_year = datetime.datetime.now().year - first_year = cast(int, df_cust["jahr"].min()) + + last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y") + future_dates = pd.date_range( + start=last_date + pd.DateOffset(months=1), periods=6, freq="MS" + ) + forecast = pd.DataFrame({"datum": future_dates}).set_index("datum") # Randomized Search kfold = KFold(n_splits=5, shuffle=True) params: ParamSearchXGBRegressor = { - "n_estimators": scipy.stats.poisson(mu=1000), + "n_estimators": scipy.stats.poisson(mu=100), "learning_rate": [0.03, 0.04, 0.05], "max_depth": range(2, 9), "min_child_weight": range(1, 5), @@ -231,26 +256,40 @@ def _process_sales( "early_stopping_rounds": [20, 50], } + best_estimator = None best_params: BestParametersXGBRegressor | None = None best_score_mae: float | None = float("inf") best_score_r2: float | None = None best_start_year: int | None = None too_few_month_points: bool = True - forecast: pd.DataFrame | None = None - for start_year in range(current_year - 4, first_year - 1, -1): + dates = cast(pd.DatetimeIndex, monthly_sum.index) + # baseline: 3 years - 36 months + starting_date = datetime.datetime.now() - relativedelta(months=36) + + target_index, _ = next( + ((i, True) for i, date in enumerate(dates) if date >= starting_date), + (len(dates) - 1, False), + ) + + for add_year, date_idx in enumerate(range(target_index, -1, -12)): + first_date = dates[date_idx] + split_date = dates[-6] + train = cast( pd.DataFrame, - monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore + monthly_sum.loc[first_date:split_date].copy(), # type: ignore ) test = cast( pd.DataFrame, - monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore + monthly_sum.loc[split_date:].copy(), # type: ignore ) X_train, X_test = train[features], test[features] y_train, y_test = train[target], test[target] - if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): + # test set size fixed at 6 --> first iteration: baseline - 6 entries + # for each new year 10 new data points (i.e., sales strictly positive) needed + if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year): too_few_month_points = False rand = RandomizedSearchCV( @@ -273,13 +312,21 @@ def _process_sales( best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_score_mae = error best_score_r2 = cast(float, r2_score(y_test, y_pred)) - best_start_year = start_year - print("executed") - forecast = test.copy() - forecast.loc[:, "vorhersage"] = y_pred + # --- new: use first_date for best_start_year + best_start_year = first_date.year + # --- new: store best_estimator + best_estimator = copy.copy(rand.best_estimator_) + + if best_estimator is not None: + X_future = pd.DataFrame( + {"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates + ) + y_future = best_estimator.predict(X_future) # type: ignore + forecast["vorhersage"] = y_future + forecast["jahr"] = forecast.index.year # type: ignore + forecast["monat"] = forecast.index.month # type: ignore + forecast = forecast.reset_index(drop=True) - if forecast is not None: - forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True) best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None if too_few_month_points: @@ -295,7 +342,9 @@ def _process_sales( pipe.stats(stats) return pipe - assert forecast is not None, "forecast is None, but was attempted to be returned" + assert "vorhersage" in forecast.columns, ( + "forecast does not contain prognosis values, but was attempted to be returned" + ) status = STATUS_HANDLER.SUCCESS pipe.success(forecast, status) stats = SalesForecastStatistics( diff --git a/src/delta_barth/constants.py b/src/delta_barth/constants.py index 45c74ba..5f163ae 100644 --- a/src/delta_barth/constants.py +++ b/src/delta_barth/constants.py @@ -17,7 +17,7 @@ DUMMY_DATA_PATH: Final[Path] = dummy_data_pth # ** logging ENABLE_LOGGING: Final[bool] = True LOGGING_TO_FILE: Final[bool] = True -LOGGING_TO_STDERR: Final[bool] = True +LOGGING_TO_STDERR: Final[bool] = False LOG_FILENAME: Final[str] = "dopt-delbar.log" # ** databases @@ -40,7 +40,7 @@ class KnownDelBarApiErrorCodes(enum.Enum): # ** API -API_CON_TIMEOUT: Final[float] = 5.0 # secs to response +API_CON_TIMEOUT: Final[float] = 10.0 # secs to response # ** API response parsing # ** column mapping [API-Response --> Target-Features] COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( diff --git a/tests/analysis/test_forecast.py b/tests/analysis/test_forecast.py index d467ca0..cc2cd39 100644 --- a/tests/analysis/test_forecast.py +++ b/tests/analysis/test_forecast.py @@ -1,4 +1,6 @@ +import datetime from datetime import datetime as Datetime +from pathlib import Path from unittest.mock import patch import numpy as np @@ -255,6 +257,7 @@ def test_preprocess_sales_FailOnTargetFeature( assert pipe.results is None +@pytest.mark.forecast def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) @@ -277,6 +280,7 @@ def test_process_sales_Success(sales_data_real_preproc): assert pipe.statistics.xgb_params is not None +@pytest.mark.forecast def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] @@ -303,6 +307,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc): assert pipe.statistics.xgb_params is None +@pytest.mark.forecast def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) @@ -329,8 +334,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): assert pipe.statistics.xgb_params is None +@pytest.mark.forecast def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): - data = sales_data_real_preproc.copy() + # prepare fake data + df = sales_data_real_preproc.copy() + f_dates = "buchungs_datum" + end = datetime.datetime.now() + start = df[f_dates].max() + fake_dates = pd.date_range(start, end, freq="MS") + fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates] + fake_df = pd.DataFrame(fake_data, columns=df.columns) + enhanced_df = pd.concat((df, fake_df), ignore_index=True) + + data = enhanced_df.copy() data["betrag"] = 10000 print(data["betrag"]) data = data.iloc[:20000, :] @@ -340,7 +356,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def __init__(self, *args, **kwargs) -> None: class Predictor: def predict(self, *args, **kwargs): - return np.array([1, 1, 1, 1]) + return np.array([1, 1, 1, 1], dtype=np.float64) self.best_estimator_ = Predictor() @@ -354,7 +370,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): pipe = fc._process_sales( pipe, min_num_data_points=1, - base_num_data_points_months=-100, + base_num_data_points_months=1, ) assert pipe.status != STATUS_HANDLER.SUCCESS diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index b1387b7..e1e8b1e 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -1,17 +1,15 @@ -import importlib import json from unittest.mock import patch import pytest import sqlalchemy as sql -import delta_barth.pipelines from delta_barth import databases as db from delta_barth import pipelines as pl from delta_barth.errors import STATUS_HANDLER -def test_write_performance_metrics(session): +def test_write_performance_metrics_Success(session): pipe_name = "test_pipe" t_start = 20_000_000_000 t_end = 30_000_000_000 @@ -33,6 +31,20 @@ def test_write_performance_metrics(session): assert metrics.execution_duration == 10 +def test_write_performance_metrics_FailStartingTime(session): + pipe_name = "test_pipe" + t_start = 30_000_000_000 + t_end = 20_000_000_000 + + with patch("delta_barth.pipelines.SESSION", session): + with pytest.raises(ValueError): + _ = pl._write_performance_metrics( + pipeline_name=pipe_name, + time_start=t_start, + time_end=t_end, + ) + + @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session): with patch( diff --git a/tests/test_session.py b/tests/test_session.py index 31cd51d..bf28c92 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -64,6 +64,7 @@ def test_session_setup_db_management(tmp_path): @patch("delta_barth.logging.ENABLE_LOGGING", True) @patch("delta_barth.logging.LOGGING_TO_FILE", True) +@patch("delta_barth.logging.LOGGING_TO_STDERR", True) def test_session_setup_logging(tmp_path): str_path = str(tmp_path) foldername: str = "logging_test"