diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index 55e42ac..19ad3d2 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -8,6 +8,7 @@ from dataclasses import asdict from datetime import datetime as Datetime from typing import TYPE_CHECKING, Final, TypeAlias, cast +import dopt_basics.datetime import numpy as np import pandas as pd import scipy.stats @@ -15,6 +16,7 @@ import sqlalchemy as sql # --- new: for calculating timedelta from dateutil.relativedelta import relativedelta +from dopt_basics.datetime import TimeUnitsTimedelta from sklearn.metrics import mean_absolute_error, r2_score from sklearn.model_selection import KFold, RandomizedSearchCV from xgboost import XGBRegressor @@ -251,26 +253,57 @@ def _process_sales( # TODO: therefore, stepping with fixed value n does not result in timedelta of n episodes # Option A: pad data frame with zero values --> could impede forecast algorithm # Option B: calculate next index based on timedelta - dates = monthly_sum.index + stride = dopt_basics.datetime.timedelta_from_val(365, TimeUnitsTimedelta.DAYS) + + dates = cast(pd.DatetimeIndex, monthly_sum.index) + + min_date = dates.min() + # print("dates: ", dates) # ?? --- new: use monthly basis for time windows # baseline: 3 years - 36 months starting_date = datetime.datetime.now() - relativedelta(months=12) # starting_date = dates.max() - relativedelta(months=36) - start_index = next( - (i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1 - ) - print("start idx: ", start_index, "length dates: ", len(dates)) + # start_index = next( + # (i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1 + # ) + # print("start idx: ", start_index, "length dates: ", len(dates)) - for add_year, date_idx in enumerate(range(start_index, -1, -12)): - print("date_idx: ", date_idx) - first_date = dates[date_idx] - print("first date: ", first_date) + def get_index_date( + dates: pd.DatetimeIndex, + starting_date: datetime.datetime | pd.Timestamp, + ) -> tuple[pd.Timestamp, bool]: + target, succ = next( + ((date, True) for date in dates if date >= starting_date), (dates[-1], False) + ) + return target, succ + + first_date, succ = get_index_date(dates, starting_date) + if not succ: + # !! return early + ... + + date_span = first_date - min_date + steps = date_span.days // stride.days + + for step in range(steps + 1): + print("step: ", step) + target_date = first_date - step * stride + print("target date: ", target_date) split_date = dates[-6] + index_date, succ = get_index_date(dates, target_date) + + if not succ: + break + + if index_date >= split_date: + print("Skip because of date difference") + continue + train = cast( pd.DataFrame, - monthly_sum.loc[first_date:split_date].copy(), # type: ignore + monthly_sum.loc[index_date:split_date].copy(), # type: ignore ) print(train) print("Length train: ", len(train)) @@ -284,7 +317,7 @@ def _process_sales( # ?? --- new: adapted condition to fit new for-loop # test set size fixed at 6 --> first iteration: baseline - 6 entries # for each new year 10 new data points needed - if len(train) >= 30 + 10 * add_year: + if len(train) >= 30 + 10 * step: too_few_month_points = False rand = RandomizedSearchCV( @@ -308,7 +341,7 @@ def _process_sales( best_score_mae = error best_score_r2 = cast(float, r2_score(y_test, y_pred)) # --- new: use first_date for best_start_year - best_start_year = first_date.year + best_start_year = target_date.year # --- new: store best_estimator best_estimator = copy.copy(rand.best_estimator_)