Compare commits

...

38 Commits
v0.5.5 ... main

Author SHA1 Message Date
e14a8a2036 bump version to 0.5.12 2025-07-02 09:47:14 +02:00
7bd3322733 trigger manual hook of pip-system-certs to use system cert stores, related to #34 2025-07-02 09:36:59 +02:00
ccce2c703e add dep to use system cert stores for request verification 2025-06-26 14:45:21 +02:00
7df35ed05d fix build script 2025-05-15 12:14:19 +02:00
b2e22f7353 release v0.5.10 2025-05-15 12:00:24 +02:00
559ef90d61 update build script 2025-05-15 11:55:09 +02:00
fcd85a609d add new build script for improved automation 2025-05-15 11:52:44 +02:00
3e14a8660e add date plausibility filter, fixes #31 2025-05-14 09:13:49 +02:00
33760bd764 new filter date for request test case in forecast 2025-05-14 09:13:23 +02:00
3011ca46cd hint for insertion of date plausibility check, related to #31 2025-05-07 09:16:11 +02:00
9881070425 add new dep 2025-05-07 08:58:39 +02:00
ce24fd8126 use session as variable in sales forecast pipeline 2025-05-07 07:09:19 +02:00
2ce5b74fa4 bump version to 0.5.9 2025-04-30 16:31:46 +02:00
e57d39c416 add login check for first request execution 2025-04-30 15:44:08 +02:00
77b4bd9700 requests: add breakout if response is not auth related, fix #28 2025-04-30 14:54:31 +02:00
690431472c add option for multiple company IDs, related to #26 2025-04-30 14:46:23 +02:00
248b811786 include login assertion in requests to eliminate unnecessary calls to the API 2025-04-30 08:42:44 +02:00
453490c0f5 basic logic for relogin in request 2025-04-30 07:53:25 +02:00
1d63469be9 prepare insertion of retry and assertion of login state 2025-04-29 16:09:08 +02:00
67406b5690 remove irrelevant statements 2025-04-29 14:13:10 +02:00
daaf48f1db new dev version 2025-04-29 14:11:08 +02:00
d754a94f98 disable logging from SQLAlchemy, fix #24 2025-04-29 14:10:09 +02:00
1447752970 remove unneeded code 2025-04-17 11:57:25 +02:00
4072b97012 add env setup for runtime to enable multiprocessing parameter search by joblib, closes #23 2025-04-17 11:55:01 +02:00
a1057fc78b enhanced config handling 2025-04-17 11:53:59 +02:00
214659c7f1 prepare fix for joblib support in C# environment, related to #23 2025-04-16 18:14:13 +02:00
58fd5bd921 bump version 2025-04-16 13:45:46 +02:00
c2757cca26 implement behaviour control by config via setup data path 2025-04-16 13:40:55 +02:00
c46c90f548 basic structure for lazy config loading 2025-04-16 12:23:49 +02:00
fc4d54dc4b add dep: tomli-w 2025-04-16 12:23:37 +02:00
5d53551923 update deps - dopt-basics 2025-04-16 11:56:41 +02:00
6a7f59116f remove unneeded pytest mark 2025-04-16 11:56:21 +02:00
063531a08e major overhaul of forecast pipeline (#21)
includes several aspects:

- harden forecast logic with additional error checks
- fix wrong behaviour
- ensure minimum data viability
- extrapolate for multiple data points into the future

fix #19

Co-authored-by: frasu
Reviewed-on: #21
Co-authored-by: foefl <f.foerster@d-opt.com>
Co-committed-by: foefl <f.foerster@d-opt.com>
2025-04-16 09:24:33 +00:00
6caa087efd re-enable logging 2025-04-10 11:12:57 +02:00
2d48be0009 update gitignore to exclude doc folders 2025-04-10 07:37:23 +02:00
fdb9812ecf add script to bump patch version 2025-04-10 07:13:35 +02:00
9f90aec324 bump version 2025-04-09 09:28:27 +02:00
dc848fd840 increase timeout timespan 2025-04-09 09:27:23 +02:00
23 changed files with 773 additions and 259 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ prototypes/
data/ data/
reports/ reports/
*.code-workspace *.code-workspace
docs/
# credentials # credentials
CREDENTIALS* CREDENTIALS*

91
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"] groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:4931e32f8c146a72ad5b0a13c02485ea5ddc727de32fbe7c5e9314bbab05966c" content_hash = "sha256:f2a2abd891603796228b21bfeb7a00fd998964fe9303a9e4e5971f63925261e8"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.11" requires_python = ">=3.11"
@ -579,7 +579,7 @@ files = [
[[package]] [[package]]
name = "dopt-basics" name = "dopt-basics"
version = "0.1.2" version = "0.1.3"
requires_python = ">=3.11" requires_python = ">=3.11"
summary = "basic cross-project tools for Python-based d-opt projects" summary = "basic cross-project tools for Python-based d-opt projects"
groups = ["default"] groups = ["default"]
@ -587,8 +587,19 @@ dependencies = [
"tzdata>=2025.1", "tzdata>=2025.1",
] ]
files = [ files = [
{file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"}, {file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"},
{file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"}, {file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"},
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
requires_python = ">=3.8"
summary = "An implementation of lxml.xmlfile for the standard library"
groups = ["dev"]
files = [
{file = "et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa"},
{file = "et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54"},
] ]
[[package]] [[package]]
@ -1450,6 +1461,20 @@ files = [
{file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"}, {file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"},
] ]
[[package]]
name = "openpyxl"
version = "3.1.5"
requires_python = ">=3.8"
summary = "A Python library to read/write Excel 2010 xlsx/xlsm files"
groups = ["dev"]
dependencies = [
"et-xmlfile",
]
files = [
{file = "openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2"},
{file = "openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050"},
]
[[package]] [[package]]
name = "overrides" name = "overrides"
version = "7.7.0" version = "7.7.0"
@ -1571,6 +1596,31 @@ files = [
{file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"}, {file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"},
] ]
[[package]]
name = "pip"
version = "25.1.1"
requires_python = ">=3.9"
summary = "The PyPA recommended tool for installing Python packages."
groups = ["default"]
files = [
{file = "pip-25.1.1-py3-none-any.whl", hash = "sha256:2913a38a2abf4ea6b64ab507bd9e967f3b53dc1ede74b01b0931e1ce548751af"},
{file = "pip-25.1.1.tar.gz", hash = "sha256:3de45d411d308d5054c2168185d8da7f9a2cd753dbac8acbfa88a8909ecd9077"},
]
[[package]]
name = "pip-system-certs"
version = "5.2"
requires_python = ">=3.10"
summary = "Automatically configures Python to use system certificates via truststore"
groups = ["default"]
dependencies = [
"pip>=24.2",
]
files = [
{file = "pip_system_certs-5.2-py3-none-any.whl", hash = "sha256:e6ef3e106d4d02313e33955c2bcc4c2b143b2da07ef91e28a6805a0c1c512126"},
{file = "pip_system_certs-5.2.tar.gz", hash = "sha256:80b776b5cf17191bf99d313699b7fce2fdb84eb7bbb225fd134109a82706406f"},
]
[[package]] [[package]]
name = "platformdirs" name = "platformdirs"
version = "4.3.6" version = "4.3.6"
@ -1623,7 +1673,7 @@ name = "psutil"
version = "7.0.0" version = "7.0.0"
requires_python = ">=3.6" requires_python = ">=3.6"
summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7." summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7."
groups = ["nb"] groups = ["default", "nb"]
files = [ files = [
{file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"}, {file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"},
{file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"}, {file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"},
@ -2414,6 +2464,17 @@ files = [
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"}, {file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
] ]
[[package]]
name = "tomli-w"
version = "1.2.0"
requires_python = ">=3.9"
summary = "A lil' TOML writer"
groups = ["dev"]
files = [
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
]
[[package]] [[package]]
name = "tomlkit" name = "tomlkit"
version = "0.13.2" version = "0.13.2"
@ -2600,8 +2661,8 @@ files = [
[[package]] [[package]]
name = "xgboost" name = "xgboost"
version = "2.1.4" version = "3.0.0"
requires_python = ">=3.8" requires_python = ">=3.10"
summary = "XGBoost Python Package" summary = "XGBoost Python Package"
groups = ["default"] groups = ["default"]
dependencies = [ dependencies = [
@ -2610,12 +2671,12 @@ dependencies = [
"scipy", "scipy",
] ]
files = [ files = [
{file = "xgboost-2.1.4-py3-none-macosx_10_15_x86_64.macosx_11_0_x86_64.macosx_12_0_x86_64.whl", hash = "sha256:78d88da184562deff25c820d943420342014dd55e0f4c017cc4563c2148df5ee"}, {file = "xgboost-3.0.0-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:ed8cffd7998bd9431c3b0287a70bec8e45c09b43c9474d9dfd261627713bd890"},
{file = "xgboost-2.1.4-py3-none-macosx_12_0_arm64.whl", hash = "sha256:523db01d4e74b05c61a985028bde88a4dd380eadc97209310621996d7d5d14a7"}, {file = "xgboost-3.0.0-py3-none-macosx_12_0_arm64.whl", hash = "sha256:314104bd3a1426a40f0c9662eef40e9ab22eb7a8068a42a8d198ce40412db75c"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:57c7e98111aceef4b689d7d2ce738564a1f7fe44237136837a47847b8b33bade"}, {file = "xgboost-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:72c3405e8dfc37048f9fe339a058fa12b9f0f03bc31d3e56f0887eed2ed2baa1"},
{file = "xgboost-2.1.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f1343a512e634822eab30d300bfc00bf777dc869d881cc74854b42173cfcdb14"}, {file = "xgboost-3.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:72d39e74649e9b628c4221111aa6a8caa860f2e853b25480424403ee61085126"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:d366097d0db047315736f46af852feaa907f6d7371716af741cdce488ae36d20"}, {file = "xgboost-3.0.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:7bdee5787f86b83bebd75e2c96caf854760788e5f4203d063da50db5bf0efc5f"},
{file = "xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:8df6da72963969ab2bf49a520c3e147b1e15cbeddd3aa0e3e039b3532c739339"}, {file = "xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:61c7e391e373b8a5312503525c0689f83ef1912a1236377022865ab340f465a4"},
{file = "xgboost-2.1.4-py3-none-win_amd64.whl", hash = "sha256:8bbfe4fedc151b83a52edbf0de945fd94358b09a81998f2945ad330fd5f20cd6"}, {file = "xgboost-3.0.0-py3-none-win_amd64.whl", hash = "sha256:0ea74e97f95b1eddfd27a46b7f22f72ec5a5322e1dc7cb41c9c23fb580763df9"},
{file = "xgboost-2.1.4.tar.gz", hash = "sha256:ab84c4bbedd7fae1a26f61e9dd7897421d5b08454b51c6eb072abc1d346d08d7"}, {file = "xgboost-3.0.0.tar.gz", hash = "sha256:45e95416df6f6f01d9a62e60cf09fc57e5ee34697f3858337c796fac9ce3b9ed"},
] ]

View File

@ -1,11 +1,11 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.5" version = "0.5.12"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
] ]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2", "SQLAlchemy>=2.0.39"] dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39", "psutil>=7.0.0", "pip-system-certs>=5.2"]
requires-python = ">=3.11" requires-python = ">=3.11"
readme = "README.md" readme = "README.md"
license = {text = "LicenseRef-Proprietary"} license = {text = "LicenseRef-Proprietary"}
@ -44,7 +44,8 @@ filterwarnings = [
] ]
markers = [ markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')", "api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')", "new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
] ]
log_cli = true log_cli = true
@ -73,7 +74,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.5" current_version = "0.5.12"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.
@ -145,6 +146,8 @@ dev = [
"pdoc3>=0.11.5", "pdoc3>=0.11.5",
"bump-my-version>=1.1.1", "bump-my-version>=1.1.1",
"nox>=2025.2.9", "nox>=2025.2.9",
"tomli-w>=1.2.0",
"openpyxl>=3.1.5",
] ]
nb = [ nb = [
"jupyterlab>=4.3.5", "jupyterlab>=4.3.5",

View File

@ -1 +1,73 @@
pdm build -d build/ pdm build --no-sdist -d build/
# Configuration
$sourceDir = ".\build"
$destDir = "..\01_releases\runtime"
$packagePrefix = "delta_barth-"
$packageSuffix = "-py3-none-any.whl"
# Ensure destination exists
if (-not (Test-Path $destDir)) {
New-Item -ItemType Directory -Path $destDir | Out-Null
}
# === Build Regex Pattern ===
$escapedSuffix = [regex]::Escape($packageSuffix)
# Match versions like 1.2.3 or 1.2.3.beta or 1.2.3.beta1
# Capture the full version as one string, including the optional pre-release after a dot
$pattern = "^$packagePrefix(?<version>\d+\.\d+\.\d+(?:\.[a-zA-Z0-9\-]+)?)$escapedSuffix$"
Write-Host "Using pattern: $pattern"
# === Get and Filter Files ===
$allFiles = Get-ChildItem -Path $sourceDir -File
$matchingFiles = @()
foreach ($file in $allFiles) {
if ($file.Name -match $pattern) {
$version = $Matches['version']
$matchingFiles += [PSCustomObject]@{
File = $file
Version = $version
}
Write-Host "Matched: $($file.Name) -> Version: $version"
} else {
Write-Host "No match: $($file.Name)"
}
}
if ($matchingFiles.Count -eq 0) {
Write-Host "No matching package files found."
return
}
# === Convert version strings to sortable format ===
function Convert-VersionForSort($v) {
# Split by dot: e.g., 1.2.3.beta -> [1, 2, 3, "beta"]
$parts = $v -split '\.'
$major = [int]$parts[0]
$minor = [int]$parts[1]
$patch = [int]$parts[2]
$pre = if ($parts.Count -gt 3) { $parts[3] } else { "~" } # "~" to ensure stable > prerelease
return [PSCustomObject]@{
Major = $major
Minor = $minor
Patch = $patch
Pre = $pre
}
}
# === Sort by semantic version + pre-release ===
$latest = $matchingFiles | Sort-Object {
Convert-VersionForSort $_.Version
} -Descending | Select-Object -First 1
# === Copy and rename to .zip ===
$baseName = [System.IO.Path]::GetFileNameWithoutExtension($latest.File.Name)
$newFileName = "$baseName.zip"
$destPath = Join-Path $destDir $newFileName
Copy-Item -Path $latest.File.FullName -Destination $destPath

2
scripts/bump_patch.ps1 Normal file
View File

@ -0,0 +1,2 @@
pdm run bump-my-version bump patch
pdm run bump-my-version show current_version

View File

@ -0,0 +1,3 @@
import pip_system_certs.wrapt_requests
pip_system_certs.wrapt_requests.inject_truststore()

33
src/delta_barth/_env.py Normal file
View File

@ -0,0 +1,33 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import Final
from dopt_basics import io
PY_RUNTIME_FOLDER: Final[str] = "python"
def prepare_env(
lib_path: Path,
) -> Path | None:
pyrt_folder = io.search_folder_path(
starting_path=lib_path,
stop_folder_name=PY_RUNTIME_FOLDER,
return_inclusive=True,
)
if pyrt_folder is None:
return None
pth_interpreter = pyrt_folder / "python.exe"
if not pth_interpreter.exists():
raise FileNotFoundError(
f"dopt-delta-barth seems to be deployed in a standalone runtime, "
f"but the interpreter was not found under: {pth_interpreter}"
)
setattr(sys, "executable", str(pth_interpreter))
setattr(sys, "_base_executable", str(pth_interpreter))
return pyrt_folder

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import copy
import datetime import datetime
import math import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
@ -7,10 +8,12 @@ from dataclasses import asdict
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast from typing import TYPE_CHECKING, Final, TypeAlias, cast
import joblib
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import scipy.stats import scipy.stats
import sqlalchemy as sql import sqlalchemy as sql
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
@ -29,7 +32,7 @@ from delta_barth.constants import (
DEFAULT_DB_ERR_CODE, DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS, MAX_NUM_WORKERS,
SALES_MIN_NUM_DATAPOINTS, SALES_MIN_NUM_DATAPOINTS,
) )
from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.errors import STATUS_HANDLER, wrap_result
@ -183,16 +186,17 @@ def _process_sales(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)]
df_cust = df_firma.copy() data[DATE_FEAT] = pd.to_datetime(data[DATE_FEAT], errors="coerce")
data = data.dropna(subset=["buchungs_datum"])
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust) len_ds = len(df_cust)
@ -206,7 +210,26 @@ def _process_sales(
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = ( monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
) )
@ -215,13 +238,17 @@ def _process_sales(
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
first_year = cast(int, df_cust["jahr"].min()) last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y")
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search # Randomized Search
kfold = KFold(n_splits=5, shuffle=True) kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = { params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=1000), "n_estimators": scipy.stats.poisson(mu=100),
"learning_rate": [0.03, 0.04, 0.05], "learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9), "max_depth": range(2, 9),
"min_child_weight": range(1, 5), "min_child_weight": range(1, 5),
@ -231,39 +258,57 @@ def _process_sales(
"early_stopping_rounds": [20, 50], "early_stopping_rounds": [20, 50],
} }
best_estimator = None
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf") best_score_mae: float | None = float("inf")
best_score_r2: float | None = None best_score_r2: float | None = None
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
for start_year in range(current_year - 4, first_year - 1, -1): dates = cast(pd.DatetimeIndex, monthly_sum.index)
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast( train = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore monthly_sum.loc[first_date:split_date].copy(), # type: ignore
) )
test = cast( test = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore monthly_sum.loc[split_date:].copy(), # type: ignore
) )
X_train, X_test = train[features], test[features] X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target] y_train, y_test = train[target], test[target]
if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)): # test set size fixed at 6 --> first iteration: baseline - 6 entries
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False too_few_month_points = False
with joblib.parallel_config(backend="loky"):
rand = RandomizedSearchCV( rand = RandomizedSearchCV(
XGBRegressor(), XGBRegressor(),
params, params,
scoring="neg_mean_absolute_error", scoring="neg_mean_absolute_error",
cv=kfold, cv=kfold,
n_jobs=-1, n_jobs=MAX_NUM_WORKERS,
n_iter=100, n_iter=100,
verbose=0, verbose=0,
) )
rand.fit( rand.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0 X_train,
y_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=0,
) )
y_pred = rand.best_estimator_.predict(X_test) # type: ignore y_pred = rand.best_estimator_.predict(X_test) # type: ignore
@ -273,13 +318,21 @@ def _process_sales(
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred)) best_score_r2 = cast(float, r2_score(y_test, y_pred))
best_start_year = start_year # --- new: use first_date for best_start_year
print("executed") best_start_year = first_date.year
forecast = test.copy() # --- new: store best_estimator
forecast.loc[:, "vorhersage"] = y_pred best_estimator = copy.copy(rand.best_estimator_)
if best_estimator is not None:
X_future = pd.DataFrame(
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates
)
y_future = best_estimator.predict(X_future) # type: ignore
forecast["vorhersage"] = y_future
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
if forecast is not None:
forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None
if too_few_month_points: if too_few_month_points:
@ -295,7 +348,9 @@ def _process_sales(
pipe.stats(stats) pipe.stats(stats)
return pipe return pipe
assert forecast is not None, "forecast is None, but was attempted to be returned" assert "vorhersage" in forecast.columns, (
"forecast does not contain prognosis values, but was attempted to be returned"
)
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status) pipe.success(forecast, status)
stats = SalesForecastStatistics( stats = SalesForecastStatistics(
@ -351,13 +406,13 @@ def _export_on_fail(
def pipeline_sales_forecast( def pipeline_sales_forecast(
session: Session, session: Session,
company_id: int | None = None, company_ids: list[int] | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...") logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data( response, status = get_sales_prognosis_data(
session, session,
company_id=company_id, company_ids=company_ids,
start_date=start_date, start_date=start_date,
) )
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
@ -384,7 +439,7 @@ def pipeline_sales_forecast(
pipe = _process_sales( pipe = _process_sales(
pipe, pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS, min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS, base_num_data_points_months=session.cfg.forecast.threshold_month_data_points,
) )
if pipe.statistics is not None: if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics) res = _write_sales_forecast_stats_wrapped(pipe.statistics)

View File

@ -7,18 +7,20 @@ import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT from delta_barth.constants import API_CON_TIMEOUT, MAX_LOGIN_RETRIES
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
if TYPE_CHECKING: if TYPE_CHECKING:
from requests import Response
from delta_barth.session import Session from delta_barth.session import Session
# ** sales data # ** sales data
# ** import # ** import
class SalesPrognosisRequestP(BaseModel): class SalesPrognosisRequestP(BaseModel):
FirmaId: SkipValidation[int | None] FirmaIds: SkipValidation[list[int] | None]
BuchungsDatum: SkipValidation[Datetime | None] BuchungsDatum: SkipValidation[Datetime | None]
@ -53,29 +55,37 @@ class SalesPrognosisResultsExport(ExportResponse):
def get_sales_prognosis_data( def get_sales_prognosis_data(
session: Session, session: Session,
company_id: int | None = None, company_ids: list[int] | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]: ) -> tuple[SalesPrognosisResponse, Status]:
_, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple())
return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten" ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE) URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP( sales_prog_req = SalesPrognosisRequestP(
FirmaId=company_id, FirmaIds=company_ids,
BuchungsDatum=start_date, BuchungsDatum=start_date,
) )
empty_response = SalesPrognosisResponse(daten=tuple()) empty_response = SalesPrognosisResponse(daten=tuple())
if not session.logged_in:
_, status = session.login()
if status != STATUS_HANDLER.SUCCESS:
return empty_response, status
resp: Response | None = None
try: try:
for attempt in range(1, (MAX_LOGIN_RETRIES + 1)):
resp = requests.get( resp = requests.get(
URL, URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True), params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType] headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT, timeout=API_CON_TIMEOUT,
) )
if resp.status_code == 401:
_, status = session.relogin()
if status != STATUS_HANDLER.SUCCESS and attempt == MAX_LOGIN_RETRIES:
return empty_response, status
continue
break
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
@ -83,6 +93,7 @@ def get_sales_prognosis_data(
response: SalesPrognosisResponse response: SalesPrognosisResponse
status: Status status: Status
assert resp is not None, "tried to use not defined response"
if resp.status_code == 200: if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json()) response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS

