major overhaul for integrated pipeline format
This commit is contained in:
@@ -2,31 +2,96 @@ from __future__ import annotations
|
||||
|
||||
import dataclasses as dc
|
||||
from collections.abc import Mapping, Set
|
||||
from typing import TYPE_CHECKING
|
||||
from datetime import datetime as Datetime
|
||||
from typing import TYPE_CHECKING, Final
|
||||
|
||||
import pandas as pd
|
||||
from sklearn.metrics import mean_squared_error
|
||||
from xgboost import XGBRegressor
|
||||
|
||||
from delta_barth.analysis import parse
|
||||
from delta_barth.constants import COL_MAP_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS
|
||||
from delta_barth.errors import STATUS_HANDLER
|
||||
from delta_barth.types import CustomerDataSalesForecast, DataPipeStates, PipeResult
|
||||
from delta_barth.api.requests import (
|
||||
SalesPrognosisResponse,
|
||||
SalesPrognosisResults,
|
||||
SalesPrognosisResultsExport,
|
||||
get_sales_prognosis_data,
|
||||
)
|
||||
from delta_barth.constants import (
|
||||
COL_MAP_SALES_PROGNOSIS,
|
||||
FEATURES_SALES_PROGNOSIS,
|
||||
MIN_NUMBER_DATAPOINTS,
|
||||
)
|
||||
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
||||
from delta_barth.types import (
|
||||
CustomerDataSalesForecast,
|
||||
DataPipeStates,
|
||||
DualDict,
|
||||
ExportResponse,
|
||||
PipeResult,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from delta_barth.api.requests import SalesPrognosisResponse
|
||||
|
||||
# TODO check pandera for DataFrame validation
|
||||
from delta_barth.api.common import Session
|
||||
from delta_barth.types import Status
|
||||
|
||||
|
||||
def parse_api_resp_to_df(
|
||||
def _parse_api_resp_to_df(
|
||||
resp: SalesPrognosisResponse,
|
||||
) -> pd.DataFrame:
|
||||
"""n >= 2
|
||||
|
||||
Parameters
|
||||
----------
|
||||
resp : SalesPrognosisResponse
|
||||
_description_
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
_description_
|
||||
"""
|
||||
data = resp.model_dump()["daten"]
|
||||
|
||||
return pd.DataFrame(data)
|
||||
|
||||
|
||||
def _parse_df_to_api_resp(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResponse:
|
||||
df_formatted = data.to_dict(orient="records")
|
||||
|
||||
return SalesPrognosisResponse(daten=tuple(df_formatted)) # type: ignore
|
||||
|
||||
|
||||
def _parse_df_to_results(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResults:
|
||||
df_formatted = data.to_dict(orient="records")
|
||||
|
||||
return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore
|
||||
|
||||
|
||||
@wrap_result()
|
||||
def _parse_api_resp_to_df_wrapped(
|
||||
resp: SalesPrognosisResponse,
|
||||
) -> pd.DataFrame: # pragma: no cover
|
||||
return _parse_api_resp_to_df(resp)
|
||||
|
||||
|
||||
@wrap_result()
|
||||
def _parse_df_to_api_resp_wrapped(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResponse: # pragma: no cover
|
||||
return _parse_df_to_api_resp(data)
|
||||
|
||||
|
||||
@wrap_result()
|
||||
def _parse_df_to_results_wrapped(
|
||||
data: pd.DataFrame,
|
||||
) -> SalesPrognosisResults:
|
||||
return _parse_df_to_results(data)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Input:
|
||||
# DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp
|
||||
@@ -44,83 +109,124 @@ def parse_api_resp_to_df(
|
||||
|
||||
# TODO: check usage of separate exception and handle it in API function
|
||||
# TODO set min number of data points as constant, not parameter
|
||||
def preprocess_sales_per_customer(
|
||||
|
||||
|
||||
def _preprocess_sales_per_customer(
|
||||
resp: SalesPrognosisResponse,
|
||||
feature_map: Mapping[str, str],
|
||||
target_features: Set[str],
|
||||
) -> pd.DataFrame:
|
||||
df = parse_api_resp_to_df(resp)
|
||||
df = parse.preprocess_features(
|
||||
) -> PipeResult[SalesPrognosisResultsExport]:
|
||||
"""n = 1
|
||||
|
||||
Parameters
|
||||
----------
|
||||
resp : SalesPrognosisResponse
|
||||
_description_
|
||||
feature_map : Mapping[str, str]
|
||||
_description_
|
||||
target_features : Set[str]
|
||||
_description_
|
||||
|
||||
Returns
|
||||
-------
|
||||
PipeResult
|
||||
_description_
|
||||
"""
|
||||
pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS)
|
||||
res = _parse_api_resp_to_df_wrapped(resp)
|
||||
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
return pipe
|
||||
|
||||
df = res.result
|
||||
res = parse.process_features_wrapped(
|
||||
df,
|
||||
feature_map=feature_map,
|
||||
target_features=target_features,
|
||||
)
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
return pipe
|
||||
|
||||
return df
|
||||
pipe.success(res.unwrap(), res.status)
|
||||
return pipe
|
||||
|
||||
|
||||
def sales_per_customer(
|
||||
data: pd.DataFrame,
|
||||
customer_id: int,
|
||||
def _process_sales_per_customer(
|
||||
pipe: PipeResult[SalesPrognosisResultsExport],
|
||||
# company_id: int,
|
||||
min_num_data_points: int = 100,
|
||||
) -> PipeResult:
|
||||
"""_summary_
|
||||
) -> PipeResult[SalesPrognosisResultsExport]:
|
||||
"""n = 1
|
||||
Input-Data:
|
||||
fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"]
|
||||
- data already prefiltered if customer ID was provided
|
||||
("firma_refid" same for all entries)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
df : pd.DataFrame
|
||||
Input DF: table "f_umsatz_fakt"
|
||||
kunde : int
|
||||
customer ID (FK "firma_ref_ID")
|
||||
pipe : PipeResult
|
||||
_description_
|
||||
min_num_data_points : int, optional
|
||||
minimum number of data points to obtain result, by default 100
|
||||
_description_, by default 100
|
||||
|
||||
Returns
|
||||
-------
|
||||
FcResult
|
||||
PipeResult
|
||||
_description_
|
||||
"""
|
||||
|
||||
cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
|
||||
# filter data
|
||||
# TODO change away from nested DataFrames: just use "f_umsatz_fakt"
|
||||
# TODO with strong type checks
|
||||
data = pipe.data
|
||||
assert data is not None, "processing not existing pipe result"
|
||||
data = data.copy()
|
||||
df_firma = data[
|
||||
(data["firma_refid"] == customer_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0)
|
||||
]
|
||||
# df_firma = data[
|
||||
# (data["firma_refid"] == company_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0)
|
||||
# ]
|
||||
|
||||
for transaction in df_firma["vorgang_refid"].unique():
|
||||
cust_data.order.append(transaction)
|
||||
cust_data.date.append(
|
||||
df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0]
|
||||
)
|
||||
cust_data.sales.append(
|
||||
df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum()
|
||||
)
|
||||
# for transaction in df_firma["vorgang_refid"].unique():
|
||||
# cust_data.order.append(transaction)
|
||||
# cust_data.date.append(
|
||||
# df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0]
|
||||
# )
|
||||
# cust_data.sales.append(
|
||||
# df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum()
|
||||
# )
|
||||
|
||||
df_cust = pd.DataFrame(dc.asdict(cust_data))
|
||||
df_cust = df_cust.sort_values(by="date").reset_index()
|
||||
# df_cust = pd.DataFrame(dc.asdict(cust_data))
|
||||
# df_cust = df_cust.sort_values(by="date").reset_index()
|
||||
|
||||
DATE_FEAT: Final[str] = "buchungs_datum"
|
||||
SALES_FEAT: Final[str] = "betrag"
|
||||
df_firma = data[(data["betrag"] > 0)]
|
||||
df_cust = df_firma.copy()
|
||||
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
||||
|
||||
# check data availability
|
||||
if len(df_cust) < min_num_data_points:
|
||||
return PipeResult(status=STATUS_HANDLER.pipe_states.TOO_FEW_POINTS, data=None)
|
||||
pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
|
||||
return pipe
|
||||
else:
|
||||
# Entwicklung der Umsätze: definierte Zeiträume Monat
|
||||
df_cust["year"] = df_cust["date"].dt.year
|
||||
df_cust["month"] = df_cust["date"].dt.month
|
||||
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
|
||||
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
|
||||
|
||||
monthly_sum = df_cust.groupby(["year", "month"])["sales"].sum().reset_index()
|
||||
monthly_sum["date"] = (
|
||||
monthly_sum["month"].astype(str) + "." + monthly_sum["year"].astype(str)
|
||||
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
||||
monthly_sum[DATE_FEAT] = (
|
||||
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
|
||||
)
|
||||
monthly_sum["date"] = pd.to_datetime(monthly_sum["date"], format="%m.%Y")
|
||||
monthly_sum = monthly_sum.set_index("date")
|
||||
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
|
||||
monthly_sum = monthly_sum.set_index(DATE_FEAT)
|
||||
|
||||
train = monthly_sum.iloc[:-5].copy()
|
||||
test = monthly_sum.iloc[-5:].copy()
|
||||
|
||||
features = ["year", "month"]
|
||||
target = "sales"
|
||||
features = ["jahr", "monat"]
|
||||
target = SALES_FEAT
|
||||
|
||||
X_train, y_train = train[features], train[target]
|
||||
X_test, y_test = test[features], test[target]
|
||||
@@ -138,8 +244,86 @@ def sales_per_customer(
|
||||
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100
|
||||
)
|
||||
|
||||
test.loc[:, "prediction"] = reg.predict(X_test)
|
||||
test.loc[:, "vorhersage"] = reg.predict(X_test)
|
||||
test = test.reset_index(drop=True)
|
||||
|
||||
# umsetzung, prognose
|
||||
return PipeResult(status=STATUS_HANDLER.pipe_states.SUCCESS, data=test)
|
||||
pipe.success(test, STATUS_HANDLER.SUCCESS)
|
||||
return pipe
|
||||
|
||||
|
||||
def _postprocess_sales_per_customer(
|
||||
pipe: PipeResult[SalesPrognosisResultsExport],
|
||||
feature_map: Mapping[str, str],
|
||||
) -> PipeResult[SalesPrognosisResultsExport]:
|
||||
data = pipe.data
|
||||
assert data is not None, "processing not existing pipe result"
|
||||
# convert features back to original naming
|
||||
res = parse.process_features_wrapped(
|
||||
data,
|
||||
feature_map=feature_map,
|
||||
target_features=set(),
|
||||
)
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
return pipe
|
||||
|
||||
# res = _parse_df_to_api_resp_wrapped(res.result)
|
||||
res = _parse_df_to_results_wrapped(res.unwrap())
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
pipe.fail(res.status)
|
||||
return pipe
|
||||
|
||||
export_response = SalesPrognosisResultsExport(
|
||||
response=res.unwrap(),
|
||||
status=res.status,
|
||||
)
|
||||
pipe.export(export_response)
|
||||
|
||||
return pipe
|
||||
|
||||
|
||||
def _export_on_fail(
|
||||
status: Status,
|
||||
) -> SalesPrognosisResultsExport:
|
||||
response = SalesPrognosisResults(daten=tuple())
|
||||
return SalesPrognosisResultsExport(response=response, status=status)
|
||||
|
||||
|
||||
def pipeline(
|
||||
session: Session,
|
||||
company_id: int | None = None,
|
||||
start_date: Datetime | None = None,
|
||||
) -> SalesPrognosisResultsExport:
|
||||
response, status = get_sales_prognosis_data(
|
||||
session,
|
||||
company_id=company_id,
|
||||
start_date=start_date,
|
||||
)
|
||||
if status != STATUS_HANDLER.SUCCESS:
|
||||
return _export_on_fail(status)
|
||||
|
||||
pipe = _preprocess_sales_per_customer(
|
||||
response,
|
||||
feature_map=COL_MAP_SALES_PROGNOSIS,
|
||||
target_features=FEATURES_SALES_PROGNOSIS,
|
||||
)
|
||||
if pipe.status != STATUS_HANDLER.SUCCESS:
|
||||
return _export_on_fail(pipe.status)
|
||||
|
||||
pipe = _process_sales_per_customer(
|
||||
pipe,
|
||||
min_num_data_points=MIN_NUMBER_DATAPOINTS,
|
||||
)
|
||||
if pipe.status != STATUS_HANDLER.SUCCESS:
|
||||
return _export_on_fail(pipe.status)
|
||||
|
||||
pipe = _postprocess_sales_per_customer(
|
||||
pipe,
|
||||
feature_map=DualDict(),
|
||||
)
|
||||
if pipe.status != STATUS_HANDLER.SUCCESS:
|
||||
return _export_on_fail(pipe.status)
|
||||
|
||||
assert pipe.results is not None, "needed export response not set in pipeline"
|
||||
|
||||
return pipe.results
|
||||
|
||||
Reference in New Issue
Block a user