import importlib from datetime import datetime as Datetime from unittest.mock import patch import pandas as pd import pytest from pydantic import ValidationError import delta_barth.analysis.forecast from delta_barth.analysis import forecast as fc from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry from delta_barth.errors import STATUS_HANDLER from delta_barth.types import DualDict, PipeResult @pytest.fixture(scope="function") def feature_map() -> DualDict[str, str]: return DualDict( artikelId="artikel_refid", firmaId="firma_refid", betrag="betrag", menge="menge", buchungsDatum="buchungs_datum", ) @pytest.fixture(scope="module") def target_features() -> frozenset[str]: return frozenset( ( "firma_refid", "betrag", "buchungs_datum", ) ) @pytest.fixture(scope="function") def valid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def valid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": [1200.25, 1500.50, 1750.75], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame: data = sales_data_real.copy() data_feats = data.columns mapped_feats: list[str] = [] for feat in data_feats: if feat in feature_map: mapped_feats.append(feature_map[feat]) else: mapped_feats.append(feat) data.columns = mapped_feats return data def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp): resp = exmpl_api_sales_prognosis_resp df = fc._parse_api_resp_to_df(resp) features = set(SalesPrognosisResponseEntry.__annotations__.keys()) assert all(col in features for col in df.columns) def test_parse_api_resp_to_df_empty(): resp = SalesPrognosisResponse(daten=tuple()) df = fc._parse_api_resp_to_df(resp) features = set(SalesPrognosisResponseEntry.__annotations__.keys()) assert all(col in features for col in df.columns) # def test_parse_df_to_api_resp_ValidData(valid_df): # ret = fc._parse_df_to_api_resp(valid_df) # assert len(ret.daten) > 0 # def test_parse_df_to_api_resp_InvalidData(invalid_df): # with pytest.raises(ValidationError): # _ = fc._parse_df_to_api_resp(invalid_df) def test_parse_df_to_results_ValidData(valid_results): ret = fc._parse_df_to_results(valid_results) assert len(ret.daten) > 0 def test_parse_df_to_results_InvalidData(invalid_results): with pytest.raises(ValidationError): _ = fc._parse_df_to_results(invalid_results) def test_preprocess_sales_Success( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None df = pipe.data assert len(df.columns) == 6 assert any(feat not in df.columns for feat in feature_map.keys()) def test_preprocess_sales_FailOnTargetFeature( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp target_features = {"not_known_feature", "test2"} pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status.code != 0 assert pipe.data is None assert pipe.results is None def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales(pipe) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None assert pipe.results is None def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales(pipe) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.data is None assert pipe.results is None def test_postprocess_sales_Success( valid_results, ): data = valid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is not None def test_postprocess_sales_FailValidation( invalid_results, ): data = invalid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is None assert "ValidationError" in pipe.status.description def test_export_on_fail(): status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS res = fc._export_on_fail(status) assert res.response is not None assert len(res.response.daten) == 0 assert res.status is not None assert res.status.code == status.code assert res.status.description == status.description def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp): def mock_request(*args, **kwargs): # pragma: no cover return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS with patch( "delta_barth.api.requests.get_sales_prognosis_data", new=mock_request, ): importlib.reload(delta_barth.analysis.forecast) result = fc.pipeline_sales(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 def test_pipeline_sales_prognosis_dummy(): result = fc.pipeline_sales_dummy(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 entry = result.response.daten[0] assert entry.jahr == 2022 assert entry.monat == 11 assert entry.vorhersage == pytest.approx(47261.058594)