43
src/delta_barth/config.py Normal file
View File

@ -0,0 +1,43 @@
from __future__ import annotations
from pathlib import Path
import dopt_basics.configs
from pydantic import BaseModel
class Config(BaseModel):
forecast: CfgForecast
class CfgForecast(BaseModel):
threshold_month_data_points: int
class LazyCfgLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
cfg_path = cfg_path.resolve()
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
self._path = cfg_path
self._cfg: Config | None = None
@property
def path(self) -> Path:
return self._path
def _load(self) -> Config:
cfg = dopt_basics.configs.load_toml(self.path)
return Config(**cfg)
def reload(self) -> None:
self._cfg = self._load()
def get(self) -> Config:
if self._cfg is None:
self._cfg = self._load()
return self._cfg

View File

@ -1,10 +1,19 @@
from __future__ import annotations
import enum import enum
from pathlib import Path from pathlib import Path
from typing import Final from typing import Final
import psutil
import delta_barth._env
from delta_barth.types import DualDict, HttpContentHeaders from delta_barth.types import DualDict, HttpContentHeaders
# ** config # ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
CFG_HOT_RELOAD: Final[bool] = True
cpu_count = psutil.cpu_count(logical=False)
MAX_NUM_WORKERS: Final[int] = (cpu_count - 1) if cpu_count is not None else 3
# ** lib path # ** lib path
lib_path = Path(__file__).parent lib_path = Path(__file__).parent
@ -13,15 +22,16 @@ LIB_PATH: Final[Path] = lib_path
dummy_data_pth = LIB_PATH / "_dummy_data" dummy_data_pth = LIB_PATH / "_dummy_data"
assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}" assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** runtime and deployment status
RUNTIME_PATH: Final[Path | None] = delta_barth._env.prepare_env(LIB_PATH)
deployment_status: bool = False
if RUNTIME_PATH is not None:
deployment_status = True
DEPLOYMENT_STATUS: Final[bool] = deployment_status
# ** logging
ENABLE_LOGGING: Final[bool] = False
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True
LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases # ** databases
DB_ECHO: Final[bool] = True DB_ECHO: Final[bool] = False
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
@ -40,7 +50,9 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API # ** API
API_CON_TIMEOUT: Final[float] = 1.0 # secs to response API_CON_TIMEOUT: Final[float] = 20.0 # secs to response
MAX_LOGIN_RETRIES: Final[int] = 2
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
@ -63,4 +75,6 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
# ** Pipelines # ** Pipelines
# ** Forecast # ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36 SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36 # !! now in config
# TODO remove later till proven stable
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@ -6,14 +6,13 @@ from pathlib import Path
from time import gmtime from time import gmtime
from typing import Final from typing import Final
from delta_barth.constants import (
ENABLE_LOGGING,
LOG_FILENAME,
LOGGING_TO_FILE,
LOGGING_TO_STDERR,
)
# ** config # ** config
# ** logging
ENABLE_LOGGING: Final[bool] = True
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = False
LOG_FILENAME: Final[str] = "dopt-delbar.log"
logging.Formatter.converter = gmtime logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s" LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000" LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
@ -31,6 +30,8 @@ logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG) logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session") logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG) logger_session.setLevel(logging.DEBUG)
logger_config = logging.getLogger("delta_barth.config")
logger_config.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management") logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG) logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results") logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")

