add dummy data pipeline
This commit is contained in:
BIN
src/delta_barth/_dummy_data/exmp_sales_prognosis_output.pkl
Normal file
BIN
src/delta_barth/_dummy_data/exmp_sales_prognosis_output.pkl
Normal file
Binary file not shown.
@@ -1,6 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses as dc
|
||||
from collections.abc import Mapping, Set
|
||||
from datetime import datetime as Datetime
|
||||
from typing import TYPE_CHECKING, Final
|
||||
@@ -18,6 +17,7 @@ from delta_barth.api.requests import (
|
||||
)
|
||||
from delta_barth.constants import (
|
||||
COL_MAP_SALES_PROGNOSIS,
|
||||
DUMMY_DATA_PATH,
|
||||
FEATURES_SALES_PROGNOSIS,
|
||||
MIN_NUMBER_DATAPOINTS,
|
||||
)
|
||||
@@ -52,12 +52,12 @@ def _parse_api_resp_to_df(
|
||||
return pd.DataFrame(data)
|
||||
|
||||
|
||||
def _parse_df_to_api_resp(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResponse:
|
||||
df_formatted = data.to_dict(orient="records")
|
||||
# def _parse_df_to_api_resp(
|
||||
# data: pd.DataFrame,
|
||||
# ) -> SalesPrognosisResponse:
|
||||
# df_formatted = data.to_dict(orient="records")
|
||||
|
||||
return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
|
||||
# return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
|
||||
|
||||
|
||||
def _parse_df_to_results(
|
||||
@@ -75,11 +75,11 @@ def _parse_api_resp_to_df_wrapped(
|
||||
return _parse_api_resp_to_df(resp)
|
||||
|
||||
|
||||
@wrap_result()
|
||||
def _parse_df_to_api_resp_wrapped(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResponse:
|
||||
return _parse_df_to_api_resp(data)
|
||||
# @wrap_result()
|
||||
# def _parse_df_to_api_resp_wrapped(
|
||||
# data: pd.DataFrame,
|
||||
# ) -> SalesPrognosisResponse:
|
||||
# return _parse_df_to_api_resp(data)
|
||||
|
||||
|
||||
@wrap_result()
|
||||
@@ -261,7 +261,6 @@ def _postprocess_sales(
|
||||
pipe.fail(res.status)
|
||||
return pipe
|
||||
|
||||
# res = _parse_df_to_api_resp_wrapped(res.result)
|
||||
res = _parse_df_to_results_wrapped(res.unwrap())
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
@@ -327,6 +326,21 @@ def pipeline_sales_dummy(
|
||||
session: Session,
|
||||
company_id: int | None = None,
|
||||
start_date: Datetime | None = None,
|
||||
) -> SalesPrognosisResultsExport: # pragma: no cover
|
||||
) -> SalesPrognosisResultsExport:
|
||||
"""prototype dummy function for tests by DelBar"""
|
||||
...
|
||||
_, _, _ = session, company_id, start_date
|
||||
|
||||
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
||||
assert data_pth.exists(), "sales forecast dummy data not existent"
|
||||
data = pd.read_pickle(data_pth)
|
||||
pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS)
|
||||
res = _parse_df_to_results_wrapped(data)
|
||||
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
return _export_on_fail(res.status)
|
||||
|
||||
return SalesPrognosisResultsExport(
|
||||
response=res.unwrap(),
|
||||
status=res.status,
|
||||
)
|
||||
|
||||
@@ -1,8 +1,17 @@
|
||||
import enum
|
||||
from pathlib import Path
|
||||
from typing import Final
|
||||
|
||||
from delta_barth.types import DualDict, HttpContentHeaders
|
||||
|
||||
# ** lib path
|
||||
lib_path = Path(__file__).parent
|
||||
assert lib_path is not None, "lib path not resolved"
|
||||
LIB_PATH: Final[Path] = lib_path
|
||||
dummy_data_pth = LIB_PATH / "_dummy_data"
|
||||
assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
|
||||
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
|
||||
|
||||
# ** error handling
|
||||
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
||||
DEFAULT_API_ERR_CODE: Final[int] = 400
|
||||
|
||||
@@ -16,3 +16,18 @@ def pipeline_sales_forecast(
|
||||
status = JsonStatus(result.status.model_dump_json())
|
||||
|
||||
return response, status
|
||||
|
||||
|
||||
def pipeline_sales_forecast_dummy(
|
||||
company_id: int | None,
|
||||
start_date: Datetime | None,
|
||||
) -> tuple[JsonResponse, JsonStatus]:
|
||||
result = forecast.pipeline_sales_dummy(
|
||||
SESSION,
|
||||
company_id=company_id,
|
||||
start_date=start_date,
|
||||
)
|
||||
response = JsonResponse(result.response.model_dump_json())
|
||||
status = JsonStatus(result.status.model_dump_json())
|
||||
|
||||
return response, status
|
||||
|
||||
Reference in New Issue
Block a user