diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index 3f3fecb..f5340ed 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -72,14 +72,14 @@ def _parse_df_to_results( @wrap_result() def _parse_api_resp_to_df_wrapped( resp: SalesPrognosisResponse, -) -> pd.DataFrame: # pragma: no cover +) -> pd.DataFrame: return _parse_api_resp_to_df(resp) @wrap_result() def _parse_df_to_api_resp_wrapped( data: pd.DataFrame, -) -> SalesPrognosisResponse: # pragma: no cover +) -> SalesPrognosisResponse: return _parse_df_to_api_resp(data) @@ -109,7 +109,7 @@ def _parse_df_to_results_wrapped( # TODO set min number of data points as constant, not parameter -def _preprocess_sales_per_customer( +def _preprocess_sales( resp: SalesPrognosisResponse, feature_map: Mapping[str, str], target_features: Set[str], @@ -151,9 +151,8 @@ def _preprocess_sales_per_customer( return pipe -def _process_sales_per_customer( +def _process_sales( pipe: PipeResult[SalesPrognosisResultsExport], - # company_id: int, min_num_data_points: int = 100, ) -> PipeResult[SalesPrognosisResultsExport]: """n = 1 @@ -174,11 +173,9 @@ def _process_sales_per_customer( PipeResult _description_ """ + # cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() - cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() # filter data - # TODO change away from nested DataFrames: just use "f_umsatz_fakt" - # TODO with strong type checks data = pipe.data assert data is not None, "processing not existing pipe result" data = data.copy() @@ -208,48 +205,46 @@ def _process_sales_per_customer( if len(df_cust) < min_num_data_points: pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) return pipe - else: - # Entwicklung der Umsätze: definierte Zeiträume Monat - df_cust["jahr"] = df_cust[DATE_FEAT].dt.year - df_cust["monat"] = df_cust[DATE_FEAT].dt.month - monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() - monthly_sum[DATE_FEAT] = ( - monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) - ) - monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") - monthly_sum = monthly_sum.set_index(DATE_FEAT) + # Entwicklung der Umsätze: definierte Zeiträume Monat + df_cust["jahr"] = df_cust[DATE_FEAT].dt.year + df_cust["monat"] = df_cust[DATE_FEAT].dt.month - train = monthly_sum.iloc[:-5].copy() - test = monthly_sum.iloc[-5:].copy() + monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() + monthly_sum[DATE_FEAT] = ( + monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) + ) + monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") + monthly_sum = monthly_sum.set_index(DATE_FEAT) - features = ["jahr", "monat"] - target = SALES_FEAT + train = monthly_sum.iloc[:-5].copy() + test = monthly_sum.iloc[-5:].copy() - X_train, y_train = train[features], train[target] - X_test, y_test = test[features], test[target] + features = ["jahr", "monat"] + target = SALES_FEAT - reg = XGBRegressor( - base_score=0.5, - booster="gbtree", - n_estimators=1000, - early_stopping_rounds=50, - objective="reg:squarederror", - max_depth=3, - learning_rate=0.01, - ) - reg.fit( - X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100 - ) + X_train, y_train = train[features], train[target] + X_test, y_test = test[features], test[target] - test.loc[:, "vorhersage"] = reg.predict(X_test) - test = test.reset_index(drop=True) + reg = XGBRegressor( + base_score=0.5, + booster="gbtree", + n_estimators=1000, + early_stopping_rounds=50, + objective="reg:squarederror", + max_depth=3, + learning_rate=0.01, + ) + reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100) - pipe.success(test, STATUS_HANDLER.SUCCESS) - return pipe + test.loc[:, "vorhersage"] = reg.predict(X_test) + test = test.reset_index(drop=True) + + pipe.success(test, STATUS_HANDLER.SUCCESS) + return pipe -def _postprocess_sales_per_customer( +def _postprocess_sales( pipe: PipeResult[SalesPrognosisResultsExport], feature_map: Mapping[str, str], ) -> PipeResult[SalesPrognosisResultsExport]: @@ -287,7 +282,7 @@ def _export_on_fail( return SalesPrognosisResultsExport(response=response, status=status) -def pipeline( +def pipeline_sales( session: Session, company_id: int | None = None, start_date: Datetime | None = None, @@ -300,7 +295,7 @@ def pipeline( if status != STATUS_HANDLER.SUCCESS: return _export_on_fail(status) - pipe = _preprocess_sales_per_customer( + pipe = _preprocess_sales( response, feature_map=COL_MAP_SALES_PROGNOSIS, target_features=FEATURES_SALES_PROGNOSIS, @@ -308,14 +303,14 @@ def pipeline( if pipe.status != STATUS_HANDLER.SUCCESS: return _export_on_fail(pipe.status) - pipe = _process_sales_per_customer( + pipe = _process_sales( pipe, min_num_data_points=MIN_NUMBER_DATAPOINTS, ) if pipe.status != STATUS_HANDLER.SUCCESS: return _export_on_fail(pipe.status) - pipe = _postprocess_sales_per_customer( + pipe = _postprocess_sales( pipe, feature_map=DualDict(), ) diff --git a/src/delta_barth/pipelines.py b/src/delta_barth/pipelines.py index 280d7c3..996a8eb 100644 --- a/src/delta_barth/pipelines.py +++ b/src/delta_barth/pipelines.py @@ -11,7 +11,7 @@ def pipeline_sales_forecast( company_id: int | None, start_date: Datetime | None, ) -> tuple[JsonResponse, JsonStatus]: - result = forecast.pipeline(SESSION, company_id=company_id, start_date=start_date) + result = forecast.pipeline_sales(SESSION, company_id=company_id, start_date=start_date) response = JsonResponse(result.response.model_dump_json()) status = JsonStatus(result.status.model_dump_json()) diff --git a/tests/analysis/test_forecast.py b/tests/analysis/test_forecast.py index f7554ee..7b65e4b 100644 --- a/tests/analysis/test_forecast.py +++ b/tests/analysis/test_forecast.py @@ -135,13 +135,13 @@ def test_parse_df_to_results_InvalidData(invalid_results): _ = fc._parse_df_to_results(invalid_results) -def test_preprocess_sales_per_customer_Success( +def test_preprocess_sales_Success( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp - pipe = fc._preprocess_sales_per_customer( + pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, @@ -153,14 +153,14 @@ def test_preprocess_sales_per_customer_Success( assert any(feat not in df.columns for feat in feature_map.keys()) -def test_preprocess_sales_per_customer_FailOnTargetFeature( +def test_preprocess_sales_FailOnTargetFeature( exmpl_api_sales_prognosis_resp, feature_map, target_features, ): resp = exmpl_api_sales_prognosis_resp target_features = {"not_known_feature", "test2"} - pipe = fc._preprocess_sales_per_customer( + pipe = fc._preprocess_sales( resp, feature_map=feature_map, target_features=target_features, @@ -170,23 +170,23 @@ def test_preprocess_sales_per_customer_FailOnTargetFeature( assert pipe.results is None -def test_sales_per_customer_Success(sales_data_real_preproc): +def test_process_sales_Success(sales_data_real_preproc): data = sales_data_real_preproc.copy() # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._process_sales_per_customer(pipe) + pipe = fc._process_sales(pipe) assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.data is not None assert pipe.results is None -def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc): +def test_process_sales_FailTooFewPoints(sales_data_real_preproc): data = sales_data_real_preproc.copy() data = data.iloc[:20, :] # fc._preprocess_sales_per_customer() pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._process_sales_per_customer(pipe) + pipe = fc._process_sales(pipe) assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS @@ -194,13 +194,13 @@ def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc): assert pipe.results is None -def test_postprocess_sales_per_customer_Success( +def test_postprocess_sales_Success( valid_results, ): data = valid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._postprocess_sales_per_customer( + pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) @@ -209,13 +209,13 @@ def test_postprocess_sales_per_customer_Success( assert pipe.results is not None -def test_postprocess_sales_per_customer_FailValidation( +def test_postprocess_sales_FailValidation( invalid_results, ): data = invalid_results pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) - pipe = fc._postprocess_sales_per_customer( + pipe = fc._postprocess_sales( pipe, feature_map=DualDict(), ) @@ -225,7 +225,7 @@ def test_postprocess_sales_per_customer_FailValidation( assert "ValidationError" in pipe.status.description -def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): +def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp): def mock_request(*args, **kwargs): return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS @@ -234,7 +234,7 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): new=mock_request, ): importlib.reload(delta_barth.analysis.forecast) - result = fc.pipeline(None) # type: ignore + result = fc.pipeline_sales(None) # type: ignore assert result.status == STATUS_HANDLER.SUCCESS assert len(result.response.daten) > 0 diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index 5ec6a3f..4c75266 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -2,14 +2,11 @@ import importlib import json from unittest.mock import patch -import pytest - import delta_barth.pipelines from delta_barth import pipelines as pl from delta_barth.errors import STATUS_HANDLER -@pytest.mark.new def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def mock_request(*args, **kwargs): return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS