261 lines
7.2 KiB
Python
261 lines
7.2 KiB
Python
import importlib
|
|
from datetime import datetime as Datetime
|
|
from unittest.mock import patch
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
import delta_barth.analysis.forecast
|
|
from delta_barth.analysis import forecast as fc
|
|
from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry
|
|
from delta_barth.errors import STATUS_HANDLER
|
|
from delta_barth.types import DualDict, PipeResult
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def feature_map() -> DualDict[str, str]:
|
|
return DualDict(
|
|
artikelId="artikel_refid",
|
|
firmaId="firma_refid",
|
|
betrag="betrag",
|
|
menge="menge",
|
|
buchungsDatum="buchungs_datum",
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def target_features() -> frozenset[str]:
|
|
return frozenset(
|
|
(
|
|
"firma_refid",
|
|
"betrag",
|
|
"buchungs_datum",
|
|
)
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def valid_df() -> pd.DataFrame:
|
|
data = {
|
|
"artikelId": [1, 2, 3],
|
|
"warengruppeId": [1, 2, 3],
|
|
"firmaId": [100, 200, 300],
|
|
"betrag": [1200.25, 1500.50, 1750.75],
|
|
"menge": [100, 200, 300],
|
|
"buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)],
|
|
}
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def invalid_df() -> pd.DataFrame:
|
|
data = {
|
|
"artikelId": [1, 2, 3],
|
|
"warengruppeId": [1, 2, 3],
|
|
"firmaId": [100, 200, 300],
|
|
"betrag": [1200.25, 1500.50, 1750.75],
|
|
"menge": [100, 200, 300],
|
|
"buchungsDatum": ["test", "test2", "test3"],
|
|
}
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def valid_results() -> pd.DataFrame:
|
|
data = {
|
|
"jahr": [2023, 2023, 2024],
|
|
"monat": [1, 2, 3],
|
|
"betrag": [100, 200, 300],
|
|
"vorhersage": [1200.25, 1500.50, 1750.75],
|
|
}
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def invalid_results() -> pd.DataFrame:
|
|
data = {
|
|
"jahr": [2023, 2023, 2024],
|
|
"monat": [1, 2, 3],
|
|
"betrag": [100, 200, 300],
|
|
"vorhersage": ["test", "test2", "test3"],
|
|
}
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame:
|
|
data = sales_data_real.copy()
|
|
data_feats = data.columns
|
|
mapped_feats: list[str] = []
|
|
|
|
for feat in data_feats:
|
|
if feat in feature_map:
|
|
mapped_feats.append(feature_map[feat])
|
|
else:
|
|
mapped_feats.append(feat)
|
|
|
|
data.columns = mapped_feats
|
|
|
|
return data
|
|
|
|
|
|
def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp):
|
|
resp = exmpl_api_sales_prognosis_resp
|
|
df = fc._parse_api_resp_to_df(resp)
|
|
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
|
|
assert all(col in features for col in df.columns)
|
|
|
|
|
|
def test_parse_api_resp_to_df_empty():
|
|
resp = SalesPrognosisResponse(daten=tuple())
|
|
df = fc._parse_api_resp_to_df(resp)
|
|
features = set(SalesPrognosisResponseEntry.__annotations__.keys())
|
|
assert all(col in features for col in df.columns)
|
|
|
|
|
|
# def test_parse_df_to_api_resp_ValidData(valid_df):
|
|
# ret = fc._parse_df_to_api_resp(valid_df)
|
|
# assert len(ret.daten) > 0
|
|
|
|
|
|
# def test_parse_df_to_api_resp_InvalidData(invalid_df):
|
|
# with pytest.raises(ValidationError):
|
|
# _ = fc._parse_df_to_api_resp(invalid_df)
|
|
|
|
|
|
def test_parse_df_to_results_ValidData(valid_results):
|
|
ret = fc._parse_df_to_results(valid_results)
|
|
assert len(ret.daten) > 0
|
|
|
|
|
|
def test_parse_df_to_results_InvalidData(invalid_results):
|
|
with pytest.raises(ValidationError):
|
|
_ = fc._parse_df_to_results(invalid_results)
|
|
|
|
|
|
def test_preprocess_sales_Success(
|
|
exmpl_api_sales_prognosis_resp,
|
|
feature_map,
|
|
target_features,
|
|
):
|
|
resp = exmpl_api_sales_prognosis_resp
|
|
pipe = fc._preprocess_sales(
|
|
resp,
|
|
feature_map=feature_map,
|
|
target_features=target_features,
|
|
)
|
|
assert pipe.status == STATUS_HANDLER.SUCCESS
|
|
assert pipe.data is not None
|
|
df = pipe.data
|
|
assert len(df.columns) == 5
|
|
assert any(feat not in df.columns for feat in feature_map.keys())
|
|
|
|
|
|
def test_preprocess_sales_FailOnTargetFeature(
|
|
exmpl_api_sales_prognosis_resp,
|
|
feature_map,
|
|
target_features,
|
|
):
|
|
resp = exmpl_api_sales_prognosis_resp
|
|
target_features = {"not_known_feature", "test2"}
|
|
pipe = fc._preprocess_sales(
|
|
resp,
|
|
feature_map=feature_map,
|
|
target_features=target_features,
|
|
)
|
|
assert pipe.status.code != 0
|
|
assert pipe.data is None
|
|
assert pipe.results is None
|
|
|
|
|
|
def test_process_sales_Success(sales_data_real_preproc):
|
|
data = sales_data_real_preproc.copy()
|
|
# fc._preprocess_sales_per_customer()
|
|
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
|
pipe = fc._process_sales(pipe)
|
|
|
|
assert pipe.status == STATUS_HANDLER.SUCCESS
|
|
assert pipe.data is not None
|
|
assert pipe.results is None
|
|
|
|
|
|
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
|
|
data = sales_data_real_preproc.copy()
|
|
data = data.iloc[:20, :]
|
|
# fc._preprocess_sales_per_customer()
|
|
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
|
pipe = fc._process_sales(pipe)
|
|
|
|
assert pipe.status != STATUS_HANDLER.SUCCESS
|
|
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
|
|
assert pipe.data is None
|
|
assert pipe.results is None
|
|
|
|
|
|
def test_postprocess_sales_Success(
|
|
valid_results,
|
|
):
|
|
data = valid_results
|
|
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
|
|
|
pipe = fc._postprocess_sales(
|
|
pipe,
|
|
feature_map=DualDict(),
|
|
)
|
|
assert pipe.status == STATUS_HANDLER.SUCCESS
|
|
assert pipe.data is None
|
|
assert pipe.results is not None
|
|
|
|
|
|
def test_postprocess_sales_FailValidation(
|
|
invalid_results,
|
|
):
|
|
data = invalid_results
|
|
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
|
|
|
pipe = fc._postprocess_sales(
|
|
pipe,
|
|
feature_map=DualDict(),
|
|
)
|
|
assert pipe.status != STATUS_HANDLER.SUCCESS
|
|
assert pipe.data is None
|
|
assert pipe.results is None
|
|
assert "ValidationError" in pipe.status.description
|
|
|
|
|
|
def test_export_on_fail():
|
|
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
|
|
res = fc._export_on_fail(status)
|
|
assert res.response is not None
|
|
assert len(res.response.daten) == 0
|
|
assert res.status is not None
|
|
assert res.status.code == status.code
|
|
assert res.status.description == status.description
|
|
|
|
|
|
def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
|
|
def mock_request(*args, **kwargs): # pragma: no cover
|
|
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
|
|
|
|
with patch(
|
|
"delta_barth.api.requests.get_sales_prognosis_data",
|
|
new=mock_request,
|
|
):
|
|
importlib.reload(delta_barth.analysis.forecast)
|
|
result = fc.pipeline_sales(None) # type: ignore
|
|
|
|
assert result.status == STATUS_HANDLER.SUCCESS
|
|
assert len(result.response.daten) > 0
|
|
|
|
|
|
def test_pipeline_sales_prognosis_dummy():
|
|
result = fc.pipeline_sales_dummy(None) # type: ignore
|
|
|
|
assert result.status == STATUS_HANDLER.SUCCESS
|
|
assert len(result.response.daten) > 0
|
|
entry = result.response.daten[0]
|
|
assert entry.jahr == 2022
|
|
assert entry.monat == 11
|
|
assert entry.vorhersage == pytest.approx(47261.058594)
|