Compare commits

..

No commits in common. "main" and "v0.5.7" have entirely different histories.
main ... v0.5.7

16 changed files with 198 additions and 456 deletions

74
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:f2a2abd891603796228b21bfeb7a00fd998964fe9303a9e4e5971f63925261e8"
content_hash = "sha256:545c39ef89d18d28a7bca4b08c93e6fb900c42612089300b867a4e0955acd6ab"
[[metadata.targets]]
requires_python = ">=3.11"
@ -591,17 +591,6 @@ files = [
{file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
requires_python = ">=3.8"
summary = "An implementation of lxml.xmlfile for the standard library"
groups = ["dev"]
files = [
{file = "et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa"},
{file = "et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54"},
]
[[package]]
name = "execnet"
version = "2.1.1"
@ -1461,20 +1450,6 @@ files = [
{file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"},
]
[[package]]
name = "openpyxl"
version = "3.1.5"
requires_python = ">=3.8"
summary = "A Python library to read/write Excel 2010 xlsx/xlsm files"
groups = ["dev"]
dependencies = [
"et-xmlfile",
]
files = [
{file = "openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2"},
{file = "openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050"},
]
[[package]]
name = "overrides"
version = "7.7.0"
@ -1596,31 +1571,6 @@ files = [
{file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"},
]
[[package]]
name = "pip"
version = "25.1.1"
requires_python = ">=3.9"
summary = "The PyPA recommended tool for installing Python packages."
groups = ["default"]
files = [
{file = "pip-25.1.1-py3-none-any.whl", hash = "sha256:2913a38a2abf4ea6b64ab507bd9e967f3b53dc1ede74b01b0931e1ce548751af"},
{file = "pip-25.1.1.tar.gz", hash = "sha256:3de45d411d308d5054c2168185d8da7f9a2cd753dbac8acbfa88a8909ecd9077"},
]
[[package]]
name = "pip-system-certs"
version = "5.2"
requires_python = ">=3.10"
summary = "Automatically configures Python to use system certificates via truststore"
groups = ["default"]
dependencies = [
"pip>=24.2",
]
files = [
{file = "pip_system_certs-5.2-py3-none-any.whl", hash = "sha256:e6ef3e106d4d02313e33955c2bcc4c2b143b2da07ef91e28a6805a0c1c512126"},
{file = "pip_system_certs-5.2.tar.gz", hash = "sha256:80b776b5cf17191bf99d313699b7fce2fdb84eb7bbb225fd134109a82706406f"},
]
[[package]]
name = "platformdirs"
version = "4.3.6"
@ -1673,7 +1623,7 @@ name = "psutil"
version = "7.0.0"
requires_python = ">=3.6"
summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7."
groups = ["default", "nb"]
groups = ["nb"]
files = [
{file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"},
{file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"},
@ -2661,8 +2611,8 @@ files = [
[[package]]
name = "xgboost"
version = "3.0.0"
requires_python = ">=3.10"
version = "2.1.4"
requires_python = ">=3.8"
summary = "XGBoost Python Package"
groups = ["default"]
dependencies = [
@ -2671,12 +2621,12 @@ dependencies = [
"scipy",
]
files = [
{file = "xgboost-3.0.0-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:ed8cffd7998bd9431c3b0287a70bec8e45c09b43c9474d9dfd261627713bd890"},
{file = "xgboost-3.0.0-py3-none-macosx_12_0_arm64.whl", hash = "sha256:314104bd3a1426a40f0c9662eef40e9ab22eb7a8068a42a8d198ce40412db75c"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:72c3405e8dfc37048f9fe339a058fa12b9f0f03bc31d3e56f0887eed2ed2baa1"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:72d39e74649e9b628c4221111aa6a8caa860f2e853b25480424403ee61085126"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:7bdee5787f86b83bebd75e2c96caf854760788e5f4203d063da50db5bf0efc5f"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:61c7e391e373b8a5312503525c0689f83ef1912a1236377022865ab340f465a4"},
{file = "xgboost-3.0.0-py3-none-win_amd64.whl", hash = "sha256:0ea74e97f95b1eddfd27a46b7f22f72ec5a5322e1dc7cb41c9c23fb580763df9"},
{file = "xgboost-3.0.0.tar.gz", hash = "sha256:45e95416df6f6f01d9a62e60cf09fc57e5ee34697f3858337c796fac9ce3b9ed"},
{file = "xgboost-2.1.4-py3-none-macosx_10_15_x86_64.macosx_11_0_x86_64.macosx_12_0_x86_64.whl", hash = "sha256:78d88da184562deff25c820d943420342014dd55e0f4c017cc4563c2148df5ee"},
{file = "xgboost-2.1.4-py3-none-macosx_12_0_arm64.whl", hash = "sha256:523db01d4e74b05c61a985028bde88a4dd380eadc97209310621996d7d5d14a7"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:57c7e98111aceef4b689d7d2ce738564a1f7fe44237136837a47847b8b33bade"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f1343a512e634822eab30d300bfc00bf777dc869d881cc74854b42173cfcdb14"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:d366097d0db047315736f46af852feaa907f6d7371716af741cdce488ae36d20"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:8df6da72963969ab2bf49a520c3e147b1e15cbeddd3aa0e3e039b3532c739339"},
{file = "xgboost-2.1.4-py3-none-win_amd64.whl", hash = "sha256:8bbfe4fedc151b83a52edbf0de945fd94358b09a81998f2945ad330fd5f20cd6"},
{file = "xgboost-2.1.4.tar.gz", hash = "sha256:ab84c4bbedd7fae1a26f61e9dd7897421d5b08454b51c6eb072abc1d346d08d7"},
]

View File

@ -1,11 +1,11 @@
[project]
name = "delta-barth"
version = "0.5.12"
version = "0.5.7"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39", "psutil>=7.0.0", "pip-system-certs>=5.2"]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39"]
requires-python = ">=3.11"
readme = "README.md"
license = {text = "LicenseRef-Proprietary"}
@ -74,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.5.12"
current_version = "0.5.7"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@ -147,7 +147,6 @@ dev = [
"bump-my-version>=1.1.1",
"nox>=2025.2.9",
"tomli-w>=1.2.0",
"openpyxl>=3.1.5",
]
nb = [
"jupyterlab>=4.3.5",

View File

@ -1,73 +1 @@
pdm build --no-sdist -d build/
# Configuration
$sourceDir = ".\build"
$destDir = "..\01_releases\runtime"
$packagePrefix = "delta_barth-"
$packageSuffix = "-py3-none-any.whl"
# Ensure destination exists
if (-not (Test-Path $destDir)) {
New-Item -ItemType Directory -Path $destDir | Out-Null
}
# === Build Regex Pattern ===
$escapedSuffix = [regex]::Escape($packageSuffix)
# Match versions like 1.2.3 or 1.2.3.beta or 1.2.3.beta1
# Capture the full version as one string, including the optional pre-release after a dot
$pattern = "^$packagePrefix(?<version>\d+\.\d+\.\d+(?:\.[a-zA-Z0-9\-]+)?)$escapedSuffix$"
Write-Host "Using pattern: $pattern"
# === Get and Filter Files ===
$allFiles = Get-ChildItem -Path $sourceDir -File
$matchingFiles = @()
foreach ($file in $allFiles) {
if ($file.Name -match $pattern) {
$version = $Matches['version']
$matchingFiles += [PSCustomObject]@{
File = $file
Version = $version
}
Write-Host "Matched: $($file.Name) -> Version: $version"
} else {
Write-Host "No match: $($file.Name)"
}
}
if ($matchingFiles.Count -eq 0) {
Write-Host "No matching package files found."
return
}
# === Convert version strings to sortable format ===
function Convert-VersionForSort($v) {
# Split by dot: e.g., 1.2.3.beta -> [1, 2, 3, "beta"]
$parts = $v -split '\.'
$major = [int]$parts[0]
$minor = [int]$parts[1]
$patch = [int]$parts[2]
$pre = if ($parts.Count -gt 3) { $parts[3] } else { "~" } # "~" to ensure stable > prerelease
return [PSCustomObject]@{
Major = $major
Minor = $minor
Patch = $patch
Pre = $pre
}
}
# === Sort by semantic version + pre-release ===
$latest = $matchingFiles | Sort-Object {
Convert-VersionForSort $_.Version
} -Descending | Select-Object -First 1
# === Copy and rename to .zip ===
$baseName = [System.IO.Path]::GetFileNameWithoutExtension($latest.File.Name)
$newFileName = "$baseName.zip"
$destPath = Join-Path $destDir $newFileName
Copy-Item -Path $latest.File.FullName -Destination $destPath
pdm build -d build/

View File

@ -1,3 +0,0 @@
import pip_system_certs.wrapt_requests
pip_system_certs.wrapt_requests.inject_truststore()

View File

@ -1,33 +0,0 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import Final
from dopt_basics import io
PY_RUNTIME_FOLDER: Final[str] = "python"
def prepare_env(
lib_path: Path,
) -> Path | None:
pyrt_folder = io.search_folder_path(
starting_path=lib_path,
stop_folder_name=PY_RUNTIME_FOLDER,
return_inclusive=True,
)
if pyrt_folder is None:
return None
pth_interpreter = pyrt_folder / "python.exe"
if not pth_interpreter.exists():
raise FileNotFoundError(
f"dopt-delta-barth seems to be deployed in a standalone runtime, "
f"but the interpreter was not found under: {pth_interpreter}"
)
setattr(sys, "executable", str(pth_interpreter))
setattr(sys, "_base_executable", str(pth_interpreter))
return pyrt_folder

View File

@ -8,11 +8,12 @@ from dataclasses import asdict
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast
import joblib
import numpy as np
import pandas as pd
import scipy.stats
import sqlalchemy as sql
# --- new: for calculating timedelta
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
@ -32,7 +33,6 @@ from delta_barth.constants import (
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
MAX_NUM_WORKERS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
@ -192,9 +192,6 @@ def _process_sales(
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
data[DATE_FEAT] = pd.to_datetime(data[DATE_FEAT], errors="coerce")
data = data.dropna(subset=["buchungs_datum"])
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
@ -294,22 +291,18 @@ def _process_sales(
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False
with joblib.parallel_config(backend="loky"):
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=MAX_NUM_WORKERS,
n_iter=100,
verbose=0,
)
rand.fit(
X_train,
y_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=0,
)
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=-1,
n_iter=100,
verbose=0,
)
rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
@ -406,13 +399,13 @@ def _export_on_fail(
def pipeline_sales_forecast(
session: Session,
company_ids: list[int] | None = None,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data(
session,
company_ids=company_ids,
company_id=company_id,
start_date=start_date,
)
if status != STATUS_HANDLER.SUCCESS:
@ -439,7 +432,7 @@ def pipeline_sales_forecast(
pipe = _process_sales(
pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=session.cfg.forecast.threshold_month_data_points,
base_num_data_points_months=SESSION.cfg.forecast.threshold_month_data_points,
)
if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics)

View File

@ -7,20 +7,18 @@ import requests
from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT, MAX_LOGIN_RETRIES
from delta_barth.constants import API_CON_TIMEOUT
from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
if TYPE_CHECKING:
from requests import Response
from delta_barth.session import Session
# ** sales data
# ** import
class SalesPrognosisRequestP(BaseModel):
FirmaIds: SkipValidation[list[int] | None]
FirmaId: SkipValidation[int | None]
BuchungsDatum: SkipValidation[Datetime | None]
@ -55,37 +53,29 @@ class SalesPrognosisResultsExport(ExportResponse):
def get_sales_prognosis_data(
session: Session,
company_ids: list[int] | None = None,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
_, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple())
return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP(
FirmaIds=company_ids,
FirmaId=company_id,
BuchungsDatum=start_date,
)
empty_response = SalesPrognosisResponse(daten=tuple())
if not session.logged_in:
_, status = session.login()
if status != STATUS_HANDLER.SUCCESS:
return empty_response, status
resp: Response | None = None
try:
for attempt in range(1, (MAX_LOGIN_RETRIES + 1)):
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
)
if resp.status_code == 401:
_, status = session.relogin()
if status != STATUS_HANDLER.SUCCESS and attempt == MAX_LOGIN_RETRIES:
return empty_response, status
continue
break
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
@ -93,7 +83,6 @@ def get_sales_prognosis_data(
response: SalesPrognosisResponse
status: Status
assert resp is not None, "tried to use not defined response"
if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS

View File

@ -1,19 +1,11 @@
from __future__ import annotations
import enum
from pathlib import Path
from typing import Final
import psutil
import delta_barth._env
from delta_barth.types import DualDict, HttpContentHeaders
# ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
CFG_HOT_RELOAD: Final[bool] = True
cpu_count = psutil.cpu_count(logical=False)
MAX_NUM_WORKERS: Final[int] = (cpu_count - 1) if cpu_count is not None else 3
# ** lib path
lib_path = Path(__file__).parent
@ -22,16 +14,15 @@ LIB_PATH: Final[Path] = lib_path
dummy_data_pth = LIB_PATH / "_dummy_data"
assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** runtime and deployment status
RUNTIME_PATH: Final[Path | None] = delta_barth._env.prepare_env(LIB_PATH)
deployment_status: bool = False
if RUNTIME_PATH is not None:
deployment_status = True
DEPLOYMENT_STATUS: Final[bool] = deployment_status
# ** logging
ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases
DB_ECHO: Final[bool] = False
DB_ECHO: Final[bool] = True
# ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
@ -50,9 +41,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API
API_CON_TIMEOUT: Final[float] = 20.0 # secs to response
MAX_LOGIN_RETRIES: Final[int] = 2
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
# ** API response parsing
# ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(

View File

@ -6,13 +6,14 @@ from pathlib import Path
from time import gmtime
from typing import Final
# ** config
# ** logging
ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log"
from delta_barth.constants import (
ENABLE_LOGGING,
LOG_FILENAME,
LOGGING_TO_FILE,
LOGGING_TO_STDERR,
)
# ** config
logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"

View File

@ -44,14 +44,14 @@ def _write_performance_metrics_wrapped(
def pipeline_sales_forecast(
company_ids: list[int] | None,
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast(
SESSION, company_ids=company_ids, start_date=start_date
SESSION, company_id=company_id, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()

View File

@ -19,7 +19,6 @@ from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
CFG_HOT_RELOAD,
DB_ECHO,
LIB_PATH,
)
@ -98,8 +97,6 @@ class Session:
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
if CFG_HOT_RELOAD:
self.reload_cfg()
return self._cfg
def _setup_config(self) -> None:
@ -111,11 +108,6 @@ class Session:
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
def reload_cfg(self) -> None:
assert self._cfg_loader is not None, "tried reloading with no CFG loader intialised"
self._cfg_loader.reload()
self._cfg = self._cfg_loader.get()
@property
def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set"
@ -292,11 +284,44 @@ class Session:
return None, status
def relogin(
def assert_login(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
self._remove_session_token()
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
empty_response = LoginResponse(token="")
try:
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else:
response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status

View File

@ -279,28 +279,6 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_InvalidDates(sales_data_real_preproc):
false_date = Datetime(2519, 6, 30)
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
data["buchungs_datum"] = data["buchungs_datum"].astype(object)
data.at[0, "buchungs_datum"] = false_date
assert data["buchungs_datum"].dtype.char == "O"
assert len(data) == 20
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
@ -452,11 +430,7 @@ def test_export_on_fail():
assert res.status.description == status.description
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
assert session.cfg.forecast.threshold_month_data_points is not None
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
@ -465,7 +439,7 @@ def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp,
):
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(session, company_ids, date) # type: ignore
result = fc.pipeline_sales_forecast(None) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0

View File

@ -4,41 +4,44 @@ import pytest
import requests
from delta_barth.api import requests as requests_
from delta_barth.api.common import LoginResponse
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_Success(session):
# do not login: let routine do it
resp, status = session.login()
# test without company ID
date = Datetime(2023, 12, 15)
assert status.code == 0
date = Datetime(2022, 6, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0
assert len(resp.daten) > 0
date = Datetime(2520, 1, 1)
date = Datetime(2030, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0
assert len(resp.daten) == 0
# test with company ID
assert status.code == 0
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
date = Datetime(2022, 6, 1)
company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
assert status.code == 0
assert len(resp.daten) > 0
date = Datetime(2520, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
date = Datetime(2030, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
assert status.code == 0
assert len(resp.daten) == 0
# test with non-existent company ID
assert status.code == 0
date = Datetime(2022, 6, 1)
company_ids = [1000024]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
company_id = 1000024
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
# TODO check if this behaviour is still considered "successful"
assert status.code == 0
assert len(resp.daten) == 0
# test without date
company_ids = [1024]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, None)
company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_id, None)
assert status.code == 0
assert len(resp.daten) > 0
# test without filters
@ -51,11 +54,12 @@ def test_get_sales_prognosis_data_Success(session):
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_NoAuth(session, mock_get):
code = 401
def test_get_sales_prognosis_data_FailLogin(session, mock_get):
session.login()
code = 500
json = {
"message": "ServerError",
"code": "TestFailAuth",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
@ -72,36 +76,6 @@ def test_get_sales_prognosis_data_NoAuth(session, mock_get):
assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailLogin(session, mock_get, mock_put):
code = 401
json = {
"message": "ServerError",
"code": "TestFailAuth",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
code_put = 500
json_put = {
"message": "ServerError",
"code": "TestUnknownError",
"hints": "TestCase",
}
mock_put.return_value.status_code = code_put
mock_put.return_value.json.return_value = json_put
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code_put
assert status.api_server_error.message == json_put["message"]
assert status.api_server_error.code == json_put["code"]
assert status.api_server_error.hints == json_put["hints"]
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
code = 405
@ -127,6 +101,11 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
@ -136,6 +115,11 @@ def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0

View File

@ -1,49 +0,0 @@
import importlib
import sys
from unittest.mock import patch
import pytest
import delta_barth.constants
from delta_barth import _env
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "test123456")
def test_prepare_env_NoRuntimeFolder(tmp_path):
ret = _env.prepare_env(tmp_path)
assert ret is None
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_FailNoInterpreter(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
with pytest.raises(FileNotFoundError):
_ = _env.prepare_env(mocked_lib_pth)
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_Success(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
rt_path = mocked_lib_pth.parents[1]
mocked_interpreter = rt_path / "python.exe"
mocked_interpreter.touch()
assert mocked_interpreter.exists()
ret = _env.prepare_env(mocked_lib_pth)
assert ret == rt_path
# sys attributes
executable = getattr(sys, "executable")
assert executable == str(mocked_interpreter)
base_executable = getattr(sys, "_base_executable")
assert base_executable == str(mocked_interpreter)
class MockPath:
def __init__(self, *args, **kwargs):
self.parent = mocked_lib_pth
with patch("pathlib.Path", MockPath):
(mocked_lib_pth / "_dummy_data").mkdir(exist_ok=True)
importlib.reload(delta_barth.constants)
assert delta_barth.constants.DEPLOYMENT_STATUS
assert delta_barth.constants.RUNTIME_PATH == rt_path

View File

@ -1,5 +1,4 @@
import json
from datetime import datetime as Datetime
from unittest.mock import patch
import pytest
@ -46,10 +45,7 @@ def test_write_performance_metrics_FailStartingTime(session):
)
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
@ -59,7 +55,7 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monke
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(company_ids, date)
json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str)
parsed_resp = json.loads(json_export)

View File

@ -1,18 +1,15 @@
import tomllib
from pathlib import Path
from unittest.mock import patch
import pytest
import tomli_w
import delta_barth.config
import delta_barth.session
from delta_barth import logging
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
)
from delta_barth.logging import LOG_FILENAME
def test_validate_path_Success():
@ -65,7 +62,7 @@ def test_session_setup_db_management(tmp_path):
assert db_path.exists()
def test_session_setup_config(tmp_path):
def test_session_setup_config(tmp_path, pth_cfg):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
@ -83,61 +80,6 @@ def test_session_setup_config(tmp_path):
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_session_reload_config_NoHotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 28
session.reload_cfg()
reload_cfg = session.cfg
assert isinstance(reload_cfg, delta_barth.config.Config)
assert reload_cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.session.CFG_HOT_RELOAD", True)
def test_session_reload_config_HotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
@ -314,11 +256,11 @@ def test_login_logout_FailApiServer(session, mock_put):
@pytest.mark.api_con_required
def test_relogin_SuccessLoggedOut(session):
def test_assert_login_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.relogin()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
@ -327,17 +269,74 @@ def test_relogin_SuccessLoggedOut(session):
@pytest.mark.api_con_required
def test_relogin_SuccessStillLoggedIn(session):
def test_assert_login_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
old_token = session.session_token
assert old_token is not None
resp, status = session.relogin()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
assert session.session_token != old_token
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]