View File

@ -44,14 +44,14 @@ def _write_performance_metrics_wrapped(
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_id: int | None, company_ids: list[int] | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast" PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...") logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns() t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast( result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date SESSION, company_ids=company_ids, start_date=start_date
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns() t_end = time.perf_counter_ns()

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import shutil
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Final from typing import TYPE_CHECKING, Final
@ -14,12 +15,20 @@ from delta_barth.api.common import (
LoginResponse, LoginResponse,
validate_credentials, validate_credentials,
) )
from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
CFG_HOT_RELOAD,
DB_ECHO,
LIB_PATH,
)
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.config import Config
from delta_barth.types import ApiCredentials, HttpContentHeaders from delta_barth.types import ApiCredentials, HttpContentHeaders
@ -41,6 +50,7 @@ class Session:
base_headers: HttpContentHeaders, base_headers: HttpContentHeaders,
db_folder: str = "data", db_folder: str = "data",
logging_folder: str = "logs", logging_folder: str = "logs",
cfg_folder: str = "config",
) -> None: ) -> None:
self._setup: bool = False self._setup: bool = False
self._data_path: Path | None = None self._data_path: Path | None = None
@ -49,6 +59,10 @@ class Session:
self._db_engine: sql.Engine | None = None self._db_engine: sql.Engine | None = None
self._logging_dir: Path | None = None self._logging_dir: Path | None = None
self._logging_folder = logging_folder self._logging_folder = logging_folder
self._cfg_path: Path | None = None
self._cfg_folder = cfg_folder
self._cfg_loader: LazyCfgLoader | None = None
self._cfg: Config | None = None
self._creds: ApiCredentials | None = None self._creds: ApiCredentials | None = None
self._base_url: str | None = None self._base_url: str | None = None
self._headers = base_headers self._headers = base_headers
@ -59,6 +73,7 @@ class Session:
# at this point: no logging configured # at this point: no logging configured
assert not self._setup, "tried to setup session twice" assert not self._setup, "tried to setup session twice"
self._setup_logging() self._setup_logging()
self._setup_config()
self._setup_db_management() self._setup_db_management()
self._setup = True self._setup = True
logger.info("[SESSION] Setup procedure successful") logger.info("[SESSION] Setup procedure successful")
@ -68,6 +83,39 @@ class Session:
assert self._data_path is not None, "accessed data path not set" assert self._data_path is not None, "accessed data path not set"
return self._data_path return self._data_path
@property
def cfg_path(self) -> Path:
if self._cfg_path is not None and self._setup:
return self._cfg_path
root = (self.data_path / self._cfg_folder).resolve()
cfg_path = root / CFG_FILENAME
if not root.exists():
root.mkdir(parents=False)
self._cfg_path = cfg_path
return self._cfg_path
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
if CFG_HOT_RELOAD:
self.reload_cfg()
return self._cfg
def _setup_config(self) -> None:
if not self.cfg_path.exists():
src_cfg = LIB_PATH / CFG_FILENAME
shutil.copyfile(src_cfg, self.cfg_path)
self._cfg_loader = LazyCfgLoader(self.cfg_path)
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
def reload_cfg(self) -> None:
assert self._cfg_loader is not None, "tried reloading with no CFG loader intialised"
self._cfg_loader.reload()
self._cfg = self._cfg_loader.get()
@property @property
def db_engine(self) -> sql.Engine: def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set" assert self._db_engine is not None, "accessed database engine not set"
@ -78,10 +126,10 @@ class Session:
if self._db_path is not None and self._setup: if self._db_path is not None and self._setup:
return self._db_path return self._db_path
db_root = (self.data_path / self._db_folder).resolve() root = (self.data_path / self._db_folder).resolve()
db_path = db_root / "dopt-data.db" db_path = root / "dopt-data.db"
if not db_root.exists(): if not root.exists():
db_root.mkdir(parents=False) root.mkdir(parents=False)
self._db_path = db_path self._db_path = db_path
return self._db_path return self._db_path
@ -244,44 +292,11 @@ class Session:
return None, status return None, status
def assert_login( def relogin(
self, self,
) -> tuple[LoginResponse, Status]: ) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None: if self.session_token is None:
return self.login() return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
empty_response = LoginResponse(token="")
try:
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token() self._remove_session_token()
response, status = self.login() return self.login()
else:
response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status

