diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index 04c5033..5a51092 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -2,31 +2,96 @@ from __future__ import annotations import dataclasses as dc from collections.abc import Mapping, Set -from typing import TYPE_CHECKING +from datetime import datetime as Datetime +from typing import TYPE_CHECKING, Final import pandas as pd from sklearn.metrics import mean_squared_error from xgboost import XGBRegressor from delta_barth.analysis import parse -from delta_barth.constants import COL_MAP_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS -from delta_barth.errors import STATUS_HANDLER -from delta_barth.types import CustomerDataSalesForecast, DataPipeStates, PipeResult +from delta_barth.api.requests import ( + SalesPrognosisResponse, + SalesPrognosisResults, + SalesPrognosisResultsExport, + get_sales_prognosis_data, +) +from delta_barth.constants import ( + COL_MAP_SALES_PROGNOSIS, + FEATURES_SALES_PROGNOSIS, + MIN_NUMBER_DATAPOINTS, +) +from delta_barth.errors import STATUS_HANDLER, wrap_result +from delta_barth.types import ( + CustomerDataSalesForecast, + DataPipeStates, + DualDict, + ExportResponse, + PipeResult, +) if TYPE_CHECKING: - from delta_barth.api.requests import SalesPrognosisResponse - -# TODO check pandera for DataFrame validation + from delta_barth.api.common import Session + from delta_barth.types import Status -def parse_api_resp_to_df( +def _parse_api_resp_to_df( resp: SalesPrognosisResponse, ) -> pd.DataFrame: + """n >= 2 + + Parameters + ---------- + resp : SalesPrognosisResponse + _description_ + + Returns + ------- + pd.DataFrame + _description_ + """ data = resp.model_dump()["daten"] return pd.DataFrame(data) +def _parse_df_to_api_resp( + data: pd.DataFrame, +) -> SalesPrognosisResponse: + df_formatted = data.to_dict(orient="records") + + return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore + + +def _parse_df_to_results( + data: pd.DataFrame, +) -> SalesPrognosisResults: + df_formatted = data.to_dict(orient="records") + + return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore + + +@wrap_result() +def _parse_api_resp_to_df_wrapped( + resp: SalesPrognosisResponse, +) -> pd.DataFrame: # pragma: no cover + return _parse_api_resp_to_df(resp) + + +@wrap_result() +def _parse_df_to_api_resp_wrapped( + data: pd.DataFrame, +) -> SalesPrognosisResponse: # pragma: no cover + return _parse_df_to_api_resp(data) + + +@wrap_result() +def _parse_df_to_results_wrapped( + data: pd.DataFrame, +) -> SalesPrognosisResults: + return _parse_df_to_results(data) + + # ------------------------------------------------------------------------------ # Input: # DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp @@ -44,83 +109,124 @@ def parse_api_resp_to_df( # TODO: check usage of separate exception and handle it in API function # TODO set min number of data points as constant, not parameter -def preprocess_sales_per_customer( + + +def _preprocess_sales_per_customer( resp: SalesPrognosisResponse, feature_map: Mapping[str, str], target_features: Set[str], -) -> pd.DataFrame: - df = parse_api_resp_to_df(resp) - df = parse.preprocess_features( +) -> PipeResult[SalesPrognosisResultsExport]: + """n = 1 + + Parameters + ---------- + resp : SalesPrognosisResponse + _description_ + feature_map : Mapping[str, str] + _description_ + target_features : Set[str] + _description_ + + Returns + ------- + PipeResult + _description_ + """ + pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS) + res = _parse_api_resp_to_df_wrapped(resp) + + if res.status != STATUS_HANDLER.SUCCESS: + pipe.fail(res.status) + return pipe + + df = res.result + res = parse.process_features_wrapped( df, feature_map=feature_map, target_features=target_features, ) + if res.status != STATUS_HANDLER.SUCCESS: + pipe.fail(res.status) + return pipe - return df + pipe.success(res.unwrap(), res.status) + return pipe -def sales_per_customer( - data: pd.DataFrame, - customer_id: int, +def _process_sales_per_customer( + pipe: PipeResult[SalesPrognosisResultsExport], + # company_id: int, min_num_data_points: int = 100, -) -> PipeResult: - """_summary_ +) -> PipeResult[SalesPrognosisResultsExport]: + """n = 1 + Input-Data: + fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"] + - data already prefiltered if customer ID was provided + ("firma_refid" same for all entries) Parameters ---------- - df : pd.DataFrame - Input DF: table "f_umsatz_fakt" - kunde : int - customer ID (FK "firma_ref_ID") + pipe : PipeResult + _description_ min_num_data_points : int, optional - minimum number of data points to obtain result, by default 100 + _description_, by default 100 Returns ------- - FcResult + PipeResult _description_ """ + cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() # filter data # TODO change away from nested DataFrames: just use "f_umsatz_fakt" # TODO with strong type checks + data = pipe.data + assert data is not None, "processing not existing pipe result" data = data.copy() - df_firma = data[ - (data["firma_refid"] == customer_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0) - ] + # df_firma = data[ + # (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0) + # ] - for transaction in df_firma["vorgang_refid"].unique(): - cust_data.order.append(transaction) - cust_data.date.append( - df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0] - ) - cust_data.sales.append( - df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum() - ) + # for transaction in df_firma["vorgang_refid"].unique(): + # cust_data.order.append(transaction) + # cust_data.date.append( + # df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0] + # ) + # cust_data.sales.append( + # df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum() + # ) - df_cust = pd.DataFrame(dc.asdict(cust_data)) - df_cust = df_cust.sort_values(by="date").reset_index() + # df_cust = pd.DataFrame(dc.asdict(cust_data)) + # df_cust = df_cust.sort_values(by="date").reset_index() + + DATE_FEAT: Final[str] = "buchungs_datum" + SALES_FEAT: Final[str] = "betrag" + df_firma = data[(data["betrag"] > 0)] + df_cust = df_firma.copy() + df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() # check data availability if len(df_cust) < min_num_data_points: - return PipeResult(status=STATUS_HANDLER.pipe_states.TOO_FEW_POINTS, data=None) + pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS) + return pipe else: # Entwicklung der Umsätze: definierte Zeiträume Monat - df_cust["year"] = df_cust["date"].dt.year - df_cust["month"] = df_cust["date"].dt.month + df_cust["jahr"] = df_cust[DATE_FEAT].dt.year + df_cust["monat"] = df_cust[DATE_FEAT].dt.month - monthly_sum = df_cust.groupby(["year", "month"])["sales"].sum().reset_index() - monthly_sum["date"] = ( - monthly_sum["month"].astype(str) + "." + monthly_sum["year"].astype(str) + monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() + monthly_sum[DATE_FEAT] = ( + monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) ) - monthly_sum["date"] = pd.to_datetime(monthly_sum["date"], format="%m.%Y") - monthly_sum = monthly_sum.set_index("date") + monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") + monthly_sum = monthly_sum.set_index(DATE_FEAT) train = monthly_sum.iloc[:-5].copy() test = monthly_sum.iloc[-5:].copy() - features = ["year", "month"] - target = "sales" + features = ["jahr", "monat"] + target = SALES_FEAT X_train, y_train = train[features], train[target] X_test, y_test = test[features], test[target] @@ -138,8 +244,86 @@ def sales_per_customer( X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100 ) - test.loc[:, "prediction"] = reg.predict(X_test) + test.loc[:, "vorhersage"] = reg.predict(X_test) test = test.reset_index(drop=True) - # umsetzung, prognose - return PipeResult(status=STATUS_HANDLER.pipe_states.SUCCESS, data=test) + pipe.success(test, STATUS_HANDLER.SUCCESS) + return pipe + + +def _postprocess_sales_per_customer( + pipe: PipeResult[SalesPrognosisResultsExport], + feature_map: Mapping[str, str], +) -> PipeResult[SalesPrognosisResultsExport]: + data = pipe.data + assert data is not None, "processing not existing pipe result" + # convert features back to original naming + res = parse.process_features_wrapped( + data, + feature_map=feature_map, + target_features=set(), + ) + if res.status != STATUS_HANDLER.SUCCESS: + pipe.fail(res.status) + return pipe + + # res = _parse_df_to_api_resp_wrapped(res.result) + res = _parse_df_to_results_wrapped(res.unwrap()) + if res.status != STATUS_HANDLER.SUCCESS: + pipe.fail(res.status) + return pipe + + export_response = SalesPrognosisResultsExport( + response=res.unwrap(), + status=res.status, + ) + pipe.export(export_response) + + return pipe + + +def _export_on_fail( + status: Status, +) -> SalesPrognosisResultsExport: + response = SalesPrognosisResults(daten=tuple()) + return SalesPrognosisResultsExport(response=response, status=status) + + +def pipeline( + session: Session, + company_id: int | None = None, + start_date: Datetime | None = None, +) -> SalesPrognosisResultsExport: + response, status = get_sales_prognosis_data( + session, + company_id=company_id, + start_date=start_date, + ) + if status != STATUS_HANDLER.SUCCESS: + return _export_on_fail(status) + + pipe = _preprocess_sales_per_customer( + response, + feature_map=COL_MAP_SALES_PROGNOSIS, + target_features=FEATURES_SALES_PROGNOSIS, + ) + if pipe.status != STATUS_HANDLER.SUCCESS: + return _export_on_fail(pipe.status) + + pipe = _process_sales_per_customer( + pipe, + min_num_data_points=MIN_NUMBER_DATAPOINTS, + ) + if pipe.status != STATUS_HANDLER.SUCCESS: + return _export_on_fail(pipe.status) + + pipe = _postprocess_sales_per_customer( + pipe, + feature_map=DualDict(), + ) + if pipe.status != STATUS_HANDLER.SUCCESS: + return _export_on_fail(pipe.status) + + assert pipe.results is not None, "needed export response not set in pipeline" + + return pipe.results diff --git a/tests/_test_data/exmp_sales_prognosis_ouput.pkl b/tests/_test_data/exmp_sales_prognosis_ouput.pkl new file mode 100644 index 0000000..0ddfa5c Binary files /dev/null and b/tests/_test_data/exmp_sales_prognosis_ouput.pkl differ diff --git a/tests/analysis/test_forecast.py b/tests/analysis/test_forecast.py index 5ee2112..38cf4e2 100644 --- a/tests/analysis/test_forecast.py +++ b/tests/analysis/test_forecast.py @@ -1,25 +1,107 @@ +import importlib +from datetime import datetime as Datetime +from pathlib import Path +from unittest.mock import patch + +import pandas as pd +import pytest +from pydantic import ValidationError + from delta_barth.analysis import forecast as fc +from delta_barth.errors import STATUS_HANDLER +from delta_barth.types import DualDict, PipeResult -def test_sales_per_customer_success(sales_data): - customer_id = 1133 - res = fc.sales_per_customer(sales_data, customer_id) - - assert res.status.code == 0 - assert res.data is not None +@pytest.fixture(scope="function") +def feature_map() -> DualDict[str, str]: + return DualDict( + artikelId="artikel_refid", + firmaId="firma_refid", + betrag="betrag", + menge="menge", + buchungsDatum="buchungs_datum", + ) -def test_sales_per_customer_too_few_data_points(sales_data): - customer_id = 1000 - res = fc.sales_per_customer(sales_data, customer_id) +@pytest.fixture(scope="module") +def target_features() -> frozenset[str]: + return frozenset( + ( + "firma_refid", + "betrag", + "buchungs_datum", + ) + ) - assert res.status.code == 1 - assert res.data is None + +@pytest.fixture(scope="function") +def valid_df() -> pd.DataFrame: + data = { + "artikelId": [1, 2, 3], + "warengruppeId": [1, 2, 3], + "firmaId": [100, 200, 300], + "betrag": [1200.25, 1500.50, 1750.75], + "menge": [100, 200, 300], + "buchungsDatum": [Datetime(2024, 1, 1), Datetime(2024, 6, 1), Datetime(2024, 10, 26)], + } + return pd.DataFrame(data) + + +@pytest.fixture(scope="function") +def invalid_df() -> pd.DataFrame: + data = { + "artikelId": [1, 2, 3], + "warengruppeId": [1, 2, 3], + "firmaId": [100, 200, 300], + "betrag": [1200.25, 1500.50, 1750.75], + "menge": [100, 200, 300], + "buchungsDatum": ["test", "test2", "test3"], + } + return pd.DataFrame(data) + + +@pytest.fixture(scope="function") +def valid_results() -> pd.DataFrame: + data = { + "jahr": [2023, 2023, 2024], + "monat": [1, 2, 3], + "betrag": [100, 200, 300], + "vorhersage": [1200.25, 1500.50, 1750.75], + } + return pd.DataFrame(data) + + +@pytest.fixture(scope="function") +def invalid_results() -> pd.DataFrame: + data = { + "jahr": [2023, 2023, 2024], + "monat": [1, 2, 3], + "betrag": [100, 200, 300], + "vorhersage": ["test", "test2", "test3"], + } + return pd.DataFrame(data) + + +@pytest.fixture(scope="function") +def sales_data_real_preproc(sales_data_real, feature_map) -> pd.DataFrame: + data = sales_data_real.copy() + data_feats = data.columns + mapped_feats: list[str] = [] + + for feat in data_feats: + if feat in feature_map: + mapped_feats.append(feature_map[feat]) + else: + mapped_feats.append(feat) + + data.columns = mapped_feats + + return data def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp): resp = exmpl_api_sales_prognosis_resp - df = fc.parse_api_resp_to_df(resp) + df = fc._parse_api_resp_to_df(resp) features = set( ( "artikelId", @@ -33,26 +115,131 @@ def test_parse_api_resp_to_df(exmpl_api_sales_prognosis_resp): assert all(col in features for col in df.columns) -def test_preprocess_sales_per_customer(exmpl_api_sales_prognosis_resp): +def test_parse_df_to_api_resp_ValidData(valid_df): + ret = fc._parse_df_to_api_resp(valid_df) + assert len(ret.daten) > 0 + + +def test_parse_df_to_api_resp_InvalidData(invalid_df): + with pytest.raises(ValidationError): + _ = fc._parse_df_to_api_resp(invalid_df) + + +def test_parse_df_to_results_ValidData(valid_results): + ret = fc._parse_df_to_results(valid_results) + assert len(ret.daten) > 0 + + +def test_parse_df_to_results_InvalidData(invalid_results): + with pytest.raises(ValidationError): + _ = fc._parse_df_to_results(invalid_results) + + +def test_preprocess_sales_per_customer_Success( + exmpl_api_sales_prognosis_resp, + feature_map, + target_features, +): resp = exmpl_api_sales_prognosis_resp - feat_mapping: dict[str, str] = { - "artikelId": "artikel_refid", - "firmaId": "firma_refid", - "betrag": "betrag", - "menge": "menge", - "buchungsDatum": "buchungs_datum", - } - target_features: frozenset[str] = frozenset( - ( - "firma_refid", - "betrag", - "buchungs_datum", - ) - ) - df = fc.preprocess_sales_per_customer( + pipe = fc._preprocess_sales_per_customer( resp, - feature_map=feat_mapping, + feature_map=feature_map, target_features=target_features, ) + assert pipe.status == STATUS_HANDLER.SUCCESS + assert pipe.data is not None + df = pipe.data assert len(df.columns) == 5 - assert any(feat not in df.columns for feat in feat_mapping.keys()) + assert any(feat not in df.columns for feat in feature_map.keys()) + + +def test_preprocess_sales_per_customer_FailOnTargetFeature( + exmpl_api_sales_prognosis_resp, + feature_map, + target_features, +): + resp = exmpl_api_sales_prognosis_resp + target_features = {"not_known_feature", "test2"} + pipe = fc._preprocess_sales_per_customer( + resp, + feature_map=feature_map, + target_features=target_features, + ) + assert pipe.status.code != 0 + assert pipe.data is None + assert pipe.results is None + + +def test_sales_per_customer_Success(sales_data_real_preproc): + data = sales_data_real_preproc.copy() + # fc._preprocess_sales_per_customer() + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + pipe = fc._process_sales_per_customer(pipe) + + assert pipe.status == STATUS_HANDLER.SUCCESS + assert pipe.data is not None + assert pipe.results is None + + +def test_sales_per_customer_FailTooFewPoints(sales_data_real_preproc): + data = sales_data_real_preproc.copy() + data = data.iloc[:20, :] + # fc._preprocess_sales_per_customer() + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + pipe = fc._process_sales_per_customer(pipe) + + assert pipe.status != STATUS_HANDLER.SUCCESS + assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS + assert pipe.data is None + assert pipe.results is None + + +def test_postprocess_sales_per_customer_Success( + valid_results, +): + data = valid_results + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + + pipe = fc._postprocess_sales_per_customer( + pipe, + feature_map=DualDict(), + ) + assert pipe.status == STATUS_HANDLER.SUCCESS + assert pipe.data is None + assert pipe.results is not None + + +def test_postprocess_sales_per_customer_FailValidation( + invalid_results, +): + data = invalid_results + pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) + + pipe = fc._postprocess_sales_per_customer( + pipe, + feature_map=DualDict(), + ) + assert pipe.status != STATUS_HANDLER.SUCCESS + assert pipe.data is None + assert pipe.results is None + assert "ValidationError" in pipe.status.description + + +@pytest.mark.new +def test_sales_prognosis_pipeline(monkeypatch, exmpl_api_sales_prognosis_resp): + def mock_request(*args, **kwargs): + return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS + + import delta_barth.api.requests + + monkeypatch.setattr(delta_barth.api.requests, "get_sales_prognosis_data", mock_request) + importlib.reload(delta_barth.api.requests) + with patch( + "delta_barth.api.requests.get_sales_prognosis_data", + new=mock_request, + ): + importlib.reload(delta_barth.analysis.forecast) # type: ignore + result = fc.pipeline(None) # type: ignore + + assert result.status == STATUS_HANDLER.SUCCESS + assert len(result.response.daten) > 0 diff --git a/tests/conftest.py b/tests/conftest.py index 580d573..33b4670 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,7 +45,7 @@ def _cvt_str_ts(value: str) -> Any: @pytest.fixture(scope="session") -def sales_data() -> pd.DataFrame: +def sales_data_db_export() -> pd.DataFrame: pwd = Path.cwd() assert "barth" in pwd.parent.name.lower(), "not in project root directory" data_pth = pwd / "./tests/_test_data/swm_f_umsatz_fakt.csv" @@ -63,6 +63,22 @@ def sales_data() -> pd.DataFrame: return data +@pytest.fixture(scope="session") +def sales_data_real() -> pd.DataFrame: + pwd = Path.cwd() + assert "barth" in pwd.parent.name.lower(), "not in project root directory" + data_pth = pwd / "./tests/_test_data/exmp_sales_prognosis_resp.json" + assert data_pth.exists(), "file to API sales data not found" + + with open(data_pth, "r") as file: + data = json.load(file) + + parsed = SalesPrognosisResponse(**data) + data = parsed.model_dump()["daten"] + + return pd.DataFrame(data) + + @pytest.fixture(scope="session") def exmpl_api_sales_prognosis_resp() -> SalesPrognosisResponse: pwd = Path.cwd() @@ -74,3 +90,13 @@ def exmpl_api_sales_prognosis_resp() -> SalesPrognosisResponse: data = json.load(file) return SalesPrognosisResponse(**data) + + +@pytest.fixture(scope="session") +def exmpl_api_sales_prognosis_output() -> pd.DataFrame: + pwd = Path.cwd() + assert "barth" in pwd.parent.name.lower(), "not in project root directory" + data_pth = pwd / "./tests/_test_data/exmp_sales_prognosis_ouput.pkl" + assert data_pth.exists(), "file to API sales data not found" + + return pd.read_pickle(data_pth)