delta-barth-py/tests/analysis/test_forecast.py

308 lines
8.6 KiB
Python

import importlib
from datetime import datetime as Datetime
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from pydantic import ValidationError
import delta_barth.analysis.forecast
from delta_barth.analysis import forecast as fc
from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry
from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DualDict, PipeResult
@pytest.fixture(scope="function")
def feature_map() -> DualDict[str, str]:
return DualDict(
artikelId="artikel_refid",
firmaId="firma_refid",
betrag="betrag",
menge="menge",
buchungsDatum="buchungs_datum",
)
@pytest.fixture(scope="module")
def target_features() -> frozenset[str]:
return frozenset(
(
"firma_refid",
"betrag",
"buchungs_datum",
)
)
@pytest.fixture(scope="function")
def valid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def valid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": [1200.25, 1500.50, 1750.75],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame:
data = sales_data_real.copy()
data_feats = data.columns
mapped_feats: list[str] = []
for feat in data_feats:
if feat in feature_map:
mapped_feats.append(feature_map[feat])
else:
mapped_feats.append(feat)
data.columns = mapped_feats
return data
def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp):
resp = exmpl_api_sales_prognosis_resp
df = fc._parse_api_resp_to_df(resp)
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
assert all(col in features for col in df.columns)
def test_parse_api_resp_to_df_empty():
resp = SalesPrognosisResponse(daten=tuple())
df = fc._parse_api_resp_to_df(resp)
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
assert all(col in features for col in df.columns)
def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0
def test_parse_df_to_results_InvalidData(invalid_results):
with pytest.raises(ValidationError):
_ = fc._parse_df_to_results(invalid_results)
def test_preprocess_sales_Success(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
pipe = fc._preprocess_sales(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
df = pipe.data
assert len(df.columns) == 6
assert any(feat not in df.columns for feat in feature_map.keys())
def test_preprocess_sales_FailOnTargetFeature(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
target_features = {"not_known_feature", "test2"}
pipe = fc._preprocess_sales(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status.code != 0
assert pipe.data is None
assert pipe.results is None
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=1,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
assert pipe.results is None
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None
assert pipe.results is None
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
class PatchSearchCV:
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
self.best_estimator_ = Predictor()
def fit(*args, **kwargs):
pass
with patch(
"delta_barth.analysis.forecast.RandomizedSearchCV",
new=PatchSearchCV,
):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None
assert pipe.results is None
def test_postprocess_sales_Success(
valid_results,
):
data = valid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales(
pipe,
feature_map=DualDict(),
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is not None
def test_postprocess_sales_FailValidation(
invalid_results,
):
data = invalid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales(
pipe,
feature_map=DualDict(),
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is None
assert "ValidationError" in pipe.status.description
def test_export_on_fail():
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
res = fc._export_on_fail(status)
assert res.response is not None
assert len(res.response.daten) == 0
assert res.status is not None
assert res.status.code == status.code
assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): # pragma: no cover
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
# new=mock_request,
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
def test_pipeline_sales_prognosis_dummy():
result = fc.pipeline_sales_dummy(None) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
entry = result.response.daten[0]
assert entry.jahr == 2022
assert entry.monat == 11
assert entry.vorhersage == pytest.approx(47261.058594)