View File

@ -0,0 +1,2 @@
[forecast]
threshold_month_data_points = 28

View File

@ -1,3 +1,4 @@
import datetime
from datetime import datetime as Datetime from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
@ -255,6 +256,7 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -277,6 +279,29 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.statistics.xgb_params is not None assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_InvalidDates(sales_data_real_preproc):
false_date = Datetime(2519, 6, 30)
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
data["buchungs_datum"] = data["buchungs_datum"].astype(object)
data.at[0, "buchungs_datum"] = false_date
assert data["buchungs_datum"].dtype.char == "O"
assert len(data) == 20
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
@ -303,6 +328,7 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -329,8 +355,19 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.statistics.xgb_params is None assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
data = sales_data_real_preproc.copy() # prepare fake data
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000 data["betrag"] = 10000
print(data["betrag"]) print(data["betrag"])
data = data.iloc[:20000, :] data = data.iloc[:20000, :]
@ -340,7 +377,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
class Predictor: class Predictor:
def predict(self, *args, **kwargs): def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1]) return np.array([1, 1, 1, 1], dtype=np.float64)
self.best_estimator_ = Predictor() self.best_estimator_ = Predictor()
@ -354,7 +391,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales( pipe = fc._process_sales(
pipe, pipe,
min_num_data_points=1, min_num_data_points=1,
base_num_data_points_months=-100, base_num_data_points_months=1,
) )
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS
@ -415,27 +452,20 @@ def test_export_on_fail():
assert res.status.description == status.description assert res.status.description == status.description
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) @patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session): def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session):
with patch( assert session.cfg.forecast.threshold_month_data_points is not None
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock: ) as get_mock,
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
with patch("delta_barth.analysis.forecast.SESSION", session): ):
result = fc.pipeline_sales_forecast(None) # type: ignore get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
print(result) sess_mock.cfg.forecast.threshold_month_data_points = 1
assert result.status == STATUS_HANDLER.SUCCESS result = fc.pipeline_sales_forecast(session, company_ids, date) # type: ignore
assert len(result.response.daten) > 0
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_FailDbWrite(exmpl_api_sales_prognosis_resp):
with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
result = fc.pipeline_sales_forecast(None) # type: ignore
print(result)
assert result.status == STATUS_HANDLER.SUCCESS assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0 assert len(result.response.daten) > 0

