505 lines
15 KiB
Python

from __future__ import annotations
import copy
import datetime
import math
from collections.abc import Mapping, Set
from dataclasses import asdict
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast
import joblib
import numpy as np
import pandas as pd
import scipy.stats
import sqlalchemy as sql
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
from delta_barth import databases
from delta_barth.analysis import parse
from delta_barth.api.requests import (
SalesPrognosisResponse,
SalesPrognosisResponseEntry,
SalesPrognosisResults,
SalesPrognosisResultsExport,
get_sales_prognosis_data,
)
from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
MAX_NUM_WORKERS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_db, logger_pipelines
from delta_barth.management import SESSION
from delta_barth.types import (
BestParametersXGBRegressor,
DualDict,
ParamSearchXGBRegressor,
PipeResult,
SalesForecastStatistics,
)
if TYPE_CHECKING:
from delta_barth.session import Session
from delta_barth.types import Status
ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics]
def _parse_api_resp_to_df(
resp: SalesPrognosisResponse,
) -> pd.DataFrame:
"""n >= 2
Parameters
----------
resp : SalesPrognosisResponse
_description_
Returns
-------
pd.DataFrame
_description_
"""
data = resp.model_dump()["daten"]
if not data:
target_features = SalesPrognosisResponseEntry.__annotations__.keys()
data = {feat: [] for feat in target_features}
return pd.DataFrame(data)
def _parse_df_to_results(
data: pd.DataFrame,
) -> SalesPrognosisResults:
df_formatted = data.to_dict(orient="records")
return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore
def _write_sales_forecast_stats(
stats: SalesForecastStatistics,
) -> None:
stats_db = asdict(stats)
_ = stats_db.pop("xgb_params")
xgb_params = stats.xgb_params
with SESSION.db_engine.begin() as conn:
res = conn.execute(sql.insert(databases.sf_stats).values(stats_db))
sf_id = cast(int, res.inserted_primary_key[0]) # type: ignore
if xgb_params is not None:
xgb_params["forecast_id"] = sf_id
conn.execute(sql.insert(databases.sf_XGB).values(xgb_params))
@wrap_result()
def _parse_api_resp_to_df_wrapped(
resp: SalesPrognosisResponse,
) -> pd.DataFrame:
return _parse_api_resp_to_df(resp)
@wrap_result()
def _parse_df_to_results_wrapped(
data: pd.DataFrame,
) -> SalesPrognosisResults:
return _parse_df_to_results(data)
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics,
) -> None:
return _write_sales_forecast_stats(stats)
def _preprocess_sales(
resp: SalesPrognosisResponse,
feature_map: Mapping[str, str],
target_features: Set[str],
) -> ForecastPipe:
"""n = 1
Parameters
----------
resp : SalesPrognosisResponse
_description_
feature_map : Mapping[str, str]
_description_
target_features : Set[str]
_description_
Returns
-------
PipeResult
_description_
"""
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_api_resp_to_df_wrapped(resp)
if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status)
return pipe
df = res.result
res = parse.process_features_wrapped(
df,
feature_map=feature_map,
target_features=target_features,
)
if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status)
return pipe
pipe.success(res.unwrap(), res.status)
return pipe
def _process_sales(
pipe: ForecastPipe,
min_num_data_points: int,
base_num_data_points_months: int,
) -> ForecastPipe:
"""n = 1
Input-Data:
fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"]
- data already prefiltered if customer ID was provided
("firma_refid" same for all entries)
Parameters
----------
pipe : PipeResult
_description_
min_num_data_points : int, optional
_description_, by default 100
Returns
-------
PipeResult
_description_
"""
# filter data
data = pipe.data
assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
data[DATE_FEAT] = pd.to_datetime(data[DATE_FEAT], errors="coerce")
data = data.dropna(subset=["buchungs_datum"])
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust)
if len_ds < min_num_data_points:
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
)
monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y")
monthly_sum = monthly_sum.set_index(DATE_FEAT)
features = ["jahr", "monat"]
target = SALES_FEAT
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search
kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
"gamma": [1],
"subsample": [0.8],
"colsample_bytree": [0.8],
"early_stopping_rounds": [20, 50],
}
best_estimator = None
best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf")
best_score_r2: float | None = None
best_start_year: int | None = None
too_few_month_points: bool = True
dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast(
pd.DataFrame,
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum.loc[split_date:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
# test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False
with joblib.parallel_config(backend="loky"):
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=MAX_NUM_WORKERS,
n_iter=100,
verbose=0,
)
rand.fit(
X_train,
y_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=0,
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
# --- new: use first_date for best_start_year
best_start_year = first_date.year
# --- new: store best_estimator
best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points:
status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe
elif best_params is None:
status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe
assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status)
stats = SalesForecastStatistics(
status.code,
status.description,
len_ds,
score_mae=best_score_mae,
score_r2=best_score_r2,
best_start_year=best_start_year,
xgb_params=best_params,
)
pipe.stats(stats)
return pipe
def _postprocess_sales(
pipe: ForecastPipe,
feature_map: Mapping[str, str],
) -> ForecastPipe:
data = pipe.data
assert data is not None, "processing not existing pipe result"
# convert features back to original naming
res = parse.process_features_wrapped(
data,
feature_map=feature_map,
target_features=set(),
)
if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status)
return pipe
res = _parse_df_to_results_wrapped(res.unwrap())
if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status)
return pipe
export_response = SalesPrognosisResultsExport(
response=res.unwrap(),
status=res.status,
)
pipe.export(export_response)
return pipe
def _export_on_fail(
status: Status,
) -> SalesPrognosisResultsExport:
response = SalesPrognosisResults(daten=tuple())
return SalesPrognosisResultsExport(response=response, status=status)
def pipeline_sales_forecast(
session: Session,
company_ids: list[int] | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data(
session,
company_ids=company_ids,
start_date=start_date,
)
if status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error(
"Error during sales forecast data retrieval, Status: %s",
status,
stack_info=True,
)
return _export_on_fail(status)
pipe = _preprocess_sales(
response,
feature_map=COL_MAP_SALES_PROGNOSIS,
target_features=FEATURES_SALES_PROGNOSIS,
)
if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error(
"Error during sales forecast preprocessing, Status: %s",
pipe.status,
stack_info=True,
)
return _export_on_fail(pipe.status)
pipe = _process_sales(
pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=session.cfg.forecast.threshold_month_data_points,
)
if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics)
if res.status != STATUS_HANDLER.SUCCESS:
logger_db.error(
"[DB] Error during write process of sales forecast statistics: %s",
res.status,
)
if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error(
"Error during sales forecast main processing, Status: %s",
pipe.status,
stack_info=True,
)
return _export_on_fail(pipe.status)
pipe = _postprocess_sales(
pipe,
feature_map=DualDict(),
)
if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error(
"Error during sales forecast postprocessing, Status: %s",
pipe.status,
stack_info=True,
)
return _export_on_fail(pipe.status)
assert pipe.results is not None, "needed export response not set in pipeline"
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
return pipe.results
def pipeline_sales_dummy(
session: Session,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
"""prototype dummy function for tests by DelBar"""
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
_, _, _ = session, company_id, start_date
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
assert data_pth.exists(), "sales forecast dummy data not existent"
data = pd.read_pickle(data_pth)
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_df_to_results_wrapped(data)
if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status)
return _export_on_fail(res.status)
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
return SalesPrognosisResultsExport(
response=res.unwrap(),
status=res.status,
)