idea of timedelta based algorithm

This commit is contained in:
Florian Förster 2025-04-11 12:23:05 +02:00
parent 5d1f5199d3
commit e1b375396a

View File

@ -8,6 +8,7 @@ from dataclasses import asdict
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast
import dopt_basics.datetime
import numpy as np
import pandas as pd
import scipy.stats
@ -15,6 +16,7 @@ import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from dopt_basics.datetime import TimeUnitsTimedelta
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
@ -251,26 +253,57 @@ def _process_sales(
# TODO: therefore, stepping with fixed value n does not result in timedelta of n episodes
# Option A: pad data frame with zero values --> could impede forecast algorithm
# Option B: calculate next index based on timedelta
dates = monthly_sum.index
stride = dopt_basics.datetime.timedelta_from_val(365, TimeUnitsTimedelta.DAYS)
dates = cast(pd.DatetimeIndex, monthly_sum.index)
min_date = dates.min()
# print("dates: ", dates)
# ?? --- new: use monthly basis for time windows
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=12)
# starting_date = dates.max() - relativedelta(months=36)
start_index = next(
(i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1
)
print("start idx: ", start_index, "length dates: ", len(dates))
# start_index = next(
# (i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1
# )
# print("start idx: ", start_index, "length dates: ", len(dates))
for add_year, date_idx in enumerate(range(start_index, -1, -12)):
print("date_idx: ", date_idx)
first_date = dates[date_idx]
print("first date: ", first_date)
def get_index_date(
dates: pd.DatetimeIndex,
starting_date: datetime.datetime | pd.Timestamp,
) -> tuple[pd.Timestamp, bool]:
target, succ = next(
((date, True) for date in dates if date >= starting_date), (dates[-1], False)
)
return target, succ
first_date, succ = get_index_date(dates, starting_date)
if not succ:
# !! return early
...
date_span = first_date - min_date
steps = date_span.days // stride.days
for step in range(steps + 1):
print("step: ", step)
target_date = first_date - step * stride
print("target date: ", target_date)
split_date = dates[-6]
index_date, succ = get_index_date(dates, target_date)
if not succ:
break
if index_date >= split_date:
print("Skip because of date difference")
continue
train = cast(
pd.DataFrame,
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
monthly_sum.loc[index_date:split_date].copy(), # type: ignore
)
print(train)
print("Length train: ", len(train))
@ -284,7 +317,7 @@ def _process_sales(
# ?? --- new: adapted condition to fit new for-loop
# test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points needed
if len(train) >= 30 + 10 * add_year:
if len(train) >= 30 + 10 * step:
too_few_month_points = False
rand = RandomizedSearchCV(
@ -308,7 +341,7 @@ def _process_sales(
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
# --- new: use first_date for best_start_year
best_start_year = first_date.year
best_start_year = target_date.year
# --- new: store best_estimator
best_estimator = copy.copy(rand.best_estimator_)