Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 58fd5bd921 | |||
| c2757cca26 | |||
| c46c90f548 | |||
| fc4d54dc4b | |||
| 5d53551923 | |||
| 6a7f59116f | |||
| 063531a08e | |||
| 6caa087efd | |||
| 2d48be0009 | |||
| fdb9812ecf | |||
| 9f90aec324 | |||
| dc848fd840 | |||
| a0d189ac9f | |||
| 6a418118d2 | |||
| 5d78fc9e02 | |||
| b93b070682 | |||
| 30641103ec |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,6 +3,7 @@ prototypes/
|
||||
data/
|
||||
reports/
|
||||
*.code-workspace
|
||||
docs/
|
||||
|
||||
# credentials
|
||||
CREDENTIALS*
|
||||
|
||||
19
pdm.lock
generated
19
pdm.lock
generated
@@ -5,7 +5,7 @@
|
||||
groups = ["default", "dev", "lint", "nb", "tests"]
|
||||
strategy = ["inherit_metadata"]
|
||||
lock_version = "4.5.0"
|
||||
content_hash = "sha256:4931e32f8c146a72ad5b0a13c02485ea5ddc727de32fbe7c5e9314bbab05966c"
|
||||
content_hash = "sha256:545c39ef89d18d28a7bca4b08c93e6fb900c42612089300b867a4e0955acd6ab"
|
||||
|
||||
[[metadata.targets]]
|
||||
requires_python = ">=3.11"
|
||||
@@ -579,7 +579,7 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "dopt-basics"
|
||||
version = "0.1.2"
|
||||
version = "0.1.3"
|
||||
requires_python = ">=3.11"
|
||||
summary = "basic cross-project tools for Python-based d-opt projects"
|
||||
groups = ["default"]
|
||||
@@ -587,8 +587,8 @@ dependencies = [
|
||||
"tzdata>=2025.1",
|
||||
]
|
||||
files = [
|
||||
{file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"},
|
||||
{file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"},
|
||||
{file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"},
|
||||
{file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2414,6 +2414,17 @@ files = [
|
||||
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tomli-w"
|
||||
version = "1.2.0"
|
||||
requires_python = ">=3.9"
|
||||
summary = "A lil' TOML writer"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
|
||||
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tomlkit"
|
||||
version = "0.13.2"
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
[project]
|
||||
name = "delta-barth"
|
||||
version = "0.5.1"
|
||||
version = "0.5.7"
|
||||
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
||||
authors = [
|
||||
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
||||
]
|
||||
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2", "SQLAlchemy>=2.0.39"]
|
||||
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39"]
|
||||
requires-python = ">=3.11"
|
||||
readme = "README.md"
|
||||
license = {text = "LicenseRef-Proprietary"}
|
||||
@@ -44,7 +44,8 @@ filterwarnings = [
|
||||
]
|
||||
markers = [
|
||||
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
|
||||
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')",
|
||||
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
|
||||
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
|
||||
]
|
||||
log_cli = true
|
||||
|
||||
@@ -73,7 +74,7 @@ directory = "reports/coverage"
|
||||
|
||||
|
||||
[tool.bumpversion]
|
||||
current_version = "0.5.1"
|
||||
current_version = "0.5.7"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
@@ -145,6 +146,7 @@ dev = [
|
||||
"pdoc3>=0.11.5",
|
||||
"bump-my-version>=1.1.1",
|
||||
"nox>=2025.2.9",
|
||||
"tomli-w>=1.2.0",
|
||||
]
|
||||
nb = [
|
||||
"jupyterlab>=4.3.5",
|
||||
|
||||
2
scripts/bump_patch.ps1
Normal file
2
scripts/bump_patch.ps1
Normal file
@@ -0,0 +1,2 @@
|
||||
pdm run bump-my-version bump patch
|
||||
pdm run bump-my-version show current_version
|
||||
@@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
|
||||
|
||||
|
||||
def status_err() -> str:
|
||||
status = Status(code=102, description="internal error occurred", message="caused by test")
|
||||
status = Status(
|
||||
code=102,
|
||||
description="internal error occurred: 'Limit-Überschreitung'",
|
||||
message="caused by test",
|
||||
)
|
||||
return status.model_dump_json()
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import math
|
||||
from collections.abc import Mapping, Set
|
||||
@@ -11,6 +12,9 @@ import numpy as np
|
||||
import pandas as pd
|
||||
import scipy.stats
|
||||
import sqlalchemy as sql
|
||||
|
||||
# --- new: for calculating timedelta
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from sklearn.metrics import mean_absolute_error, r2_score
|
||||
from sklearn.model_selection import KFold, RandomizedSearchCV
|
||||
from xgboost import XGBRegressor
|
||||
@@ -26,9 +30,9 @@ from delta_barth.api.requests import (
|
||||
)
|
||||
from delta_barth.constants import (
|
||||
COL_MAP_SALES_PROGNOSIS,
|
||||
DEFAULT_DB_ERR_CODE,
|
||||
DUMMY_DATA_PATH,
|
||||
FEATURES_SALES_PROGNOSIS,
|
||||
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
||||
SALES_MIN_NUM_DATAPOINTS,
|
||||
)
|
||||
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
||||
@@ -110,7 +114,7 @@ def _parse_df_to_results_wrapped(
|
||||
return _parse_df_to_results(data)
|
||||
|
||||
|
||||
@wrap_result()
|
||||
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||
def _write_sales_forecast_stats_wrapped(
|
||||
stats: SalesForecastStatistics,
|
||||
) -> None:
|
||||
@@ -182,16 +186,14 @@ def _process_sales(
|
||||
PipeResult
|
||||
_description_
|
||||
"""
|
||||
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
|
||||
|
||||
# filter data
|
||||
data = pipe.data
|
||||
assert data is not None, "processing not existing pipe result"
|
||||
|
||||
DATE_FEAT: Final[str] = "buchungs_datum"
|
||||
SALES_FEAT: Final[str] = "betrag"
|
||||
df_firma = data[(data["betrag"] > 0)]
|
||||
df_cust = df_firma.copy()
|
||||
df_filter = data[(data["betrag"] > 0)]
|
||||
df_cust = df_filter.copy()
|
||||
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
||||
len_ds = len(df_cust)
|
||||
|
||||
@@ -205,7 +207,26 @@ def _process_sales(
|
||||
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
|
||||
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
|
||||
|
||||
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
||||
monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
||||
|
||||
current_year = datetime.datetime.now().year
|
||||
current_month = datetime.datetime.now().month
|
||||
years = range(df_cust["jahr"].min(), current_year + 1)
|
||||
|
||||
all_month_year_combinations = pd.DataFrame(
|
||||
[
|
||||
(year, month)
|
||||
for year in years
|
||||
for month in range(1, 13)
|
||||
if (year < current_year or (year == current_year and month <= current_month))
|
||||
],
|
||||
columns=["jahr", "monat"],
|
||||
)
|
||||
|
||||
monthly_sum = pd.merge(
|
||||
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
|
||||
)
|
||||
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
|
||||
monthly_sum[DATE_FEAT] = (
|
||||
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
|
||||
)
|
||||
@@ -214,13 +235,17 @@ def _process_sales(
|
||||
|
||||
features = ["jahr", "monat"]
|
||||
target = SALES_FEAT
|
||||
current_year = datetime.datetime.now().year
|
||||
first_year = cast(int, df_cust["jahr"].min())
|
||||
|
||||
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
|
||||
future_dates = pd.date_range(
|
||||
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
|
||||
)
|
||||
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
|
||||
|
||||
# Randomized Search
|
||||
kfold = KFold(n_splits=5, shuffle=True)
|
||||
params: ParamSearchXGBRegressor = {
|
||||
"n_estimators": scipy.stats.poisson(mu=1000),
|
||||
"n_estimators": scipy.stats.poisson(mu=100),
|
||||
"learning_rate": [0.03, 0.04, 0.05],
|
||||
"max_depth": range(2, 9),
|
||||
"min_child_weight": range(1, 5),
|
||||
@@ -230,26 +255,40 @@ def _process_sales(
|
||||
"early_stopping_rounds": [20, 50],
|
||||
}
|
||||
|
||||
best_estimator = None
|
||||
best_params: BestParametersXGBRegressor | None = None
|
||||
best_score_mae: float | None = float("inf")
|
||||
best_score_r2: float | None = None
|
||||
best_start_year: int | None = None
|
||||
too_few_month_points: bool = True
|
||||
forecast: pd.DataFrame | None = None
|
||||
|
||||
for start_year in range(current_year - 4, first_year - 1, -1):
|
||||
dates = cast(pd.DatetimeIndex, monthly_sum.index)
|
||||
# baseline: 3 years - 36 months
|
||||
starting_date = datetime.datetime.now() - relativedelta(months=36)
|
||||
|
||||
target_index, _ = next(
|
||||
((i, True) for i, date in enumerate(dates) if date >= starting_date),
|
||||
(len(dates) - 1, False),
|
||||
)
|
||||
|
||||
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
|
||||
first_date = dates[date_idx]
|
||||
split_date = dates[-6]
|
||||
|
||||
train = cast(
|
||||
pd.DataFrame,
|
||||
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
|
||||
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
|
||||
)
|
||||
test = cast(
|
||||
pd.DataFrame,
|
||||
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
|
||||
monthly_sum.loc[split_date:].copy(), # type: ignore
|
||||
)
|
||||
X_train, X_test = train[features], test[features]
|
||||
y_train, y_test = train[target], test[target]
|
||||
|
||||
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
|
||||
# test set size fixed at 6 --> first iteration: baseline - 6 entries
|
||||
# for each new year 10 new data points (i.e., sales strictly positive) needed
|
||||
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
|
||||
too_few_month_points = False
|
||||
|
||||
rand = RandomizedSearchCV(
|
||||
@@ -272,13 +311,21 @@ def _process_sales(
|
||||
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
|
||||
best_score_mae = error
|
||||
best_score_r2 = cast(float, r2_score(y_test, y_pred))
|
||||
best_start_year = start_year
|
||||
print("executed")
|
||||
forecast = test.copy()
|
||||
forecast.loc[:, "vorhersage"] = y_pred
|
||||
# --- new: use first_date for best_start_year
|
||||
best_start_year = first_date.year
|
||||
# --- new: store best_estimator
|
||||
best_estimator = copy.copy(rand.best_estimator_)
|
||||
|
||||
if best_estimator is not None:
|
||||
X_future = pd.DataFrame(
|
||||
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
|
||||
)
|
||||
y_future = best_estimator.predict(X_future) # type: ignore
|
||||
forecast["vorhersage"] = y_future
|
||||
forecast["jahr"] = forecast.index.year # type: ignore
|
||||
forecast["monat"] = forecast.index.month # type: ignore
|
||||
forecast = forecast.reset_index(drop=True)
|
||||
|
||||
if forecast is not None:
|
||||
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
|
||||
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
||||
|
||||
if too_few_month_points:
|
||||
@@ -294,7 +341,9 @@ def _process_sales(
|
||||
pipe.stats(stats)
|
||||
return pipe
|
||||
|
||||
assert forecast is not None, "forecast is None, but was attempted to be returned"
|
||||
assert "vorhersage" in forecast.columns, (
|
||||
"forecast does not contain prognosis values, but was attempted to be returned"
|
||||
)
|
||||
status = STATUS_HANDLER.SUCCESS
|
||||
pipe.success(forecast, status)
|
||||
stats = SalesForecastStatistics(
|
||||
@@ -383,7 +432,7 @@ def pipeline_sales_forecast(
|
||||
pipe = _process_sales(
|
||||
pipe,
|
||||
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
|
||||
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
||||
base_num_data_points_months=SESSION.cfg.forecast.threshold_month_data_points,
|
||||
)
|
||||
if pipe.statistics is not None:
|
||||
res = _write_sales_forecast_stats_wrapped(pipe.statistics)
|
||||
|
||||
@@ -7,6 +7,7 @@ import requests
|
||||
from dopt_basics.io import combine_route
|
||||
from pydantic import BaseModel, PositiveInt, SkipValidation
|
||||
|
||||
from delta_barth.constants import API_CON_TIMEOUT
|
||||
from delta_barth.errors import STATUS_HANDLER
|
||||
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
|
||||
|
||||
@@ -55,7 +56,7 @@ def get_sales_prognosis_data(
|
||||
company_id: int | None = None,
|
||||
start_date: Datetime | None = None,
|
||||
) -> tuple[SalesPrognosisResponse, Status]:
|
||||
resp, status = session.assert_login()
|
||||
_, status = session.assert_login()
|
||||
if status != STATUS_HANDLER.SUCCESS:
|
||||
response = SalesPrognosisResponse(daten=tuple())
|
||||
return response, status
|
||||
@@ -67,11 +68,18 @@ def get_sales_prognosis_data(
|
||||
FirmaId=company_id,
|
||||
BuchungsDatum=start_date,
|
||||
)
|
||||
empty_response = SalesPrognosisResponse(daten=tuple())
|
||||
try:
|
||||
resp = requests.get(
|
||||
URL,
|
||||
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
|
||||
headers=session.headers, # type: ignore[argumentType]
|
||||
timeout=API_CON_TIMEOUT,
|
||||
)
|
||||
except requests.exceptions.Timeout:
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||
except requests.exceptions.RequestException:
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||
|
||||
response: SalesPrognosisResponse
|
||||
status: Status
|
||||
@@ -79,7 +87,7 @@ def get_sales_prognosis_data(
|
||||
response = SalesPrognosisResponse(**resp.json())
|
||||
status = STATUS_HANDLER.SUCCESS
|
||||
else:
|
||||
response = SalesPrognosisResponse(daten=tuple())
|
||||
response = empty_response
|
||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||
status = STATUS_HANDLER.api_error(err)
|
||||
|
||||
|
||||
43
src/delta_barth/config.py
Normal file
43
src/delta_barth/config.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import dopt_basics.configs
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
forecast: CfgForecast
|
||||
|
||||
|
||||
class CfgForecast(BaseModel):
|
||||
threshold_month_data_points: int
|
||||
|
||||
|
||||
class LazyCfgLoader:
|
||||
def __init__(
|
||||
self,
|
||||
cfg_path: Path,
|
||||
) -> None:
|
||||
cfg_path = cfg_path.resolve()
|
||||
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
|
||||
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
|
||||
self._path = cfg_path
|
||||
self._cfg: Config | None = None
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self._path
|
||||
|
||||
def _load(self) -> Config:
|
||||
cfg = dopt_basics.configs.load_toml(self.path)
|
||||
|
||||
return Config(**cfg)
|
||||
|
||||
def reload(self) -> None:
|
||||
self._cfg = self._load()
|
||||
|
||||
def get(self) -> Config:
|
||||
if self._cfg is None:
|
||||
self._cfg = self._load()
|
||||
return self._cfg
|
||||
@@ -5,6 +5,7 @@ from typing import Final
|
||||
from delta_barth.types import DualDict, HttpContentHeaders
|
||||
|
||||
# ** config
|
||||
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
|
||||
|
||||
# ** lib path
|
||||
lib_path = Path(__file__).parent
|
||||
@@ -15,9 +16,9 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
|
||||
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
|
||||
|
||||
# ** logging
|
||||
ENABLE_LOGGING: Final[bool] = False
|
||||
ENABLE_LOGGING: Final[bool] = True
|
||||
LOGGING_TO_FILE: Final[bool] = True
|
||||
LOGGING_TO_STDERR: Final[bool] = True
|
||||
LOGGING_TO_STDERR: Final[bool] = False
|
||||
LOG_FILENAME: Final[str] = "dopt-delbar.log"
|
||||
|
||||
# ** databases
|
||||
@@ -25,6 +26,7 @@ DB_ECHO: Final[bool] = True
|
||||
|
||||
# ** error handling
|
||||
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
||||
DEFAULT_DB_ERR_CODE: Final[int] = 150
|
||||
DEFAULT_API_ERR_CODE: Final[int] = 400
|
||||
|
||||
|
||||
@@ -38,6 +40,8 @@ class KnownDelBarApiErrorCodes(enum.Enum):
|
||||
COMMON = frozenset((400, 401, 409, 500))
|
||||
|
||||
|
||||
# ** API
|
||||
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
|
||||
# ** API response parsing
|
||||
# ** column mapping [API-Response --> Target-Features]
|
||||
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
|
||||
@@ -60,4 +64,6 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
|
||||
# ** Pipelines
|
||||
# ** Forecast
|
||||
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
|
||||
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36
|
||||
# !! now in config
|
||||
# TODO remove later till proven stable
|
||||
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36
|
||||
|
||||
@@ -22,8 +22,8 @@ perf_meas = sql.Table(
|
||||
"performance_measurement",
|
||||
metadata,
|
||||
sql.Column("id", sql.Integer, primary_key=True),
|
||||
sql.Column("execution_duration", sql.Float),
|
||||
sql.Column("pipeline_name", sql.String(length=30)),
|
||||
sql.Column("execution_duration", sql.Float),
|
||||
)
|
||||
# ** ---- forecasts
|
||||
sf_stats = sql.Table(
|
||||
|
||||
2
src/delta_barth/dopt-cfg.toml
Normal file
2
src/delta_barth/dopt-cfg.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
[forecast]
|
||||
threshold_month_data_points = 28
|
||||
@@ -53,9 +53,19 @@ class UApiError(Exception):
|
||||
## ** internal error handling
|
||||
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
|
||||
("SUCCESS", 0, "Erfolg"),
|
||||
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
|
||||
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
|
||||
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
|
||||
(
|
||||
"CONNECTION_TIMEOUT",
|
||||
1,
|
||||
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
|
||||
),
|
||||
(
|
||||
"CONNECTION_ERROR",
|
||||
2,
|
||||
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
|
||||
),
|
||||
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
|
||||
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
|
||||
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -31,6 +31,8 @@ logger_status = logging.getLogger("delta_barth.status")
|
||||
logger_status.setLevel(logging.DEBUG)
|
||||
logger_session = logging.getLogger("delta_barth.session")
|
||||
logger_session.setLevel(logging.DEBUG)
|
||||
logger_config = logging.getLogger("delta_barth.config")
|
||||
logger_config.setLevel(logging.DEBUG)
|
||||
logger_management = logging.getLogger("delta_barth.management")
|
||||
logger_management.setLevel(logging.DEBUG)
|
||||
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
|
||||
|
||||
@@ -14,9 +14,11 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
||||
|
||||
def setup(
|
||||
data_path: str,
|
||||
base_url: str,
|
||||
) -> None: # pragma: no cover
|
||||
# at this point: no logging configured
|
||||
SESSION.set_data_path(data_path)
|
||||
SESSION.set_base_url(base_url=base_url)
|
||||
SESSION.setup()
|
||||
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
|
||||
|
||||
@@ -37,6 +39,7 @@ def set_credentials(
|
||||
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
|
||||
|
||||
|
||||
# ** not part of external API, only internal
|
||||
def get_credentials() -> str: # pragma: no cover
|
||||
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
|
||||
creds = SESSION.creds
|
||||
@@ -44,12 +47,15 @@ def get_credentials() -> str: # pragma: no cover
|
||||
return creds.model_dump_json()
|
||||
|
||||
|
||||
# ** legacy: not part of external API
|
||||
def set_base_url(
|
||||
base_url: str,
|
||||
) -> None: # pragma: no cover
|
||||
SESSION.set_base_url(base_url=base_url)
|
||||
|
||||
|
||||
def get_data_path() -> str: # pragma: no cover
|
||||
return str(SESSION.data_path)
|
||||
|
||||
|
||||
def get_base_url() -> str: # pragma: no cover
|
||||
return SESSION.base_url
|
||||
|
||||
@@ -1,24 +1,83 @@
|
||||
"""collection of configured data pipelines, intended to be invoked from C#"""
|
||||
|
||||
import time
|
||||
from datetime import datetime as Datetime
|
||||
from typing import Final
|
||||
|
||||
import sqlalchemy as sql
|
||||
|
||||
from delta_barth import databases as db
|
||||
from delta_barth.analysis import forecast
|
||||
from delta_barth.constants import DEFAULT_DB_ERR_CODE
|
||||
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
||||
from delta_barth.logging import logger_pipelines as logger
|
||||
from delta_barth.management import SESSION
|
||||
from delta_barth.types import JsonExportResponse
|
||||
from delta_barth.types import JsonExportResponse, PipelineMetrics
|
||||
|
||||
|
||||
def _write_performance_metrics(
|
||||
pipeline_name: str,
|
||||
time_start: int,
|
||||
time_end: int,
|
||||
) -> PipelineMetrics:
|
||||
if time_end < time_start:
|
||||
raise ValueError("Ending time smaller than starting time")
|
||||
execution_duration = (time_end - time_start) / 1e9
|
||||
metrics = PipelineMetrics(
|
||||
pipeline_name=pipeline_name,
|
||||
execution_duration=execution_duration,
|
||||
)
|
||||
|
||||
with SESSION.db_engine.begin() as con:
|
||||
con.execute(sql.insert(db.perf_meas).values(**metrics))
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||
def _write_performance_metrics_wrapped(
|
||||
pipeline_name: str,
|
||||
time_start: int,
|
||||
time_end: int,
|
||||
) -> PipelineMetrics:
|
||||
return _write_performance_metrics(pipeline_name, time_start, time_end)
|
||||
|
||||
|
||||
def pipeline_sales_forecast(
|
||||
company_id: int | None,
|
||||
start_date: Datetime | None,
|
||||
) -> JsonExportResponse:
|
||||
PIPELINE_NAME: Final[str] = "sales_forecast"
|
||||
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
|
||||
t_start = time.perf_counter_ns()
|
||||
result = forecast.pipeline_sales_forecast(
|
||||
SESSION, company_id=company_id, start_date=start_date
|
||||
)
|
||||
export = JsonExportResponse(result.model_dump_json())
|
||||
|
||||
t_end = time.perf_counter_ns()
|
||||
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
|
||||
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||
res = _write_performance_metrics_wrapped(
|
||||
pipeline_name=PIPELINE_NAME,
|
||||
time_start=t_start,
|
||||
time_end=t_end,
|
||||
)
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
logger.error(
|
||||
(
|
||||
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||
"pipeline metrics to database: %s"
|
||||
),
|
||||
PIPELINE_NAME,
|
||||
res.status,
|
||||
)
|
||||
else:
|
||||
metrics = res.unwrap()
|
||||
logger.info(
|
||||
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||
PIPELINE_NAME,
|
||||
metrics["execution_duration"],
|
||||
)
|
||||
|
||||
return export
|
||||
|
||||
@@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
|
||||
company_id: int | None,
|
||||
start_date: Datetime | None,
|
||||
) -> JsonExportResponse:
|
||||
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
|
||||
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
|
||||
t_start = time.perf_counter_ns()
|
||||
result = forecast.pipeline_sales_dummy(
|
||||
SESSION,
|
||||
company_id=company_id,
|
||||
start_date=start_date,
|
||||
)
|
||||
export = JsonExportResponse(result.model_dump_json())
|
||||
|
||||
t_end = time.perf_counter_ns()
|
||||
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
|
||||
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||
res = _write_performance_metrics_wrapped(
|
||||
pipeline_name=PIPELINE_NAME,
|
||||
time_start=t_start,
|
||||
time_end=t_end,
|
||||
)
|
||||
if res.status != STATUS_HANDLER.SUCCESS:
|
||||
logger.error(
|
||||
(
|
||||
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||
"pipeline metrics to database: %s"
|
||||
),
|
||||
PIPELINE_NAME,
|
||||
res.status,
|
||||
)
|
||||
else:
|
||||
metrics = res.unwrap()
|
||||
logger.info(
|
||||
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||
PIPELINE_NAME,
|
||||
metrics["execution_duration"],
|
||||
)
|
||||
|
||||
return export
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Final
|
||||
|
||||
@@ -14,12 +15,19 @@ from delta_barth.api.common import (
|
||||
LoginResponse,
|
||||
validate_credentials,
|
||||
)
|
||||
from delta_barth.constants import DB_ECHO
|
||||
from delta_barth.config import LazyCfgLoader
|
||||
from delta_barth.constants import (
|
||||
API_CON_TIMEOUT,
|
||||
CFG_FILENAME,
|
||||
DB_ECHO,
|
||||
LIB_PATH,
|
||||
)
|
||||
from delta_barth.errors import STATUS_HANDLER
|
||||
from delta_barth.logging import logger_session as logger
|
||||
from delta_barth.types import DelBarApiError, Status
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from delta_barth.config import Config
|
||||
from delta_barth.types import ApiCredentials, HttpContentHeaders
|
||||
|
||||
|
||||
@@ -41,6 +49,7 @@ class Session:
|
||||
base_headers: HttpContentHeaders,
|
||||
db_folder: str = "data",
|
||||
logging_folder: str = "logs",
|
||||
cfg_folder: str = "config",
|
||||
) -> None:
|
||||
self._setup: bool = False
|
||||
self._data_path: Path | None = None
|
||||
@@ -49,6 +58,10 @@ class Session:
|
||||
self._db_engine: sql.Engine | None = None
|
||||
self._logging_dir: Path | None = None
|
||||
self._logging_folder = logging_folder
|
||||
self._cfg_path: Path | None = None
|
||||
self._cfg_folder = cfg_folder
|
||||
self._cfg_loader: LazyCfgLoader | None = None
|
||||
self._cfg: Config | None = None
|
||||
self._creds: ApiCredentials | None = None
|
||||
self._base_url: str | None = None
|
||||
self._headers = base_headers
|
||||
@@ -59,6 +72,7 @@ class Session:
|
||||
# at this point: no logging configured
|
||||
assert not self._setup, "tried to setup session twice"
|
||||
self._setup_logging()
|
||||
self._setup_config()
|
||||
self._setup_db_management()
|
||||
self._setup = True
|
||||
logger.info("[SESSION] Setup procedure successful")
|
||||
@@ -68,6 +82,32 @@ class Session:
|
||||
assert self._data_path is not None, "accessed data path not set"
|
||||
return self._data_path
|
||||
|
||||
@property
|
||||
def cfg_path(self) -> Path:
|
||||
if self._cfg_path is not None and self._setup:
|
||||
return self._cfg_path
|
||||
|
||||
root = (self.data_path / self._cfg_folder).resolve()
|
||||
cfg_path = root / CFG_FILENAME
|
||||
if not root.exists():
|
||||
root.mkdir(parents=False)
|
||||
self._cfg_path = cfg_path
|
||||
return self._cfg_path
|
||||
|
||||
@property
|
||||
def cfg(self) -> Config:
|
||||
assert self._cfg is not None, "tried to access not set config from session"
|
||||
return self._cfg
|
||||
|
||||
def _setup_config(self) -> None:
|
||||
if not self.cfg_path.exists():
|
||||
src_cfg = LIB_PATH / CFG_FILENAME
|
||||
shutil.copyfile(src_cfg, self.cfg_path)
|
||||
|
||||
self._cfg_loader = LazyCfgLoader(self.cfg_path)
|
||||
self._cfg = self._cfg_loader.get()
|
||||
logger.info("[SESSION] Successfully read and setup config")
|
||||
|
||||
@property
|
||||
def db_engine(self) -> sql.Engine:
|
||||
assert self._db_engine is not None, "accessed database engine not set"
|
||||
@@ -78,10 +118,10 @@ class Session:
|
||||
if self._db_path is not None and self._setup:
|
||||
return self._db_path
|
||||
|
||||
db_root = (self.data_path / self._db_folder).resolve()
|
||||
db_path = db_root / "dopt-data.db"
|
||||
if not db_root.exists():
|
||||
db_root.mkdir(parents=False)
|
||||
root = (self.data_path / self._db_folder).resolve()
|
||||
db_path = root / "dopt-data.db"
|
||||
if not root.exists():
|
||||
root.mkdir(parents=False)
|
||||
self._db_path = db_path
|
||||
return self._db_path
|
||||
|
||||
@@ -191,11 +231,18 @@ class Session:
|
||||
databaseName=self.creds.database,
|
||||
mandantName=self.creds.mandant,
|
||||
)
|
||||
empty_response = LoginResponse(token="")
|
||||
try:
|
||||
resp = requests.put(
|
||||
URL,
|
||||
login_req.model_dump_json(),
|
||||
headers=self.headers, # type: ignore
|
||||
timeout=API_CON_TIMEOUT,
|
||||
)
|
||||
except requests.exceptions.Timeout: # pragma: no cover
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||
except requests.exceptions.RequestException: # pragma: no cover
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||
|
||||
response: LoginResponse
|
||||
status: Status
|
||||
@@ -204,7 +251,7 @@ class Session:
|
||||
status = STATUS_HANDLER.pipe_states.SUCCESS
|
||||
self._add_session_token(response.token)
|
||||
else:
|
||||
response = LoginResponse(token="")
|
||||
response = empty_response
|
||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||
status = STATUS_HANDLER.api_error(err)
|
||||
|
||||
@@ -216,12 +263,17 @@ class Session:
|
||||
ROUTE: Final[str] = "user/logout"
|
||||
URL: Final = combine_route(self.base_url, ROUTE)
|
||||
|
||||
try:
|
||||
resp = requests.put(
|
||||
URL,
|
||||
headers=self.headers, # type: ignore
|
||||
timeout=API_CON_TIMEOUT,
|
||||
)
|
||||
except requests.exceptions.Timeout: # pragma: no cover
|
||||
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||
except requests.exceptions.RequestException: # pragma: no cover
|
||||
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||
|
||||
response = None
|
||||
status: Status
|
||||
if resp.status_code == 200:
|
||||
status = STATUS_HANDLER.SUCCESS
|
||||
@@ -230,7 +282,7 @@ class Session:
|
||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||
status = STATUS_HANDLER.api_error(err)
|
||||
|
||||
return response, status
|
||||
return None, status
|
||||
|
||||
def assert_login(
|
||||
self,
|
||||
@@ -246,11 +298,18 @@ class Session:
|
||||
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
|
||||
URL: Final = combine_route(self.base_url, ROUTE)
|
||||
params: dict[str, int] = {"FirmaId": 999999}
|
||||
empty_response = LoginResponse(token="")
|
||||
try:
|
||||
resp = requests.get(
|
||||
URL,
|
||||
params=params,
|
||||
headers=self.headers, # type: ignore
|
||||
timeout=API_CON_TIMEOUT,
|
||||
)
|
||||
except requests.exceptions.Timeout: # pragma: no cover
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||
except requests.exceptions.RequestException: # pragma: no cover
|
||||
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||
|
||||
response: LoginResponse
|
||||
status: Status
|
||||
@@ -261,7 +320,7 @@ class Session:
|
||||
self._remove_session_token()
|
||||
response, status = self.login()
|
||||
else:
|
||||
response = LoginResponse(token="")
|
||||
response = empty_response
|
||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||
status = STATUS_HANDLER.api_error(err)
|
||||
|
||||
|
||||
@@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
|
||||
@dataclass(slots=True)
|
||||
class DataPipeStates:
|
||||
SUCCESS: Status
|
||||
CONNECTION_TIMEOUT: Status
|
||||
CONNECTION_ERROR: Status
|
||||
TOO_FEW_POINTS: Status
|
||||
TOO_FEW_MONTH_POINTS: Status
|
||||
NO_RELIABLE_FORECAST: Status
|
||||
@@ -139,7 +141,13 @@ class Statistics:
|
||||
pass
|
||||
|
||||
|
||||
# ** forecasts
|
||||
# ** ---- performance
|
||||
class PipelineMetrics(t.TypedDict):
|
||||
pipeline_name: str
|
||||
execution_duration: float
|
||||
|
||||
|
||||
# ** ---- forecasts
|
||||
@dataclass(slots=True)
|
||||
class CustomerDataSalesForecast:
|
||||
order: list[int] = field(default_factory=list)
|
||||
|
||||
2
tests/_test_data/dopt-cfg.toml
Normal file
2
tests/_test_data/dopt-cfg.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
[forecast]
|
||||
threshold_month_data_points = 28
|
||||
@@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
from datetime import datetime as Datetime
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -255,6 +256,7 @@ def test_preprocess_sales_FailOnTargetFeature(
|
||||
assert pipe.results is None
|
||||
|
||||
|
||||
@pytest.mark.forecast
|
||||
def test_process_sales_Success(sales_data_real_preproc):
|
||||
data = sales_data_real_preproc.copy()
|
||||
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
||||
@@ -277,6 +279,7 @@ def test_process_sales_Success(sales_data_real_preproc):
|
||||
assert pipe.statistics.xgb_params is not None
|
||||
|
||||
|
||||
@pytest.mark.forecast
|
||||
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
|
||||
data = sales_data_real_preproc.copy()
|
||||
data = data.iloc[:20, :]
|
||||
@@ -303,6 +306,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
|
||||
assert pipe.statistics.xgb_params is None
|
||||
|
||||
|
||||
@pytest.mark.forecast
|
||||
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
|
||||
data = sales_data_real_preproc.copy()
|
||||
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
|
||||
@@ -329,8 +333,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
|
||||
assert pipe.statistics.xgb_params is None
|
||||
|
||||
|
||||
@pytest.mark.forecast
|
||||
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
|
||||
data = sales_data_real_preproc.copy()
|
||||
# prepare fake data
|
||||
df = sales_data_real_preproc.copy()
|
||||
f_dates = "buchungs_datum"
|
||||
end = datetime.datetime.now()
|
||||
start = df[f_dates].max()
|
||||
fake_dates = pd.date_range(start, end, freq="MS")
|
||||
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
|
||||
fake_df = pd.DataFrame(fake_data, columns=df.columns)
|
||||
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
|
||||
|
||||
data = enhanced_df.copy()
|
||||
data["betrag"] = 10000
|
||||
print(data["betrag"])
|
||||
data = data.iloc[:20000, :]
|
||||
@@ -340,7 +355,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
class Predictor:
|
||||
def predict(self, *args, **kwargs):
|
||||
return np.array([1, 1, 1, 1])
|
||||
return np.array([1, 1, 1, 1], dtype=np.float64)
|
||||
|
||||
self.best_estimator_ = Predictor()
|
||||
|
||||
@@ -354,7 +369,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
|
||||
pipe = fc._process_sales(
|
||||
pipe,
|
||||
min_num_data_points=1,
|
||||
base_num_data_points_months=-100,
|
||||
base_num_data_points_months=1,
|
||||
)
|
||||
|
||||
assert pipe.status != STATUS_HANDLER.SUCCESS
|
||||
@@ -415,27 +430,16 @@ def test_export_on_fail():
|
||||
assert res.status.description == status.description
|
||||
|
||||
|
||||
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
||||
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
|
||||
with patch(
|
||||
with (
|
||||
patch(
|
||||
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
||||
) as mock:
|
||||
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
|
||||
with patch("delta_barth.analysis.forecast.SESSION", session):
|
||||
) as get_mock,
|
||||
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
|
||||
):
|
||||
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
|
||||
sess_mock.cfg.forecast.threshold_month_data_points = 1
|
||||
result = fc.pipeline_sales_forecast(None) # type: ignore
|
||||
print(result)
|
||||
assert result.status == STATUS_HANDLER.SUCCESS
|
||||
assert len(result.response.daten) > 0
|
||||
|
||||
|
||||
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
||||
def test_pipeline_sales_forecast_FailDbWrite(exmpl_api_sales_prognosis_resp):
|
||||
with patch(
|
||||
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
||||
) as mock:
|
||||
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
|
||||
result = fc.pipeline_sales_forecast(None) # type: ignore
|
||||
print(result)
|
||||
assert result.status == STATUS_HANDLER.SUCCESS
|
||||
assert len(result.response.daten) > 0
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from datetime import datetime as Datetime
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from delta_barth.api import requests as requests_
|
||||
from delta_barth.api.common import LoginResponse
|
||||
|
||||
|
||||
@pytest.mark.api_con_required
|
||||
@@ -94,3 +96,31 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
|
||||
assert status.api_server_error.message == json["message"]
|
||||
assert status.api_server_error.code == json["code"]
|
||||
assert status.api_server_error.hints == json["hints"]
|
||||
|
||||
|
||||
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
|
||||
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
|
||||
|
||||
def assert_login():
|
||||
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
|
||||
|
||||
session.assert_login = assert_login
|
||||
|
||||
resp, status = requests_.get_sales_prognosis_data(session, None, None)
|
||||
assert resp is not None
|
||||
assert len(resp.daten) == 0
|
||||
assert status.code == 1
|
||||
|
||||
|
||||
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
|
||||
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
|
||||
|
||||
def assert_login():
|
||||
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
|
||||
|
||||
session.assert_login = assert_login
|
||||
|
||||
resp, status = requests_.get_sales_prognosis_data(session, None, None)
|
||||
assert resp is not None
|
||||
assert len(resp.daten) == 0
|
||||
assert status.code == 2
|
||||
|
||||
@@ -8,6 +8,7 @@ from unittest.mock import patch
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
import tomli_w
|
||||
|
||||
import delta_barth.session
|
||||
from delta_barth.api.requests import SalesPrognosisResponse
|
||||
@@ -33,6 +34,28 @@ def api_base_url(credentials) -> str:
|
||||
return credentials["base_url"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pth_dummy_cfg() -> Path:
|
||||
pwd = Path.cwd()
|
||||
assert "barth" in pwd.parent.name.lower(), "not in project root directory"
|
||||
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
|
||||
assert data_pth.exists(), "file to dummy CFG not found"
|
||||
return data_pth
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path:
|
||||
with open(pth_dummy_cfg, "rb") as file:
|
||||
cfg_data = tomllib.load(file)
|
||||
|
||||
target = tmp_path / "dummy_cfg.toml"
|
||||
target.touch()
|
||||
with open(target, "wb") as file:
|
||||
tomli_w.dump(cfg_data, file)
|
||||
|
||||
return target
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def sales_data_real() -> pd.DataFrame:
|
||||
pwd = Path.cwd()
|
||||
@@ -95,7 +118,7 @@ def mock_put():
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_get():
|
||||
with patch("requests.get") as mock:
|
||||
yield mock
|
||||
|
||||
40
tests/test_config.py
Normal file
40
tests/test_config.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import tomllib
|
||||
|
||||
import tomli_w
|
||||
|
||||
from delta_barth import config
|
||||
|
||||
|
||||
def test_CfgLoader_Init(pth_cfg):
|
||||
loader = config.LazyCfgLoader(pth_cfg)
|
||||
|
||||
assert loader.path == pth_cfg
|
||||
assert loader._cfg is None
|
||||
|
||||
|
||||
def test_CfgLoader_Get(pth_cfg):
|
||||
loader = config.LazyCfgLoader(pth_cfg)
|
||||
|
||||
parsed_cfg = loader.get()
|
||||
assert isinstance(parsed_cfg, config.Config)
|
||||
assert parsed_cfg.forecast.threshold_month_data_points == 28
|
||||
|
||||
|
||||
def test_CfgLoader_Reload(pth_cfg):
|
||||
loader = config.LazyCfgLoader(pth_cfg)
|
||||
|
||||
parsed_cfg = loader.get()
|
||||
assert isinstance(parsed_cfg, config.Config)
|
||||
assert parsed_cfg.forecast.threshold_month_data_points == 28
|
||||
# modify config and reload
|
||||
with open(pth_cfg, "rb") as file:
|
||||
cfg_data = tomllib.load(file)
|
||||
cfg_data["forecast"]["threshold_month_data_points"] = 30
|
||||
with open(pth_cfg, "wb") as file:
|
||||
tomli_w.dump(cfg_data, file)
|
||||
|
||||
assert parsed_cfg.forecast.threshold_month_data_points == 28
|
||||
loader.reload()
|
||||
parsed_cfg = loader.get()
|
||||
assert isinstance(parsed_cfg, config.Config)
|
||||
assert parsed_cfg.forecast.threshold_month_data_points == 30
|
||||
@@ -1,21 +1,60 @@
|
||||
import importlib
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import sqlalchemy as sql
|
||||
|
||||
import delta_barth.pipelines
|
||||
from delta_barth import databases as db
|
||||
from delta_barth import pipelines as pl
|
||||
from delta_barth.errors import STATUS_HANDLER
|
||||
|
||||
|
||||
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
||||
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
||||
with patch(
|
||||
def test_write_performance_metrics_Success(session):
|
||||
pipe_name = "test_pipe"
|
||||
t_start = 20_000_000_000
|
||||
t_end = 30_000_000_000
|
||||
|
||||
with patch("delta_barth.pipelines.SESSION", session):
|
||||
metrics = pl._write_performance_metrics(
|
||||
pipeline_name=pipe_name,
|
||||
time_start=t_start,
|
||||
time_end=t_end,
|
||||
)
|
||||
assert metrics["pipeline_name"] == pipe_name
|
||||
assert metrics["execution_duration"] == 10
|
||||
|
||||
with session.db_engine.begin() as con:
|
||||
ret = con.execute(sql.select(db.perf_meas))
|
||||
|
||||
metrics = ret.all()[-1]
|
||||
assert metrics.pipeline_name == pipe_name
|
||||
assert metrics.execution_duration == 10
|
||||
|
||||
|
||||
def test_write_performance_metrics_FailStartingTime(session):
|
||||
pipe_name = "test_pipe"
|
||||
t_start = 30_000_000_000
|
||||
t_end = 20_000_000_000
|
||||
|
||||
with patch("delta_barth.pipelines.SESSION", session):
|
||||
with pytest.raises(ValueError):
|
||||
_ = pl._write_performance_metrics(
|
||||
pipeline_name=pipe_name,
|
||||
time_start=t_start,
|
||||
time_end=t_end,
|
||||
)
|
||||
|
||||
|
||||
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
|
||||
with (
|
||||
patch(
|
||||
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
||||
) as mock:
|
||||
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
||||
importlib.reload(delta_barth.pipelines)
|
||||
) as get_mock,
|
||||
patch("delta_barth.pipelines.SESSION", session),
|
||||
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
|
||||
):
|
||||
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
||||
sess_mock.cfg.forecast.threshold_month_data_points = 1
|
||||
json_export = pl.pipeline_sales_forecast(None, None)
|
||||
|
||||
assert isinstance(json_export, str)
|
||||
@@ -27,8 +66,16 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
||||
assert "code" in parsed_resp["status"]
|
||||
assert parsed_resp["status"]["code"] == 0
|
||||
|
||||
with session.db_engine.begin() as con:
|
||||
ret = con.execute(sql.select(db.perf_meas))
|
||||
|
||||
def test_sales_prognosis_pipeline_dummy():
|
||||
metrics = ret.all()[-1]
|
||||
assert metrics.pipeline_name == "sales_forecast"
|
||||
assert metrics.execution_duration > 0
|
||||
|
||||
|
||||
def test_sales_prognosis_pipeline_dummy(session):
|
||||
with patch("delta_barth.pipelines.SESSION", session):
|
||||
json_export = pl.pipeline_sales_forecast_dummy(None, None)
|
||||
|
||||
assert isinstance(json_export, str)
|
||||
@@ -43,3 +90,10 @@ def test_sales_prognosis_pipeline_dummy():
|
||||
assert entry["vorhersage"] == pytest.approx(47261.058594)
|
||||
assert "code" in parsed_resp["status"]
|
||||
assert parsed_resp["status"]["code"] == 0
|
||||
|
||||
with session.db_engine.begin() as con:
|
||||
ret = con.execute(sql.select(db.perf_meas))
|
||||
|
||||
metrics = ret.all()[-1]
|
||||
assert metrics.pipeline_name == "sales_forecast_dummy"
|
||||
assert metrics.execution_duration > 0
|
||||
|
||||
@@ -62,8 +62,27 @@ def test_session_setup_db_management(tmp_path):
|
||||
assert db_path.exists()
|
||||
|
||||
|
||||
def test_session_setup_config(tmp_path, pth_cfg):
|
||||
str_path = str(tmp_path)
|
||||
foldername: str = "cfg_test"
|
||||
target_cfg_dir = tmp_path / foldername
|
||||
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
|
||||
session.set_data_path(str_path)
|
||||
cfg_path = session.cfg_path
|
||||
assert cfg_path.parent.exists()
|
||||
assert cfg_path.parent == target_cfg_dir
|
||||
assert not cfg_path.exists()
|
||||
session.setup()
|
||||
cfg_path2 = session.cfg_path
|
||||
assert cfg_path2 == cfg_path
|
||||
assert session._cfg is not None
|
||||
assert cfg_path.exists()
|
||||
assert session.cfg.forecast.threshold_month_data_points == 28
|
||||
|
||||
|
||||
@patch("delta_barth.logging.ENABLE_LOGGING", True)
|
||||
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
|
||||
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
|
||||
def test_session_setup_logging(tmp_path):
|
||||
str_path = str(tmp_path)
|
||||
foldername: str = "logging_test"
|
||||
|
||||
Reference in New Issue
Block a user