12 Commits

Author SHA1 Message Date
58fd5bd921 bump version 2025-04-16 13:45:46 +02:00
c2757cca26 implement behaviour control by config via setup data path 2025-04-16 13:40:55 +02:00
c46c90f548 basic structure for lazy config loading 2025-04-16 12:23:49 +02:00
fc4d54dc4b add dep: tomli-w 2025-04-16 12:23:37 +02:00
5d53551923 update deps - dopt-basics 2025-04-16 11:56:41 +02:00
6a7f59116f remove unneeded pytest mark 2025-04-16 11:56:21 +02:00
063531a08e major overhaul of forecast pipeline (#21)
includes several aspects:

- harden forecast logic with additional error checks
- fix wrong behaviour
- ensure minimum data viability
- extrapolate for multiple data points into the future

fix #19

Co-authored-by: frasu
Reviewed-on: #21
Co-authored-by: foefl <f.foerster@d-opt.com>
Co-committed-by: foefl <f.foerster@d-opt.com>
2025-04-16 09:24:33 +00:00
6caa087efd re-enable logging 2025-04-10 11:12:57 +02:00
2d48be0009 update gitignore to exclude doc folders 2025-04-10 07:37:23 +02:00
fdb9812ecf add script to bump patch version 2025-04-10 07:13:35 +02:00
9f90aec324 bump version 2025-04-09 09:28:27 +02:00
dc848fd840 increase timeout timespan 2025-04-09 09:27:23 +02:00
16 changed files with 329 additions and 73 deletions

1
.gitignore vendored
View File

@@ -3,6 +3,7 @@ prototypes/
data/
reports/
*.code-workspace
docs/
# credentials
CREDENTIALS*

19
pdm.lock generated
View File

@@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:4931e32f8c146a72ad5b0a13c02485ea5ddc727de32fbe7c5e9314bbab05966c"
content_hash = "sha256:545c39ef89d18d28a7bca4b08c93e6fb900c42612089300b867a4e0955acd6ab"
[[metadata.targets]]
requires_python = ">=3.11"
@@ -579,7 +579,7 @@ files = [
[[package]]
name = "dopt-basics"
version = "0.1.2"
version = "0.1.3"
requires_python = ">=3.11"
summary = "basic cross-project tools for Python-based d-opt projects"
groups = ["default"]
@@ -587,8 +587,8 @@ dependencies = [
"tzdata>=2025.1",
]
files = [
{file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"},
{file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"},
{file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"},
{file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
]
[[package]]
@@ -2414,6 +2414,17 @@ files = [
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
]
[[package]]
name = "tomli-w"
version = "1.2.0"
requires_python = ">=3.9"
summary = "A lil' TOML writer"
groups = ["dev"]
files = [
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
]
[[package]]
name = "tomlkit"
version = "0.13.2"

View File

@@ -1,11 +1,11 @@
[project]
name = "delta-barth"
version = "0.5.5"
version = "0.5.7"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2", "SQLAlchemy>=2.0.39"]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39"]
requires-python = ">=3.11"
readme = "README.md"
license = {text = "LicenseRef-Proprietary"}
@@ -44,7 +44,8 @@ filterwarnings = [
]
markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
]
log_cli = true
@@ -73,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.5.5"
current_version = "0.5.7"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@@ -145,6 +146,7 @@ dev = [
"pdoc3>=0.11.5",
"bump-my-version>=1.1.1",
"nox>=2025.2.9",
"tomli-w>=1.2.0",
]
nb = [
"jupyterlab>=4.3.5",

2
scripts/bump_patch.ps1 Normal file
View File

@@ -0,0 +1,2 @@
pdm run bump-my-version bump patch
pdm run bump-my-version show current_version

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import copy
import datetime
import math
from collections.abc import Mapping, Set
@@ -11,6 +12,9 @@ import numpy as np
import pandas as pd
import scipy.stats
import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
@@ -29,7 +33,6 @@ from delta_barth.constants import (
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
@@ -183,16 +186,14 @@ def _process_sales(
PipeResult
_description_
"""
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data
data = pipe.data
assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)]
df_cust = df_firma.copy()
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust)
@@ -206,7 +207,26 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
)
@@ -215,13 +235,17 @@ def _process_sales(
features = ["jahr", "monat"]
target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min())
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search
kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000),
"n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
@@ -231,26 +255,40 @@ def _process_sales(
"early_stopping_rounds": [20, 50],
}
best_estimator = None
best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf")
best_score_r2: float | None = None
best_start_year: int | None = None
too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1):
dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
monthly_sum.loc[split_date:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
# test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False
rand = RandomizedSearchCV(
@@ -273,13 +311,21 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year
print("executed")
forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred
# --- new: use first_date for best_start_year
best_start_year = first_date.year
# --- new: store best_estimator
best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points:
@@ -295,7 +341,9 @@ def _process_sales(
pipe.stats(stats)
return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned"
assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status)
stats = SalesForecastStatistics(
@@ -384,7 +432,7 @@ def pipeline_sales_forecast(
pipe = _process_sales(
pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
base_num_data_points_months=SESSION.cfg.forecast.threshold_month_data_points,
)
if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics)

43
src/delta_barth/config.py Normal file
View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from pathlib import Path
import dopt_basics.configs
from pydantic import BaseModel
class Config(BaseModel):
forecast: CfgForecast
class CfgForecast(BaseModel):
threshold_month_data_points: int
class LazyCfgLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
cfg_path = cfg_path.resolve()
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
self._path = cfg_path
self._cfg: Config | None = None
@property
def path(self) -> Path:
return self._path
def _load(self) -> Config:
cfg = dopt_basics.configs.load_toml(self.path)
return Config(**cfg)
def reload(self) -> None:
self._cfg = self._load()
def get(self) -> Config:
if self._cfg is None:
self._cfg = self._load()
return self._cfg

View File

@@ -5,6 +5,7 @@ from typing import Final
from delta_barth.types import DualDict, HttpContentHeaders
# ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
# ** lib path
lib_path = Path(__file__).parent
@@ -15,9 +16,9 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging
ENABLE_LOGGING: Final[bool] = False
ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases
@@ -40,7 +41,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API
API_CON_TIMEOUT: Final[float] = 1.0 # secs to response
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
# ** API response parsing
# ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
@@ -63,4 +64,6 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
# ** Pipelines
# ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36
# !! now in config
# TODO remove later till proven stable
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@@ -31,6 +31,8 @@ logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG)
logger_config = logging.getLogger("delta_barth.config")
logger_config.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Final
@@ -14,12 +15,19 @@ from delta_barth.api.common import (
LoginResponse,
validate_credentials,
)
from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO
from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
DB_ECHO,
LIB_PATH,
)
from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING:
from delta_barth.config import Config
from delta_barth.types import ApiCredentials, HttpContentHeaders
@@ -41,6 +49,7 @@ class Session:
base_headers: HttpContentHeaders,
db_folder: str = "data",
logging_folder: str = "logs",
cfg_folder: str = "config",
) -> None:
self._setup: bool = False
self._data_path: Path | None = None
@@ -49,6 +58,10 @@ class Session:
self._db_engine: sql.Engine | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._cfg_path: Path | None = None
self._cfg_folder = cfg_folder
self._cfg_loader: LazyCfgLoader | None = None
self._cfg: Config | None = None
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
@@ -59,6 +72,7 @@ class Session:
# at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging()
self._setup_config()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@@ -68,6 +82,32 @@ class Session:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def cfg_path(self) -> Path:
if self._cfg_path is not None and self._setup:
return self._cfg_path
root = (self.data_path / self._cfg_folder).resolve()
cfg_path = root / CFG_FILENAME
if not root.exists():
root.mkdir(parents=False)
self._cfg_path = cfg_path
return self._cfg_path
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
return self._cfg
def _setup_config(self) -> None:
if not self.cfg_path.exists():
src_cfg = LIB_PATH / CFG_FILENAME
shutil.copyfile(src_cfg, self.cfg_path)
self._cfg_loader = LazyCfgLoader(self.cfg_path)
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
@property
def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set"
@@ -78,10 +118,10 @@ class Session:
if self._db_path is not None and self._setup:
return self._db_path
db_root = (self.data_path / self._db_folder).resolve()
db_path = db_root / "dopt-data.db"
if not db_root.exists():
db_root.mkdir(parents=False)
root = (self.data_path / self._db_folder).resolve()
db_path = root / "dopt-data.db"
if not root.exists():
root.mkdir(parents=False)
self._db_path = db_path
return self._db_path

View File

@@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@@ -1,3 +1,4 @@
import datetime
from datetime import datetime as Datetime
from unittest.mock import patch
@@ -255,6 +256,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@@ -277,6 +279,7 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
@@ -303,6 +306,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@@ -329,8 +333,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
# prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
@@ -340,7 +355,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor()
@@ -354,7 +369,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
base_num_data_points_months=1,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
@@ -415,27 +430,16 @@ def test_export_on_fail():
assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch("delta_barth.analysis.forecast.SESSION", session):
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_FailDbWrite(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0

View File

@@ -8,6 +8,7 @@ from unittest.mock import patch
import pandas as pd
import pytest
import tomli_w
import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse
@@ -33,6 +34,28 @@ def api_base_url(credentials) -> str:
return credentials["base_url"]
@pytest.fixture(scope="session")
def pth_dummy_cfg() -> Path:
pwd = Path.cwd()
assert "barth" in pwd.parent.name.lower(), "not in project root directory"
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
assert data_pth.exists(), "file to dummy CFG not found"
return data_pth
@pytest.fixture(scope="function")
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path:
with open(pth_dummy_cfg, "rb") as file:
cfg_data = tomllib.load(file)
target = tmp_path / "dummy_cfg.toml"
target.touch()
with open(target, "wb") as file:
tomli_w.dump(cfg_data, file)
return target
@pytest.fixture(scope="session")
def sales_data_real() -> pd.DataFrame:
pwd = Path.cwd()

40
tests/test_config.py Normal file
View File

@@ -0,0 +1,40 @@
import tomllib
import tomli_w
from delta_barth import config
def test_CfgLoader_Init(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
assert loader.path == pth_cfg
assert loader._cfg is None
def test_CfgLoader_Get(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
def test_CfgLoader_Reload(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
# modify config and reload
with open(pth_cfg, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(pth_cfg, "wb") as file:
tomli_w.dump(cfg_data, file)
assert parsed_cfg.forecast.threshold_month_data_points == 28
loader.reload()
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 30

View File

@@ -1,17 +1,15 @@
import importlib
import json
from unittest.mock import patch
import pytest
import sqlalchemy as sql
import delta_barth.pipelines
from delta_barth import databases as db
from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session):
def test_write_performance_metrics_Success(session):
pipe_name = "test_pipe"
t_start = 20_000_000_000
t_end = 30_000_000_000
@@ -33,14 +31,31 @@ def test_write_performance_metrics(session):
assert metrics.execution_duration == 10
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast(None, None)
def test_write_performance_metrics_FailStartingTime(session):
pipe_name = "test_pipe"
t_start = 30_000_000_000
t_end = 20_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.pipelines.SESSION", session),
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str)
parsed_resp = json.loads(json_export)
@@ -59,7 +74,6 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
assert metrics.execution_duration > 0
@pytest.mark.new
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None)

View File

@@ -62,8 +62,27 @@ def test_session_setup_db_management(tmp_path):
assert db_path.exists()
def test_session_setup_config(tmp_path, pth_cfg):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
cfg_path2 = session.cfg_path
assert cfg_path2 == cfg_path
assert session._cfg is not None
assert cfg_path.exists()
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"