Compare commits

...

12 Commits

2 changed files with 94 additions and 22 deletions

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import copy
import datetime import datetime
import math import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
@ -7,10 +8,15 @@ from dataclasses import asdict
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast from typing import TYPE_CHECKING, Final, TypeAlias, cast
import dopt_basics.datetime
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import scipy.stats import scipy.stats
import sqlalchemy as sql import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from dopt_basics.datetime import TimeUnitsTimedelta
from sklearn.metrics import mean_absolute_error, r2_score from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
@ -183,16 +189,14 @@ def _process_sales(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)] df_filter = data[(data["betrag"] > 0)]
df_cust = df_firma.copy() df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust) len_ds = len(df_cust)
@ -206,7 +210,18 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() current_year = datetime.now().year
current_month = datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
old_monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
all_month_year_combinations = pd.DataFrame(
[(year, month) for year in years for month in range(1, 13) if (year < current_year or (year == current_year and month <= current_month))], columns=["jahr", "monat"]
)
monthly_sum = pd.merge(all_month_year_combinations, old_monthly_sum, on=["jahr", "monat"], how="left")
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = ( monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
) )
@ -215,13 +230,17 @@ def _process_sales(
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min()) last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search # Randomized Search
kfold = KFold(n_splits=5, shuffle=True) kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = { params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000), "n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05], "learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9), "max_depth": range(2, 9),
"min_child_weight": range(1, 5), "min_child_weight": range(1, 5),
@ -231,26 +250,68 @@ def _process_sales(
"early_stopping_rounds": [20, 50], "early_stopping_rounds": [20, 50],
} }
best_estimator = None
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf") best_score_mae: float | None = float("inf")
best_score_r2: float | None = None best_score_r2: float | None = None
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1): stride = dopt_basics.datetime.timedelta_from_val(365, TimeUnitsTimedelta.DAYS)
dates = cast(pd.DatetimeIndex, monthly_sum.index)
min_date = dates.min()
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
def get_index_date(
dates: pd.DatetimeIndex,
starting_date: datetime.datetime | pd.Timestamp,
) -> tuple[pd.Timestamp, bool]:
target, succ = next(
((date, True) for date in dates if date >= starting_date), (dates[-1], False)
)
return target, succ
first_date, succ = get_index_date(dates, starting_date)
if not succ:
# !! return early
...
date_span = first_date - min_date
steps = date_span.days // stride.days
for step in range(steps + 1):
print("step: ", step)
target_date = first_date - step * stride
print("target date: ", target_date)
split_date = dates[-6]
index_date, succ = get_index_date(dates, target_date)
if not succ:
break
if index_date >= split_date:
print("Skip because of date difference")
continue
train = cast( train = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore monthly_sum.loc[index_date:split_date].copy(), # type: ignore
) )
print(train)
print("Length train: ", len(train))
test = cast( test = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore monthly_sum.loc[split_date:].copy(), # type: ignore
) )
X_train, X_test = train[features], test[features] X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target] y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): # test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= 30 + 10 * step:
too_few_month_points = False too_few_month_points = False
rand = RandomizedSearchCV( rand = RandomizedSearchCV(
@ -273,13 +334,22 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred)) best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year # --- new: use target_date for best_start_year
print("executed") best_start_year = target_date.year
forecast = test.copy() # --- new: store best_estimator
forecast.loc[:, "vorhersage"] = y_pred best_estimator = copy.copy(rand.best_estimator_)
# ?? --- new: use best_estimator to calculate future values and store them in forecast
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points: if too_few_month_points:
@ -295,7 +365,9 @@ def _process_sales(
pipe.stats(stats) pipe.stats(stats)
return pipe return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned" assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status) pipe.success(forecast, status)
stats = SalesForecastStatistics( stats = SalesForecastStatistics(

View File

@ -17,7 +17,7 @@ DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging # ** logging
ENABLE_LOGGING: Final[bool] = True ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log" LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases # ** databases
@ -40,7 +40,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API # ** API
API_CON_TIMEOUT: Final[float] = 5.0 # secs to response API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(