import importlib from datetime import datetime as Datetime from unittest.mock import patch import pandas as pd import pytest from pydantic import ValidationError from delta_barth.analysis import forecast as fc from delta_barth.errors import STATUS_HANDLER from delta_barth.types import DualDict, PipeResult @pytest.fixture(scope="function") def feature_map() -> DualDict[str, str]: return DualDict( artikelId="artikel_refid", firmaId="firma_refid", betrag="betrag", menge="menge", buchungsDatum="buchungs_datum", ) @pytest.fixture(scope="module") def target_features() -> frozenset[str]: return frozenset( ( "firma_refid", "betrag", "buchungs_datum", ) ) @pytest.fixture(scope="function") def valid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def valid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": [1200.25, 1500.50, 1750.75], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame: data = sales_data_real.copy() data_feats = data.columns mapped_feats: list[str] = [] for feat in data_feats: if feat in feature_map: mapped_feats.append(feature_map[feat]) else: mapped_feats.append(feat) data.columns = mapped_feats return data def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp): resp = exmpl_api_sales_prognosis_resp df = fc._parse_api_resp_to_df(resp) features = set( ( "artikelId", "warengruppeId", "firmaId", "betrag", "menge", "buchungsDatum", ) ) assert all(col in features for col in df.columns) def test_parse_df_to_api_resp_ValidData(valid_df): ret = fc._parse_df_to_api_resp(valid_df) assert len(ret.daten) > 0 def test_parse_df_to_api_resp_InvalidData(invalid_df): with pytest.raises(ValidationError): _ = fc._parse_df_to_api_resp(invalid_df) def test_parse_df_to_results_ValidData(valid_results): ret = fc._parse_df_to_results(valid_results) assert len(ret.daten) > 0 def test_parse_df_to_results_InvalidData(invalid_results): with pytest.raises(ValidationError): _ = fc._parse_df_to_results(invalid_results) def test_preprocess_sales_per_customer_Success( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp pipe = fc._preprocess_sales_per_customer( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None df = pipe.data assert len(df.columns) == 5 assert any(feat not in df.columns for feat in feature_map.keys()) def test_preprocess_sales_per_customer_FailOnTargetFeature( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp target_features = {"not_known_feature", "test2"} pipe = fc._preprocess_sales_per_customer( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status.code != 0 assert pipe.data is None assert pipe.results is None def test_sales_per_customer_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales_per_customer(pipe) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None assert pipe.results is None def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales_per_customer(pipe) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.data is None assert pipe.results is None def test_postprocess_sales_per_customer_Success( valid_results, ): data = valid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales_per_customer( pipe, feature_map=DualDict(), ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is not None def test_postprocess_sales_per_customer_FailValidation( invalid_results, ): data = invalid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales_per_customer( pipe, feature_map=DualDict(), ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is None assert "ValidationError" in pipe.status.description def test_sales_prognosis_pipeline(monkeypatch, exmpl_api_sales_prognosis_resp): def mock_request(*args, **kwargs): return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS import delta_barth.api.requests monkeypatch.setattr(delta_barth.api.requests, "get_sales_prognosis_data", mock_request) importlib.reload(delta_barth.api.requests) with patch( "delta_barth.api.requests.get_sales_prognosis_data", new=mock_request, ): importlib.reload(delta_barth.analysis.forecast) # type: ignore result = fc.pipeline(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0