import datetime from datetime import datetime as Datetime from unittest.mock import patch import numpy as np import pandas as pd import pytest import sqlalchemy as sql from pydantic import ValidationError from delta_barth import databases as db from delta_barth.analysis import forecast as fc from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry from delta_barth.errors import STATUS_HANDLER from delta_barth.types import ( BestParametersXGBRegressor, DualDict, PipeResult, SalesForecastStatistics, ) @pytest.fixture(scope="function") def feature_map() -> DualDict[str, str]: return DualDict( artikelId="artikel_refid", firmaId="firma_refid", betrag="betrag", menge="menge", buchungsDatum="buchungs_datum", ) @pytest.fixture(scope="module") def target_features() -> frozenset[str]: return frozenset( ( "firma_refid", "betrag", "buchungs_datum", ) ) @pytest.fixture(scope="function") def valid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_df() -> pd.DataFrame: data = { "artikelId": [1, 2, 3], "warengruppeId": [1, 2, 3], "firmaId": [100, 200, 300], "betrag": [1200.25, 1500.50, 1750.75], "menge": [100, 200, 300], "buchungsDatum": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def valid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": [1200.25, 1500.50, 1750.75], } return pd.DataFrame(data) @pytest.fixture(scope="function") def invalid_results() -> pd.DataFrame: data = { "jahr": [2023, 2023, 2024], "monat": [1, 2, 3], "betrag": [100, 200, 300], "vorhersage": ["test", "test2", "test3"], } return pd.DataFrame(data) @pytest.fixture(scope="function") def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame: data = sales_data_real.copy() data_feats = data.columns mapped_feats: list[str] = [] for feat in data_feats: if feat in feature_map: mapped_feats.append(feature_map[feat]) else: mapped_feats.append(feat) data.columns = mapped_feats return data def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp): resp = exmpl_api_sales_prognosis_resp df = fc._parse_api_resp_to_df(resp) features = set(SalesPrognosisResponseEntry.__annotations__.keys()) assert all(col in features for col in df.columns) def test_parse_api_resp_to_df_empty(): resp = SalesPrognosisResponse(daten=tuple()) df = fc._parse_api_resp_to_df(resp) features = set(SalesPrognosisResponseEntry.__annotations__.keys()) assert all(col in features for col in df.columns) def test_parse_df_to_results_ValidData(valid_results): ret = fc._parse_df_to_results(valid_results) assert len(ret.daten) > 0 def test_parse_df_to_results_InvalidData(invalid_results): with pytest.raises(ValidationError): _ = fc._parse_df_to_results(invalid_results) def test_write_sales_forecast_stats_small(session): eng = session.db_engine code = 0 descr = "Test case to write stats" length = 32 stats = SalesForecastStatistics(code, descr, length) # execute with patch("delta_barth.analysis.forecast.SESSION", session): fc._write_sales_forecast_stats(stats) # read with eng.begin() as conn: res = conn.execute(sql.select(db.sf_stats)) inserted = tuple(res.mappings())[0] data = dict(**inserted) del data["id"] result = SalesForecastStatistics(**data) assert result.status_code == code assert result.status_dscr == descr assert result.length_dataset == length assert result.score_mae is None assert result.score_r2 is None assert result.best_start_year is None assert result.xgb_params is None def test_write_sales_forecast_stats_large(session): eng = session.db_engine code = 0 descr = "Test case to write stats" length = 32 score_mae = 3.54 score_r2 = 0.56 best_start_year = 2020 xgb_params = BestParametersXGBRegressor( n_estimators=2, learning_rate=0.3, max_depth=2, min_child_weight=5, gamma=0.5, subsample=0.8, colsample_bytree=5.25, early_stopping_rounds=5, ) stats = SalesForecastStatistics( code, descr, length, score_mae, score_r2, best_start_year, xgb_params, ) # execute with patch("delta_barth.analysis.forecast.SESSION", session): fc._write_sales_forecast_stats(stats) # read with eng.begin() as conn: res_stats = conn.execute(sql.select(db.sf_stats)) res_xgb = conn.execute(sql.select(db.sf_XGB)) # reconstruct best XGB parameters inserted_xgb = tuple(res_xgb.mappings())[0] data_xgb = dict(**inserted_xgb) del data_xgb["id"] xgb_stats = BestParametersXGBRegressor(**data_xgb) # reconstruct other statistics inserted = tuple(res_stats.mappings())[0] data_inserted = dict(**inserted) stats_id_fk = data_inserted["id"] # foreign key in XGB parameters del data_inserted["id"] stats = SalesForecastStatistics(**data_inserted, xgb_params=xgb_stats) assert stats.status_code == code assert stats.status_dscr == descr assert stats.length_dataset == length assert stats.score_mae == pytest.approx(score_mae) assert stats.score_r2 == pytest.approx(score_r2) assert stats.best_start_year == best_start_year assert stats.xgb_params is not None # compare xgb_stats assert stats.xgb_params["forecast_id"] == stats_id_fk # type: ignore assert stats.xgb_params["n_estimators"] == 2 assert stats.xgb_params["learning_rate"] == pytest.approx(0.3) assert stats.xgb_params["max_depth"] == 2 assert stats.xgb_params["min_child_weight"] == 5 assert stats.xgb_params["gamma"] == pytest.approx(0.5) assert stats.xgb_params["subsample"] == pytest.approx(0.8) assert stats.xgb_params["colsample_bytree"] == pytest.approx(5.25) assert stats.xgb_params["early_stopping_rounds"] == 5 def test_preprocess_sales_Success( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None df = pipe.data assert len(df.columns) == 6 assert any(feat not in df.columns for feat in feature_map.keys()) def test_preprocess_sales_FailOnTargetFeature( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp target_features = {"not_known_feature", "test2"} pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, ) assert pipe.status.code != 0 assert pipe.data is None assert pipe.results is None @pytest.mark.forecast def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales( pipe, min_num_data_points=36, base_num_data_points_months=1, ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None assert pipe.results is None assert pipe.statistics is not None assert pipe.statistics.status_code == STATUS_HANDLER.SUCCESS.code assert pipe.statistics.status_dscr == STATUS_HANDLER.SUCCESS.description assert pipe.statistics.length_dataset is not None assert pipe.statistics.score_mae is not None assert pipe.statistics.score_r2 is not None assert pipe.statistics.best_start_year is not None assert pipe.statistics.xgb_params is not None @pytest.mark.forecast def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales( pipe, min_num_data_points=36, base_num_data_points_months=36, ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.data is None assert pipe.results is None assert pipe.statistics is not None assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.code assert ( pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.description ) assert pipe.statistics.length_dataset is not None assert pipe.statistics.score_mae is None assert pipe.statistics.score_r2 is None assert pipe.statistics.best_start_year is None assert pipe.statistics.xgb_params is None @pytest.mark.forecast def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._process_sales( pipe, min_num_data_points=36, base_num_data_points_months=36, ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS assert pipe.data is None assert pipe.results is None assert pipe.statistics is not None assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.code assert ( pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.description ) assert pipe.statistics.length_dataset is not None assert pipe.statistics.score_mae is None assert pipe.statistics.score_r2 is None assert pipe.statistics.best_start_year is None assert pipe.statistics.xgb_params is None @pytest.mark.forecast def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): # prepare fake data df = sales_data_real_preproc.copy() f_dates = "buchungs_datum" end = datetime.datetime.now() start = df[f_dates].max() fake_dates = pd.date_range(start, end, freq="MS") fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates] fake_df = pd.DataFrame(fake_data, columns=df.columns) enhanced_df = pd.concat((df, fake_df), ignore_index=True) data = enhanced_df.copy() data["betrag"] = 10000 print(data["betrag"]) data = data.iloc[:20000, :] pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) class PatchSearchCV: def __init__(self, *args, **kwargs) -> None: class Predictor: def predict(self, *args, **kwargs): return np.array([1, 1, 1, 1], dtype=np.float64) self.best_estimator_ = Predictor() def fit(*args, **kwargs): pass with patch( "delta_barth.analysis.forecast.RandomizedSearchCV", new=PatchSearchCV, ): pipe = fc._process_sales( pipe, min_num_data_points=1, base_num_data_points_months=1, ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST assert pipe.data is None assert pipe.results is None assert pipe.statistics is not None assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.code assert ( pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.description ) assert pipe.statistics.length_dataset is not None assert pipe.statistics.score_mae is None assert pipe.statistics.score_r2 is None assert pipe.statistics.best_start_year is None assert pipe.statistics.xgb_params is None def test_postprocess_sales_Success( valid_results, ): data = valid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is not None def test_postprocess_sales_FailValidation( invalid_results, ): data = invalid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.data is None assert pipe.results is None assert "ValidationError" in pipe.status.description def test_export_on_fail(): status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS res = fc._export_on_fail(status) assert res.response is not None assert len(res.response.daten) == 0 assert res.status is not None assert res.status.code == status.code assert res.status.description == status.description def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session): with ( patch( "delta_barth.analysis.forecast.get_sales_prognosis_data", ) as get_mock, patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock, ): get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS sess_mock.cfg.forecast.threshold_month_data_points = 1 result = fc.pipeline_sales_forecast(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 def test_pipeline_sales_prognosis_dummy(): result = fc.pipeline_sales_dummy(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 entry = result.response.daten[0] assert entry.jahr == 2022 assert entry.monat == 11 assert entry.vorhersage == pytest.approx(47261.058594)