472 lines
14 KiB
Python
472 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import datetime
|
|
# --- new: for calculating timedelta
|
|
from dateutil.relativedelta import relativedelta
|
|
import math
|
|
from collections.abc import Mapping, Set
|
|
from dataclasses import asdict
|
|
from datetime import datetime as Datetime
|
|
from typing import TYPE_CHECKING, Final, TypeAlias, cast
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import scipy.stats
|
|
import sqlalchemy as sql
|
|
from sklearn.metrics import mean_absolute_error, r2_score
|
|
from sklearn.model_selection import KFold, RandomizedSearchCV
|
|
from xgboost import XGBRegressor
|
|
|
|
from delta_barth import databases
|
|
from delta_barth.analysis import parse
|
|
from delta_barth.api.requests import (
|
|
SalesPrognosisResponse,
|
|
SalesPrognosisResponseEntry,
|
|
SalesPrognosisResults,
|
|
SalesPrognosisResultsExport,
|
|
get_sales_prognosis_data,
|
|
)
|
|
from delta_barth.constants import (
|
|
COL_MAP_SALES_PROGNOSIS,
|
|
DEFAULT_DB_ERR_CODE,
|
|
DUMMY_DATA_PATH,
|
|
FEATURES_SALES_PROGNOSIS,
|
|
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
|
SALES_MIN_NUM_DATAPOINTS,
|
|
)
|
|
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
|
from delta_barth.logging import logger_db, logger_pipelines
|
|
from delta_barth.management import SESSION
|
|
from delta_barth.types import (
|
|
BestParametersXGBRegressor,
|
|
DualDict,
|
|
ParamSearchXGBRegressor,
|
|
PipeResult,
|
|
SalesForecastStatistics,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from delta_barth.session import Session
|
|
from delta_barth.types import Status
|
|
|
|
ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics]
|
|
|
|
|
|
def _parse_api_resp_to_df(
|
|
resp: SalesPrognosisResponse,
|
|
) -> pd.DataFrame:
|
|
"""n >= 2
|
|
|
|
Parameters
|
|
----------
|
|
resp : SalesPrognosisResponse
|
|
_description_
|
|
|
|
Returns
|
|
-------
|
|
pd.DataFrame
|
|
_description_
|
|
"""
|
|
data = resp.model_dump()["daten"]
|
|
|
|
if not data:
|
|
target_features = SalesPrognosisResponseEntry.__annotations__.keys()
|
|
data = {feat: [] for feat in target_features}
|
|
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
def _parse_df_to_results(
|
|
data: pd.DataFrame,
|
|
) -> SalesPrognosisResults:
|
|
df_formatted = data.to_dict(orient="records")
|
|
|
|
return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore
|
|
|
|
|
|
def _write_sales_forecast_stats(
|
|
stats: SalesForecastStatistics,
|
|
) -> None:
|
|
stats_db = asdict(stats)
|
|
_ = stats_db.pop("xgb_params")
|
|
xgb_params = stats.xgb_params
|
|
|
|
with SESSION.db_engine.begin() as conn:
|
|
res = conn.execute(sql.insert(databases.sf_stats).values(stats_db))
|
|
sf_id = cast(int, res.inserted_primary_key[0]) # type: ignore
|
|
if xgb_params is not None:
|
|
xgb_params["forecast_id"] = sf_id
|
|
conn.execute(sql.insert(databases.sf_XGB).values(xgb_params))
|
|
|
|
|
|
@wrap_result()
|
|
def _parse_api_resp_to_df_wrapped(
|
|
resp: SalesPrognosisResponse,
|
|
) -> pd.DataFrame:
|
|
return _parse_api_resp_to_df(resp)
|
|
|
|
|
|
@wrap_result()
|
|
def _parse_df_to_results_wrapped(
|
|
data: pd.DataFrame,
|
|
) -> SalesPrognosisResults:
|
|
return _parse_df_to_results(data)
|
|
|
|
|
|
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
|
def _write_sales_forecast_stats_wrapped(
|
|
stats: SalesForecastStatistics,
|
|
) -> None:
|
|
return _write_sales_forecast_stats(stats)
|
|
|
|
|
|
def _preprocess_sales(
|
|
resp: SalesPrognosisResponse,
|
|
feature_map: Mapping[str, str],
|
|
target_features: Set[str],
|
|
) -> ForecastPipe:
|
|
"""n = 1
|
|
|
|
Parameters
|
|
----------
|
|
resp : SalesPrognosisResponse
|
|
_description_
|
|
feature_map : Mapping[str, str]
|
|
_description_
|
|
target_features : Set[str]
|
|
_description_
|
|
|
|
Returns
|
|
-------
|
|
PipeResult
|
|
_description_
|
|
"""
|
|
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
|
|
res = _parse_api_resp_to_df_wrapped(resp)
|
|
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
pipe.fail(res.status)
|
|
return pipe
|
|
|
|
df = res.result
|
|
res = parse.process_features_wrapped(
|
|
df,
|
|
feature_map=feature_map,
|
|
target_features=target_features,
|
|
)
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
pipe.fail(res.status)
|
|
return pipe
|
|
|
|
pipe.success(res.unwrap(), res.status)
|
|
return pipe
|
|
|
|
|
|
def _process_sales(
|
|
pipe: ForecastPipe,
|
|
min_num_data_points: int,
|
|
base_num_data_points_months: int,
|
|
) -> ForecastPipe:
|
|
"""n = 1
|
|
Input-Data:
|
|
fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"]
|
|
- data already prefiltered if customer ID was provided
|
|
("firma_refid" same for all entries)
|
|
|
|
Parameters
|
|
----------
|
|
pipe : PipeResult
|
|
_description_
|
|
min_num_data_points : int, optional
|
|
_description_, by default 100
|
|
|
|
Returns
|
|
-------
|
|
PipeResult
|
|
_description_
|
|
"""
|
|
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
|
|
|
|
# filter data
|
|
data = pipe.data
|
|
assert data is not None, "processing not existing pipe result"
|
|
|
|
DATE_FEAT: Final[str] = "buchungs_datum"
|
|
SALES_FEAT: Final[str] = "betrag"
|
|
df_firma = data[(data["betrag"] > 0)]
|
|
df_cust = df_firma.copy()
|
|
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
|
len_ds = len(df_cust)
|
|
|
|
if len_ds < min_num_data_points:
|
|
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
|
|
pipe.fail(status)
|
|
stats = SalesForecastStatistics(status.code, status.description, len_ds)
|
|
pipe.stats(stats)
|
|
return pipe
|
|
|
|
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
|
|
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
|
|
|
|
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
|
monthly_sum[DATE_FEAT] = (
|
|
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
|
|
)
|
|
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
|
|
monthly_sum = monthly_sum.set_index(DATE_FEAT)
|
|
|
|
features = ["jahr", "monat"]
|
|
target = SALES_FEAT
|
|
|
|
# --- new: dates and forecast
|
|
last_date = pd.to_datetime(datetime.now().strftime("%m.%Y"), format="%m.%Y")
|
|
future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), periods=6, freq="MS")
|
|
forecast = pd.DataFrame({"datum": future_dates.strftime("%m.%Y")}).set_index("datum")
|
|
|
|
# Randomized Search
|
|
kfold = KFold(n_splits=5, shuffle=True)
|
|
params: ParamSearchXGBRegressor = {
|
|
"n_estimators": scipy.stats.poisson(mu=1000),
|
|
"learning_rate": [0.03, 0.04, 0.05],
|
|
"max_depth": range(2, 9),
|
|
"min_child_weight": range(1, 5),
|
|
"gamma": [1],
|
|
"subsample": [0.8],
|
|
"colsample_bytree": [0.8],
|
|
"early_stopping_rounds": [20, 50],
|
|
}
|
|
|
|
# --- new: best_estimator (internal usage only)
|
|
best_estimator = None
|
|
|
|
best_params: BestParametersXGBRegressor | None = None
|
|
best_score_mae: float | None = float("inf")
|
|
best_score_r2: float | None = None
|
|
best_start_year: int | None = None
|
|
too_few_month_points: bool = True
|
|
forecast: pd.DataFrame | None = None
|
|
|
|
dates = monthly_sum.index
|
|
# --- new: use monthly basis for time windows
|
|
starting_date = datetime.now() - relativedelta(months=36)
|
|
#starting_date = dates.max() - relativedelta(months=36)
|
|
start_index = next((i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1)
|
|
|
|
for index, i in enumerate(range(start_index, -1, -12)):
|
|
first_date = dates[i]
|
|
split_date = dates[-6]
|
|
|
|
train = cast(
|
|
pd.DataFrame,
|
|
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
|
|
)
|
|
test = cast(
|
|
pd.DataFrame,
|
|
monthly_sum.loc[split_date:].copy(), # type: ignore
|
|
)
|
|
X_train, X_test = train[features], test[features]
|
|
y_train, y_test = train[target], test[target]
|
|
|
|
# --- new: adapted condition to fit new for-loop
|
|
if len(train) >= 30 + 10 * index:
|
|
too_few_month_points = False
|
|
|
|
rand = RandomizedSearchCV(
|
|
XGBRegressor(),
|
|
params,
|
|
scoring="neg_mean_absolute_error",
|
|
cv=kfold,
|
|
n_jobs=-1,
|
|
n_iter=100,
|
|
verbose=0,
|
|
)
|
|
rand.fit(
|
|
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
|
|
)
|
|
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
|
|
|
|
if len(np.unique(y_pred)) != 1:
|
|
error = cast(float, mean_absolute_error(y_test, y_pred))
|
|
if error < best_score_mae:
|
|
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
|
|
best_score_mae = error
|
|
best_score_r2 = cast(float, r2_score(y_test, y_pred))
|
|
# --- new: use first_date for best_start_year
|
|
best_start_year = first_date.year
|
|
# --- new: store best_estimator
|
|
best_estimator = rand.best_estimator_
|
|
|
|
# --- new: use best_estimator to calculate future values and store them in forecast
|
|
if best_estimator is not None:
|
|
X_future = pd.DataFrame({"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates)
|
|
y_future = best_estimator.predict(X_future)
|
|
forecast["vorhersage"] = y_future
|
|
|
|
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
|
|
|
if too_few_month_points:
|
|
status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
|
|
pipe.fail(status)
|
|
stats = SalesForecastStatistics(status.code, status.description, len_ds)
|
|
pipe.stats(stats)
|
|
return pipe
|
|
elif best_params is None:
|
|
status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
|
|
pipe.fail(status)
|
|
stats = SalesForecastStatistics(status.code, status.description, len_ds)
|
|
pipe.stats(stats)
|
|
return pipe
|
|
|
|
assert forecast is not None, "forecast is None, but was attempted to be returned"
|
|
status = STATUS_HANDLER.SUCCESS
|
|
pipe.success(forecast, status)
|
|
stats = SalesForecastStatistics(
|
|
status.code,
|
|
status.description,
|
|
len_ds,
|
|
score_mae=best_score_mae,
|
|
score_r2=best_score_r2,
|
|
best_start_year=best_start_year,
|
|
xgb_params=best_params,
|
|
)
|
|
pipe.stats(stats)
|
|
|
|
return pipe
|
|
|
|
|
|
def _postprocess_sales(
|
|
pipe: ForecastPipe,
|
|
feature_map: Mapping[str, str],
|
|
) -> ForecastPipe:
|
|
data = pipe.data
|
|
assert data is not None, "processing not existing pipe result"
|
|
# convert features back to original naming
|
|
res = parse.process_features_wrapped(
|
|
data,
|
|
feature_map=feature_map,
|
|
target_features=set(),
|
|
)
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
pipe.fail(res.status)
|
|
return pipe
|
|
|
|
res = _parse_df_to_results_wrapped(res.unwrap())
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
pipe.fail(res.status)
|
|
return pipe
|
|
|
|
export_response = SalesPrognosisResultsExport(
|
|
response=res.unwrap(),
|
|
status=res.status,
|
|
)
|
|
pipe.export(export_response)
|
|
|
|
return pipe
|
|
|
|
|
|
def _export_on_fail(
|
|
status: Status,
|
|
) -> SalesPrognosisResultsExport:
|
|
response = SalesPrognosisResults(daten=tuple())
|
|
return SalesPrognosisResultsExport(response=response, status=status)
|
|
|
|
|
|
def pipeline_sales_forecast(
|
|
session: Session,
|
|
company_id: int | None = None,
|
|
start_date: Datetime | None = None,
|
|
) -> SalesPrognosisResultsExport:
|
|
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
|
|
response, status = get_sales_prognosis_data(
|
|
session,
|
|
company_id=company_id,
|
|
start_date=start_date,
|
|
)
|
|
if status != STATUS_HANDLER.SUCCESS:
|
|
logger_pipelines.error(
|
|
"Error during sales forecast data retrieval, Status: %s",
|
|
status,
|
|
stack_info=True,
|
|
)
|
|
return _export_on_fail(status)
|
|
|
|
pipe = _preprocess_sales(
|
|
response,
|
|
feature_map=COL_MAP_SALES_PROGNOSIS,
|
|
target_features=FEATURES_SALES_PROGNOSIS,
|
|
)
|
|
if pipe.status != STATUS_HANDLER.SUCCESS:
|
|
logger_pipelines.error(
|
|
"Error during sales forecast preprocessing, Status: %s",
|
|
pipe.status,
|
|
stack_info=True,
|
|
)
|
|
return _export_on_fail(pipe.status)
|
|
|
|
pipe = _process_sales(
|
|
pipe,
|
|
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
|
|
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
|
)
|
|
if pipe.statistics is not None:
|
|
res = _write_sales_forecast_stats_wrapped(pipe.statistics)
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
logger_db.error(
|
|
"[DB] Error during write process of sales forecast statistics: %s",
|
|
res.status,
|
|
)
|
|
if pipe.status != STATUS_HANDLER.SUCCESS:
|
|
logger_pipelines.error(
|
|
"Error during sales forecast main processing, Status: %s",
|
|
pipe.status,
|
|
stack_info=True,
|
|
)
|
|
return _export_on_fail(pipe.status)
|
|
|
|
pipe = _postprocess_sales(
|
|
pipe,
|
|
feature_map=DualDict(),
|
|
)
|
|
if pipe.status != STATUS_HANDLER.SUCCESS:
|
|
logger_pipelines.error(
|
|
"Error during sales forecast postprocessing, Status: %s",
|
|
pipe.status,
|
|
stack_info=True,
|
|
)
|
|
return _export_on_fail(pipe.status)
|
|
|
|
assert pipe.results is not None, "needed export response not set in pipeline"
|
|
|
|
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
|
|
|
|
return pipe.results
|
|
|
|
|
|
def pipeline_sales_dummy(
|
|
session: Session,
|
|
company_id: int | None = None,
|
|
start_date: Datetime | None = None,
|
|
) -> SalesPrognosisResultsExport:
|
|
"""prototype dummy function for tests by DelBar"""
|
|
|
|
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
|
|
|
|
_, _, _ = session, company_id, start_date
|
|
|
|
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
|
assert data_pth.exists(), "sales forecast dummy data not existent"
|
|
data = pd.read_pickle(data_pth)
|
|
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
|
|
res = _parse_df_to_results_wrapped(data)
|
|
|
|
if res.status != STATUS_HANDLER.SUCCESS:
|
|
pipe.fail(res.status)
|
|
return _export_on_fail(res.status)
|
|
|
|
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
|
|
|
|
return SalesPrognosisResultsExport(
|
|
response=res.unwrap(),
|
|
status=res.status,
|
|
)
|