from __future__ import annotations import copy import datetime import math from collections.abc import Mapping, Set from dataclasses import asdict from datetime import datetime as Datetime from typing import TYPE_CHECKING, Final, TypeAlias, cast import numpy as np import pandas as pd import scipy.stats import sqlalchemy as sql # --- new: for calculating timedelta from dateutil.relativedelta import relativedelta from sklearn.metrics import mean_absolute_error, r2_score from sklearn.model_selection import KFold, RandomizedSearchCV from xgboost import XGBRegressor from delta_barth import databases from delta_barth.analysis import parse from delta_barth.api.requests import ( SalesPrognosisResponse, SalesPrognosisResponseEntry, SalesPrognosisResults, SalesPrognosisResultsExport, get_sales_prognosis_data, ) from delta_barth.constants import ( COL_MAP_SALES_PROGNOSIS, DEFAULT_DB_ERR_CODE, DUMMY_DATA_PATH, FEATURES_SALES_PROGNOSIS, SALES_BASE_NUM_DATAPOINTS_MONTHS, SALES_MIN_NUM_DATAPOINTS, ) from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.logging import logger_db, logger_pipelines from delta_barth.management import SESSION from delta_barth.types import ( BestParametersXGBRegressor, DualDict, ParamSearchXGBRegressor, PipeResult, SalesForecastStatistics, ) if TYPE_CHECKING: from delta_barth.session import Session from delta_barth.types import Status ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics] def _parse_api_resp_to_df( resp: SalesPrognosisResponse, ) -> pd.DataFrame: """n >= 2 Parameters ---------- resp : SalesPrognosisResponse _description_ Returns ------- pd.DataFrame _description_ """ data = resp.model_dump()["daten"] if not data: target_features = SalesPrognosisResponseEntry.__annotations__.keys() data = {feat: [] for feat in target_features} return pd.DataFrame(data) def _parse_df_to_results( data: pd.DataFrame, ) -> SalesPrognosisResults: df_formatted = data.to_dict(orient="records") return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore def _write_sales_forecast_stats( stats: SalesForecastStatistics, ) -> None: stats_db = asdict(stats) _ = stats_db.pop("xgb_params") xgb_params = stats.xgb_params with SESSION.db_engine.begin() as conn: res = conn.execute(sql.insert(databases.sf_stats).values(stats_db)) sf_id = cast(int, res.inserted_primary_key[0]) # type: ignore if xgb_params is not None: xgb_params["forecast_id"] = sf_id conn.execute(sql.insert(databases.sf_XGB).values(xgb_params)) @wrap_result() def _parse_api_resp_to_df_wrapped( resp: SalesPrognosisResponse, ) -> pd.DataFrame: return _parse_api_resp_to_df(resp) @wrap_result() def _parse_df_to_results_wrapped( data: pd.DataFrame, ) -> SalesPrognosisResults: return _parse_df_to_results(data) @wrap_result(code_on_error=DEFAULT_DB_ERR_CODE) def _write_sales_forecast_stats_wrapped( stats: SalesForecastStatistics, ) -> None: return _write_sales_forecast_stats(stats) def _preprocess_sales( resp: SalesPrognosisResponse, feature_map: Mapping[str, str], target_features: Set[str], ) -> ForecastPipe: """n = 1 Parameters ---------- resp : SalesPrognosisResponse _description_ feature_map : Mapping[str, str] _description_ target_features : Set[str] _description_ Returns ------- PipeResult _description_ """ pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) res = _parse_api_resp_to_df_wrapped(resp) if res.status != STATUS_HANDLER.SUCCESS: pipe.fail(res.status) return pipe df = res.result res = parse.process_features_wrapped( df, feature_map=feature_map, target_features=target_features, ) if res.status != STATUS_HANDLER.SUCCESS: pipe.fail(res.status) return pipe pipe.success(res.unwrap(), res.status) return pipe def _process_sales( pipe: ForecastPipe, min_num_data_points: int, base_num_data_points_months: int, ) -> ForecastPipe: """n = 1 Input-Data: fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"] - data already prefiltered if customer ID was provided ("firma_refid" same for all entries) Parameters ---------- pipe : PipeResult _description_ min_num_data_points : int, optional _description_, by default 100 Returns ------- PipeResult _description_ """ # filter data data = pipe.data assert data is not None, "processing not existing pipe result" DATE_FEAT: Final[str] = "buchungs_datum" SALES_FEAT: Final[str] = "betrag" df_filter = data[(data["betrag"] > 0)] df_cust = df_filter.copy() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() len_ds = len(df_cust) if len_ds < min_num_data_points: status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS pipe.fail(status) stats = SalesForecastStatistics(status.code, status.description, len_ds) pipe.stats(stats) return pipe df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["monat"] = df_cust[DATE_FEAT].dt.month monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() monthly_sum[DATE_FEAT] = ( monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) ) monthly_sum[DATE_FEAT] = pd.to_datetime(monthly_sum[DATE_FEAT], format="%m.%Y") monthly_sum = monthly_sum.set_index(DATE_FEAT) features = ["jahr", "monat"] target = SALES_FEAT # ?? --- new: dates and forecast last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y") future_dates = pd.date_range( start=last_date + pd.DateOffset(months=1), periods=6, freq="MS" ) forecast = pd.DataFrame({"datum": future_dates}).set_index("datum") # Randomized Search kfold = KFold(n_splits=5, shuffle=True) params: ParamSearchXGBRegressor = { "n_estimators": scipy.stats.poisson(mu=1000), "learning_rate": [0.03, 0.04, 0.05], "max_depth": range(2, 9), "min_child_weight": range(1, 5), "gamma": [1], "subsample": [0.8], "colsample_bytree": [0.8], "early_stopping_rounds": [20, 50], } # ?? --- new: best_estimator (internal usage only) best_estimator = None best_params: BestParametersXGBRegressor | None = None best_score_mae: float | None = float("inf") best_score_r2: float | None = None best_start_year: int | None = None too_few_month_points: bool = True # forecast: pd.DataFrame | None = None # TODO: write routine to pad missing values in datetime row # TODO problem: continuous timeline expected, but values can be empty for multiple months # TODO: therefore, stepping with fixed value n does not result in timedelta of n episodes # Option A: pad data frame with zero values --> could impede forecast algorithm # Option B: calculate next index based on timedelta dates = monthly_sum.index # print("dates: ", dates) # ?? --- new: use monthly basis for time windows # baseline: 3 years - 36 months # starting_date = datetime.datetime.now() - relativedelta(months=36) starting_date = dates.max() - relativedelta(months=36) start_index = next( (i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1 ) print("start idx: ", start_index, "length dates: ", len(dates)) for add_year, date_idx in enumerate(range(start_index, -1, -12)): print("date_idx: ", date_idx) first_date = dates[date_idx] print("first date: ", first_date) split_date = dates[-6] train = cast( pd.DataFrame, monthly_sum.loc[first_date:split_date].copy(), # type: ignore ) print(train) print("Length train: ", len(train)) test = cast( pd.DataFrame, monthly_sum.loc[split_date:].copy(), # type: ignore ) X_train, X_test = train[features], test[features] y_train, y_test = train[target], test[target] # ?? --- new: adapted condition to fit new for-loop # test set size fixed at 6 --> first iteration: baseline - 6 entries # for each new year 10 new data points needed if len(train) >= 30 + 10 * add_year: too_few_month_points = False rand = RandomizedSearchCV( XGBRegressor(), params, scoring="neg_mean_absolute_error", cv=kfold, n_jobs=-1, n_iter=100, verbose=0, ) rand.fit( X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0 ) y_pred = rand.best_estimator_.predict(X_test) # type: ignore if len(np.unique(y_pred)) != 1: error = cast(float, mean_absolute_error(y_test, y_pred)) if error < best_score_mae: best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_score_mae = error best_score_r2 = cast(float, r2_score(y_test, y_pred)) # --- new: use first_date for best_start_year best_start_year = first_date.year # --- new: store best_estimator best_estimator = copy.copy(rand.best_estimator_) # ?? --- new: use best_estimator to calculate future values and store them in forecast if best_estimator is not None: X_future = pd.DataFrame( {"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates ) y_future = best_estimator.predict(X_future) # type: ignore forecast["vorhersage"] = y_future forecast["jahr"] = forecast.index.year # type: ignore forecast["monat"] = forecast.index.month # type: ignore forecast = forecast.reset_index(drop=True) best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None if too_few_month_points: status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS pipe.fail(status) stats = SalesForecastStatistics(status.code, status.description, len_ds) pipe.stats(stats) return pipe elif best_params is None: status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST pipe.fail(status) stats = SalesForecastStatistics(status.code, status.description, len_ds) pipe.stats(stats) return pipe assert "vorhersage" in forecast.columns, ( "forecast does not contain prognosis values, but was attempted to be returned" ) status = STATUS_HANDLER.SUCCESS pipe.success(forecast, status) stats = SalesForecastStatistics( status.code, status.description, len_ds, score_mae=best_score_mae, score_r2=best_score_r2, best_start_year=best_start_year, xgb_params=best_params, ) pipe.stats(stats) return pipe def _postprocess_sales( pipe: ForecastPipe, feature_map: Mapping[str, str], ) -> ForecastPipe: data = pipe.data assert data is not None, "processing not existing pipe result" # convert features back to original naming res = parse.process_features_wrapped( data, feature_map=feature_map, target_features=set(), ) if res.status != STATUS_HANDLER.SUCCESS: pipe.fail(res.status) return pipe res = _parse_df_to_results_wrapped(res.unwrap()) if res.status != STATUS_HANDLER.SUCCESS: pipe.fail(res.status) return pipe export_response = SalesPrognosisResultsExport( response=res.unwrap(), status=res.status, ) pipe.export(export_response) return pipe def _export_on_fail( status: Status, ) -> SalesPrognosisResultsExport: response = SalesPrognosisResults(daten=tuple()) return SalesPrognosisResultsExport(response=response, status=status) def pipeline_sales_forecast( session: Session, company_id: int | None = None, start_date: Datetime | None = None, ) -> SalesPrognosisResultsExport: logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...") response, status = get_sales_prognosis_data( session, company_id=company_id, start_date=start_date, ) if status != STATUS_HANDLER.SUCCESS: logger_pipelines.error( "Error during sales forecast data retrieval, Status: %s", status, stack_info=True, ) return _export_on_fail(status) pipe = _preprocess_sales( response, feature_map=COL_MAP_SALES_PROGNOSIS, target_features=FEATURES_SALES_PROGNOSIS, ) if pipe.status != STATUS_HANDLER.SUCCESS: logger_pipelines.error( "Error during sales forecast preprocessing, Status: %s", pipe.status, stack_info=True, ) return _export_on_fail(pipe.status) pipe = _process_sales( pipe, min_num_data_points=SALES_MIN_NUM_DATAPOINTS, base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS, ) if pipe.statistics is not None: res = _write_sales_forecast_stats_wrapped(pipe.statistics) if res.status != STATUS_HANDLER.SUCCESS: logger_db.error( "[DB] Error during write process of sales forecast statistics: %s", res.status, ) if pipe.status != STATUS_HANDLER.SUCCESS: logger_pipelines.error( "Error during sales forecast main processing, Status: %s", pipe.status, stack_info=True, ) return _export_on_fail(pipe.status) pipe = _postprocess_sales( pipe, feature_map=DualDict(), ) if pipe.status != STATUS_HANDLER.SUCCESS: logger_pipelines.error( "Error during sales forecast postprocessing, Status: %s", pipe.status, stack_info=True, ) return _export_on_fail(pipe.status) assert pipe.results is not None, "needed export response not set in pipeline" logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful") return pipe.results def pipeline_sales_dummy( session: Session, company_id: int | None = None, start_date: Datetime | None = None, ) -> SalesPrognosisResultsExport: """prototype dummy function for tests by DelBar""" logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...") _, _, _ = session, company_id, start_date data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" assert data_pth.exists(), "sales forecast dummy data not existent" data = pd.read_pickle(data_pth) pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) res = _parse_df_to_results_wrapped(data) if res.status != STATUS_HANDLER.SUCCESS: pipe.fail(res.status) return _export_on_fail(res.status) logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful") return SalesPrognosisResultsExport( response=res.unwrap(), status=res.status, )