Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0eb39deec5 | |||
| 8501f551b2 | |||
| da594fb5ba | |||
| e8f3a7aea8 | |||
| 8936f798ab | |||
| e1b375396a | |||
| 5d1f5199d3 | |||
| f49744ca45 | |||
| 2934326258 | |||
| 4ef8fc5e9d | |||
| 14c4efedf7 | |||
| 2055ee5c8b | |||
| 6caa087efd | |||
| 2d48be0009 | |||
| fdb9812ecf | |||
| 9f90aec324 | |||
| dc848fd840 | |||
| a0d189ac9f | |||
| 6a418118d2 | |||
| 5d78fc9e02 | |||
| b93b070682 | |||
| 30641103ec | |||
| d1d665e60a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,6 +3,7 @@ prototypes/
|
|||||||
data/
|
data/
|
||||||
reports/
|
reports/
|
||||||
*.code-workspace
|
*.code-workspace
|
||||||
|
docs/
|
||||||
|
|
||||||
# credentials
|
# credentials
|
||||||
CREDENTIALS*
|
CREDENTIALS*
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "delta-barth"
|
name = "delta-barth"
|
||||||
version = "0.5.0"
|
version = "0.5.7dev1"
|
||||||
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
||||||
@@ -73,7 +73,7 @@ directory = "reports/coverage"
|
|||||||
|
|
||||||
|
|
||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.5.0"
|
current_version = "0.5.7dev1"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
2
scripts/bump_patch.ps1
Normal file
2
scripts/bump_patch.ps1
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
pdm run bump-my-version bump patch
|
||||||
|
pdm run bump-my-version show current_version
|
||||||
@@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
|
|||||||
|
|
||||||
|
|
||||||
def status_err() -> str:
|
def status_err() -> str:
|
||||||
status = Status(code=102, description="internal error occurred", message="caused by test")
|
status = Status(
|
||||||
|
code=102,
|
||||||
|
description="internal error occurred: 'Limit-Überschreitung'",
|
||||||
|
message="caused by test",
|
||||||
|
)
|
||||||
return status.model_dump_json()
|
return status.model_dump_json()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import math
|
import math
|
||||||
from collections.abc import Mapping, Set
|
from collections.abc import Mapping, Set
|
||||||
@@ -7,10 +8,15 @@ from dataclasses import asdict
|
|||||||
from datetime import datetime as Datetime
|
from datetime import datetime as Datetime
|
||||||
from typing import TYPE_CHECKING, Final, TypeAlias, cast
|
from typing import TYPE_CHECKING, Final, TypeAlias, cast
|
||||||
|
|
||||||
|
import dopt_basics.datetime
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import scipy.stats
|
import scipy.stats
|
||||||
import sqlalchemy as sql
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
# --- new: for calculating timedelta
|
||||||
|
from dateutil.relativedelta import relativedelta
|
||||||
|
from dopt_basics.datetime import TimeUnitsTimedelta
|
||||||
from sklearn.metrics import mean_absolute_error, r2_score
|
from sklearn.metrics import mean_absolute_error, r2_score
|
||||||
from sklearn.model_selection import KFold, RandomizedSearchCV
|
from sklearn.model_selection import KFold, RandomizedSearchCV
|
||||||
from xgboost import XGBRegressor
|
from xgboost import XGBRegressor
|
||||||
@@ -26,6 +32,7 @@ from delta_barth.api.requests import (
|
|||||||
)
|
)
|
||||||
from delta_barth.constants import (
|
from delta_barth.constants import (
|
||||||
COL_MAP_SALES_PROGNOSIS,
|
COL_MAP_SALES_PROGNOSIS,
|
||||||
|
DEFAULT_DB_ERR_CODE,
|
||||||
DUMMY_DATA_PATH,
|
DUMMY_DATA_PATH,
|
||||||
FEATURES_SALES_PROGNOSIS,
|
FEATURES_SALES_PROGNOSIS,
|
||||||
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
SALES_BASE_NUM_DATAPOINTS_MONTHS,
|
||||||
@@ -110,7 +117,7 @@ def _parse_df_to_results_wrapped(
|
|||||||
return _parse_df_to_results(data)
|
return _parse_df_to_results(data)
|
||||||
|
|
||||||
|
|
||||||
@wrap_result()
|
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||||
def _write_sales_forecast_stats_wrapped(
|
def _write_sales_forecast_stats_wrapped(
|
||||||
stats: SalesForecastStatistics,
|
stats: SalesForecastStatistics,
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -182,16 +189,14 @@ def _process_sales(
|
|||||||
PipeResult
|
PipeResult
|
||||||
_description_
|
_description_
|
||||||
"""
|
"""
|
||||||
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
|
|
||||||
|
|
||||||
# filter data
|
# filter data
|
||||||
data = pipe.data
|
data = pipe.data
|
||||||
assert data is not None, "processing not existing pipe result"
|
assert data is not None, "processing not existing pipe result"
|
||||||
|
|
||||||
DATE_FEAT: Final[str] = "buchungs_datum"
|
DATE_FEAT: Final[str] = "buchungs_datum"
|
||||||
SALES_FEAT: Final[str] = "betrag"
|
SALES_FEAT: Final[str] = "betrag"
|
||||||
df_firma = data[(data["betrag"] > 0)]
|
df_filter = data[(data["betrag"] > 0)]
|
||||||
df_cust = df_firma.copy()
|
df_cust = df_filter.copy()
|
||||||
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
|
||||||
len_ds = len(df_cust)
|
len_ds = len(df_cust)
|
||||||
|
|
||||||
@@ -205,7 +210,18 @@ def _process_sales(
|
|||||||
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
|
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
|
||||||
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
|
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
|
||||||
|
|
||||||
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
current_year = datetime.now().year
|
||||||
|
current_month = datetime.now().month
|
||||||
|
years = range(df_cust["jahr"].min(), current_year + 1)
|
||||||
|
|
||||||
|
old_monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
|
||||||
|
|
||||||
|
all_month_year_combinations = pd.DataFrame(
|
||||||
|
[(year, month) for year in years for month in range(1, 13) if (year < current_year or (year == current_year and month <= current_month))], columns=["jahr", "monat"]
|
||||||
|
)
|
||||||
|
|
||||||
|
monthly_sum = pd.merge(all_month_year_combinations, old_monthly_sum, on=["jahr", "monat"], how="left")
|
||||||
|
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
|
||||||
monthly_sum[DATE_FEAT] = (
|
monthly_sum[DATE_FEAT] = (
|
||||||
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
|
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
|
||||||
)
|
)
|
||||||
@@ -214,13 +230,17 @@ def _process_sales(
|
|||||||
|
|
||||||
features = ["jahr", "monat"]
|
features = ["jahr", "monat"]
|
||||||
target = SALES_FEAT
|
target = SALES_FEAT
|
||||||
current_year = datetime.datetime.now().year
|
|
||||||
first_year = cast(int, df_cust["jahr"].min())
|
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
|
||||||
|
future_dates = pd.date_range(
|
||||||
|
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
|
||||||
|
)
|
||||||
|
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
|
||||||
|
|
||||||
# Randomized Search
|
# Randomized Search
|
||||||
kfold = KFold(n_splits=5, shuffle=True)
|
kfold = KFold(n_splits=5, shuffle=True)
|
||||||
params: ParamSearchXGBRegressor = {
|
params: ParamSearchXGBRegressor = {
|
||||||
"n_estimators": scipy.stats.poisson(mu=1000),
|
"n_estimators": scipy.stats.poisson(mu=100),
|
||||||
"learning_rate": [0.03, 0.04, 0.05],
|
"learning_rate": [0.03, 0.04, 0.05],
|
||||||
"max_depth": range(2, 9),
|
"max_depth": range(2, 9),
|
||||||
"min_child_weight": range(1, 5),
|
"min_child_weight": range(1, 5),
|
||||||
@@ -230,26 +250,68 @@ def _process_sales(
|
|||||||
"early_stopping_rounds": [20, 50],
|
"early_stopping_rounds": [20, 50],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
best_estimator = None
|
||||||
best_params: BestParametersXGBRegressor | None = None
|
best_params: BestParametersXGBRegressor | None = None
|
||||||
best_score_mae: float | None = float("inf")
|
best_score_mae: float | None = float("inf")
|
||||||
best_score_r2: float | None = None
|
best_score_r2: float | None = None
|
||||||
best_start_year: int | None = None
|
best_start_year: int | None = None
|
||||||
too_few_month_points: bool = True
|
too_few_month_points: bool = True
|
||||||
forecast: pd.DataFrame | None = None
|
|
||||||
|
stride = dopt_basics.datetime.timedelta_from_val(365, TimeUnitsTimedelta.DAYS)
|
||||||
|
dates = cast(pd.DatetimeIndex, monthly_sum.index)
|
||||||
|
min_date = dates.min()
|
||||||
|
|
||||||
|
# baseline: 3 years - 36 months
|
||||||
|
starting_date = datetime.datetime.now() - relativedelta(months=36)
|
||||||
|
|
||||||
|
def get_index_date(
|
||||||
|
dates: pd.DatetimeIndex,
|
||||||
|
starting_date: datetime.datetime | pd.Timestamp,
|
||||||
|
) -> tuple[pd.Timestamp, bool]:
|
||||||
|
target, succ = next(
|
||||||
|
((date, True) for date in dates if date >= starting_date), (dates[-1], False)
|
||||||
|
)
|
||||||
|
return target, succ
|
||||||
|
|
||||||
|
first_date, succ = get_index_date(dates, starting_date)
|
||||||
|
if not succ:
|
||||||
|
# !! return early
|
||||||
|
...
|
||||||
|
|
||||||
|
date_span = first_date - min_date
|
||||||
|
steps = date_span.days // stride.days
|
||||||
|
|
||||||
|
for step in range(steps + 1):
|
||||||
|
print("step: ", step)
|
||||||
|
target_date = first_date - step * stride
|
||||||
|
print("target date: ", target_date)
|
||||||
|
split_date = dates[-6]
|
||||||
|
|
||||||
|
index_date, succ = get_index_date(dates, target_date)
|
||||||
|
|
||||||
|
if not succ:
|
||||||
|
break
|
||||||
|
|
||||||
|
if index_date >= split_date:
|
||||||
|
print("Skip because of date difference")
|
||||||
|
continue
|
||||||
|
|
||||||
for start_year in range(current_year - 4, first_year - 1, -1):
|
|
||||||
train = cast(
|
train = cast(
|
||||||
pd.DataFrame,
|
pd.DataFrame,
|
||||||
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
|
monthly_sum.loc[index_date:split_date].copy(), # type: ignore
|
||||||
)
|
)
|
||||||
|
print(train)
|
||||||
|
print("Length train: ", len(train))
|
||||||
test = cast(
|
test = cast(
|
||||||
pd.DataFrame,
|
pd.DataFrame,
|
||||||
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
|
monthly_sum.loc[split_date:].copy(), # type: ignore
|
||||||
)
|
)
|
||||||
X_train, X_test = train[features], test[features]
|
X_train, X_test = train[features], test[features]
|
||||||
y_train, y_test = train[target], test[target]
|
y_train, y_test = train[target], test[target]
|
||||||
|
|
||||||
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
|
# test set size fixed at 6 --> first iteration: baseline - 6 entries
|
||||||
|
# for each new year 10 new data points (i.e., sales strictly positive) needed
|
||||||
|
if len(train[train[SALES_FEAT] > 0]) >= 30 + 10 * step:
|
||||||
too_few_month_points = False
|
too_few_month_points = False
|
||||||
|
|
||||||
rand = RandomizedSearchCV(
|
rand = RandomizedSearchCV(
|
||||||
@@ -272,13 +334,22 @@ def _process_sales(
|
|||||||
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
|
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
|
||||||
best_score_mae = error
|
best_score_mae = error
|
||||||
best_score_r2 = cast(float, r2_score(y_test, y_pred))
|
best_score_r2 = cast(float, r2_score(y_test, y_pred))
|
||||||
best_start_year = start_year
|
# --- new: use target_date for best_start_year
|
||||||
print("executed")
|
best_start_year = target_date.year
|
||||||
forecast = test.copy()
|
# --- new: store best_estimator
|
||||||
forecast.loc[:, "vorhersage"] = y_pred
|
best_estimator = copy.copy(rand.best_estimator_)
|
||||||
|
|
||||||
|
# ?? --- new: use best_estimator to calculate future values and store them in forecast
|
||||||
|
if best_estimator is not None:
|
||||||
|
X_future = pd.DataFrame(
|
||||||
|
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
|
||||||
|
)
|
||||||
|
y_future = best_estimator.predict(X_future) # type: ignore
|
||||||
|
forecast["vorhersage"] = y_future
|
||||||
|
forecast["jahr"] = forecast.index.year # type: ignore
|
||||||
|
forecast["monat"] = forecast.index.month # type: ignore
|
||||||
|
forecast = forecast.reset_index(drop=True)
|
||||||
|
|
||||||
if forecast is not None:
|
|
||||||
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
|
|
||||||
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
|
||||||
|
|
||||||
if too_few_month_points:
|
if too_few_month_points:
|
||||||
@@ -294,7 +365,9 @@ def _process_sales(
|
|||||||
pipe.stats(stats)
|
pipe.stats(stats)
|
||||||
return pipe
|
return pipe
|
||||||
|
|
||||||
assert forecast is not None, "forecast is None, but was attempted to be returned"
|
assert "vorhersage" in forecast.columns, (
|
||||||
|
"forecast does not contain prognosis values, but was attempted to be returned"
|
||||||
|
)
|
||||||
status = STATUS_HANDLER.SUCCESS
|
status = STATUS_HANDLER.SUCCESS
|
||||||
pipe.success(forecast, status)
|
pipe.success(forecast, status)
|
||||||
stats = SalesForecastStatistics(
|
stats = SalesForecastStatistics(
|
||||||
@@ -353,6 +426,7 @@ def pipeline_sales_forecast(
|
|||||||
company_id: int | None = None,
|
company_id: int | None = None,
|
||||||
start_date: Datetime | None = None,
|
start_date: Datetime | None = None,
|
||||||
) -> SalesPrognosisResultsExport:
|
) -> SalesPrognosisResultsExport:
|
||||||
|
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
|
||||||
response, status = get_sales_prognosis_data(
|
response, status = get_sales_prognosis_data(
|
||||||
session,
|
session,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
@@ -413,6 +487,8 @@ def pipeline_sales_forecast(
|
|||||||
|
|
||||||
assert pipe.results is not None, "needed export response not set in pipeline"
|
assert pipe.results is not None, "needed export response not set in pipeline"
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
|
||||||
|
|
||||||
return pipe.results
|
return pipe.results
|
||||||
|
|
||||||
|
|
||||||
@@ -422,6 +498,9 @@ def pipeline_sales_dummy(
|
|||||||
start_date: Datetime | None = None,
|
start_date: Datetime | None = None,
|
||||||
) -> SalesPrognosisResultsExport:
|
) -> SalesPrognosisResultsExport:
|
||||||
"""prototype dummy function for tests by DelBar"""
|
"""prototype dummy function for tests by DelBar"""
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
|
||||||
|
|
||||||
_, _, _ = session, company_id, start_date
|
_, _, _ = session, company_id, start_date
|
||||||
|
|
||||||
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
||||||
@@ -434,6 +513,8 @@ def pipeline_sales_dummy(
|
|||||||
pipe.fail(res.status)
|
pipe.fail(res.status)
|
||||||
return _export_on_fail(res.status)
|
return _export_on_fail(res.status)
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
|
||||||
|
|
||||||
return SalesPrognosisResultsExport(
|
return SalesPrognosisResultsExport(
|
||||||
response=res.unwrap(),
|
response=res.unwrap(),
|
||||||
status=res.status,
|
status=res.status,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import requests
|
|||||||
from dopt_basics.io import combine_route
|
from dopt_basics.io import combine_route
|
||||||
from pydantic import BaseModel, PositiveInt, SkipValidation
|
from pydantic import BaseModel, PositiveInt, SkipValidation
|
||||||
|
|
||||||
|
from delta_barth.constants import API_CON_TIMEOUT
|
||||||
from delta_barth.errors import STATUS_HANDLER
|
from delta_barth.errors import STATUS_HANDLER
|
||||||
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
|
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
|
||||||
|
|
||||||
@@ -55,7 +56,7 @@ def get_sales_prognosis_data(
|
|||||||
company_id: int | None = None,
|
company_id: int | None = None,
|
||||||
start_date: Datetime | None = None,
|
start_date: Datetime | None = None,
|
||||||
) -> tuple[SalesPrognosisResponse, Status]:
|
) -> tuple[SalesPrognosisResponse, Status]:
|
||||||
resp, status = session.assert_login()
|
_, status = session.assert_login()
|
||||||
if status != STATUS_HANDLER.SUCCESS:
|
if status != STATUS_HANDLER.SUCCESS:
|
||||||
response = SalesPrognosisResponse(daten=tuple())
|
response = SalesPrognosisResponse(daten=tuple())
|
||||||
return response, status
|
return response, status
|
||||||
@@ -67,11 +68,18 @@ def get_sales_prognosis_data(
|
|||||||
FirmaId=company_id,
|
FirmaId=company_id,
|
||||||
BuchungsDatum=start_date,
|
BuchungsDatum=start_date,
|
||||||
)
|
)
|
||||||
resp = requests.get(
|
empty_response = SalesPrognosisResponse(daten=tuple())
|
||||||
URL,
|
try:
|
||||||
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
|
resp = requests.get(
|
||||||
headers=session.headers, # type: ignore[argumentType]
|
URL,
|
||||||
)
|
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
|
||||||
|
headers=session.headers, # type: ignore[argumentType]
|
||||||
|
timeout=API_CON_TIMEOUT,
|
||||||
|
)
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||||
|
except requests.exceptions.RequestException:
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||||
|
|
||||||
response: SalesPrognosisResponse
|
response: SalesPrognosisResponse
|
||||||
status: Status
|
status: Status
|
||||||
@@ -79,7 +87,7 @@ def get_sales_prognosis_data(
|
|||||||
response = SalesPrognosisResponse(**resp.json())
|
response = SalesPrognosisResponse(**resp.json())
|
||||||
status = STATUS_HANDLER.SUCCESS
|
status = STATUS_HANDLER.SUCCESS
|
||||||
else:
|
else:
|
||||||
response = SalesPrognosisResponse(daten=tuple())
|
response = empty_response
|
||||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||||
status = STATUS_HANDLER.api_error(err)
|
status = STATUS_HANDLER.api_error(err)
|
||||||
|
|
||||||
|
|||||||
@@ -15,9 +15,9 @@ assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
|
|||||||
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
|
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
|
||||||
|
|
||||||
# ** logging
|
# ** logging
|
||||||
ENABLE_LOGGING: Final[bool] = False
|
ENABLE_LOGGING: Final[bool] = True
|
||||||
LOGGING_TO_FILE: Final[bool] = True
|
LOGGING_TO_FILE: Final[bool] = True
|
||||||
LOGGING_TO_STDERR: Final[bool] = True
|
LOGGING_TO_STDERR: Final[bool] = False
|
||||||
LOG_FILENAME: Final[str] = "dopt-delbar.log"
|
LOG_FILENAME: Final[str] = "dopt-delbar.log"
|
||||||
|
|
||||||
# ** databases
|
# ** databases
|
||||||
@@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
|
|||||||
|
|
||||||
# ** error handling
|
# ** error handling
|
||||||
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
|
||||||
|
DEFAULT_DB_ERR_CODE: Final[int] = 150
|
||||||
DEFAULT_API_ERR_CODE: Final[int] = 400
|
DEFAULT_API_ERR_CODE: Final[int] = 400
|
||||||
|
|
||||||
|
|
||||||
@@ -38,6 +39,8 @@ class KnownDelBarApiErrorCodes(enum.Enum):
|
|||||||
COMMON = frozenset((400, 401, 409, 500))
|
COMMON = frozenset((400, 401, 409, 500))
|
||||||
|
|
||||||
|
|
||||||
|
# ** API
|
||||||
|
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
|
||||||
# ** API response parsing
|
# ** API response parsing
|
||||||
# ** column mapping [API-Response --> Target-Features]
|
# ** column mapping [API-Response --> Target-Features]
|
||||||
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
|
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ perf_meas = sql.Table(
|
|||||||
"performance_measurement",
|
"performance_measurement",
|
||||||
metadata,
|
metadata,
|
||||||
sql.Column("id", sql.Integer, primary_key=True),
|
sql.Column("id", sql.Integer, primary_key=True),
|
||||||
sql.Column("execution_duration", sql.Float),
|
|
||||||
sql.Column("pipeline_name", sql.String(length=30)),
|
sql.Column("pipeline_name", sql.String(length=30)),
|
||||||
|
sql.Column("execution_duration", sql.Float),
|
||||||
)
|
)
|
||||||
# ** ---- forecasts
|
# ** ---- forecasts
|
||||||
sf_stats = sql.Table(
|
sf_stats = sql.Table(
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from functools import wraps
|
|||||||
from typing import Any, Final
|
from typing import Any, Final
|
||||||
|
|
||||||
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
|
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
|
||||||
from delta_barth.logging import logger_wrapped_results as logger
|
from delta_barth.logging import logger_status, logger_wrapped_results
|
||||||
from delta_barth.types import DataPipeStates, Status
|
from delta_barth.types import DataPipeStates, Status
|
||||||
|
|
||||||
if t.TYPE_CHECKING:
|
if t.TYPE_CHECKING:
|
||||||
@@ -53,9 +53,19 @@ class UApiError(Exception):
|
|||||||
## ** internal error handling
|
## ** internal error handling
|
||||||
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
|
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
|
||||||
("SUCCESS", 0, "Erfolg"),
|
("SUCCESS", 0, "Erfolg"),
|
||||||
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
|
(
|
||||||
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
|
"CONNECTION_TIMEOUT",
|
||||||
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
|
1,
|
||||||
|
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"CONNECTION_ERROR",
|
||||||
|
2,
|
||||||
|
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
|
||||||
|
),
|
||||||
|
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
|
||||||
|
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
|
||||||
|
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -151,23 +161,32 @@ class StatusHandler:
|
|||||||
state: Status,
|
state: Status,
|
||||||
) -> None:
|
) -> None:
|
||||||
if state == self.SUCCESS:
|
if state == self.SUCCESS:
|
||||||
|
logger_status.info(
|
||||||
|
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
code = state.code
|
code = state.code
|
||||||
descr = state.description
|
descr = state.description
|
||||||
msg = state.message
|
msg = state.message
|
||||||
|
|
||||||
|
exc: Exception
|
||||||
if code < DEFAULT_INTERNAL_ERR_CODE:
|
if code < DEFAULT_INTERNAL_ERR_CODE:
|
||||||
raise _construct_exception(UDataProcessingError, descr, msg)
|
exc = _construct_exception(UDataProcessingError, descr, msg)
|
||||||
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
|
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
|
||||||
raise _construct_exception(UInternalError, descr, msg)
|
exc = _construct_exception(UInternalError, descr, msg)
|
||||||
else:
|
else:
|
||||||
api_err = state.api_server_error
|
api_err = state.api_server_error
|
||||||
assert api_err is not None, (
|
assert api_err is not None, (
|
||||||
"error code inidcated API error, but no error instance found"
|
"error code inidcated API error, but no error instance found"
|
||||||
)
|
)
|
||||||
add_info = api_err.model_dump(exclude_none=True)
|
add_info = api_err.model_dump(exclude_none=True)
|
||||||
raise _construct_exception(UApiError, descr, msg, add_info)
|
exc = _construct_exception(UApiError, descr, msg, add_info)
|
||||||
|
|
||||||
|
logger_status.error(
|
||||||
|
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
|
||||||
|
)
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
|
||||||
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
|
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
|
||||||
@@ -229,24 +248,24 @@ def wrap_result(
|
|||||||
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
|
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
|
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
|
||||||
status: ResultWrapper[T]
|
wrapped_result: ResultWrapper[T]
|
||||||
try:
|
try:
|
||||||
res = func(*args, **kwargs)
|
res = func(*args, **kwargs)
|
||||||
status = ResultWrapper(
|
wrapped_result = ResultWrapper(
|
||||||
result=res, exception=None, code_on_error=code_on_error
|
result=res, exception=None, code_on_error=code_on_error
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
status = ResultWrapper(
|
wrapped_result = ResultWrapper(
|
||||||
result=NotSet(), exception=err, code_on_error=code_on_error
|
result=NotSet(), exception=err, code_on_error=code_on_error
|
||||||
)
|
)
|
||||||
logger.error(
|
logger_wrapped_results.info(
|
||||||
"An exception in routine %s occurred - msg: %s, stack trace:",
|
"[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
|
||||||
func.__name__,
|
func.__name__,
|
||||||
str(err),
|
str(err),
|
||||||
stack_info=True,
|
stack_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
return status
|
return wrapped_result
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|||||||
@@ -17,21 +17,22 @@ from delta_barth.constants import (
|
|||||||
logging.Formatter.converter = gmtime
|
logging.Formatter.converter = gmtime
|
||||||
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
|
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
|
||||||
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
|
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
|
||||||
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
|
|
||||||
# if not LOG_FILE_FOLDER.exists():
|
|
||||||
# LOG_FILE_FOLDER.mkdir(parents=True)
|
|
||||||
|
|
||||||
|
|
||||||
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
|
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
|
||||||
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
|
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
|
||||||
|
# ** handlers
|
||||||
|
NULL_HANDLER = logging.NullHandler()
|
||||||
|
# ** formatters
|
||||||
|
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
|
||||||
|
|
||||||
# ** loggers and configuration
|
# ** loggers and configuration
|
||||||
logger_all = logging.getLogger("delta_barth")
|
|
||||||
# logger_all.addHandler(logger_all_handler_stderr)
|
logger_base = logging.getLogger("delta_barth")
|
||||||
# logger_all.addHandler(logger_all_handler_file)
|
logger_status = logging.getLogger("delta_barth.status")
|
||||||
|
logger_status.setLevel(logging.DEBUG)
|
||||||
logger_session = logging.getLogger("delta_barth.session")
|
logger_session = logging.getLogger("delta_barth.session")
|
||||||
logger_session.setLevel(logging.DEBUG)
|
logger_session.setLevel(logging.DEBUG)
|
||||||
|
logger_management = logging.getLogger("delta_barth.management")
|
||||||
|
logger_management.setLevel(logging.DEBUG)
|
||||||
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
|
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
|
||||||
logger_wrapped_results.setLevel(logging.DEBUG)
|
logger_wrapped_results.setLevel(logging.DEBUG)
|
||||||
logger_pipelines = logging.getLogger("delta_barth.pipelines")
|
logger_pipelines = logging.getLogger("delta_barth.pipelines")
|
||||||
@@ -43,18 +44,15 @@ logger_db.setLevel(logging.DEBUG)
|
|||||||
def setup_logging(
|
def setup_logging(
|
||||||
logging_dir: Path,
|
logging_dir: Path,
|
||||||
) -> None:
|
) -> None:
|
||||||
# ** formatters
|
|
||||||
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
|
|
||||||
|
|
||||||
# ** handlers
|
# ** handlers
|
||||||
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
|
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
|
||||||
null_handler = logging.NullHandler()
|
|
||||||
if ENABLE_LOGGING and LOGGING_TO_STDERR:
|
if ENABLE_LOGGING and LOGGING_TO_STDERR:
|
||||||
logger_all_handler_stderr = logging.StreamHandler()
|
logger_all_handler_stderr = logging.StreamHandler()
|
||||||
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
|
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
|
||||||
logger_all_handler_stderr.setFormatter(logger_all_formater)
|
logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER)
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
logger_all_handler_stderr = null_handler
|
logger_all_handler_stderr = NULL_HANDLER
|
||||||
|
|
||||||
if ENABLE_LOGGING and LOGGING_TO_FILE:
|
if ENABLE_LOGGING and LOGGING_TO_FILE:
|
||||||
logger_all_handler_file = logging.handlers.RotatingFileHandler(
|
logger_all_handler_file = logging.handlers.RotatingFileHandler(
|
||||||
@@ -65,9 +63,17 @@ def setup_logging(
|
|||||||
delay=True,
|
delay=True,
|
||||||
)
|
)
|
||||||
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
|
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
|
||||||
logger_all_handler_file.setFormatter(logger_all_formater)
|
logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER)
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
logger_all_handler_file = null_handler
|
logger_all_handler_file = NULL_HANDLER
|
||||||
|
|
||||||
logger_all.addHandler(logger_all_handler_stderr)
|
logger_base.addHandler(logger_all_handler_stderr)
|
||||||
logger_all.addHandler(logger_all_handler_file)
|
logger_base.addHandler(logger_all_handler_file)
|
||||||
|
|
||||||
|
|
||||||
|
def disable_logging() -> None:
|
||||||
|
handlers = tuple(logger_base.handlers)
|
||||||
|
for handler in handlers:
|
||||||
|
logger_base.removeHandler(handler)
|
||||||
|
|
||||||
|
logger_base.addHandler(NULL_HANDLER)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from __future__ import annotations
|
|||||||
from typing import Final
|
from typing import Final
|
||||||
|
|
||||||
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
|
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
|
||||||
|
from delta_barth.logging import logger_session as logger
|
||||||
from delta_barth.session import Session
|
from delta_barth.session import Session
|
||||||
|
|
||||||
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
||||||
@@ -13,9 +14,13 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
|||||||
|
|
||||||
def setup(
|
def setup(
|
||||||
data_path: str,
|
data_path: str,
|
||||||
|
base_url: str,
|
||||||
) -> None: # pragma: no cover
|
) -> None: # pragma: no cover
|
||||||
|
# at this point: no logging configured
|
||||||
SESSION.set_data_path(data_path)
|
SESSION.set_data_path(data_path)
|
||||||
|
SESSION.set_base_url(base_url=base_url)
|
||||||
SESSION.setup()
|
SESSION.setup()
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
|
||||||
|
|
||||||
|
|
||||||
def set_credentials(
|
def set_credentials(
|
||||||
@@ -24,25 +29,33 @@ def set_credentials(
|
|||||||
database: str,
|
database: str,
|
||||||
mandant: str,
|
mandant: str,
|
||||||
) -> None: # pragma: no cover
|
) -> None: # pragma: no cover
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
|
||||||
SESSION.set_credentials(
|
SESSION.set_credentials(
|
||||||
username=username,
|
username=username,
|
||||||
password=password,
|
password=password,
|
||||||
database=database,
|
database=database,
|
||||||
mandant=mandant,
|
mandant=mandant,
|
||||||
)
|
)
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
|
||||||
|
|
||||||
|
|
||||||
|
# ** not part of external API, only internal
|
||||||
def get_credentials() -> str: # pragma: no cover
|
def get_credentials() -> str: # pragma: no cover
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
|
||||||
creds = SESSION.creds
|
creds = SESSION.creds
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
|
||||||
return creds.model_dump_json()
|
return creds.model_dump_json()
|
||||||
|
|
||||||
|
|
||||||
# ** legacy: not part of external API
|
|
||||||
def set_base_url(
|
def set_base_url(
|
||||||
base_url: str,
|
base_url: str,
|
||||||
) -> None: # pragma: no cover
|
) -> None: # pragma: no cover
|
||||||
SESSION.set_base_url(base_url=base_url)
|
SESSION.set_base_url(base_url=base_url)
|
||||||
|
|
||||||
|
|
||||||
|
def get_data_path() -> str: # pragma: no cover
|
||||||
|
return str(SESSION.data_path)
|
||||||
|
|
||||||
|
|
||||||
def get_base_url() -> str: # pragma: no cover
|
def get_base_url() -> str: # pragma: no cover
|
||||||
return SESSION.base_url
|
return SESSION.base_url
|
||||||
|
|||||||
@@ -1,20 +1,83 @@
|
|||||||
"""collection of configured data pipelines, intended to be invoked from C#"""
|
"""collection of configured data pipelines, intended to be invoked from C#"""
|
||||||
|
|
||||||
|
import time
|
||||||
from datetime import datetime as Datetime
|
from datetime import datetime as Datetime
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
from delta_barth import databases as db
|
||||||
from delta_barth.analysis import forecast
|
from delta_barth.analysis import forecast
|
||||||
|
from delta_barth.constants import DEFAULT_DB_ERR_CODE
|
||||||
|
from delta_barth.errors import STATUS_HANDLER, wrap_result
|
||||||
|
from delta_barth.logging import logger_pipelines as logger
|
||||||
from delta_barth.management import SESSION
|
from delta_barth.management import SESSION
|
||||||
from delta_barth.types import JsonExportResponse
|
from delta_barth.types import JsonExportResponse, PipelineMetrics
|
||||||
|
|
||||||
|
|
||||||
|
def _write_performance_metrics(
|
||||||
|
pipeline_name: str,
|
||||||
|
time_start: int,
|
||||||
|
time_end: int,
|
||||||
|
) -> PipelineMetrics:
|
||||||
|
if time_end < time_start:
|
||||||
|
raise ValueError("Ending time smaller than starting time")
|
||||||
|
execution_duration = (time_end - time_start) / 1e9
|
||||||
|
metrics = PipelineMetrics(
|
||||||
|
pipeline_name=pipeline_name,
|
||||||
|
execution_duration=execution_duration,
|
||||||
|
)
|
||||||
|
|
||||||
|
with SESSION.db_engine.begin() as con:
|
||||||
|
con.execute(sql.insert(db.perf_meas).values(**metrics))
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
|
||||||
|
|
||||||
|
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
|
||||||
|
def _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name: str,
|
||||||
|
time_start: int,
|
||||||
|
time_end: int,
|
||||||
|
) -> PipelineMetrics:
|
||||||
|
return _write_performance_metrics(pipeline_name, time_start, time_end)
|
||||||
|
|
||||||
|
|
||||||
def pipeline_sales_forecast(
|
def pipeline_sales_forecast(
|
||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
PIPELINE_NAME: Final[str] = "sales_forecast"
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
|
||||||
|
t_start = time.perf_counter_ns()
|
||||||
result = forecast.pipeline_sales_forecast(
|
result = forecast.pipeline_sales_forecast(
|
||||||
SESSION, company_id=company_id, start_date=start_date
|
SESSION, company_id=company_id, start_date=start_date
|
||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
t_end = time.perf_counter_ns()
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||||
|
res = _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name=PIPELINE_NAME,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
if res.status != STATUS_HANDLER.SUCCESS:
|
||||||
|
logger.error(
|
||||||
|
(
|
||||||
|
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||||
|
"pipeline metrics to database: %s"
|
||||||
|
),
|
||||||
|
PIPELINE_NAME,
|
||||||
|
res.status,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
metrics = res.unwrap()
|
||||||
|
logger.info(
|
||||||
|
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||||
|
PIPELINE_NAME,
|
||||||
|
metrics["execution_duration"],
|
||||||
|
)
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|
||||||
@@ -23,11 +86,38 @@ def pipeline_sales_forecast_dummy(
|
|||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
|
||||||
|
t_start = time.perf_counter_ns()
|
||||||
result = forecast.pipeline_sales_dummy(
|
result = forecast.pipeline_sales_dummy(
|
||||||
SESSION,
|
SESSION,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
start_date=start_date,
|
start_date=start_date,
|
||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
t_end = time.perf_counter_ns()
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
|
||||||
|
res = _write_performance_metrics_wrapped(
|
||||||
|
pipeline_name=PIPELINE_NAME,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
if res.status != STATUS_HANDLER.SUCCESS:
|
||||||
|
logger.error(
|
||||||
|
(
|
||||||
|
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
|
||||||
|
"pipeline metrics to database: %s"
|
||||||
|
),
|
||||||
|
PIPELINE_NAME,
|
||||||
|
res.status,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
metrics = res.unwrap()
|
||||||
|
logger.info(
|
||||||
|
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
|
||||||
|
PIPELINE_NAME,
|
||||||
|
metrics["execution_duration"],
|
||||||
|
)
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from delta_barth.api.common import (
|
|||||||
LoginResponse,
|
LoginResponse,
|
||||||
validate_credentials,
|
validate_credentials,
|
||||||
)
|
)
|
||||||
from delta_barth.constants import DB_ECHO
|
from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO
|
||||||
from delta_barth.errors import STATUS_HANDLER
|
from delta_barth.errors import STATUS_HANDLER
|
||||||
from delta_barth.logging import logger_session as logger
|
from delta_barth.logging import logger_session as logger
|
||||||
from delta_barth.types import DelBarApiError, Status
|
from delta_barth.types import DelBarApiError, Status
|
||||||
@@ -42,6 +42,7 @@ class Session:
|
|||||||
db_folder: str = "data",
|
db_folder: str = "data",
|
||||||
logging_folder: str = "logs",
|
logging_folder: str = "logs",
|
||||||
) -> None:
|
) -> None:
|
||||||
|
self._setup: bool = False
|
||||||
self._data_path: Path | None = None
|
self._data_path: Path | None = None
|
||||||
self._db_path: Path | None = None
|
self._db_path: Path | None = None
|
||||||
self._db_folder = db_folder
|
self._db_folder = db_folder
|
||||||
@@ -55,8 +56,12 @@ class Session:
|
|||||||
self._logged_in: bool = False
|
self._logged_in: bool = False
|
||||||
|
|
||||||
def setup(self) -> None:
|
def setup(self) -> None:
|
||||||
self._setup_db_management()
|
# at this point: no logging configured
|
||||||
|
assert not self._setup, "tried to setup session twice"
|
||||||
self._setup_logging()
|
self._setup_logging()
|
||||||
|
self._setup_db_management()
|
||||||
|
self._setup = True
|
||||||
|
logger.info("[SESSION] Setup procedure successful")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def data_path(self) -> Path:
|
def data_path(self) -> Path:
|
||||||
@@ -70,7 +75,7 @@ class Session:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def db_path(self) -> Path:
|
def db_path(self) -> Path:
|
||||||
if self._db_path is not None:
|
if self._db_path is not None and self._setup:
|
||||||
return self._db_path
|
return self._db_path
|
||||||
|
|
||||||
db_root = (self.data_path / self._db_folder).resolve()
|
db_root = (self.data_path / self._db_folder).resolve()
|
||||||
@@ -80,9 +85,14 @@ class Session:
|
|||||||
self._db_path = db_path
|
self._db_path = db_path
|
||||||
return self._db_path
|
return self._db_path
|
||||||
|
|
||||||
|
def _setup_db_management(self) -> None:
|
||||||
|
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
|
||||||
|
db.metadata.create_all(self._db_engine)
|
||||||
|
logger.info("[SESSION] Successfully setup DB management")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def logging_dir(self) -> Path:
|
def logging_dir(self) -> Path:
|
||||||
if self._logging_dir is not None:
|
if self._logging_dir is not None and self._setup:
|
||||||
return self._logging_dir
|
return self._logging_dir
|
||||||
|
|
||||||
logging_dir = self.data_path / self._logging_folder
|
logging_dir = self.data_path / self._logging_folder
|
||||||
@@ -91,15 +101,13 @@ class Session:
|
|||||||
self._logging_dir = logging_dir
|
self._logging_dir = logging_dir
|
||||||
return self._logging_dir
|
return self._logging_dir
|
||||||
|
|
||||||
def _setup_db_management(self) -> None:
|
|
||||||
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
|
|
||||||
db.metadata.create_all(self._db_engine)
|
|
||||||
logger.info("[SESSION] Successfully setup DB management")
|
|
||||||
|
|
||||||
def _setup_logging(self) -> None:
|
def _setup_logging(self) -> None:
|
||||||
delta_barth.logging.setup_logging(self.logging_dir)
|
delta_barth.logging.setup_logging(self.logging_dir)
|
||||||
logger.info("[SESSION] Successfully setup logging")
|
logger.info("[SESSION] Successfully setup logging")
|
||||||
|
|
||||||
|
def disable_logging(self) -> None:
|
||||||
|
delta_barth.logging.disable_logging()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def creds(self) -> ApiCredentials:
|
def creds(self) -> ApiCredentials:
|
||||||
assert self._creds is not None, "accessed credentials not set"
|
assert self._creds is not None, "accessed credentials not set"
|
||||||
@@ -110,6 +118,7 @@ class Session:
|
|||||||
path: str,
|
path: str,
|
||||||
):
|
):
|
||||||
self._data_path = validate_path(path)
|
self._data_path = validate_path(path)
|
||||||
|
self._setup = False
|
||||||
|
|
||||||
def set_credentials(
|
def set_credentials(
|
||||||
self,
|
self,
|
||||||
@@ -182,11 +191,18 @@ class Session:
|
|||||||
databaseName=self.creds.database,
|
databaseName=self.creds.database,
|
||||||
mandantName=self.creds.mandant,
|
mandantName=self.creds.mandant,
|
||||||
)
|
)
|
||||||
resp = requests.put(
|
empty_response = LoginResponse(token="")
|
||||||
URL,
|
try:
|
||||||
login_req.model_dump_json(),
|
resp = requests.put(
|
||||||
headers=self.headers, # type: ignore
|
URL,
|
||||||
)
|
login_req.model_dump_json(),
|
||||||
|
headers=self.headers, # type: ignore
|
||||||
|
timeout=API_CON_TIMEOUT,
|
||||||
|
)
|
||||||
|
except requests.exceptions.Timeout: # pragma: no cover
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||||
|
except requests.exceptions.RequestException: # pragma: no cover
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||||
|
|
||||||
response: LoginResponse
|
response: LoginResponse
|
||||||
status: Status
|
status: Status
|
||||||
@@ -195,7 +211,7 @@ class Session:
|
|||||||
status = STATUS_HANDLER.pipe_states.SUCCESS
|
status = STATUS_HANDLER.pipe_states.SUCCESS
|
||||||
self._add_session_token(response.token)
|
self._add_session_token(response.token)
|
||||||
else:
|
else:
|
||||||
response = LoginResponse(token="")
|
response = empty_response
|
||||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||||
status = STATUS_HANDLER.api_error(err)
|
status = STATUS_HANDLER.api_error(err)
|
||||||
|
|
||||||
@@ -207,12 +223,17 @@ class Session:
|
|||||||
ROUTE: Final[str] = "user/logout"
|
ROUTE: Final[str] = "user/logout"
|
||||||
URL: Final = combine_route(self.base_url, ROUTE)
|
URL: Final = combine_route(self.base_url, ROUTE)
|
||||||
|
|
||||||
resp = requests.put(
|
try:
|
||||||
URL,
|
resp = requests.put(
|
||||||
headers=self.headers, # type: ignore
|
URL,
|
||||||
)
|
headers=self.headers, # type: ignore
|
||||||
|
timeout=API_CON_TIMEOUT,
|
||||||
|
)
|
||||||
|
except requests.exceptions.Timeout: # pragma: no cover
|
||||||
|
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||||
|
except requests.exceptions.RequestException: # pragma: no cover
|
||||||
|
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||||
|
|
||||||
response = None
|
|
||||||
status: Status
|
status: Status
|
||||||
if resp.status_code == 200:
|
if resp.status_code == 200:
|
||||||
status = STATUS_HANDLER.SUCCESS
|
status = STATUS_HANDLER.SUCCESS
|
||||||
@@ -221,7 +242,7 @@ class Session:
|
|||||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||||
status = STATUS_HANDLER.api_error(err)
|
status = STATUS_HANDLER.api_error(err)
|
||||||
|
|
||||||
return response, status
|
return None, status
|
||||||
|
|
||||||
def assert_login(
|
def assert_login(
|
||||||
self,
|
self,
|
||||||
@@ -237,11 +258,18 @@ class Session:
|
|||||||
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
|
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
|
||||||
URL: Final = combine_route(self.base_url, ROUTE)
|
URL: Final = combine_route(self.base_url, ROUTE)
|
||||||
params: dict[str, int] = {"FirmaId": 999999}
|
params: dict[str, int] = {"FirmaId": 999999}
|
||||||
resp = requests.get(
|
empty_response = LoginResponse(token="")
|
||||||
URL,
|
try:
|
||||||
params=params,
|
resp = requests.get(
|
||||||
headers=self.headers, # type: ignore
|
URL,
|
||||||
)
|
params=params,
|
||||||
|
headers=self.headers, # type: ignore
|
||||||
|
timeout=API_CON_TIMEOUT,
|
||||||
|
)
|
||||||
|
except requests.exceptions.Timeout: # pragma: no cover
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
|
||||||
|
except requests.exceptions.RequestException: # pragma: no cover
|
||||||
|
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
|
||||||
|
|
||||||
response: LoginResponse
|
response: LoginResponse
|
||||||
status: Status
|
status: Status
|
||||||
@@ -252,7 +280,7 @@ class Session:
|
|||||||
self._remove_session_token()
|
self._remove_session_token()
|
||||||
response, status = self.login()
|
response, status = self.login()
|
||||||
else:
|
else:
|
||||||
response = LoginResponse(token="")
|
response = empty_response
|
||||||
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
err = DelBarApiError(status_code=resp.status_code, **resp.json())
|
||||||
status = STATUS_HANDLER.api_error(err)
|
status = STATUS_HANDLER.api_error(err)
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
|
|||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class DataPipeStates:
|
class DataPipeStates:
|
||||||
SUCCESS: Status
|
SUCCESS: Status
|
||||||
|
CONNECTION_TIMEOUT: Status
|
||||||
|
CONNECTION_ERROR: Status
|
||||||
TOO_FEW_POINTS: Status
|
TOO_FEW_POINTS: Status
|
||||||
TOO_FEW_MONTH_POINTS: Status
|
TOO_FEW_MONTH_POINTS: Status
|
||||||
NO_RELIABLE_FORECAST: Status
|
NO_RELIABLE_FORECAST: Status
|
||||||
@@ -139,7 +141,13 @@ class Statistics:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ** forecasts
|
# ** ---- performance
|
||||||
|
class PipelineMetrics(t.TypedDict):
|
||||||
|
pipeline_name: str
|
||||||
|
execution_duration: float
|
||||||
|
|
||||||
|
|
||||||
|
# ** ---- forecasts
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class CustomerDataSalesForecast:
|
class CustomerDataSalesForecast:
|
||||||
order: list[int] = field(default_factory=list)
|
order: list[int] = field(default_factory=list)
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
from datetime import datetime as Datetime
|
from datetime import datetime as Datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import requests
|
||||||
|
|
||||||
from delta_barth.api import requests as requests_
|
from delta_barth.api import requests as requests_
|
||||||
|
from delta_barth.api.common import LoginResponse
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.api_con_required
|
@pytest.mark.api_con_required
|
||||||
@@ -94,3 +96,31 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
|
|||||||
assert status.api_server_error.message == json["message"]
|
assert status.api_server_error.message == json["message"]
|
||||||
assert status.api_server_error.code == json["code"]
|
assert status.api_server_error.code == json["code"]
|
||||||
assert status.api_server_error.hints == json["hints"]
|
assert status.api_server_error.hints == json["hints"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
|
||||||
|
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
|
||||||
|
|
||||||
|
def assert_login():
|
||||||
|
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
|
||||||
|
|
||||||
|
session.assert_login = assert_login
|
||||||
|
|
||||||
|
resp, status = requests_.get_sales_prognosis_data(session, None, None)
|
||||||
|
assert resp is not None
|
||||||
|
assert len(resp.daten) == 0
|
||||||
|
assert status.code == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
|
||||||
|
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
|
||||||
|
|
||||||
|
def assert_login():
|
||||||
|
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
|
||||||
|
|
||||||
|
session.assert_login = assert_login
|
||||||
|
|
||||||
|
resp, status = requests_.get_sales_prognosis_data(session, None, None)
|
||||||
|
assert resp is not None
|
||||||
|
assert len(resp.daten) == 0
|
||||||
|
assert status.code == 2
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import tomllib
|
import tomllib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, cast
|
from typing import cast
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@@ -95,7 +95,7 @@ def mock_put():
|
|||||||
yield mock
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture(scope="function")
|
||||||
def mock_get():
|
def mock_get():
|
||||||
with patch("requests.get") as mock:
|
with patch("requests.get") as mock:
|
||||||
yield mock
|
yield mock
|
||||||
|
|||||||
@@ -3,20 +3,44 @@ import json
|
|||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
import delta_barth.pipelines
|
import delta_barth.pipelines
|
||||||
|
from delta_barth import databases as db
|
||||||
from delta_barth import pipelines as pl
|
from delta_barth import pipelines as pl
|
||||||
from delta_barth.errors import STATUS_HANDLER
|
from delta_barth.errors import STATUS_HANDLER
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_performance_metrics(session):
|
||||||
|
pipe_name = "test_pipe"
|
||||||
|
t_start = 20_000_000_000
|
||||||
|
t_end = 30_000_000_000
|
||||||
|
|
||||||
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
|
metrics = pl._write_performance_metrics(
|
||||||
|
pipeline_name=pipe_name,
|
||||||
|
time_start=t_start,
|
||||||
|
time_end=t_end,
|
||||||
|
)
|
||||||
|
assert metrics["pipeline_name"] == pipe_name
|
||||||
|
assert metrics["execution_duration"] == 10
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
|
metrics = ret.all()[-1]
|
||||||
|
assert metrics.pipeline_name == pipe_name
|
||||||
|
assert metrics.execution_duration == 10
|
||||||
|
|
||||||
|
|
||||||
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
|
||||||
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
|
||||||
with patch(
|
with patch(
|
||||||
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
"delta_barth.analysis.forecast.get_sales_prognosis_data",
|
||||||
) as mock:
|
) as mock:
|
||||||
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
|
||||||
importlib.reload(delta_barth.pipelines)
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
json_export = pl.pipeline_sales_forecast(None, None)
|
json_export = pl.pipeline_sales_forecast(None, None)
|
||||||
|
|
||||||
assert isinstance(json_export, str)
|
assert isinstance(json_export, str)
|
||||||
parsed_resp = json.loads(json_export)
|
parsed_resp = json.loads(json_export)
|
||||||
@@ -27,9 +51,18 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
|
|||||||
assert "code" in parsed_resp["status"]
|
assert "code" in parsed_resp["status"]
|
||||||
assert parsed_resp["status"]["code"] == 0
|
assert parsed_resp["status"]["code"] == 0
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
def test_sales_prognosis_pipeline_dummy():
|
metrics = ret.all()[-1]
|
||||||
json_export = pl.pipeline_sales_forecast_dummy(None, None)
|
assert metrics.pipeline_name == "sales_forecast"
|
||||||
|
assert metrics.execution_duration > 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.new
|
||||||
|
def test_sales_prognosis_pipeline_dummy(session):
|
||||||
|
with patch("delta_barth.pipelines.SESSION", session):
|
||||||
|
json_export = pl.pipeline_sales_forecast_dummy(None, None)
|
||||||
|
|
||||||
assert isinstance(json_export, str)
|
assert isinstance(json_export, str)
|
||||||
parsed_resp = json.loads(json_export)
|
parsed_resp = json.loads(json_export)
|
||||||
@@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy():
|
|||||||
assert entry["vorhersage"] == pytest.approx(47261.058594)
|
assert entry["vorhersage"] == pytest.approx(47261.058594)
|
||||||
assert "code" in parsed_resp["status"]
|
assert "code" in parsed_resp["status"]
|
||||||
assert parsed_resp["status"]["code"] == 0
|
assert parsed_resp["status"]["code"] == 0
|
||||||
|
|
||||||
|
with session.db_engine.begin() as con:
|
||||||
|
ret = con.execute(sql.select(db.perf_meas))
|
||||||
|
|
||||||
|
metrics = ret.all()[-1]
|
||||||
|
assert metrics.pipeline_name == "sales_forecast_dummy"
|
||||||
|
assert metrics.execution_duration > 0
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from unittest.mock import patch
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import delta_barth.session
|
import delta_barth.session
|
||||||
|
from delta_barth import logging
|
||||||
from delta_barth.constants import (
|
from delta_barth.constants import (
|
||||||
DEFAULT_API_ERR_CODE,
|
DEFAULT_API_ERR_CODE,
|
||||||
HTTP_BASE_CONTENT_HEADERS,
|
HTTP_BASE_CONTENT_HEADERS,
|
||||||
@@ -55,6 +56,8 @@ def test_session_setup_db_management(tmp_path):
|
|||||||
assert db_path.parent == target_db_dir
|
assert db_path.parent == target_db_dir
|
||||||
assert not db_path.exists()
|
assert not db_path.exists()
|
||||||
session.setup()
|
session.setup()
|
||||||
|
db_path2 = session.db_path
|
||||||
|
assert db_path2 == db_path
|
||||||
assert session._db_engine is not None
|
assert session._db_engine is not None
|
||||||
assert db_path.exists()
|
assert db_path.exists()
|
||||||
|
|
||||||
@@ -66,6 +69,30 @@ def test_session_setup_logging(tmp_path):
|
|||||||
foldername: str = "logging_test"
|
foldername: str = "logging_test"
|
||||||
target_log_dir = tmp_path / foldername
|
target_log_dir = tmp_path / foldername
|
||||||
|
|
||||||
|
session = delta_barth.session.Session(
|
||||||
|
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
||||||
|
)
|
||||||
|
session.set_data_path(str_path)
|
||||||
|
log_dir = session.logging_dir
|
||||||
|
|
||||||
|
assert log_dir.exists()
|
||||||
|
assert log_dir == target_log_dir
|
||||||
|
# write file
|
||||||
|
target_file = target_log_dir / LOG_FILENAME
|
||||||
|
assert not target_file.exists()
|
||||||
|
session.setup() # calls setup code for logging
|
||||||
|
log_dir2 = session.logging_dir
|
||||||
|
assert log_dir2 == log_dir
|
||||||
|
assert target_file.exists()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("delta_barth.logging.ENABLE_LOGGING", True)
|
||||||
|
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
|
||||||
|
def test_session_disable_logging(tmp_path):
|
||||||
|
str_path = str(tmp_path)
|
||||||
|
foldername: str = "logging_test"
|
||||||
|
target_log_dir = tmp_path / foldername
|
||||||
|
|
||||||
session = delta_barth.session.Session(
|
session = delta_barth.session.Session(
|
||||||
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
||||||
)
|
)
|
||||||
@@ -78,6 +105,21 @@ def test_session_setup_logging(tmp_path):
|
|||||||
assert not target_file.exists()
|
assert not target_file.exists()
|
||||||
session.setup() # calls setup code for logging
|
session.setup() # calls setup code for logging
|
||||||
assert target_file.exists()
|
assert target_file.exists()
|
||||||
|
# provoke entry
|
||||||
|
msg = "this is a test"
|
||||||
|
logging.logger_base.critical(msg)
|
||||||
|
session.disable_logging()
|
||||||
|
with open(target_file, "r") as file:
|
||||||
|
content = file.readlines()
|
||||||
|
last_line = content[-1]
|
||||||
|
assert msg in last_line.lower()
|
||||||
|
# log new entry which should not be added as logging is disabled
|
||||||
|
msg = "this is a second test"
|
||||||
|
logging.logger_base.critical(msg)
|
||||||
|
with open(target_file, "r") as file:
|
||||||
|
content = file.readlines()
|
||||||
|
last_line = content[-1]
|
||||||
|
assert msg not in last_line.lower()
|
||||||
|
|
||||||
|
|
||||||
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
|
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
|
||||||
|
|||||||
Reference in New Issue
Block a user