18 Commits

Author SHA1 Message Date
58fd5bd921 bump version 2025-04-16 13:45:46 +02:00
c2757cca26 implement behaviour control by config via setup data path 2025-04-16 13:40:55 +02:00
c46c90f548 basic structure for lazy config loading 2025-04-16 12:23:49 +02:00
fc4d54dc4b add dep: tomli-w 2025-04-16 12:23:37 +02:00
5d53551923 update deps - dopt-basics 2025-04-16 11:56:41 +02:00
6a7f59116f remove unneeded pytest mark 2025-04-16 11:56:21 +02:00
063531a08e major overhaul of forecast pipeline (#21)
includes several aspects:

- harden forecast logic with additional error checks
- fix wrong behaviour
- ensure minimum data viability
- extrapolate for multiple data points into the future

fix #19

Co-authored-by: frasu
Reviewed-on: #21
Co-authored-by: foefl <f.foerster@d-opt.com>
Co-committed-by: foefl <f.foerster@d-opt.com>
2025-04-16 09:24:33 +00:00
6caa087efd re-enable logging 2025-04-10 11:12:57 +02:00
2d48be0009 update gitignore to exclude doc folders 2025-04-10 07:37:23 +02:00
fdb9812ecf add script to bump patch version 2025-04-10 07:13:35 +02:00
9f90aec324 bump version 2025-04-09 09:28:27 +02:00
dc848fd840 increase timeout timespan 2025-04-09 09:27:23 +02:00
a0d189ac9f add logging of pipeline metrics in database 2025-04-04 13:37:05 +02:00
6a418118d2 prepare metrics writing process 2025-04-03 16:05:46 +02:00
5d78fc9e02 added handling for API connectivity errors 2025-04-03 12:51:14 +02:00
b93b070682 adapt C# JSON type 2025-04-03 11:22:00 +02:00
30641103ec rework session management: interface to C# 2025-04-03 09:26:56 +02:00
d1d665e60a base structure for logging and logging management via session, closes #2 2025-03-28 11:31:27 +01:00
24 changed files with 700 additions and 144 deletions

1
.gitignore vendored
View File

@@ -3,6 +3,7 @@ prototypes/
data/ data/
reports/ reports/
*.code-workspace *.code-workspace
docs/
# credentials # credentials
CREDENTIALS* CREDENTIALS*

19
pdm.lock generated
View File

@@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"] groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:4931e32f8c146a72ad5b0a13c02485ea5ddc727de32fbe7c5e9314bbab05966c" content_hash = "sha256:545c39ef89d18d28a7bca4b08c93e6fb900c42612089300b867a4e0955acd6ab"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.11" requires_python = ">=3.11"
@@ -579,7 +579,7 @@ files = [
[[package]] [[package]]
name = "dopt-basics" name = "dopt-basics"
version = "0.1.2" version = "0.1.3"
requires_python = ">=3.11" requires_python = ">=3.11"
summary = "basic cross-project tools for Python-based d-opt projects" summary = "basic cross-project tools for Python-based d-opt projects"
groups = ["default"] groups = ["default"]
@@ -587,8 +587,8 @@ dependencies = [
"tzdata>=2025.1", "tzdata>=2025.1",
] ]
files = [ files = [
{file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"}, {file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"},
{file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"}, {file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
] ]
[[package]] [[package]]
@@ -2414,6 +2414,17 @@ files = [
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"}, {file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
] ]
[[package]]
name = "tomli-w"
version = "1.2.0"
requires_python = ">=3.9"
summary = "A lil' TOML writer"
groups = ["dev"]
files = [
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
]
[[package]] [[package]]
name = "tomlkit" name = "tomlkit"
version = "0.13.2" version = "0.13.2"

View File

@@ -1,11 +1,11 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.0" version = "0.5.7"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
] ]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2", "SQLAlchemy>=2.0.39"] dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39"]
requires-python = ">=3.11" requires-python = ">=3.11"
readme = "README.md" readme = "README.md"
license = {text = "LicenseRef-Proprietary"} license = {text = "LicenseRef-Proprietary"}
@@ -44,7 +44,8 @@ filterwarnings = [
] ]
markers = [ markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')", "api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')", "new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
] ]
log_cli = true log_cli = true
@@ -73,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.0" current_version = "0.5.7"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.
@@ -145,6 +146,7 @@ dev = [
"pdoc3>=0.11.5", "pdoc3>=0.11.5",
"bump-my-version>=1.1.1", "bump-my-version>=1.1.1",
"nox>=2025.2.9", "nox>=2025.2.9",
"tomli-w>=1.2.0",
] ]
nb = [ nb = [
"jupyterlab>=4.3.5", "jupyterlab>=4.3.5",

2
scripts/bump_patch.ps1 Normal file
View File

@@ -0,0 +1,2 @@
pdm run bump-my-version bump patch
pdm run bump-my-version show current_version

View File

@@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
def status_err() -> str: def status_err() -> str:
status = Status(code=102, description="internal error occurred", message="caused by test") status = Status(
code=102,
description="internal error occurred: 'Limit-Überschreitung'",
message="caused by test",
)
return status.model_dump_json() return status.model_dump_json()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import copy
import datetime import datetime
import math import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
@@ -11,6 +12,9 @@ import numpy as np
import pandas as pd import pandas as pd
import scipy.stats import scipy.stats
import sqlalchemy as sql import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
@@ -26,9 +30,9 @@ from delta_barth.api.requests import (
) )
from delta_barth.constants import ( from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS, SALES_MIN_NUM_DATAPOINTS,
) )
from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.errors import STATUS_HANDLER, wrap_result
@@ -110,7 +114,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data) return _parse_df_to_results(data)
@wrap_result() @wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped( def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics, stats: SalesForecastStatistics,
) -> None: ) -> None:
@@ -182,16 +186,14 @@ def _process_sales(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)] df_filter = data[(data["betrag"] > 0)]
df_cust = df_firma.copy() df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust) len_ds = len(df_cust)
@@ -205,7 +207,26 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = ( monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
) )
@@ -214,13 +235,17 @@ def _process_sales(
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min()) last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search # Randomized Search
kfold = KFold(n_splits=5, shuffle=True) kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = { params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000), "n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05], "learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9), "max_depth": range(2, 9),
"min_child_weight": range(1, 5), "min_child_weight": range(1, 5),
@@ -230,26 +255,40 @@ def _process_sales(
"early_stopping_rounds": [20, 50], "early_stopping_rounds": [20, 50],
} }
best_estimator = None
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf") best_score_mae: float | None = float("inf")
best_score_r2: float | None = None best_score_r2: float | None = None
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1): dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast( train = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore monthly_sum.loc[first_date:split_date].copy(), # type: ignore
) )
test = cast( test = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore monthly_sum.loc[split_date:].copy(), # type: ignore
) )
X_train, X_test = train[features], test[features] X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target] y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): # test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False too_few_month_points = False
rand = RandomizedSearchCV( rand = RandomizedSearchCV(
@@ -272,13 +311,21 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred)) best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year # --- new: use first_date for best_start_year
print("executed") best_start_year = first_date.year
forecast = test.copy() # --- new: store best_estimator
forecast.loc[:, "vorhersage"] = y_pred best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points: if too_few_month_points:
@@ -294,7 +341,9 @@ def _process_sales(
pipe.stats(stats) pipe.stats(stats)
return pipe return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned" assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status) pipe.success(forecast, status)
stats = SalesForecastStatistics( stats = SalesForecastStatistics(
@@ -353,6 +402,7 @@ def pipeline_sales_forecast(
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data( response, status = get_sales_prognosis_data(
session, session,
company_id=company_id, company_id=company_id,
@@ -382,7 +432,7 @@ def pipeline_sales_forecast(
pipe = _process_sales( pipe = _process_sales(
pipe, pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS, min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS, base_num_data_points_months=SESSION.cfg.forecast.threshold_month_data_points,
) )
if pipe.statistics is not None: if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics) res = _write_sales_forecast_stats_wrapped(pipe.statistics)
@@ -413,6 +463,8 @@ def pipeline_sales_forecast(
assert pipe.results is not None, "needed export response not set in pipeline" assert pipe.results is not None, "needed export response not set in pipeline"
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
return pipe.results return pipe.results
@@ -422,6 +474,9 @@ def pipeline_sales_dummy(
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
"""prototype dummy function for tests by DelBar""" """prototype dummy function for tests by DelBar"""
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
_, _, _ = session, company_id, start_date _, _, _ = session, company_id, start_date
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
@@ -434,6 +489,8 @@ def pipeline_sales_dummy(
pipe.fail(res.status) pipe.fail(res.status)
return _export_on_fail(res.status) return _export_on_fail(res.status)
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
return SalesPrognosisResultsExport( return SalesPrognosisResultsExport(
response=res.unwrap(), response=res.unwrap(),
status=res.status, status=res.status,

View File

@@ -7,6 +7,7 @@ import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
@@ -55,7 +56,7 @@ def get_sales_prognosis_data(
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]: ) -> tuple[SalesPrognosisResponse, Status]:
resp, status = session.assert_login() _, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple()) response = SalesPrognosisResponse(daten=tuple())
return response, status return response, status
@@ -67,11 +68,18 @@ def get_sales_prognosis_data(
FirmaId=company_id, FirmaId=company_id,
BuchungsDatum=start_date, BuchungsDatum=start_date,
) )
empty_response = SalesPrognosisResponse(daten=tuple())
try:
resp = requests.get( resp = requests.get(
URL, URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True), params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType] headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: SalesPrognosisResponse response: SalesPrognosisResponse
status: Status status: Status
@@ -79,7 +87,7 @@ def get_sales_prognosis_data(
response = SalesPrognosisResponse(**resp.json()) response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
else: else:
response = SalesPrognosisResponse(daten=tuple()) response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

43
src/delta_barth/config.py Normal file
View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from pathlib import Path
import dopt_basics.configs
from pydantic import BaseModel
class Config(BaseModel):
forecast: CfgForecast
class CfgForecast(BaseModel):
threshold_month_data_points: int
class LazyCfgLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
cfg_path = cfg_path.resolve()
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
self._path = cfg_path
self._cfg: Config | None = None
@property
def path(self) -> Path:
return self._path
def _load(self) -> Config:
cfg = dopt_basics.configs.load_toml(self.path)
return Config(**cfg)
def reload(self) -> None:
self._cfg = self._load()
def get(self) -> Config:
if self._cfg is None:
self._cfg = self._load()
return self._cfg

View File

@@ -5,6 +5,7 @@ from typing import Final
from delta_barth.types import DualDict, HttpContentHeaders from delta_barth.types import DualDict, HttpContentHeaders
# ** config # ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
# ** lib path # ** lib path
lib_path = Path(__file__).parent lib_path = Path(__file__).parent
@@ -15,9 +16,9 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** logging # ** logging
ENABLE_LOGGING: Final[bool] = False ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log" LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases # ** databases
@@ -25,6 +26,7 @@ DB_ECHO: Final[bool] = True
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400 DEFAULT_API_ERR_CODE: Final[int] = 400
@@ -38,6 +40,8 @@ class KnownDelBarApiErrorCodes(enum.Enum):
COMMON = frozenset((400, 401, 409, 500)) COMMON = frozenset((400, 401, 409, 500))
# ** API
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
@@ -60,4 +64,6 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
# ** Pipelines # ** Pipelines
# ** Forecast # ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36 SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36 # !! now in config
# TODO remove later till proven stable
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement", "performance_measurement",
metadata, metadata,
sql.Column("id", sql.Integer, primary_key=True), sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)), sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
) )
# ** ---- forecasts # ** ---- forecasts
sf_stats = sql.Table( sf_stats = sql.Table(

View File

@@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@@ -6,7 +6,7 @@ from functools import wraps
from typing import Any, Final from typing import Any, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.logging import logger_wrapped_results as logger from delta_barth.logging import logger_status, logger_wrapped_results
from delta_barth.types import DataPipeStates, Status from delta_barth.types import DataPipeStates, Status
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
@@ -53,9 +53,19 @@ class UApiError(Exception):
## ** internal error handling ## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"), (
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"), "CONNECTION_TIMEOUT",
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"), 1,
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
),
(
"CONNECTION_ERROR",
2,
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
),
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
) )
@@ -151,23 +161,32 @@ class StatusHandler:
state: Status, state: Status,
) -> None: ) -> None:
if state == self.SUCCESS: if state == self.SUCCESS:
logger_status.info(
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
)
return return
code = state.code code = state.code
descr = state.description descr = state.description
msg = state.message msg = state.message
exc: Exception
if code < DEFAULT_INTERNAL_ERR_CODE: if code < DEFAULT_INTERNAL_ERR_CODE:
raise _construct_exception(UDataProcessingError, descr, msg) exc = _construct_exception(UDataProcessingError, descr, msg)
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE: elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
raise _construct_exception(UInternalError, descr, msg) exc = _construct_exception(UInternalError, descr, msg)
else: else:
api_err = state.api_server_error api_err = state.api_server_error
assert api_err is not None, ( assert api_err is not None, (
"error code inidcated API error, but no error instance found" "error code inidcated API error, but no error instance found"
) )
add_info = api_err.model_dump(exclude_none=True) add_info = api_err.model_dump(exclude_none=True)
raise _construct_exception(UApiError, descr, msg, add_info) exc = _construct_exception(UApiError, descr, msg, add_info)
logger_status.error(
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
)
raise exc
STATUS_HANDLER: Final[StatusHandler] = StatusHandler() STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
@@ -229,24 +248,24 @@ def wrap_result(
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]: def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
@wraps(func) @wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]: def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
status: ResultWrapper[T] wrapped_result: ResultWrapper[T]
try: try:
res = func(*args, **kwargs) res = func(*args, **kwargs)
status = ResultWrapper( wrapped_result = ResultWrapper(
result=res, exception=None, code_on_error=code_on_error result=res, exception=None, code_on_error=code_on_error
) )
except Exception as err: except Exception as err:
status = ResultWrapper( wrapped_result = ResultWrapper(
result=NotSet(), exception=err, code_on_error=code_on_error result=NotSet(), exception=err, code_on_error=code_on_error
) )
logger.error( logger_wrapped_results.info(
"An exception in routine %s occurred - msg: %s, stack trace:", "[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__, func.__name__,
str(err), str(err),
stack_info=True, stack_info=True,
) )
return status return wrapped_result
return wrapper return wrapper

View File

@@ -17,21 +17,24 @@ from delta_barth.constants import (
logging.Formatter.converter = gmtime logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s" LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000" LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
# if not LOG_FILE_FOLDER.exists():
# LOG_FILE_FOLDER.mkdir(parents=True)
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
# ** handlers
NULL_HANDLER = logging.NullHandler()
# ** formatters
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** loggers and configuration # ** loggers and configuration
logger_all = logging.getLogger("delta_barth")
# logger_all.addHandler(logger_all_handler_stderr) logger_base = logging.getLogger("delta_barth")
# logger_all.addHandler(logger_all_handler_file) logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session") logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG) logger_session.setLevel(logging.DEBUG)
logger_config = logging.getLogger("delta_barth.config")
logger_config.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results") logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
logger_wrapped_results.setLevel(logging.DEBUG) logger_wrapped_results.setLevel(logging.DEBUG)
logger_pipelines = logging.getLogger("delta_barth.pipelines") logger_pipelines = logging.getLogger("delta_barth.pipelines")
@@ -43,18 +46,15 @@ logger_db.setLevel(logging.DEBUG)
def setup_logging( def setup_logging(
logging_dir: Path, logging_dir: Path,
) -> None: ) -> None:
# ** formatters
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** handlers # ** handlers
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
null_handler = logging.NullHandler()
if ENABLE_LOGGING and LOGGING_TO_STDERR: if ENABLE_LOGGING and LOGGING_TO_STDERR:
logger_all_handler_stderr = logging.StreamHandler() logger_all_handler_stderr = logging.StreamHandler()
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR) logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
logger_all_handler_stderr.setFormatter(logger_all_formater) logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_stderr = null_handler logger_all_handler_stderr = NULL_HANDLER
if ENABLE_LOGGING and LOGGING_TO_FILE: if ENABLE_LOGGING and LOGGING_TO_FILE:
logger_all_handler_file = logging.handlers.RotatingFileHandler( logger_all_handler_file = logging.handlers.RotatingFileHandler(
@@ -65,9 +65,17 @@ def setup_logging(
delay=True, delay=True,
) )
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE) logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
logger_all_handler_file.setFormatter(logger_all_formater) logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_file = null_handler logger_all_handler_file = NULL_HANDLER
logger_all.addHandler(logger_all_handler_stderr) logger_base.addHandler(logger_all_handler_stderr)
logger_all.addHandler(logger_all_handler_file) logger_base.addHandler(logger_all_handler_file)
def disable_logging() -> None:
handlers = tuple(logger_base.handlers)
for handler in handlers:
logger_base.removeHandler(handler)
logger_base.addHandler(NULL_HANDLER)

View File

@@ -6,6 +6,7 @@ from __future__ import annotations
from typing import Final from typing import Final
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.logging import logger_session as logger
from delta_barth.session import Session from delta_barth.session import Session
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS) SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
@@ -13,9 +14,13 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup( def setup(
data_path: str, data_path: str,
base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
# at this point: no logging configured
SESSION.set_data_path(data_path) SESSION.set_data_path(data_path)
SESSION.set_base_url(base_url=base_url)
SESSION.setup() SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
def set_credentials( def set_credentials(
@@ -24,25 +29,33 @@ def set_credentials(
database: str, database: str,
mandant: str, mandant: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
SESSION.set_credentials( SESSION.set_credentials(
username=username, username=username,
password=password, password=password,
database=database, database=database,
mandant=mandant, mandant=mandant,
) )
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
# ** not part of external API, only internal
def get_credentials() -> str: # pragma: no cover def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds creds = SESSION.creds
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
return creds.model_dump_json() return creds.model_dump_json()
# ** legacy: not part of external API
def set_base_url( def set_base_url(
base_url: str, base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
SESSION.set_base_url(base_url=base_url) SESSION.set_base_url(base_url=base_url)
def get_data_path() -> str: # pragma: no cover
return str(SESSION.data_path)
def get_base_url() -> str: # pragma: no cover def get_base_url() -> str: # pragma: no cover
return SESSION.base_url return SESSION.base_url

View File

@@ -1,20 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#""" """collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast( result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date SESSION, company_id=company_id, start_date=start_date
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export
@@ -23,11 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy( result = forecast.pipeline_sales_dummy(
SESSION, SESSION,
company_id=company_id, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import shutil
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Final from typing import TYPE_CHECKING, Final
@@ -14,12 +15,19 @@ from delta_barth.api.common import (
LoginResponse, LoginResponse,
validate_credentials, validate_credentials,
) )
from delta_barth.constants import DB_ECHO from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
DB_ECHO,
LIB_PATH,
)
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.config import Config
from delta_barth.types import ApiCredentials, HttpContentHeaders from delta_barth.types import ApiCredentials, HttpContentHeaders
@@ -41,13 +49,19 @@ class Session:
base_headers: HttpContentHeaders, base_headers: HttpContentHeaders,
db_folder: str = "data", db_folder: str = "data",
logging_folder: str = "logs", logging_folder: str = "logs",
cfg_folder: str = "config",
) -> None: ) -> None:
self._setup: bool = False
self._data_path: Path | None = None self._data_path: Path | None = None
self._db_path: Path | None = None self._db_path: Path | None = None
self._db_folder = db_folder self._db_folder = db_folder
self._db_engine: sql.Engine | None = None self._db_engine: sql.Engine | None = None
self._logging_dir: Path | None = None self._logging_dir: Path | None = None
self._logging_folder = logging_folder self._logging_folder = logging_folder
self._cfg_path: Path | None = None
self._cfg_folder = cfg_folder
self._cfg_loader: LazyCfgLoader | None = None
self._cfg: Config | None = None
self._creds: ApiCredentials | None = None self._creds: ApiCredentials | None = None
self._base_url: str | None = None self._base_url: str | None = None
self._headers = base_headers self._headers = base_headers
@@ -55,14 +69,45 @@ class Session:
self._logged_in: bool = False self._logged_in: bool = False
def setup(self) -> None: def setup(self) -> None:
self._setup_db_management() # at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging() self._setup_logging()
self._setup_config()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@property @property
def data_path(self) -> Path: def data_path(self) -> Path:
assert self._data_path is not None, "accessed data path not set" assert self._data_path is not None, "accessed data path not set"
return self._data_path return self._data_path
@property
def cfg_path(self) -> Path:
if self._cfg_path is not None and self._setup:
return self._cfg_path
root = (self.data_path / self._cfg_folder).resolve()
cfg_path = root / CFG_FILENAME
if not root.exists():
root.mkdir(parents=False)
self._cfg_path = cfg_path
return self._cfg_path
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
return self._cfg
def _setup_config(self) -> None:
if not self.cfg_path.exists():
src_cfg = LIB_PATH / CFG_FILENAME
shutil.copyfile(src_cfg, self.cfg_path)
self._cfg_loader = LazyCfgLoader(self.cfg_path)
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
@property @property
def db_engine(self) -> sql.Engine: def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set" assert self._db_engine is not None, "accessed database engine not set"
@@ -70,19 +115,24 @@ class Session:
@property @property
def db_path(self) -> Path: def db_path(self) -> Path:
if self._db_path is not None: if self._db_path is not None and self._setup:
return self._db_path return self._db_path
db_root = (self.data_path / self._db_folder).resolve() root = (self.data_path / self._db_folder).resolve()
db_path = db_root / "dopt-data.db" db_path = root / "dopt-data.db"
if not db_root.exists(): if not root.exists():
db_root.mkdir(parents=False) root.mkdir(parents=False)
self._db_path = db_path self._db_path = db_path
return self._db_path return self._db_path
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
@property @property
def logging_dir(self) -> Path: def logging_dir(self) -> Path:
if self._logging_dir is not None: if self._logging_dir is not None and self._setup:
return self._logging_dir return self._logging_dir
logging_dir = self.data_path / self._logging_folder logging_dir = self.data_path / self._logging_folder
@@ -91,15 +141,13 @@ class Session:
self._logging_dir = logging_dir self._logging_dir = logging_dir
return self._logging_dir return self._logging_dir
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
def _setup_logging(self) -> None: def _setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir) delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging") logger.info("[SESSION] Successfully setup logging")
def disable_logging(self) -> None:
delta_barth.logging.disable_logging()
@property @property
def creds(self) -> ApiCredentials: def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set" assert self._creds is not None, "accessed credentials not set"
@@ -110,6 +158,7 @@ class Session:
path: str, path: str,
): ):
self._data_path = validate_path(path) self._data_path = validate_path(path)
self._setup = False
def set_credentials( def set_credentials(
self, self,
@@ -182,11 +231,18 @@ class Session:
databaseName=self.creds.database, databaseName=self.creds.database,
mandantName=self.creds.mandant, mandantName=self.creds.mandant,
) )
empty_response = LoginResponse(token="")
try:
resp = requests.put( resp = requests.put(
URL, URL,
login_req.model_dump_json(), login_req.model_dump_json(),
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -195,7 +251,7 @@ class Session:
status = STATUS_HANDLER.pipe_states.SUCCESS status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token) self._add_session_token(response.token)
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
@@ -207,12 +263,17 @@ class Session:
ROUTE: Final[str] = "user/logout" ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
try:
resp = requests.put( resp = requests.put(
URL, URL,
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response = None
status: Status status: Status
if resp.status_code == 200: if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
@@ -221,7 +282,7 @@ class Session:
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
return response, status return None, status
def assert_login( def assert_login(
self, self,
@@ -237,11 +298,18 @@ class Session:
ROUTE: Final[str] = "verkauf/umsatzprognosedaten" ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999} params: dict[str, int] = {"FirmaId": 999999}
empty_response = LoginResponse(token="")
try:
resp = requests.get( resp = requests.get(
URL, URL,
params=params, params=params,
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -252,7 +320,7 @@ class Session:
self._remove_session_token() self._remove_session_token()
response, status = self.login() response, status = self.login()
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
@dataclass(slots=True) @dataclass(slots=True)
class DataPipeStates: class DataPipeStates:
SUCCESS: Status SUCCESS: Status
CONNECTION_TIMEOUT: Status
CONNECTION_ERROR: Status
TOO_FEW_POINTS: Status TOO_FEW_POINTS: Status
TOO_FEW_MONTH_POINTS: Status TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status NO_RELIABLE_FORECAST: Status
@@ -139,7 +141,13 @@ class Statistics:
pass pass
# ** forecasts # ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)

View File

@@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@@ -1,3 +1,4 @@
import datetime
from datetime import datetime as Datetime from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
@@ -255,6 +256,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@@ -277,6 +279,7 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
@@ -303,6 +306,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@@ -329,8 +333,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy() # prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000 data["betrag"] = 10000
print(data["betrag"]) print(data["betrag"])
data = data.iloc[:20000, :] data = data.iloc[:20000, :]
@@ -340,7 +355,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
class Predictor: class Predictor:
def predict(self, *args, **kwargs): def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1]) return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor() self.best_estimator_ = Predictor()
@@ -354,7 +369,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales( pipe = fc._process_sales(
pipe, pipe,
min_num_data_points=1, min_num_data_points=1,
base_num_data_points_months=-100, base_num_data_points_months=1,
) )
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS
@@ -415,27 +430,16 @@ def test_export_on_fail():
assert res.status.description == status.description assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session): def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
with patch( with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock: ) as get_mock,
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
with patch("delta_barth.analysis.forecast.SESSION", session): ):
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(None) # type: ignore result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_FailDbWrite(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0 assert len(result.response.daten) > 0

View File

@@ -1,8 +1,10 @@
from datetime import datetime as Datetime from datetime import datetime as Datetime
import pytest import pytest
import requests
from delta_barth.api import requests as requests_ from delta_barth.api import requests as requests_
from delta_barth.api.common import LoginResponse
@pytest.mark.api_con_required @pytest.mark.api_con_required
@@ -94,3 +96,31 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
assert status.api_server_error.message == json["message"] assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"] assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 1
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 2

View File

@@ -3,11 +3,12 @@ from __future__ import annotations
import json import json
import tomllib import tomllib
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import cast
from unittest.mock import patch from unittest.mock import patch
import pandas as pd import pandas as pd
import pytest import pytest
import tomli_w
import delta_barth.session import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse from delta_barth.api.requests import SalesPrognosisResponse
@@ -33,6 +34,28 @@ def api_base_url(credentials) -> str:
return credentials["base_url"] return credentials["base_url"]
@pytest.fixture(scope="session")
def pth_dummy_cfg() -> Path:
pwd = Path.cwd()
assert "barth" in pwd.parent.name.lower(), "not in project root directory"
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
assert data_pth.exists(), "file to dummy CFG not found"
return data_pth
@pytest.fixture(scope="function")
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path:
with open(pth_dummy_cfg, "rb") as file:
cfg_data = tomllib.load(file)
target = tmp_path / "dummy_cfg.toml"
target.touch()
with open(target, "wb") as file:
tomli_w.dump(cfg_data, file)
return target
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def sales_data_real() -> pd.DataFrame: def sales_data_real() -> pd.DataFrame:
pwd = Path.cwd() pwd = Path.cwd()
@@ -95,7 +118,7 @@ def mock_put():
yield mock yield mock
@pytest.fixture @pytest.fixture(scope="function")
def mock_get(): def mock_get():
with patch("requests.get") as mock: with patch("requests.get") as mock:
yield mock yield mock

40
tests/test_config.py Normal file
View File

@@ -0,0 +1,40 @@
import tomllib
import tomli_w
from delta_barth import config
def test_CfgLoader_Init(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
assert loader.path == pth_cfg
assert loader._cfg is None
def test_CfgLoader_Get(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
def test_CfgLoader_Reload(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
# modify config and reload
with open(pth_cfg, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(pth_cfg, "wb") as file:
tomli_w.dump(cfg_data, file)
assert parsed_cfg.forecast.threshold_month_data_points == 28
loader.reload()
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 30

View File

@@ -1,21 +1,60 @@
import importlib
import json import json
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql
import delta_barth.pipelines from delta_barth import databases as db
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) def test_write_performance_metrics_Success(session):
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): pipe_name = "test_pipe"
with patch( t_start = 20_000_000_000
t_end = 30_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
metrics = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
def test_write_performance_metrics_FailStartingTime(session):
pipe_name = "test_pipe"
t_start = 30_000_000_000
t_end = 20_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock: ) as get_mock,
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS) patch("delta_barth.pipelines.SESSION", session),
importlib.reload(delta_barth.pipelines) patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(None, None) json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
@@ -27,8 +66,16 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
def test_sales_prognosis_pipeline_dummy(): metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast"
assert metrics.execution_duration > 0
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None) json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
@@ -43,3 +90,10 @@ def test_sales_prognosis_pipeline_dummy():
assert entry["vorhersage"] == pytest.approx(47261.058594) assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0

View File

@@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest import pytest
import delta_barth.session import delta_barth.session
from delta_barth import logging
from delta_barth.constants import ( from delta_barth.constants import (
DEFAULT_API_ERR_CODE, DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS, HTTP_BASE_CONTENT_HEADERS,
@@ -55,17 +56,62 @@ def test_session_setup_db_management(tmp_path):
assert db_path.parent == target_db_dir assert db_path.parent == target_db_dir
assert not db_path.exists() assert not db_path.exists()
session.setup() session.setup()
db_path2 = session.db_path
assert db_path2 == db_path
assert session._db_engine is not None assert session._db_engine is not None
assert db_path.exists() assert db_path.exists()
def test_session_setup_config(tmp_path, pth_cfg):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
cfg_path2 = session.cfg_path
assert cfg_path2 == cfg_path
assert session._cfg is not None
assert cfg_path.exists()
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.logging.ENABLE_LOGGING", True) @patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True) @patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path): def test_session_setup_logging(tmp_path):
str_path = str(tmp_path) str_path = str(tmp_path)
foldername: str = "logging_test" foldername: str = "logging_test"
target_log_dir = tmp_path / foldername target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
log_dir2 = session.logging_dir
assert log_dir2 == log_dir
assert target_file.exists()
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_disable_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session( session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
) )
@@ -78,6 +124,21 @@ def test_session_setup_logging(tmp_path):
assert not target_file.exists() assert not target_file.exists()
session.setup() # calls setup code for logging session.setup() # calls setup code for logging
assert target_file.exists() assert target_file.exists()
# provoke entry
msg = "this is a test"
logging.logger_base.critical(msg)
session.disable_logging()
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg in last_line.lower()
# log new entry which should not be added as logging is disabled
msg = "this is a second test"
logging.logger_base.critical(msg)
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg not in last_line.lower()
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url): def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):