View File

@ -4,44 +4,41 @@ import pytest
import requests import requests
from delta_barth.api import requests as requests_ from delta_barth.api import requests as requests_
from delta_barth.api.common import LoginResponse
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_Success(session): def test_get_sales_prognosis_data_Success(session):
resp, status = session.login() # do not login: let routine do it
# test without company ID # test without company ID
assert status.code == 0 date = Datetime(2023, 12, 15)
date = Datetime(2022, 6, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date) resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
date = Datetime(2030, 1, 1) date = Datetime(2520, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date) resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test with company ID # test with company ID
assert status.code == 0 assert status.code == 0
date = Datetime(2022, 6, 1) date = Datetime(2023, 8, 15)
company_id = 1024 company_ids = [5661, 1027]
resp, status = requests_.get_sales_prognosis_data(session, company_id, date) resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
date = Datetime(2030, 1, 1) date = Datetime(2520, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_id, date) resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test with non-existent company ID # test with non-existent company ID
assert status.code == 0 assert status.code == 0
date = Datetime(2022, 6, 1) date = Datetime(2022, 6, 1)
company_id = 1000024 company_ids = [1000024]
resp, status = requests_.get_sales_prognosis_data(session, company_id, date) resp, status = requests_.get_sales_prognosis_data(session, company_ids, date)
# TODO check if this behaviour is still considered "successful"
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test without date # test without date
company_id = 1024 company_ids = [1024]
resp, status = requests_.get_sales_prognosis_data(session, company_id, None) resp, status = requests_.get_sales_prognosis_data(session, company_ids, None)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
# test without filters # test without filters
@ -54,12 +51,11 @@ def test_get_sales_prognosis_data_Success(session):
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailLogin(session, mock_get): def test_get_sales_prognosis_data_NoAuth(session, mock_get):
session.login() code = 401
code = 500
json = { json = {
"message": "ServerError", "message": "ServerError",
"code": "TestExternalServerError", "code": "TestFailAuth",
"hints": "TestCase", "hints": "TestCase",
} }
mock_get.return_value.status_code = code mock_get.return_value.status_code = code
@ -76,6 +72,36 @@ def test_get_sales_prognosis_data_FailLogin(session, mock_get):
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailLogin(session, mock_get, mock_put):
code = 401
json = {
"message": "ServerError",
"code": "TestFailAuth",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
code_put = 500
json_put = {
"message": "ServerError",
"code": "TestUnknownError",
"hints": "TestCase",
}
mock_put.return_value.status_code = code_put
mock_put.return_value.json.return_value = json_put
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code_put
assert status.api_server_error.message == json_put["message"]
assert status.api_server_error.code == json_put["code"]
assert status.api_server_error.hints == json_put["hints"]
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailApiServer(session, mock_get): def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
code = 405 code = 405
@ -101,11 +127,6 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get): def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout") mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None) resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None assert resp is not None
assert len(resp.daten) == 0 assert len(resp.daten) == 0
@ -115,11 +136,6 @@ def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get): def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout") mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None) resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None assert resp is not None
assert len(resp.daten) == 0 assert len(resp.daten) == 0

View File

@ -8,6 +8,7 @@ from unittest.mock import patch
import pandas as pd import pandas as pd
import pytest import pytest
import tomli_w
import delta_barth.session import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse from delta_barth.api.requests import SalesPrognosisResponse
@ -33,6 +34,28 @@ def api_base_url(credentials) -> str:
return credentials["base_url"] return credentials["base_url"]
@pytest.fixture(scope="session")
def pth_dummy_cfg() -> Path:
pwd = Path.cwd()
assert "barth" in pwd.parent.name.lower(), "not in project root directory"
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
assert data_pth.exists(), "file to dummy CFG not found"
return data_pth
@pytest.fixture(scope="function")
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path:
with open(pth_dummy_cfg, "rb") as file:
cfg_data = tomllib.load(file)
target = tmp_path / "dummy_cfg.toml"
target.touch()
with open(target, "wb") as file:
tomli_w.dump(cfg_data, file)
return target
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def sales_data_real() -> pd.DataFrame: def sales_data_real() -> pd.DataFrame:
pwd = Path.cwd() pwd = Path.cwd()

40
tests/test_config.py Normal file
View File

@ -0,0 +1,40 @@
import tomllib
import tomli_w
from delta_barth import config
def test_CfgLoader_Init(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
assert loader.path == pth_cfg
assert loader._cfg is None
def test_CfgLoader_Get(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
def test_CfgLoader_Reload(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
# modify config and reload
with open(pth_cfg, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(pth_cfg, "wb") as file:
tomli_w.dump(cfg_data, file)
assert parsed_cfg.forecast.threshold_month_data_points == 28
loader.reload()
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 30

49
tests/test_env.py Normal file
View File

@ -0,0 +1,49 @@
import importlib
import sys
from unittest.mock import patch
import pytest
import delta_barth.constants
from delta_barth import _env
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "test123456")
def test_prepare_env_NoRuntimeFolder(tmp_path):
ret = _env.prepare_env(tmp_path)
assert ret is None
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_FailNoInterpreter(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
with pytest.raises(FileNotFoundError):
_ = _env.prepare_env(mocked_lib_pth)
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_Success(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
rt_path = mocked_lib_pth.parents[1]
mocked_interpreter = rt_path / "python.exe"
mocked_interpreter.touch()
assert mocked_interpreter.exists()
ret = _env.prepare_env(mocked_lib_pth)
assert ret == rt_path
# sys attributes
executable = getattr(sys, "executable")
assert executable == str(mocked_interpreter)
base_executable = getattr(sys, "_base_executable")
assert base_executable == str(mocked_interpreter)
class MockPath:
def __init__(self, *args, **kwargs):
self.parent = mocked_lib_pth
with patch("pathlib.Path", MockPath):
(mocked_lib_pth / "_dummy_data").mkdir(exist_ok=True)
importlib.reload(delta_barth.constants)
assert delta_barth.constants.DEPLOYMENT_STATUS
assert delta_barth.constants.RUNTIME_PATH == rt_path

View File

@ -1,17 +1,16 @@
import importlib
import json import json
from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql import sqlalchemy as sql
import delta_barth.pipelines
from delta_barth import databases as db from delta_barth import databases as db
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session): def test_write_performance_metrics_Success(session):
pipe_name = "test_pipe" pipe_name = "test_pipe"
t_start = 20_000_000_000 t_start = 20_000_000_000
t_end = 30_000_000_000 t_end = 30_000_000_000
@ -33,14 +32,34 @@ def test_write_performance_metrics(session):
assert metrics.execution_duration == 10 assert metrics.execution_duration == 10
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) def test_write_performance_metrics_FailStartingTime(session):
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session): pipe_name = "test_pipe"
with patch( t_start = 30_000_000_000
"delta_barth.analysis.forecast.get_sales_prognosis_data", t_end = 20_000_000_000
) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
with patch("delta_barth.pipelines.SESSION", session): with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast(None, None) with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.pipelines.SESSION", session),
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(company_ids, date)
assert isinstance(json_export, str) assert isinstance(json_export, str)
parsed_resp = json.loads(json_export) parsed_resp = json.loads(json_export)
@ -59,7 +78,6 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
assert metrics.execution_duration > 0 assert metrics.execution_duration > 0
@pytest.mark.new
def test_sales_prognosis_pipeline_dummy(session): def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session): with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None) json_export = pl.pipeline_sales_forecast_dummy(None, None)

View File

@ -1,15 +1,18 @@
import tomllib
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import tomli_w
import delta_barth.config
import delta_barth.session import delta_barth.session
from delta_barth import logging from delta_barth import logging
from delta_barth.constants import ( from delta_barth.constants import (
DEFAULT_API_ERR_CODE, DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS, HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
) )
from delta_barth.logging import LOG_FILENAME
def test_validate_path_Success(): def test_validate_path_Success():
@ -62,8 +65,82 @@ def test_session_setup_db_management(tmp_path):
assert db_path.exists() assert db_path.exists()
def test_session_setup_config(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
cfg_path2 = session.cfg_path
assert cfg_path2 == cfg_path
assert session._cfg is not None
assert cfg_path.exists()
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_session_reload_config_NoHotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 28
session.reload_cfg()
reload_cfg = session.cfg
assert isinstance(reload_cfg, delta_barth.config.Config)
assert reload_cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.session.CFG_HOT_RELOAD", True)
def test_session_reload_config_HotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.logging.ENABLE_LOGGING", True) @patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True) @patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path): def test_session_setup_logging(tmp_path):
str_path = str(tmp_path) str_path = str(tmp_path)
foldername: str = "logging_test" foldername: str = "logging_test"
@ -237,11 +314,11 @@ def test_login_logout_FailApiServer(session, mock_put):
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_assert_login_SuccessLoggedOut(session): def test_relogin_SuccessLoggedOut(session):
assert session.session_token is None assert session.session_token is None
assert session._creds is not None assert session._creds is not None
# test logged out state # test logged out state
resp, status = session.assert_login() resp, status = session.relogin()
assert resp is not None assert resp is not None
assert status.code == 0 assert status.code == 0
assert session.session_token is not None assert session.session_token is not None
@ -250,74 +327,17 @@ def test_assert_login_SuccessLoggedOut(session):
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_assert_login_SuccessStillLoggedIn(session): def test_relogin_SuccessStillLoggedIn(session):
assert session.session_token is None assert session.session_token is None
assert session._creds is not None assert session._creds is not None
resp, status = session.login() resp, status = session.login()
resp, status = session.assert_login() old_token = session.session_token
assert old_token is not None
resp, status = session.relogin()
assert resp is not None assert resp is not None
assert status.code == 0 assert status.code == 0
assert session.session_token is not None assert session.session_token is not None
assert session.session_token != old_token
resp, status = session.logout() resp, status = session.logout()
assert status.code == 0 assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]