Compare commits

...

43 Commits
v0.5.1 ... main

Author SHA1 Message Date
e14a8a2036 bump version to 0.5.12 2025-07-02 09:47:14 +02:00
7bd3322733 trigger manual hook of pip-system-certs to use system cert stores, related to #34 2025-07-02 09:36:59 +02:00
ccce2c703e add dep to use system cert stores for request verification 2025-06-26 14:45:21 +02:00
7df35ed05d fix build script 2025-05-15 12:14:19 +02:00
b2e22f7353 release v0.5.10 2025-05-15 12:00:24 +02:00
559ef90d61 update build script 2025-05-15 11:55:09 +02:00
fcd85a609d add new build script for improved automation 2025-05-15 11:52:44 +02:00
3e14a8660e add date plausibility filter, fixes #31 2025-05-14 09:13:49 +02:00
33760bd764 new filter date for request test case in forecast 2025-05-14 09:13:23 +02:00
3011ca46cd hint for insertion of date plausibility check, related to #31 2025-05-07 09:16:11 +02:00
9881070425 add new dep 2025-05-07 08:58:39 +02:00
ce24fd8126 use session as variable in sales forecast pipeline 2025-05-07 07:09:19 +02:00
2ce5b74fa4 bump version to 0.5.9 2025-04-30 16:31:46 +02:00
e57d39c416 add login check for first request execution 2025-04-30 15:44:08 +02:00
77b4bd9700 requests: add breakout if response is not auth related, fix #28 2025-04-30 14:54:31 +02:00
690431472c add option for multiple company IDs, related to #26 2025-04-30 14:46:23 +02:00
248b811786 include login assertion in requests to eliminate unnecessary calls to the API 2025-04-30 08:42:44 +02:00
453490c0f5 basic logic for relogin in request 2025-04-30 07:53:25 +02:00
1d63469be9 prepare insertion of retry and assertion of login state 2025-04-29 16:09:08 +02:00
67406b5690 remove irrelevant statements 2025-04-29 14:13:10 +02:00
daaf48f1db new dev version 2025-04-29 14:11:08 +02:00
d754a94f98 disable logging from SQLAlchemy, fix #24 2025-04-29 14:10:09 +02:00
1447752970 remove unneeded code 2025-04-17 11:57:25 +02:00
4072b97012 add env setup for runtime to enable multiprocessing parameter search by joblib, closes #23 2025-04-17 11:55:01 +02:00
a1057fc78b enhanced config handling 2025-04-17 11:53:59 +02:00
214659c7f1 prepare fix for joblib support in C# environment, related to #23 2025-04-16 18:14:13 +02:00
58fd5bd921 bump version 2025-04-16 13:45:46 +02:00
c2757cca26 implement behaviour control by config via setup data path 2025-04-16 13:40:55 +02:00
c46c90f548 basic structure for lazy config loading 2025-04-16 12:23:49 +02:00
fc4d54dc4b add dep: tomli-w 2025-04-16 12:23:37 +02:00
5d53551923 update deps - dopt-basics 2025-04-16 11:56:41 +02:00
6a7f59116f remove unneeded pytest mark 2025-04-16 11:56:21 +02:00
063531a08e major overhaul of forecast pipeline (#21)
includes several aspects:

- harden forecast logic with additional error checks
- fix wrong behaviour
- ensure minimum data viability
- extrapolate for multiple data points into the future

fix #19

Co-authored-by: frasu
Reviewed-on: #21
Co-authored-by: foefl <f.foerster@d-opt.com>
Co-committed-by: foefl <f.foerster@d-opt.com>
2025-04-16 09:24:33 +00:00
6caa087efd re-enable logging 2025-04-10 11:12:57 +02:00
2d48be0009 update gitignore to exclude doc folders 2025-04-10 07:37:23 +02:00
fdb9812ecf add script to bump patch version 2025-04-10 07:13:35 +02:00
9f90aec324 bump version 2025-04-09 09:28:27 +02:00
dc848fd840 increase timeout timespan 2025-04-09 09:27:23 +02:00
a0d189ac9f add logging of pipeline metrics in database 2025-04-04 13:37:05 +02:00
6a418118d2 prepare metrics writing process 2025-04-03 16:05:46 +02:00
5d78fc9e02 added handling for API connectivity errors 2025-04-03 12:51:14 +02:00
b93b070682 adapt C# JSON type 2025-04-03 11:22:00 +02:00
30641103ec rework session management: interface to C# 2025-04-03 09:26:56 +02:00
28 changed files with 989 additions and 263 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ prototypes/
data/
reports/
*.code-workspace
docs/
# credentials
CREDENTIALS*

91
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:4931e32f8c146a72ad5b0a13c02485ea5ddc727de32fbe7c5e9314bbab05966c"
content_hash = "sha256:f2a2abd891603796228b21bfeb7a00fd998964fe9303a9e4e5971f63925261e8"
[[metadata.targets]]
requires_python = ">=3.11"
@ -579,7 +579,7 @@ files = [
[[package]]
name = "dopt-basics"
version = "0.1.2"
version = "0.1.3"
requires_python = ">=3.11"
summary = "basic cross-project tools for Python-based d-opt projects"
groups = ["default"]
@ -587,8 +587,19 @@ dependencies = [
"tzdata>=2025.1",
]
files = [
{file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"},
{file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"},
{file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"},
{file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
requires_python = ">=3.8"
summary = "An implementation of lxml.xmlfile for the standard library"
groups = ["dev"]
files = [
{file = "et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa"},
{file = "et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54"},
]
[[package]]
@ -1450,6 +1461,20 @@ files = [
{file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"},
]
[[package]]
name = "openpyxl"
version = "3.1.5"
requires_python = ">=3.8"
summary = "A Python library to read/write Excel 2010 xlsx/xlsm files"
groups = ["dev"]
dependencies = [
"et-xmlfile",
]
files = [
{file = "openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2"},
{file = "openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050"},
]
[[package]]
name = "overrides"
version = "7.7.0"
@ -1571,6 +1596,31 @@ files = [
{file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"},
]
[[package]]
name = "pip"
version = "25.1.1"
requires_python = ">=3.9"
summary = "The PyPA recommended tool for installing Python packages."
groups = ["default"]
files = [
{file = "pip-25.1.1-py3-none-any.whl", hash = "sha256:2913a38a2abf4ea6b64ab507bd9e967f3b53dc1ede74b01b0931e1ce548751af"},
{file = "pip-25.1.1.tar.gz", hash = "sha256:3de45d411d308d5054c2168185d8da7f9a2cd753dbac8acbfa88a8909ecd9077"},
]
[[package]]
name = "pip-system-certs"
version = "5.2"
requires_python = ">=3.10"
summary = "Automatically configures Python to use system certificates via truststore"
groups = ["default"]
dependencies = [
"pip>=24.2",
]
files = [
{file = "pip_system_certs-5.2-py3-none-any.whl", hash = "sha256:e6ef3e106d4d02313e33955c2bcc4c2b143b2da07ef91e28a6805a0c1c512126"},
{file = "pip_system_certs-5.2.tar.gz", hash = "sha256:80b776b5cf17191bf99d313699b7fce2fdb84eb7bbb225fd134109a82706406f"},
]
[[package]]
name = "platformdirs"
version = "4.3.6"
@ -1623,7 +1673,7 @@ name = "psutil"
version = "7.0.0"
requires_python = ">=3.6"
summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7."
groups = ["nb"]
groups = ["default", "nb"]
files = [
{file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"},
{file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"},
@ -2414,6 +2464,17 @@ files = [
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
]
[[package]]
name = "tomli-w"
version = "1.2.0"
requires_python = ">=3.9"
summary = "A lil' TOML writer"
groups = ["dev"]
files = [
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
]
[[package]]
name = "tomlkit"
version = "0.13.2"
@ -2600,8 +2661,8 @@ files = [
[[package]]
name = "xgboost"
version = "2.1.4"
requires_python = ">=3.8"
version = "3.0.0"
requires_python = ">=3.10"
summary = "XGBoost Python Package"
groups = ["default"]
dependencies = [
@ -2610,12 +2671,12 @@ dependencies = [
"scipy",
]
files = [
{file = "xgboost-2.1.4-py3-none-macosx_10_15_x86_64.macosx_11_0_x86_64.macosx_12_0_x86_64.whl", hash = "sha256:78d88da184562deff25c820d943420342014dd55e0f4c017cc4563c2148df5ee"},
{file = "xgboost-2.1.4-py3-none-macosx_12_0_arm64.whl", hash = "sha256:523db01d4e74b05c61a985028bde88a4dd380eadc97209310621996d7d5d14a7"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:57c7e98111aceef4b689d7d2ce738564a1f7fe44237136837a47847b8b33bade"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f1343a512e634822eab30d300bfc00bf777dc869d881cc74854b42173cfcdb14"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:d366097d0db047315736f46af852feaa907f6d7371716af741cdce488ae36d20"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:8df6da72963969ab2bf49a520c3e147b1e15cbeddd3aa0e3e039b3532c739339"},
{file = "xgboost-2.1.4-py3-none-win_amd64.whl", hash = "sha256:8bbfe4fedc151b83a52edbf0de945fd94358b09a81998f2945ad330fd5f20cd6"},
{file = "xgboost-2.1.4.tar.gz", hash = "sha256:ab84c4bbedd7fae1a26f61e9dd7897421d5b08454b51c6eb072abc1d346d08d7"},
{file = "xgboost-3.0.0-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:ed8cffd7998bd9431c3b0287a70bec8e45c09b43c9474d9dfd261627713bd890"},
{file = "xgboost-3.0.0-py3-none-macosx_12_0_arm64.whl", hash = "sha256:314104bd3a1426a40f0c9662eef40e9ab22eb7a8068a42a8d198ce40412db75c"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:72c3405e8dfc37048f9fe339a058fa12b9f0f03bc31d3e56f0887eed2ed2baa1"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:72d39e74649e9b628c4221111aa6a8caa860f2e853b25480424403ee61085126"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:7bdee5787f86b83bebd75e2c96caf854760788e5f4203d063da50db5bf0efc5f"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:61c7e391e373b8a5312503525c0689f83ef1912a1236377022865ab340f465a4"},
{file = "xgboost-3.0.0-py3-none-win_amd64.whl", hash = "sha256:0ea74e97f95b1eddfd27a46b7f22f72ec5a5322e1dc7cb41c9c23fb580763df9"},
{file = "xgboost-3.0.0.tar.gz", hash = "sha256:45e95416df6f6f01d9a62e60cf09fc57e5ee34697f3858337c796fac9ce3b9ed"},
]

View File

@ -1,11 +1,11 @@
[project]
name = "delta-barth"
version = "0.5.1"
version = "0.5.12"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2", "SQLAlchemy>=2.0.39"]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39", "psutil>=7.0.0", "pip-system-certs>=5.2"]
requires-python = ">=3.11"
readme = "README.md"
license = {text = "LicenseRef-Proprietary"}
@ -44,7 +44,8 @@ filterwarnings = [
]
markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
]
log_cli = true
@ -73,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.5.1"
current_version = "0.5.12"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@ -145,6 +146,8 @@ dev = [
"pdoc3>=0.11.5",
"bump-my-version>=1.1.1",
"nox>=2025.2.9",
"tomli-w>=1.2.0",
"openpyxl>=3.1.5",
]
nb = [
"jupyterlab>=4.3.5",

View File

@ -1 +1,73 @@
pdm build -d build/
pdm build --no-sdist -d build/
# Configuration
$sourceDir = ".\build"
$destDir = "..\01_releases\runtime"
$packagePrefix = "delta_barth-"
$packageSuffix = "-py3-none-any.whl"
# Ensure destination exists
if (-not (Test-Path $destDir)) {
New-Item -ItemType Directory -Path $destDir | Out-Null
}
# === Build Regex Pattern ===
$escapedSuffix = [regex]::Escape($packageSuffix)
# Match versions like 1.2.3 or 1.2.3.beta or 1.2.3.beta1
# Capture the full version as one string, including the optional pre-release after a dot
$pattern = "^$packagePrefix(?<version>\d+\.\d+\.\d+(?:\.[a-zA-Z0-9\-]+)?)$escapedSuffix$"
Write-Host "Using pattern: $pattern"
# === Get and Filter Files ===
$allFiles = Get-ChildItem -Path $sourceDir -File
$matchingFiles = @()
foreach ($file in $allFiles) {
if ($file.Name -match $pattern) {
$version = $Matches['version']
$matchingFiles += [PSCustomObject]@{
File = $file
Version = $version
}
Write-Host "Matched: $($file.Name) -> Version: $version"
} else {
Write-Host "No match: $($file.Name)"
}
}
if ($matchingFiles.Count -eq 0) {
Write-Host "No matching package files found."
return
}
# === Convert version strings to sortable format ===
function Convert-VersionForSort($v) {
# Split by dot: e.g., 1.2.3.beta -> [1, 2, 3, "beta"]
$parts = $v -split '\.'
$major = [int]$parts[0]
$minor = [int]$parts[1]
$patch = [int]$parts[2]
$pre = if ($parts.Count -gt 3) { $parts[3] } else { "~" } # "~" to ensure stable > prerelease
return [PSCustomObject]@{
Major = $major
Minor = $minor
Patch = $patch
Pre = $pre
}
}
# === Sort by semantic version + pre-release ===
$latest = $matchingFiles | Sort-Object {
Convert-VersionForSort $_.Version
} -Descending | Select-Object -First 1
# === Copy and rename to .zip ===
$baseName = [System.IO.Path]::GetFileNameWithoutExtension($latest.File.Name)
$newFileName = "$baseName.zip"
$destPath = Join-Path $destDir $newFileName
Copy-Item -Path $latest.File.FullName -Destination $destPath

2
scripts/bump_patch.ps1 Normal file
View File

@ -0,0 +1,2 @@
pdm run bump-my-version bump patch
pdm run bump-my-version show current_version

View File

@ -0,0 +1,3 @@
import pip_system_certs.wrapt_requests
pip_system_certs.wrapt_requests.inject_truststore()

View File

@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
def status_err() -> str:
status = Status(code=102, description="internal error occurred", message="caused by test")
status = Status(
code=102,
description="internal error occurred: 'Limit-Überschreitung'",
message="caused by test",
)
return status.model_dump_json()

33
src/delta_barth/_env.py Normal file
View File

@ -0,0 +1,33 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import Final
from dopt_basics import io
PY_RUNTIME_FOLDER: Final[str] = "python"
def prepare_env(
lib_path: Path,
) -> Path | None:
pyrt_folder = io.search_folder_path(
starting_path=lib_path,
stop_folder_name=PY_RUNTIME_FOLDER,
return_inclusive=True,
)
if pyrt_folder is None:
return None
pth_interpreter = pyrt_folder / "python.exe"
if not pth_interpreter.exists():
raise FileNotFoundError(
f"dopt-delta-barth seems to be deployed in a standalone runtime, "
f"but the interpreter was not found under: {pth_interpreter}"
)
setattr(sys, "executable", str(pth_interpreter))
setattr(sys, "_base_executable", str(pth_interpreter))
return pyrt_folder

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import copy
import datetime
import math
from collections.abc import Mapping, Set
@ -7,10 +8,12 @@ from dataclasses import asdict
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast
import joblib
import numpy as np
import pandas as pd
import scipy.stats
import sqlalchemy as sql
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor
@ -26,9 +29,10 @@ from delta_barth.api.requests import (
)
from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS,
MAX_NUM_WORKERS,
SALES_MIN_NUM_DATAPOINTS,
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
@ -110,7 +114,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data)
@wrap_result()
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics,
) -> None:
@ -182,16 +186,17 @@ def _process_sales(
PipeResult
_description_
"""
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data
data = pipe.data
assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)]
df_cust = df_firma.copy()
data[DATE_FEAT] = pd.to_datetime(data[DATE_FEAT], errors="coerce")
data = data.dropna(subset=["buchungs_datum"])
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust)
@ -205,7 +210,26 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
)
@ -214,13 +238,17 @@ def _process_sales(
features = ["jahr", "monat"]
target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min())
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search
kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000),
"n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9),
"min_child_weight": range(1, 5),
@ -230,40 +258,58 @@ def _process_sales(
"early_stopping_rounds": [20, 50],
}
best_estimator = None
best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf")
best_score_r2: float | None = None
best_start_year: int | None = None
too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1):
dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
monthly_sum.loc[first_date:split_date].copy(), # type: ignore
)
test = cast(
pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
monthly_sum.loc[split_date:].copy(), # type: ignore
)
X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
# test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=-1,
n_iter=100,
verbose=0,
)
rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
)
with joblib.parallel_config(backend="loky"):
rand = RandomizedSearchCV(
XGBRegressor(),
params,
scoring="neg_mean_absolute_error",
cv=kfold,
n_jobs=MAX_NUM_WORKERS,
n_iter=100,
verbose=0,
)
rand.fit(
X_train,
y_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=0,
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1:
@ -272,13 +318,21 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year
print("executed")
forecast = test.copy()
forecast.loc[:, "vorhersage"] = y_pred
# --- new: use first_date for best_start_year
best_start_year = first_date.year
# --- new: store best_estimator
best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points:
@ -294,7 +348,9 @@ def _process_sales(
pipe.stats(stats)
return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned"
assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status)
stats = SalesForecastStatistics(
@ -350,13 +406,13 @@ def _export_on_fail(
def pipeline_sales_forecast(
session: Session,
company_id: int | None = None,
company_ids: list[int] | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data(
session,
company_id=company_id,
company_ids=company_ids,
start_date=start_date,
)
if status != STATUS_HANDLER.SUCCESS:
@ -383,7 +439,7 @@ def pipeline_sales_forecast(
pipe = _process_sales(
pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
base_num_data_points_months=session.cfg.forecast.threshold_month_data_points,
)
if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics)

View File

@ -7,17 +7,20 @@ import requests
from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT, MAX_LOGIN_RETRIES
from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
if TYPE_CHECKING:
from requests import Response
from delta_barth.session import Session
# ** sales data
# ** import
class SalesPrognosisRequestP(BaseModel):
FirmaId: SkipValidation[int | None]
FirmaIds: SkipValidation[list[int] | None]
BuchungsDatum: SkipValidation[Datetime | None]
@ -52,34 +55,50 @@ class SalesPrognosisResultsExport(ExportResponse):
def get_sales_prognosis_data(
session: Session,
company_id: int | None = None,
company_ids: list[int] | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
resp, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple())
return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP(
FirmaId=company_id,
FirmaIds=company_ids,
BuchungsDatum=start_date,
)
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
)
empty_response = SalesPrognosisResponse(daten=tuple())
if not session.logged_in:
_, status = session.login()
if status != STATUS_HANDLER.SUCCESS:
return empty_response, status
resp: Response | None = None
try:
for attempt in range(1, (MAX_LOGIN_RETRIES + 1)):
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
)
if resp.status_code == 401:
_, status = session.relogin()
if status != STATUS_HANDLER.SUCCESS and attempt == MAX_LOGIN_RETRIES:
return empty_response, status
continue
break
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: SalesPrognosisResponse
status: Status
assert resp is not None, "tried to use not defined response"
if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS
else:
response = SalesPrognosisResponse(daten=tuple())
response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)

43
src/delta_barth/config.py Normal file
View File

@ -0,0 +1,43 @@
from __future__ import annotations
from pathlib import Path
import dopt_basics.configs
from pydantic import BaseModel
class Config(BaseModel):
forecast: CfgForecast
class CfgForecast(BaseModel):
threshold_month_data_points: int
class LazyCfgLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
cfg_path = cfg_path.resolve()
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
self._path = cfg_path
self._cfg: Config | None = None
@property
def path(self) -> Path:
return self._path
def _load(self) -> Config:
cfg = dopt_basics.configs.load_toml(self.path)
return Config(**cfg)
def reload(self) -> None:
self._cfg = self._load()
def get(self) -> Config:
if self._cfg is None:
self._cfg = self._load()
return self._cfg

View File

@ -1,10 +1,19 @@
from __future__ import annotations
import enum
from pathlib import Path
from typing import Final
import psutil
import delta_barth._env
from delta_barth.types import DualDict, HttpContentHeaders
# ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
CFG_HOT_RELOAD: Final[bool] = True
cpu_count = psutil.cpu_count(logical=False)
MAX_NUM_WORKERS: Final[int] = (cpu_count - 1) if cpu_count is not None else 3
# ** lib path
lib_path = Path(__file__).parent
@ -13,18 +22,20 @@ LIB_PATH: Final[Path] = lib_path
dummy_data_pth = LIB_PATH / "_dummy_data"
assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** runtime and deployment status
RUNTIME_PATH: Final[Path | None] = delta_barth._env.prepare_env(LIB_PATH)
deployment_status: bool = False
if RUNTIME_PATH is not None:
deployment_status = True
DEPLOYMENT_STATUS: Final[bool] = deployment_status
# ** logging
ENABLE_LOGGING: Final[bool] = False
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True
LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases
DB_ECHO: Final[bool] = True
DB_ECHO: Final[bool] = False
# ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400
@ -38,6 +49,10 @@ class KnownDelBarApiErrorCodes(enum.Enum):
COMMON = frozenset((400, 401, 409, 500))
# ** API
API_CON_TIMEOUT: Final[float] = 20.0 # secs to response
MAX_LOGIN_RETRIES: Final[int] = 2
# ** API response parsing
# ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
@ -60,4 +75,6 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
# ** Pipelines
# ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36
# !! now in config
# TODO remove later till proven stable
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement",
metadata,
sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
)
# ** ---- forecasts
sf_stats = sql.Table(

View File

@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@ -53,9 +53,19 @@ class UApiError(Exception):
## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
(
"CONNECTION_TIMEOUT",
1,
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
),
(
"CONNECTION_ERROR",
2,
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
),
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
)

View File

@ -6,14 +6,13 @@ from pathlib import Path
from time import gmtime
from typing import Final
from delta_barth.constants import (
ENABLE_LOGGING,
LOG_FILENAME,
LOGGING_TO_FILE,
LOGGING_TO_STDERR,
)
# ** config
# ** logging
ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log"
logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
@ -31,6 +30,8 @@ logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG)
logger_config = logging.getLogger("delta_barth.config")
logger_config.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")

View File

@ -14,9 +14,11 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup(
data_path: str,
base_url: str,
) -> None: # pragma: no cover
# at this point: no logging configured
SESSION.set_data_path(data_path)
SESSION.set_base_url(base_url=base_url)
SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
@ -37,6 +39,7 @@ def set_credentials(
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
# ** not part of external API, only internal
def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds
@ -44,12 +47,15 @@ def get_credentials() -> str: # pragma: no cover
return creds.model_dump_json()
# ** legacy: not part of external API
def set_base_url(
base_url: str,
) -> None: # pragma: no cover
SESSION.set_base_url(base_url=base_url)
def get_data_path() -> str: # pragma: no cover
return str(SESSION.data_path)
def get_base_url() -> str: # pragma: no cover
return SESSION.base_url

View File

@ -1,24 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse
from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast(
company_id: int | None,
company_ids: list[int] | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date
SESSION, company_ids=company_ids, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export
@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy(
SESSION,
company_id=company_id,
start_date=start_date,
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Final
@ -14,12 +15,20 @@ from delta_barth.api.common import (
LoginResponse,
validate_credentials,
)
from delta_barth.constants import DB_ECHO
from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
CFG_HOT_RELOAD,
DB_ECHO,
LIB_PATH,
)
from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING:
from delta_barth.config import Config
from delta_barth.types import ApiCredentials, HttpContentHeaders
@ -41,6 +50,7 @@ class Session:
base_headers: HttpContentHeaders,
db_folder: str = "data",
logging_folder: str = "logs",
cfg_folder: str = "config",
) -> None:
self._setup: bool = False
self._data_path: Path | None = None
@ -49,6 +59,10 @@ class Session:
self._db_engine: sql.Engine | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._cfg_path: Path | None = None
self._cfg_folder = cfg_folder
self._cfg_loader: LazyCfgLoader | None = None
self._cfg: Config | None = None
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
@ -59,6 +73,7 @@ class Session:
# at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging()
self._setup_config()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@ -68,6 +83,39 @@ class Session:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def cfg_path(self) -> Path:
if self._cfg_path is not None and self._setup:
return self._cfg_path
root = (self.data_path / self._cfg_folder).resolve()
cfg_path = root / CFG_FILENAME
if not root.exists():
root.mkdir(parents=False)
self._cfg_path = cfg_path
return self._cfg_path
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
if CFG_HOT_RELOAD:
self.reload_cfg()
return self._cfg
def _setup_config(self) -> None:
if not self.cfg_path.exists():
src_cfg = LIB_PATH / CFG_FILENAME
shutil.copyfile(src_cfg, self.cfg_path)
self._cfg_loader = LazyCfgLoader(self.cfg_path)
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
def reload_cfg(self) -> None:
assert self._cfg_loader is not None, "tried reloading with no CFG loader intialised"
self._cfg_loader.reload()
self._cfg = self._cfg_loader.get()
@property
def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set"
@ -78,10 +126,10 @@ class Session:
if self._db_path is not None and self._setup:
return self._db_path
db_root = (self.data_path / self._db_folder).resolve()
db_path = db_root / "dopt-data.db"
if not db_root.exists():
db_root.mkdir(parents=False)
root = (self.data_path / self._db_folder).resolve()
db_path = root / "dopt-data.db"
if not root.exists():
root.mkdir(parents=False)
self._db_path = db_path
return self._db_path
@ -191,11 +239,18 @@ class Session:
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
)
empty_response = LoginResponse(token="")
try:
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse
status: Status
@ -204,7 +259,7 @@ class Session:
status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = LoginResponse(token="")
response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
@ -216,12 +271,17 @@ class Session:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put(
URL,
headers=self.headers, # type: ignore
)
try:
resp = requests.put(
URL,
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response = None
status: Status
if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS
@ -230,39 +290,13 @@ class Session:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
return None, status
def assert_login(
def relogin(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
self._remove_session_token()
return self.login()

View File

@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
@dataclass(slots=True)
class DataPipeStates:
SUCCESS: Status
CONNECTION_TIMEOUT: Status
CONNECTION_ERROR: Status
TOO_FEW_POINTS: Status
TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status
@ -139,7 +141,13 @@ class Statistics:
pass
# ** forecasts
# ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True)
class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list)

View File

@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@ -1,3 +1,4 @@
import datetime
from datetime import datetime as Datetime
from unittest.mock import patch
@ -255,6 +256,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -277,6 +279,29 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_InvalidDates(sales_data_real_preproc):
false_date = Datetime(2519, 6, 30)
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
data["buchungs_datum"] = data["buchungs_datum"].astype(object)
data.at[0, "buchungs_datum"] = false_date
assert data["buchungs_datum"].dtype.char == "O"
assert len(data) == 20
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
@ -303,6 +328,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -329,8 +355,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy()
# prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000
print(data["betrag"])
data = data.iloc[:20000, :]
@ -340,7 +377,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None:
class Predictor:
def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1])
return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor()
@ -354,7 +391,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales(
pipe,
min_num_data_points=1,
base_num_data_points_months=-100,
base_num_data_points_months=1,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
@ -415,27 +452,20 @@ def test_export_on_fail():
assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
with patch("delta_barth.analysis.forecast.SESSION", session):
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_FailDbWrite(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert session.cfg.forecast.threshold_month_data_points is not None
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(session, company_ids, date) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0

View File

@ -1,45 +1,44 @@
from datetime import datetime as Datetime
import pytest
import requests
from delta_barth.api import requests as requests_
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_Success(session):
resp, status = session.login()
# do not login: let routine do it
# test without company ID
assert status.code == 0
date = Datetime(2022, 6, 1)
date = Datetime(2023, 12, 15)
resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0
assert len(resp.daten) > 0
date = Datetime(2030, 1, 1)
date = Datetime(2520, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0
assert len(resp.daten) == 0
# test with company ID
assert status.code == 0
date = Datetime(2022, 6, 1)
company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
assert status.code == 0
assert len(resp.daten) > 0
date = Datetime(2030, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
date = Datetime(2520, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
assert status.code == 0
assert len(resp.daten) == 0
# test with non-existent company ID
assert status.code == 0
date = Datetime(2022, 6, 1)
company_id = 1000024
resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
# TODO check if this behaviour is still considered "successful"
company_ids = [1000024]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
assert status.code == 0
assert len(resp.daten) == 0
# test without date
company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_id, None)
company_ids = [1024]
resp, status = requests_.get_sales_prognosis_data(session, company_ids, None)
assert status.code == 0
assert len(resp.daten) > 0
# test without filters
@ -52,12 +51,11 @@ def test_get_sales_prognosis_data_Success(session):
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailLogin(session, mock_get):
session.login()
code = 500
def test_get_sales_prognosis_data_NoAuth(session, mock_get):
code = 401
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"code": "TestFailAuth",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
@ -74,6 +72,36 @@ def test_get_sales_prognosis_data_FailLogin(session, mock_get):
assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailLogin(session, mock_get, mock_put):
code = 401
json = {
"message": "ServerError",
"code": "TestFailAuth",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
code_put = 500
json_put = {
"message": "ServerError",
"code": "TestUnknownError",
"hints": "TestCase",
}
mock_put.return_value.status_code = code_put
mock_put.return_value.json.return_value = json_put
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code_put
assert status.api_server_error.message == json_put["message"]
assert status.api_server_error.code == json_put["code"]
assert status.api_server_error.hints == json_put["hints"]
@pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
code = 405
@ -94,3 +122,21 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 1
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 2

View File

@ -8,6 +8,7 @@ from unittest.mock import patch
import pandas as pd
import pytest
import tomli_w
import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse
@ -33,6 +34,28 @@ def api_base_url(credentials) -> str:
return credentials["base_url"]
@pytest.fixture(scope="session")
def pth_dummy_cfg() -> Path:
pwd = Path.cwd()
assert "barth" in pwd.parent.name.lower(), "not in project root directory"
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
assert data_pth.exists(), "file to dummy CFG not found"
return data_pth
@pytest.fixture(scope="function")
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path:
with open(pth_dummy_cfg, "rb") as file:
cfg_data = tomllib.load(file)
target = tmp_path / "dummy_cfg.toml"
target.touch()
with open(target, "wb") as file:
tomli_w.dump(cfg_data, file)
return target
@pytest.fixture(scope="session")
def sales_data_real() -> pd.DataFrame:
pwd = Path.cwd()
@ -95,7 +118,7 @@ def mock_put():
yield mock
@pytest.fixture
@pytest.fixture(scope="function")
def mock_get():
with patch("requests.get") as mock:
yield mock

40
tests/test_config.py Normal file
View File

@ -0,0 +1,40 @@
import tomllib
import tomli_w
from delta_barth import config
def test_CfgLoader_Init(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
assert loader.path == pth_cfg
assert loader._cfg is None
def test_CfgLoader_Get(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
def test_CfgLoader_Reload(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
# modify config and reload
with open(pth_cfg, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(pth_cfg, "wb") as file:
tomli_w.dump(cfg_data, file)
assert parsed_cfg.forecast.threshold_month_data_points == 28
loader.reload()
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 30

49
tests/test_env.py Normal file
View File

@ -0,0 +1,49 @@
import importlib
import sys
from unittest.mock import patch
import pytest
import delta_barth.constants
from delta_barth import _env
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "test123456")
def test_prepare_env_NoRuntimeFolder(tmp_path):
ret = _env.prepare_env(tmp_path)
assert ret is None
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_FailNoInterpreter(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
with pytest.raises(FileNotFoundError):
_ = _env.prepare_env(mocked_lib_pth)
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_Success(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
rt_path = mocked_lib_pth.parents[1]
mocked_interpreter = rt_path / "python.exe"
mocked_interpreter.touch()
assert mocked_interpreter.exists()
ret = _env.prepare_env(mocked_lib_pth)
assert ret == rt_path
# sys attributes
executable = getattr(sys, "executable")
assert executable == str(mocked_interpreter)
base_executable = getattr(sys, "_base_executable")
assert base_executable == str(mocked_interpreter)
class MockPath:
def __init__(self, *args, **kwargs):
self.parent = mocked_lib_pth
with patch("pathlib.Path", MockPath):
(mocked_lib_pth / "_dummy_data").mkdir(exist_ok=True)
importlib.reload(delta_barth.constants)
assert delta_barth.constants.DEPLOYMENT_STATUS
assert delta_barth.constants.RUNTIME_PATH == rt_path

View File

@ -1,22 +1,65 @@
import importlib
import json
from datetime import datetime as Datetime
from unittest.mock import patch
import pytest
import sqlalchemy as sql
import delta_barth.pipelines
from delta_barth import databases as db
from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
importlib.reload(delta_barth.pipelines)
json_export = pl.pipeline_sales_forecast(None, None)
def test_write_performance_metrics_Success(session):
pipe_name = "test_pipe"
t_start = 20_000_000_000
t_end = 30_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
metrics = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
def test_write_performance_metrics_FailStartingTime(session):
pipe_name = "test_pipe"
t_start = 30_000_000_000
t_end = 20_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.pipelines.SESSION", session),
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(company_ids, date)
assert isinstance(json_export, str)
parsed_resp = json.loads(json_export)
@ -27,9 +70,17 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
def test_sales_prognosis_pipeline_dummy():
json_export = pl.pipeline_sales_forecast_dummy(None, None)
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast"
assert metrics.execution_duration > 0
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str)
parsed_resp = json.loads(json_export)
@ -43,3 +94,10 @@ def test_sales_prognosis_pipeline_dummy():
assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0

View File

@ -1,15 +1,18 @@
import tomllib
from pathlib import Path
from unittest.mock import patch
import pytest
import tomli_w
import delta_barth.config
import delta_barth.session
from delta_barth import logging
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
)
from delta_barth.logging import LOG_FILENAME
def test_validate_path_Success():
@ -62,8 +65,82 @@ def test_session_setup_db_management(tmp_path):
assert db_path.exists()
def test_session_setup_config(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
cfg_path2 = session.cfg_path
assert cfg_path2 == cfg_path
assert session._cfg is not None
assert cfg_path.exists()
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_session_reload_config_NoHotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 28
session.reload_cfg()
reload_cfg = session.cfg
assert isinstance(reload_cfg, delta_barth.config.Config)
assert reload_cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.session.CFG_HOT_RELOAD", True)
def test_session_reload_config_HotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
@ -237,11 +314,11 @@ def test_login_logout_FailApiServer(session, mock_put):
@pytest.mark.api_con_required
def test_assert_login_SuccessLoggedOut(session):
def test_relogin_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.assert_login()
resp, status = session.relogin()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
@ -250,74 +327,17 @@ def test_assert_login_SuccessLoggedOut(session):
@pytest.mark.api_con_required
def test_assert_login_SuccessStillLoggedIn(session):
def test_relogin_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
resp, status = session.assert_login()
old_token = session.session_token
assert old_token is not None
resp, status = session.relogin()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
assert session.session_token != old_token
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]