major overhaul of forecast pipeline #21
@ -1,8 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
# --- new: for calculating timedelta
|
|
||||||
from dateutil.relativedelta import relativedelta
|
|
||||||
import math
|
import math
|
||||||
from collections.abc import Mapping, Set
|
from collections.abc import Mapping, Set
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
@ -13,6 +12,9 @@ import numpy as np
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import scipy.stats
|
import scipy.stats
|
||||||
import sqlalchemy as sql
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
# --- new: for calculating timedelta
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
from sklearn.metrics import mean_absolute_error, r2_score
|
from sklearn.metrics import mean_absolute_error, r2_score
|
||||||
from sklearn.model_selection import KFold, RandomizedSearchCV
|
from sklearn.model_selection import KFold, RandomizedSearchCV
|
||||||
from xgboost import XGBRegressor
|
from xgboost import XGBRegressor
|
||||||
@ -185,16 +187,14 @@ def _process_sales(
|
|||||||
PipeResult
|
PipeResult
|
||||||
_description_
|
_description_
|
||||||
"""
|
"""
|
||||||
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
|
|
||||||
|
|
||||||
# filter data
|
# filter data
|
||||||
data = pipe.data
|
data = pipe.data
|
||||||
assert data is not None, "processing not existing pipe result"
|
assert data is not None, "processing not existing pipe result"
|
||||||
|
|
||||||
DATE_FEAT: Final[str] = "buchungs_datum"
|
DATE_FEAT: Final[str] = "buchungs_datum"
|
||||||
SALES_FEAT: Final[str] = "betrag"
|
SALES_FEAT: Final[str] = "betrag"
|
||||||
df_firma = data[(data["betrag"] > 0)]
|
df_filter = data[(data["betrag"] > 0)]
|
||||||
df_cust = df_firma.copy()
|
df_cust = df_filter.copy()
|
||||||
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
||||||
len_ds = len(df_cust)
|
len_ds = len(df_cust)
|
||||||
|
|
||||||
@ -218,9 +218,11 @@ def _process_sales(
|
|||||||
features = ["jahr", "monat"]
|
features = ["jahr", "monat"]
|
||||||
target = SALES_FEAT
|
target = SALES_FEAT
|
||||||
|
|
||||||
# --- new: dates and forecast
|
# ?? --- new: dates and forecast
|
||||||
last_date = pd.to_datetime(datetime.now().strftime("%m.%Y"), format="%m.%Y")
|
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
|
||||||
future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), periods=6, freq="MS")
|
future_dates = pd.date_range(
|
||||||
|
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
|
||||||
|
)
|
||||||
forecast = pd.DataFrame({"datum": future_dates.strftime("%m.%Y")}).set_index("datum")
|
forecast = pd.DataFrame({"datum": future_dates.strftime("%m.%Y")}).set_index("datum")
|
||||||
|
|
||||||
# Randomized Search
|
# Randomized Search
|
||||||
@ -236,30 +238,42 @@ def _process_sales(
|
|||||||
"early_stopping_rounds": [20, 50],
|
"early_stopping_rounds": [20, 50],
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- new: best_estimator (internal usage only)
|
# ?? --- new: best_estimator (internal usage only)
|
||||||
best_estimator = None
|
best_estimator = None
|
||||||
|
|
||||||
best_params: BestParametersXGBRegressor | None = None
|
best_params: BestParametersXGBRegressor | None = None
|
||||||
best_score_mae: float | None = float("inf")
|
best_score_mae: float | None = float("inf")
|
||||||
best_score_r2: float | None = None
|
best_score_r2: float | None = None
|
||||||
best_start_year: int | None = None
|
best_start_year: int | None = None
|
||||||
too_few_month_points: bool = True
|
too_few_month_points: bool = True
|
||||||
forecast: pd.DataFrame | None = None
|
# forecast: pd.DataFrame | None = None
|
||||||
|
# TODO: write routine to pad missing values in datetime row
|
||||||
|
# TODO problem: continuous timeline expected, but values can be empty for multiple months
|
||||||
|
# TODO: therefore, stepping with fixed value n does not result in timedelta of n episodes
|
||||||
|
# Option A: pad data frame with zero values --> could impede forecast algorithm
|
||||||
|
# Option B: calculate next index based on timedelta
|
||||||
dates = monthly_sum.index
|
dates = monthly_sum.index
|
||||||
# --- new: use monthly basis for time windows
|
# print("dates: ", dates)
|
||||||
starting_date = datetime.now() - relativedelta(months=36)
|
# ?? --- new: use monthly basis for time windows
|
||||||
|
# baseline: 3 years - 36 months
|
||||||
|
starting_date = datetime.datetime.now() - relativedelta(months=12)
|
||||||
# starting_date = dates.max() - relativedelta(months=36)
|
# starting_date = dates.max() - relativedelta(months=36)
|
||||||
start_index = next((i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1)
|
start_index = next(
|
||||||
|
(i for i, date in enumerate(dates) if date >= starting_date), len(dates) - 1
|
||||||
|
)
|
||||||
|
print("start idx: ", start_index, "length dates: ", len(dates))
|
||||||
|
|
||||||
for index, i in enumerate(range(start_index, -1, -12)):
|
for add_year, date_idx in enumerate(range(start_index, -1, -12)):
|
||||||
first_date = dates[i]
|
print("date_idx: ", date_idx)
|
||||||
|
first_date = dates[date_idx]
|
||||||
|
print("first date: ", first_date)
|
||||||
split_date = dates[-6]
|
split_date = dates[-6]
|
||||||
|
|
||||||
train = cast(
|
train = cast(
|
||||||
pd.DataFrame,
|
pd.DataFrame,
|
||||||
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
|
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
|
||||||
)
|
)
|
||||||
|
print(train)
|
||||||
|
print("Length train: ", len(train))
|
||||||
test = cast(
|
test = cast(
|
||||||
pd.DataFrame,
|
pd.DataFrame,
|
||||||
monthly_sum.loc[split_date:].copy(), # type: ignore
|
monthly_sum.loc[split_date:].copy(), # type: ignore
|
||||||
@ -267,8 +281,10 @@ def _process_sales(
|
|||||||
X_train, X_test = train[features], test[features]
|
X_train, X_test = train[features], test[features]
|
||||||
y_train, y_test = train[target], test[target]
|
y_train, y_test = train[target], test[target]
|
||||||
|
|
||||||
# --- new: adapted condition to fit new for-loop
|
# ?? --- new: adapted condition to fit new for-loop
|
||||||
if len(train) >= 30 + 10 * index:
|
# test set size fixed at 6 --> first iteration: baseline - 6 entries
|
||||||
|
# for each new year 10 new data points needed
|
||||||
|
if len(train) >= 30 + 10 * add_year:
|
||||||
too_few_month_points = False
|
too_few_month_points = False
|
||||||
|
|
||||||
rand = RandomizedSearchCV(
|
rand = RandomizedSearchCV(
|
||||||
@ -294,12 +310,14 @@ def _process_sales(
|
|||||||
# --- new: use first_date for best_start_year
|
# --- new: use first_date for best_start_year
|
||||||
best_start_year = first_date.year
|
best_start_year = first_date.year
|
||||||
# --- new: store best_estimator
|
# --- new: store best_estimator
|
||||||
best_estimator = rand.best_estimator_
|
best_estimator = copy.copy(rand.best_estimator_)
|
||||||
|
|
||||||
# --- new: use best_estimator to calculate future values and store them in forecast
|
# ?? --- new: use best_estimator to calculate future values and store them in forecast
|
||||||
if best_estimator is not None:
|
if best_estimator is not None:
|
||||||
X_future = pd.DataFrame({"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates)
|
X_future = pd.DataFrame(
|
||||||
y_future = best_estimator.predict(X_future)
|
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
|
||||||
|
)
|
||||||
|
y_future = best_estimator.predict(X_future) # type: ignore
|
||||||
forecast["vorhersage"] = y_future
|
forecast["vorhersage"] = y_future
|
||||||
|
|
||||||
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
||||||
@ -317,7 +335,9 @@ def _process_sales(
|
|||||||
pipe.stats(stats)
|
pipe.stats(stats)
|
||||||
return pipe
|
return pipe
|
||||||
|
|
||||||
assert forecast is not None, "forecast is None, but was attempted to be returned"
|
assert "vorhersage" in forecast.columns, (
|
||||||
|
"forecast does not contain prognosis values, but was attempted to be returned"
|
||||||
|
)
|
||||||
status = STATUS_HANDLER.SUCCESS
|
status = STATUS_HANDLER.SUCCESS
|
||||||
pipe.success(forecast, status)
|
pipe.success(forecast, status)
|
||||||
stats = SalesForecastStatistics(
|
stats = SalesForecastStatistics(
|
||||||
|
|||||||
@ -17,7 +17,7 @@ DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
|
|||||||
# ** logging
|
# ** logging
|
||||||
ENABLE_LOGGING: Final[bool] = True
|
ENABLE_LOGGING: Final[bool] = True
|
||||||
LOGGING_TO_FILE: Final[bool] = True
|
LOGGING_TO_FILE: Final[bool] = True
|
||||||
LOGGING_TO_STDERR: Final[bool] = True
|
LOGGING_TO_STDERR: Final[bool] = False
|
||||||
LOG_FILENAME: Final[str] = "dopt-delbar.log"
|
LOG_FILENAME: Final[str] = "dopt-delbar.log"
|
||||||
|
|
||||||
# ** databases
|
# ** databases
|
||||||
@ -40,7 +40,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
|
|||||||
|
|
||||||
|
|
||||||
# ** API
|
# ** API
|
||||||
API_CON_TIMEOUT: Final[float] = 5.0 # secs to response
|
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
|
||||||
# ** API response parsing
|
# ** API response parsing
|
||||||
# ** column mapping [API-Response --> Target-Features]
|
# ** column mapping [API-Response --> Target-Features]
|
||||||
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
|
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user