delta-barth-py/tests/analysis/test_forecast.py

482 lines
15 KiB
Python

import datetime
from datetime import datetime as Datetime
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
import sqlalchemy as sql
from pydantic import ValidationError
from delta_barth import databases as db
from delta_barth.analysis import forecast as fc
from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry
from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import (
BestParametersXGBRegressor,
DualDict,
PipeResult,
SalesForecastStatistics,
)
@pytest.fixture(scope="function")
def feature_map() -> DualDict[str, str]:
return DualDict(
artikelId="artikel_refid",
firmaId="firma_refid",
betrag="betrag",
menge="menge",
buchungsDatum="buchungs_datum",
)
@pytest.fixture(scope="module")
def target_features() -> frozenset[str]:
return frozenset(
(
"firma_refid",
"betrag",
"buchungs_datum",
)
)
@pytest.fixture(scope="function")
def valid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def valid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": [1200.25, 1500.50, 1750.75],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame:
data = sales_data_real.copy()
data_feats = data.columns
mapped_feats: list[str] = []
for feat in data_feats:
if feat in feature_map:
mapped_feats.append(feature_map[feat])
else:
mapped_feats.append(feat)
data.columns = mapped_feats
return data
def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp):
resp = exmpl_api_sales_prognosis_resp
df = fc._parse_api_resp_to_df(resp)
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
assert all(col in features for col in df.columns)
def test_parse_api_resp_to_df_empty():
resp = SalesPrognosisResponse(daten=tuple())
df = fc._parse_api_resp_to_df(resp)
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
assert all(col in features for col in df.columns)
def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0
def test_parse_df_to_results_InvalidData(invalid_results):
with pytest.raises(ValidationError):
_ = fc._parse_df_to_results(invalid_results)
def test_write_sales_forecast_stats_small(session):
eng = session.db_engine
code = 0
descr = "Test case to write stats"
length = 32
stats = SalesForecastStatistics(code, descr, length)
# execute
with patch("delta_barth.analysis.forecast.SESSION", session):
fc._write_sales_forecast_stats(stats)
# read
with eng.begin() as conn:
res = conn.execute(sql.select(db.sf_stats))
inserted = tuple(res.mappings())[0]
data = dict(**inserted)
del data["id"]
result = SalesForecastStatistics(**data)
assert result.status_code == code
assert result.status_dscr == descr
assert result.length_dataset == length
assert result.score_mae is None
assert result.score_r2 is None
assert result.best_start_year is None
assert result.xgb_params is None
def test_write_sales_forecast_stats_large(session):
eng = session.db_engine
code = 0
descr = "Test case to write stats"
length = 32
score_mae = 3.54
score_r2 = 0.56
best_start_year = 2020
xgb_params = BestParametersXGBRegressor(
n_estimators=2,
learning_rate=0.3,
max_depth=2,
min_child_weight=5,
gamma=0.5,
subsample=0.8,
colsample_bytree=5.25,
early_stopping_rounds=5,
)
stats = SalesForecastStatistics(
code,
descr,
length,
score_mae,
score_r2,
best_start_year,
xgb_params,
)
# execute
with patch("delta_barth.analysis.forecast.SESSION", session):
fc._write_sales_forecast_stats(stats)
# read
with eng.begin() as conn:
res_stats = conn.execute(sql.select(db.sf_stats))
res_xgb = conn.execute(sql.select(db.sf_XGB))
# reconstruct best XGB parameters
inserted_xgb = tuple(res_xgb.mappings())[0]
data_xgb = dict(**inserted_xgb)
del data_xgb["id"]
xgb_stats = BestParametersXGBRegressor(**data_xgb)
# reconstruct other statistics
inserted = tuple(res_stats.mappings())[0]
data_inserted = dict(**inserted)
stats_id_fk = data_inserted["id"] # foreign key in XGB parameters
del data_inserted["id"]
stats = SalesForecastStatistics(**data_inserted, xgb_params=xgb_stats)
assert stats.status_code == code
assert stats.status_dscr == descr
assert stats.length_dataset == length
assert stats.score_mae == pytest.approx(score_mae)
assert stats.score_r2 == pytest.approx(score_r2)
assert stats.best_start_year == best_start_year
assert stats.xgb_params is not None
# compare xgb_stats
assert stats.xgb_params["forecast_id"] == stats_id_fk # type: ignore
assert stats.xgb_params["n_estimators"] == 2
assert stats.xgb_params["learning_rate"] == pytest.approx(0.3)
assert stats.xgb_params["max_depth"] == 2
assert stats.xgb_params["min_child_weight"] == 5
assert stats.xgb_params["gamma"] == pytest.approx(0.5)
assert stats.xgb_params["subsample"] == pytest.approx(0.8)
assert stats.xgb_params["colsample_bytree"] == pytest.approx(5.25)
assert stats.xgb_params["early_stopping_rounds"] == 5
def test_preprocess_sales_Success(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
pipe = fc._preprocess_sales(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
df = pipe.data
assert len(df.columns) == 6
assert any(feat not in df.columns for feat in feature_map.keys())
def test_preprocess_sales_FailOnTargetFeature(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
target_features = {"not_known_feature", "test2"}
pipe = fc._preprocess_sales(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status.code != 0
assert pipe.data is None
assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=1,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.SUCCESS.code
assert pipe.statistics.status_dscr == STATUS_HANDLER.SUCCESS.description
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is not None
assert pipe.statistics.score_r2 is not None
assert pipe.statistics.best_start_year is not None
assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_InvalidDates(sales_data_real_preproc):
false_date = Datetime(2519, 6, 30)
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
data["buchungs_datum"] = data["buchungs_datum"].astype(object)
data.at[0, "buchungs_datum"] = false_date
assert data["buchungs_datum"].dtype.char == "O"
assert len(data) == 20
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.code
assert (
pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
# prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
class PatchSearchCV:
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor()
def fit(*args, **kwargs):
pass
with patch(
"delta_barth.analysis.forecast.RandomizedSearchCV",
new=PatchSearchCV,
):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=1,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
def test_postprocess_sales_Success(
valid_results,
):
data = valid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales(
pipe,
feature_map=DualDict(),
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is not None
def test_postprocess_sales_FailValidation(
invalid_results,
):
data = invalid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales(
pipe,
feature_map=DualDict(),
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is None
assert "ValidationError" in pipe.status.description
def test_export_on_fail():
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
res = fc._export_on_fail(status)
assert res.response is not None
assert len(res.response.daten) == 0
assert res.status is not None
assert res.status.code == status.code
assert res.status.description == status.description
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
assert session.cfg.forecast.threshold_month_data_points is not None
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(session, company_ids, date) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
def test_pipeline_sales_prognosis_dummy():
result = fc.pipeline_sales_dummy(None) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
entry = result.response.daten[0]
assert entry.jahr == 2022
assert entry.monat == 11
assert entry.vorhersage == pytest.approx(47261.058594)