refactoring

This commit is contained in:
Florian Förster 2025-03-14 06:58:48 +01:00
parent 405e9a9c3c
commit 02ec161a30
4 changed files with 55 additions and 63 deletions

View File

@ -72,14 +72,14 @@ def _parse_df_to_results(
@wrap_result() @wrap_result()
def _parse_api_resp_to_df_wrapped( def _parse_api_resp_to_df_wrapped(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
) -> pd.DataFrame: # pragma: no cover ) -> pd.DataFrame:
return _parse_api_resp_to_df(resp) return _parse_api_resp_to_df(resp)
@wrap_result() @wrap_result()
def _parse_df_to_api_resp_wrapped( def _parse_df_to_api_resp_wrapped(
data: pd.DataFrame, data: pd.DataFrame,
) -> SalesPrognosisResponse: # pragma: no cover ) -> SalesPrognosisResponse:
return _parse_df_to_api_resp(data) return _parse_df_to_api_resp(data)
@ -109,7 +109,7 @@ def _parse_df_to_results_wrapped(
# TODO set min number of data points as constant, not parameter # TODO set min number of data points as constant, not parameter
def _preprocess_sales_per_customer( def _preprocess_sales(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
target_features: Set[str], target_features: Set[str],
@ -151,9 +151,8 @@ def _preprocess_sales_per_customer(
return pipe return pipe
def _process_sales_per_customer( def _process_sales(
pipe: PipeResult[SalesPrognosisResultsExport], pipe: PipeResult[SalesPrognosisResultsExport],
# company_id: int,
min_num_data_points: int = 100, min_num_data_points: int = 100,
) -> PipeResult[SalesPrognosisResultsExport]: ) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1 """n = 1
@ -174,11 +173,9 @@ def _process_sales_per_customer(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
# TODO change away from nested DataFrames: just use "f_umsatz_fakt"
# TODO with strong type checks
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
data = data.copy() data = data.copy()
@ -208,7 +205,7 @@ def _process_sales_per_customer(
if len(df_cust) < min_num_data_points: if len(df_cust) < min_num_data_points:
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
return pipe return pipe
else:
# Entwicklung der Umsätze: definierte Zeiträume Monat # Entwicklung der Umsätze: definierte Zeiträume Monat
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
@ -238,9 +235,7 @@ def _process_sales_per_customer(
max_depth=3, max_depth=3,
learning_rate=0.01, learning_rate=0.01,
) )
reg.fit( reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100)
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100
)
test.loc[:, "vorhersage"] = reg.predict(X_test) test.loc[:, "vorhersage"] = reg.predict(X_test)
test = test.reset_index(drop=True) test = test.reset_index(drop=True)
@ -249,7 +244,7 @@ def _process_sales_per_customer(
return pipe return pipe
def _postprocess_sales_per_customer( def _postprocess_sales(
pipe: PipeResult[SalesPrognosisResultsExport], pipe: PipeResult[SalesPrognosisResultsExport],
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
) -> PipeResult[SalesPrognosisResultsExport]: ) -> PipeResult[SalesPrognosisResultsExport]:
@ -287,7 +282,7 @@ def _export_on_fail(
return SalesPrognosisResultsExport(response=response, status=status) return SalesPrognosisResultsExport(response=response, status=status)
def pipeline( def pipeline_sales(
session: Session, session: Session,
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
@ -300,7 +295,7 @@ def pipeline(
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
return _export_on_fail(status) return _export_on_fail(status)
pipe = _preprocess_sales_per_customer( pipe = _preprocess_sales(
response, response,
feature_map=COL_MAP_SALES_PROGNOSIS, feature_map=COL_MAP_SALES_PROGNOSIS,
target_features=FEATURES_SALES_PROGNOSIS, target_features=FEATURES_SALES_PROGNOSIS,
@ -308,14 +303,14 @@ def pipeline(
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
return _export_on_fail(pipe.status) return _export_on_fail(pipe.status)
pipe = _process_sales_per_customer( pipe = _process_sales(
pipe, pipe,
min_num_data_points=MIN_NUMBER_DATAPOINTS, min_num_data_points=MIN_NUMBER_DATAPOINTS,
) )
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
return _export_on_fail(pipe.status) return _export_on_fail(pipe.status)
pipe = _postprocess_sales_per_customer( pipe = _postprocess_sales(
pipe, pipe,
feature_map=DualDict(), feature_map=DualDict(),
) )

View File

@ -11,7 +11,7 @@ def pipeline_sales_forecast(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> tuple[JsonResponse, JsonStatus]: ) -> tuple[JsonResponse, JsonStatus]:
result = forecast.pipeline(SESSION, company_id=company_id, start_date=start_date) result = forecast.pipeline_sales(SESSION, company_id=company_id, start_date=start_date)
response = JsonResponse(result.response.model_dump_json()) response = JsonResponse(result.response.model_dump_json())
status = JsonStatus(result.status.model_dump_json()) status = JsonStatus(result.status.model_dump_json())

View File

@ -135,13 +135,13 @@ def test_parse_df_to_results_InvalidData(invalid_results):
_ = fc._parse_df_to_results(invalid_results) _ = fc._parse_df_to_results(invalid_results)
def test_preprocess_sales_per_customer_Success( def test_preprocess_sales_Success(
exmpl_api_sales_prognosis_resp, exmpl_api_sales_prognosis_resp,
feature_map, feature_map,
target_features, target_features,
): ):
resp = exmpl_api_sales_prognosis_resp resp = exmpl_api_sales_prognosis_resp
pipe = fc._preprocess_sales_per_customer( pipe = fc._preprocess_sales(
resp, resp,
feature_map=feature_map, feature_map=feature_map,
target_features=target_features, target_features=target_features,
@ -153,14 +153,14 @@ def test_preprocess_sales_per_customer_Success(
assert any(feat not in df.columns for feat in feature_map.keys()) assert any(feat not in df.columns for feat in feature_map.keys())
def test_preprocess_sales_per_customer_FailOnTargetFeature( def test_preprocess_sales_FailOnTargetFeature(
exmpl_api_sales_prognosis_resp, exmpl_api_sales_prognosis_resp,
feature_map, feature_map,
target_features, target_features,
): ):
resp = exmpl_api_sales_prognosis_resp resp = exmpl_api_sales_prognosis_resp
target_features = {"not_known_feature", "test2"} target_features = {"not_known_feature", "test2"}
pipe = fc._preprocess_sales_per_customer( pipe = fc._preprocess_sales(
resp, resp,
feature_map=feature_map, feature_map=feature_map,
target_features=target_features, target_features=target_features,
@ -170,23 +170,23 @@ def test_preprocess_sales_per_customer_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
def test_sales_per_customer_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
# fc._preprocess_sales_per_customer() # fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales_per_customer(pipe) pipe = fc._process_sales(pipe)
assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None assert pipe.data is not None
assert pipe.results is None assert pipe.results is None
def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
# fc._preprocess_sales_per_customer() # fc._preprocess_sales_per_customer()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales_per_customer(pipe) pipe = fc._process_sales(pipe)
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
@ -194,13 +194,13 @@ def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc):
assert pipe.results is None assert pipe.results is None
def test_postprocess_sales_per_customer_Success( def test_postprocess_sales_Success(
valid_results, valid_results,
): ):
data = valid_results data = valid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales_per_customer( pipe = fc._postprocess_sales(
pipe, pipe,
feature_map=DualDict(), feature_map=DualDict(),
) )
@ -209,13 +209,13 @@ def test_postprocess_sales_per_customer_Success(
assert pipe.results is not None assert pipe.results is not None
def test_postprocess_sales_per_customer_FailValidation( def test_postprocess_sales_FailValidation(
invalid_results, invalid_results,
): ):
data = invalid_results data = invalid_results
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._postprocess_sales_per_customer( pipe = fc._postprocess_sales(
pipe, pipe,
feature_map=DualDict(), feature_map=DualDict(),
) )
@ -225,7 +225,7 @@ def test_postprocess_sales_per_customer_FailValidation(
assert "ValidationError" in pipe.status.description assert "ValidationError" in pipe.status.description
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): def mock_request(*args, **kwargs):
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
@ -234,7 +234,7 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
new=mock_request, new=mock_request,
): ):
importlib.reload(delta_barth.analysis.forecast) importlib.reload(delta_barth.analysis.forecast)
result = fc.pipeline(None) # type: ignore result = fc.pipeline_sales(None) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0 assert len(result.response.daten) > 0

View File

@ -2,14 +2,11 @@ import importlib
import json import json
from unittest.mock import patch from unittest.mock import patch
import pytest
import delta_barth.pipelines import delta_barth.pipelines
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
@pytest.mark.new
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
def mock_request(*args, **kwargs): def mock_request(*args, **kwargs):
return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS