delta-barth-py/tests/analysis/test_forecast.py

245 lines
6.6 KiB
Python

import importlib
from datetime import datetime as Datetime
from pathlib import Path
from unittest.mock import patch
import pandas as pd
import pytest
from pydantic import ValidationError
from delta_barth.analysis import forecast as fc
from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DualDict, PipeResult
@pytest.fixture(scope="function")
def feature_map() -> DualDict[str, str]:
return DualDict(
artikelId="artikel_refid",
firmaId="firma_refid",
betrag="betrag",
menge="menge",
buchungsDatum="buchungs_datum",
)
@pytest.fixture(scope="module")
def target_features() -> frozenset[str]:
return frozenset(
(
"firma_refid",
"betrag",
"buchungs_datum",
)
)
@pytest.fixture(scope="function")
def valid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_df() -> pd.DataFrame:
data = {
"artikelId": [1, 2, 3],
"warengruppeId": [1, 2, 3],
"firmaId": [100, 200, 300],
"betrag": [1200.25, 1500.50, 1750.75],
"menge": [100, 200, 300],
"buchungsDatum": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def valid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": [1200.25, 1500.50, 1750.75],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def invalid_results() -> pd.DataFrame:
data = {
"jahr": [2023, 2023, 2024],
"monat": [1, 2, 3],
"betrag": [100, 200, 300],
"vorhersage": ["test", "test2", "test3"],
}
return pd.DataFrame(data)
@pytest.fixture(scope="function")
def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame:
data = sales_data_real.copy()
data_feats = data.columns
mapped_feats: list[str] = []
for feat in data_feats:
if feat in feature_map:
mapped_feats.append(feature_map[feat])
else:
mapped_feats.append(feat)
data.columns = mapped_feats
return data
def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp):
resp = exmpl_api_sales_prognosis_resp
df = fc._parse_api_resp_to_df(resp)
features = set(
(
"artikelId",
"warengruppeId",
"firmaId",
"betrag",
"menge",
"buchungsDatum",
)
)
assert all(col in features for col in df.columns)
def test_parse_df_to_api_resp_ValidData(valid_df):
ret = fc._parse_df_to_api_resp(valid_df)
assert len(ret.daten) > 0
def test_parse_df_to_api_resp_InvalidData(invalid_df):
with pytest.raises(ValidationError):
_ = fc._parse_df_to_api_resp(invalid_df)
def test_parse_df_to_results_ValidData(valid_results):
ret = fc._parse_df_to_results(valid_results)
assert len(ret.daten) > 0
def test_parse_df_to_results_InvalidData(invalid_results):
with pytest.raises(ValidationError):
_ = fc._parse_df_to_results(invalid_results)
def test_preprocess_sales_per_customer_Success(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
pipe = fc._preprocess_sales_per_customer(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
df = pipe.data
assert len(df.columns) == 5
assert any(feat not in df.columns for feat in feature_map.keys())
def test_preprocess_sales_per_customer_FailOnTargetFeature(
exmpl_api_sales_prognosis_resp,
feature_map,
target_features,
):
resp = exmpl_api_sales_prognosis_resp
target_features = {"not_known_feature", "test2"}
pipe = fc._preprocess_sales_per_customer(
resp,
feature_map=feature_map,
target_features=target_features,
)
assert pipe.status.code != 0
assert pipe.data is None
assert pipe.results is None
def test_sales_per_customer_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales_per_customer(pipe)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None
assert pipe.results is None
def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
# fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales_per_customer(pipe)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
def test_postprocess_sales_per_customer_Success(
valid_results,
):
data = valid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales_per_customer(
pipe,
feature_map=DualDict(),
)
assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is not None
def test_postprocess_sales_per_customer_FailValidation(
invalid_results,
):
data = invalid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales_per_customer(
pipe,
feature_map=DualDict(),
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.data is None
assert pipe.results is None
assert "ValidationError" in pipe.status.description
def test_sales_prognosis_pipeline(monkeypatch, exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs):
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
import delta_barth.api.requests
monkeypatch.setattr(delta_barth.api.requests, "get_sales_prognosis_data", mock_request)
importlib.reload(delta_barth.api.requests)
with patch(
"delta_barth.api.requests.get_sales_prognosis_data",
new=mock_request,
):
importlib.reload(delta_barth.analysis.forecast) # type: ignore
result = fc.pipeline(None) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0