Compare commits

..

No commits in common. "main" and "v0.4.1" have entirely different histories.
main ... v0.4.1

32 changed files with 773 additions and 1987 deletions

1
.gitignore vendored
View File

@ -3,7 +3,6 @@ prototypes/
data/ data/
reports/ reports/
*.code-workspace *.code-workspace
docs/
# credentials # credentials
CREDENTIALS* CREDENTIALS*

176
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"] groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:f2a2abd891603796228b21bfeb7a00fd998964fe9303a9e4e5971f63925261e8" content_hash = "sha256:d51351adbafc599b97f8b3c9047ad0c7b8607d47cff5874121f546af04793ee2"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.11" requires_python = ">=3.11"
@ -579,7 +579,7 @@ files = [
[[package]] [[package]]
name = "dopt-basics" name = "dopt-basics"
version = "0.1.3" version = "0.1.2"
requires_python = ">=3.11" requires_python = ">=3.11"
summary = "basic cross-project tools for Python-based d-opt projects" summary = "basic cross-project tools for Python-based d-opt projects"
groups = ["default"] groups = ["default"]
@ -587,19 +587,8 @@ dependencies = [
"tzdata>=2025.1", "tzdata>=2025.1",
] ]
files = [ files = [
{file = "dopt_basics-0.1.3-py3-none-any.whl", hash = "sha256:974c2b442e47f0f05e66ff821ae48a9b12f7b77a8a3bc06fe8ac232e2bc27608"}, {file = "dopt_basics-0.1.2-py3-none-any.whl", hash = "sha256:dae8b7e31197fb173d98c74ed6f227c3dceaadf980139f0852a7f031d2e78b84"},
{file = "dopt_basics-0.1.3.tar.gz", hash = "sha256:22ba30cbd385cb8929cb6a13fe01e253cd7d9617ef637e41609f2468691450e8"}, {file = "dopt_basics-0.1.2.tar.gz", hash = "sha256:dc54942db95b0608fa44f7b612ee3247dad50d2538ad88a1697b3357a8b05634"},
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
requires_python = ">=3.8"
summary = "An implementation of lxml.xmlfile for the standard library"
groups = ["dev"]
files = [
{file = "et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa"},
{file = "et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54"},
] ]
[[package]] [[package]]
@ -659,51 +648,6 @@ files = [
{file = "fqdn-1.5.1.tar.gz", hash = "sha256:105ed3677e767fb5ca086a0c1f4bb66ebc3c100be518f0e0d755d9eae164d89f"}, {file = "fqdn-1.5.1.tar.gz", hash = "sha256:105ed3677e767fb5ca086a0c1f4bb66ebc3c100be518f0e0d755d9eae164d89f"},
] ]
[[package]]
name = "greenlet"
version = "3.1.1"
requires_python = ">=3.7"
summary = "Lightweight in-process concurrent programming"
groups = ["default"]
marker = "(platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\") and python_version < \"3.14\""
files = [
{file = "greenlet-3.1.1-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e4d333e558953648ca09d64f13e6d8f0523fa705f51cae3f03b5983489958c70"},
{file = "greenlet-3.1.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:09fc016b73c94e98e29af67ab7b9a879c307c6731a2c9da0db5a7d9b7edd1159"},
{file = "greenlet-3.1.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d5e975ca70269d66d17dd995dafc06f1b06e8cb1ec1e9ed54c1d1e4a7c4cf26e"},
{file = "greenlet-3.1.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3b2813dc3de8c1ee3f924e4d4227999285fd335d1bcc0d2be6dc3f1f6a318ec1"},
{file = "greenlet-3.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e347b3bfcf985a05e8c0b7d462ba6f15b1ee1c909e2dcad795e49e91b152c383"},
{file = "greenlet-3.1.1-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9e8f8c9cb53cdac7ba9793c276acd90168f416b9ce36799b9b885790f8ad6c0a"},
{file = "greenlet-3.1.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:62ee94988d6b4722ce0028644418d93a52429e977d742ca2ccbe1c4f4a792511"},
{file = "greenlet-3.1.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:1776fd7f989fc6b8d8c8cb8da1f6b82c5814957264d1f6cf818d475ec2bf6395"},
{file = "greenlet-3.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:48ca08c771c268a768087b408658e216133aecd835c0ded47ce955381105ba39"},
{file = "greenlet-3.1.1-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:4afe7ea89de619adc868e087b4d2359282058479d7cfb94970adf4b55284574d"},
{file = "greenlet-3.1.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f406b22b7c9a9b4f8aa9d2ab13d6ae0ac3e85c9a809bd590ad53fed2bf70dc79"},
{file = "greenlet-3.1.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c3a701fe5a9695b238503ce5bbe8218e03c3bcccf7e204e455e7462d770268aa"},
{file = "greenlet-3.1.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2846930c65b47d70b9d178e89c7e1a69c95c1f68ea5aa0a58646b7a96df12441"},
{file = "greenlet-3.1.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:99cfaa2110534e2cf3ba31a7abcac9d328d1d9f1b95beede58294a60348fba36"},
{file = "greenlet-3.1.1-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1443279c19fca463fc33e65ef2a935a5b09bb90f978beab37729e1c3c6c25fe9"},
{file = "greenlet-3.1.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:b7cede291382a78f7bb5f04a529cb18e068dd29e0fb27376074b6d0317bf4dd0"},
{file = "greenlet-3.1.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:23f20bb60ae298d7d8656c6ec6db134bca379ecefadb0b19ce6f19d1f232a942"},
{file = "greenlet-3.1.1-cp312-cp312-win_amd64.whl", hash = "sha256:7124e16b4c55d417577c2077be379514321916d5790fa287c9ed6f23bd2ffd01"},
{file = "greenlet-3.1.1-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:05175c27cb459dcfc05d026c4232f9de8913ed006d42713cb8a5137bd49375f1"},
{file = "greenlet-3.1.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:935e943ec47c4afab8965954bf49bfa639c05d4ccf9ef6e924188f762145c0ff"},
{file = "greenlet-3.1.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:667a9706c970cb552ede35aee17339a18e8f2a87a51fba2ed39ceeeb1004798a"},
{file = "greenlet-3.1.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b8a678974d1f3aa55f6cc34dc480169d58f2e6d8958895d68845fa4ab566509e"},
{file = "greenlet-3.1.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:efc0f674aa41b92da8c49e0346318c6075d734994c3c4e4430b1c3f853e498e4"},
{file = "greenlet-3.1.1-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0153404a4bb921f0ff1abeb5ce8a5131da56b953eda6e14b88dc6bbc04d2049e"},
{file = "greenlet-3.1.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:275f72decf9932639c1c6dd1013a1bc266438eb32710016a1c742df5da6e60a1"},
{file = "greenlet-3.1.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:c4aab7f6381f38a4b42f269057aee279ab0fc7bf2e929e3d4abfae97b682a12c"},
{file = "greenlet-3.1.1-cp313-cp313-win_amd64.whl", hash = "sha256:b42703b1cf69f2aa1df7d1030b9d77d3e584a70755674d60e710f0af570f3761"},
{file = "greenlet-3.1.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f1695e76146579f8c06c1509c7ce4dfe0706f49c6831a817ac04eebb2fd02011"},
{file = "greenlet-3.1.1-cp313-cp313t-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7876452af029456b3f3549b696bb36a06db7c90747740c5302f74a9e9fa14b13"},
{file = "greenlet-3.1.1-cp313-cp313t-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4ead44c85f8ab905852d3de8d86f6f8baf77109f9da589cb4fa142bd3b57b475"},
{file = "greenlet-3.1.1-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8320f64b777d00dd7ccdade271eaf0cad6636343293a25074cc5566160e4de7b"},
{file = "greenlet-3.1.1-cp313-cp313t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6510bf84a6b643dabba74d3049ead221257603a253d0a9873f55f6a59a65f822"},
{file = "greenlet-3.1.1-cp313-cp313t-musllinux_1_1_aarch64.whl", hash = "sha256:04b013dc07c96f83134b1e99888e7a79979f1a247e2a9f59697fa14b5862ed01"},
{file = "greenlet-3.1.1-cp313-cp313t-musllinux_1_1_x86_64.whl", hash = "sha256:411f015496fec93c1c8cd4e5238da364e1da7a124bcb293f085bf2860c32c6f6"},
{file = "greenlet-3.1.1.tar.gz", hash = "sha256:4ce3ac6cdb6adf7946475d7ef31777c26d94bccc377e070a7986bd2d5c515467"},
]
[[package]] [[package]]
name = "h11" name = "h11"
version = "0.14.0" version = "0.14.0"
@ -1461,20 +1405,6 @@ files = [
{file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"}, {file = "nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:362aed5963fb9ea2ed2f264409baae30143498fd0e5c503aeaa1badd88cdc54a"},
] ]
[[package]]
name = "openpyxl"
version = "3.1.5"
requires_python = ">=3.8"
summary = "A Python library to read/write Excel 2010 xlsx/xlsm files"
groups = ["dev"]
dependencies = [
"et-xmlfile",
]
files = [
{file = "openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2"},
{file = "openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050"},
]
[[package]] [[package]]
name = "overrides" name = "overrides"
version = "7.7.0" version = "7.7.0"
@ -1596,31 +1526,6 @@ files = [
{file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"}, {file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"},
] ]
[[package]]
name = "pip"
version = "25.1.1"
requires_python = ">=3.9"
summary = "The PyPA recommended tool for installing Python packages."
groups = ["default"]
files = [
{file = "pip-25.1.1-py3-none-any.whl", hash = "sha256:2913a38a2abf4ea6b64ab507bd9e967f3b53dc1ede74b01b0931e1ce548751af"},
{file = "pip-25.1.1.tar.gz", hash = "sha256:3de45d411d308d5054c2168185d8da7f9a2cd753dbac8acbfa88a8909ecd9077"},
]
[[package]]
name = "pip-system-certs"
version = "5.2"
requires_python = ">=3.10"
summary = "Automatically configures Python to use system certificates via truststore"
groups = ["default"]
dependencies = [
"pip>=24.2",
]
files = [
{file = "pip_system_certs-5.2-py3-none-any.whl", hash = "sha256:e6ef3e106d4d02313e33955c2bcc4c2b143b2da07ef91e28a6805a0c1c512126"},
{file = "pip_system_certs-5.2.tar.gz", hash = "sha256:80b776b5cf17191bf99d313699b7fce2fdb84eb7bbb225fd134109a82706406f"},
]
[[package]] [[package]]
name = "platformdirs" name = "platformdirs"
version = "4.3.6" version = "4.3.6"
@ -1673,7 +1578,7 @@ name = "psutil"
version = "7.0.0" version = "7.0.0"
requires_python = ">=3.6" requires_python = ">=3.6"
summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7." summary = "Cross-platform lib for process and system monitoring in Python. NOTE: the syntax of this script MUST be kept compatible with Python 2.7."
groups = ["default", "nb"] groups = ["nb"]
files = [ files = [
{file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"}, {file = "psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25"},
{file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"}, {file = "psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da"},
@ -2368,46 +2273,6 @@ files = [
{file = "soupsieve-2.6.tar.gz", hash = "sha256:e2e68417777af359ec65daac1057404a3c8a5455bb8abc36f1a9866ab1a51abb"}, {file = "soupsieve-2.6.tar.gz", hash = "sha256:e2e68417777af359ec65daac1057404a3c8a5455bb8abc36f1a9866ab1a51abb"},
] ]
[[package]]
name = "sqlalchemy"
version = "2.0.39"
requires_python = ">=3.7"
summary = "Database Abstraction Library"
groups = ["default"]
dependencies = [
"greenlet!=0.4.17; (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\") and python_version < \"3.14\"",
"importlib-metadata; python_version < \"3.8\"",
"typing-extensions>=4.6.0",
]
files = [
{file = "sqlalchemy-2.0.39-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a28f9c238f1e143ff42ab3ba27990dfb964e5d413c0eb001b88794c5c4a528a9"},
{file = "sqlalchemy-2.0.39-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:08cf721bbd4391a0e765fe0fe8816e81d9f43cece54fdb5ac465c56efafecb3d"},
{file = "sqlalchemy-2.0.39-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7a8517b6d4005facdbd7eb4e8cf54797dbca100a7df459fdaff4c5123265c1cd"},
{file = "sqlalchemy-2.0.39-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b2de1523d46e7016afc7e42db239bd41f2163316935de7c84d0e19af7e69538"},
{file = "sqlalchemy-2.0.39-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:412c6c126369ddae171c13987b38df5122cb92015cba6f9ee1193b867f3f1530"},
{file = "sqlalchemy-2.0.39-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:6b35e07f1d57b79b86a7de8ecdcefb78485dab9851b9638c2c793c50203b2ae8"},
{file = "sqlalchemy-2.0.39-cp311-cp311-win32.whl", hash = "sha256:3eb14ba1a9d07c88669b7faf8f589be67871d6409305e73e036321d89f1d904e"},
{file = "sqlalchemy-2.0.39-cp311-cp311-win_amd64.whl", hash = "sha256:78f1b79132a69fe8bd6b5d91ef433c8eb40688ba782b26f8c9f3d2d9ca23626f"},
{file = "sqlalchemy-2.0.39-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:c457a38351fb6234781d054260c60e531047e4d07beca1889b558ff73dc2014b"},
{file = "sqlalchemy-2.0.39-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:018ee97c558b499b58935c5a152aeabf6d36b3d55d91656abeb6d93d663c0c4c"},
{file = "sqlalchemy-2.0.39-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5493a8120d6fc185f60e7254fc056a6742f1db68c0f849cfc9ab46163c21df47"},
{file = "sqlalchemy-2.0.39-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b2cf5b5ddb69142511d5559c427ff00ec8c0919a1e6c09486e9c32636ea2b9dd"},
{file = "sqlalchemy-2.0.39-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9f03143f8f851dd8de6b0c10784363712058f38209e926723c80654c1b40327a"},
{file = "sqlalchemy-2.0.39-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:06205eb98cb3dd52133ca6818bf5542397f1dd1b69f7ea28aa84413897380b06"},
{file = "sqlalchemy-2.0.39-cp312-cp312-win32.whl", hash = "sha256:7f5243357e6da9a90c56282f64b50d29cba2ee1f745381174caacc50d501b109"},
{file = "sqlalchemy-2.0.39-cp312-cp312-win_amd64.whl", hash = "sha256:2ed107331d188a286611cea9022de0afc437dd2d3c168e368169f27aa0f61338"},
{file = "sqlalchemy-2.0.39-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:fe193d3ae297c423e0e567e240b4324d6b6c280a048e64c77a3ea6886cc2aa87"},
{file = "sqlalchemy-2.0.39-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:79f4f502125a41b1b3b34449e747a6abfd52a709d539ea7769101696bdca6716"},
{file = "sqlalchemy-2.0.39-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8a10ca7f8a1ea0fd5630f02feb055b0f5cdfcd07bb3715fc1b6f8cb72bf114e4"},
{file = "sqlalchemy-2.0.39-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e6b0a1c7ed54a5361aaebb910c1fa864bae34273662bb4ff788a527eafd6e14d"},
{file = "sqlalchemy-2.0.39-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:52607d0ebea43cf214e2ee84a6a76bc774176f97c5a774ce33277514875a718e"},
{file = "sqlalchemy-2.0.39-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:c08a972cbac2a14810463aec3a47ff218bb00c1a607e6689b531a7c589c50723"},
{file = "sqlalchemy-2.0.39-cp313-cp313-win32.whl", hash = "sha256:23c5aa33c01bd898f879db158537d7e7568b503b15aad60ea0c8da8109adf3e7"},
{file = "sqlalchemy-2.0.39-cp313-cp313-win_amd64.whl", hash = "sha256:4dabd775fd66cf17f31f8625fc0e4cfc5765f7982f94dc09b9e5868182cb71c0"},
{file = "sqlalchemy-2.0.39-py3-none-any.whl", hash = "sha256:a1c6b0a5e3e326a466d809b651c63f278b1256146a377a528b6938a279da334f"},
{file = "sqlalchemy-2.0.39.tar.gz", hash = "sha256:5d2d1fe548def3267b4c70a8568f108d1fed7cbbeccb9cc166e05af2abc25c22"},
]
[[package]] [[package]]
name = "stack-data" name = "stack-data"
version = "0.6.3" version = "0.6.3"
@ -2464,17 +2329,6 @@ files = [
{file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"}, {file = "tinycss2-1.4.0.tar.gz", hash = "sha256:10c0972f6fc0fbee87c3edb76549357415e94548c1ae10ebccdea16fb404a9b7"},
] ]
[[package]]
name = "tomli-w"
version = "1.2.0"
requires_python = ">=3.9"
summary = "A lil' TOML writer"
groups = ["dev"]
files = [
{file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"},
{file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"},
]
[[package]] [[package]]
name = "tomlkit" name = "tomlkit"
version = "0.13.2" version = "0.13.2"
@ -2661,8 +2515,8 @@ files = [
[[package]] [[package]]
name = "xgboost" name = "xgboost"
version = "3.0.0" version = "2.1.4"
requires_python = ">=3.10" requires_python = ">=3.8"
summary = "XGBoost Python Package" summary = "XGBoost Python Package"
groups = ["default"] groups = ["default"]
dependencies = [ dependencies = [
@ -2671,12 +2525,12 @@ dependencies = [
"scipy", "scipy",
] ]
files = [ files = [
{file = "xgboost-3.0.0-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:ed8cffd7998bd9431c3b0287a70bec8e45c09b43c9474d9dfd261627713bd890"}, {file = "xgboost-2.1.4-py3-none-macosx_10_15_x86_64.macosx_11_0_x86_64.macosx_12_0_x86_64.whl", hash = "sha256:78d88da184562deff25c820d943420342014dd55e0f4c017cc4563c2148df5ee"},
{file = "xgboost-3.0.0-py3-none-macosx_12_0_arm64.whl", hash = "sha256:314104bd3a1426a40f0c9662eef40e9ab22eb7a8068a42a8d198ce40412db75c"}, {file = "xgboost-2.1.4-py3-none-macosx_12_0_arm64.whl", hash = "sha256:523db01d4e74b05c61a985028bde88a4dd380eadc97209310621996d7d5d14a7"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:72c3405e8dfc37048f9fe339a058fa12b9f0f03bc31d3e56f0887eed2ed2baa1"}, {file = "xgboost-2.1.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:57c7e98111aceef4b689d7d2ce738564a1f7fe44237136837a47847b8b33bade"},
{file = "xgboost-3.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:72d39e74649e9b628c4221111aa6a8caa860f2e853b25480424403ee61085126"}, {file = "xgboost-2.1.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f1343a512e634822eab30d300bfc00bf777dc869d881cc74854b42173cfcdb14"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:7bdee5787f86b83bebd75e2c96caf854760788e5f4203d063da50db5bf0efc5f"}, {file = "xgboost-2.1.4-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:d366097d0db047315736f46af852feaa907f6d7371716af741cdce488ae36d20"},
{file = "xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:61c7e391e373b8a5312503525c0689f83ef1912a1236377022865ab340f465a4"}, {file = "xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:8df6da72963969ab2bf49a520c3e147b1e15cbeddd3aa0e3e039b3532c739339"},
{file = "xgboost-3.0.0-py3-none-win_amd64.whl", hash = "sha256:0ea74e97f95b1eddfd27a46b7f22f72ec5a5322e1dc7cb41c9c23fb580763df9"}, {file = "xgboost-2.1.4-py3-none-win_amd64.whl", hash = "sha256:8bbfe4fedc151b83a52edbf0de945fd94358b09a81998f2945ad330fd5f20cd6"},
{file = "xgboost-3.0.0.tar.gz", hash = "sha256:45e95416df6f6f01d9a62e60cf09fc57e5ee34697f3858337c796fac9ce3b9ed"}, {file = "xgboost-2.1.4.tar.gz", hash = "sha256:ab84c4bbedd7fae1a26f61e9dd7897421d5b08454b51c6eb072abc1d346d08d7"},
] ]

View File

@ -1,11 +1,11 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.12" version = "0.4.1"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
] ]
dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.3", "SQLAlchemy>=2.0.39", "psutil>=7.0.0", "pip-system-certs>=5.2"] dependencies = ["scikit-learn>=1.6.1", "pandas>=2.2.3", "xgboost>=2.1.4", "joblib>=1.4.2", "typing-extensions>=4.12.2", "requests>=2.32.3", "pydantic>=2.10.6", "dopt-basics>=0.1.2"]
requires-python = ">=3.11" requires-python = ">=3.11"
readme = "README.md" readme = "README.md"
license = {text = "LicenseRef-Proprietary"} license = {text = "LicenseRef-Proprietary"}
@ -44,8 +44,7 @@ filterwarnings = [
] ]
markers = [ markers = [
"api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')", "api_con_required: tests require an API connection (deselect with '-m \"not api_con_required\"')",
"new: to test only new tests, usually removed afterwards (deselect with '-m \"not new\"')", "new: to test only new tests, usually removed afterwards (deselect with '-m \"not quick\"')",
"forecast: main components of forecast pipeline (deselect with '-m \"not forecast\"')"
] ]
log_cli = true log_cli = true
@ -74,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.12" current_version = "0.4.1"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.
@ -146,8 +145,6 @@ dev = [
"pdoc3>=0.11.5", "pdoc3>=0.11.5",
"bump-my-version>=1.1.1", "bump-my-version>=1.1.1",
"nox>=2025.2.9", "nox>=2025.2.9",
"tomli-w>=1.2.0",
"openpyxl>=3.1.5",
] ]
nb = [ nb = [
"jupyterlab>=4.3.5", "jupyterlab>=4.3.5",

View File

@ -1,73 +1 @@
pdm build --no-sdist -d build/ pdm build -d build/
# Configuration
$sourceDir = ".\build"
$destDir = "..\01_releases\runtime"
$packagePrefix = "delta_barth-"
$packageSuffix = "-py3-none-any.whl"
# Ensure destination exists
if (-not (Test-Path $destDir)) {
New-Item -ItemType Directory -Path $destDir | Out-Null
}
# === Build Regex Pattern ===
$escapedSuffix = [regex]::Escape($packageSuffix)
# Match versions like 1.2.3 or 1.2.3.beta or 1.2.3.beta1
# Capture the full version as one string, including the optional pre-release after a dot
$pattern = "^$packagePrefix(?<version>\d+\.\d+\.\d+(?:\.[a-zA-Z0-9\-]+)?)$escapedSuffix$"
Write-Host "Using pattern: $pattern"
# === Get and Filter Files ===
$allFiles = Get-ChildItem -Path $sourceDir -File
$matchingFiles = @()
foreach ($file in $allFiles) {
if ($file.Name -match $pattern) {
$version = $Matches['version']
$matchingFiles += [PSCustomObject]@{
File = $file
Version = $version
}
Write-Host "Matched: $($file.Name) -> Version: $version"
} else {
Write-Host "No match: $($file.Name)"
}
}
if ($matchingFiles.Count -eq 0) {
Write-Host "No matching package files found."
return
}
# === Convert version strings to sortable format ===
function Convert-VersionForSort($v) {
# Split by dot: e.g., 1.2.3.beta -> [1, 2, 3, "beta"]
$parts = $v -split '\.'
$major = [int]$parts[0]
$minor = [int]$parts[1]
$patch = [int]$parts[2]
$pre = if ($parts.Count -gt 3) { $parts[3] } else { "~" } # "~" to ensure stable > prerelease
return [PSCustomObject]@{
Major = $major
Minor = $minor
Patch = $patch
Pre = $pre
}
}
# === Sort by semantic version + pre-release ===
$latest = $matchingFiles | Sort-Object {
Convert-VersionForSort $_.Version
} -Descending | Select-Object -First 1
# === Copy and rename to .zip ===
$baseName = [System.IO.Path]::GetFileNameWithoutExtension($latest.File.Name)
$newFileName = "$baseName.zip"
$destPath = Join-Path $destDir $newFileName
Copy-Item -Path $latest.File.FullName -Destination $destPath

View File

@ -1,2 +0,0 @@
pdm run bump-my-version bump patch
pdm run bump-my-version show current_version

View File

@ -1,3 +0,0 @@
import pip_system_certs.wrapt_requests
pip_system_certs.wrapt_requests.inject_truststore()

View File

@ -42,11 +42,7 @@ def delta_barth_api_error() -> str:
def status_err() -> str: def status_err() -> str:
status = Status( status = Status(code=102, description="internal error occurred", message="caused by test")
code=102,
description="internal error occurred: 'Limit-Überschreitung'",
message="caused by test",
)
return status.model_dump_json() return status.model_dump_json()

View File

@ -1,33 +0,0 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import Final
from dopt_basics import io
PY_RUNTIME_FOLDER: Final[str] = "python"
def prepare_env(
lib_path: Path,
) -> Path | None:
pyrt_folder = io.search_folder_path(
starting_path=lib_path,
stop_folder_name=PY_RUNTIME_FOLDER,
return_inclusive=True,
)
if pyrt_folder is None:
return None
pth_interpreter = pyrt_folder / "python.exe"
if not pth_interpreter.exists():
raise FileNotFoundError(
f"dopt-delta-barth seems to be deployed in a standalone runtime, "
f"but the interpreter was not found under: {pth_interpreter}"
)
setattr(sys, "executable", str(pth_interpreter))
setattr(sys, "_base_executable", str(pth_interpreter))
return pyrt_folder

View File

@ -1,24 +1,17 @@
from __future__ import annotations from __future__ import annotations
import copy
import datetime import datetime
import math
from collections.abc import Mapping, Set from collections.abc import Mapping, Set
from dataclasses import asdict
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final, TypeAlias, cast from typing import TYPE_CHECKING, Final, cast
import joblib
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import scipy.stats import scipy.stats
import sqlalchemy as sql
from dateutil.relativedelta import relativedelta
from sklearn.metrics import mean_absolute_error, r2_score from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV from sklearn.model_selection import KFold, RandomizedSearchCV
from xgboost import XGBRegressor from xgboost import XGBRegressor
from delta_barth import databases
from delta_barth.analysis import parse from delta_barth.analysis import parse
from delta_barth.api.requests import ( from delta_barth.api.requests import (
SalesPrognosisResponse, SalesPrognosisResponse,
@ -29,29 +22,24 @@ from delta_barth.api.requests import (
) )
from delta_barth.constants import ( from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
MAX_NUM_WORKERS, SALES_BASE_NUM_DATAPOINTS_MONTHS,
SALES_MIN_NUM_DATAPOINTS, SALES_MIN_NUM_DATAPOINTS,
) )
from delta_barth.errors import STATUS_HANDLER, wrap_result from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_db, logger_pipelines from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION
from delta_barth.types import ( from delta_barth.types import (
BestParametersXGBRegressor, BestParametersXGBRegressor,
DualDict, DualDict,
ParamSearchXGBRegressor, ParamSearchXGBRegressor,
PipeResult, PipeResult,
SalesForecastStatistics,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.session import Session from delta_barth.api.common import Session
from delta_barth.types import Status from delta_barth.types import Status
ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics]
def _parse_api_resp_to_df( def _parse_api_resp_to_df(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
@ -85,21 +73,6 @@ def _parse_df_to_results(
return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore return SalesPrognosisResults(daten=tuple(df_formatted)) # type: ignore
def _write_sales_forecast_stats(
stats: SalesForecastStatistics,
) -> None:
stats_db = asdict(stats)
_ = stats_db.pop("xgb_params")
xgb_params = stats.xgb_params
with SESSION.db_engine.begin() as conn:
res = conn.execute(sql.insert(databases.sf_stats).values(stats_db))
sf_id = cast(int, res.inserted_primary_key[0]) # type: ignore
if xgb_params is not None:
xgb_params["forecast_id"] = sf_id
conn.execute(sql.insert(databases.sf_XGB).values(xgb_params))
@wrap_result() @wrap_result()
def _parse_api_resp_to_df_wrapped( def _parse_api_resp_to_df_wrapped(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
@ -114,18 +87,30 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data) return _parse_df_to_results(data)
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE) # ------------------------------------------------------------------------------
def _write_sales_forecast_stats_wrapped( # Input:
stats: SalesForecastStatistics, # DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp
) -> None: # kunde (muss enthalten sein in df['firmen']['firma_refid'])
return _write_sales_forecast_stats(stats)
# Output:
# Integer umsetzung (Prognose möglich): 0 ja, 1 nein (zu wenig Daten verfügbar),
# 2 nein (Daten nicht für Prognose geeignet)
# DataFrame test: Jahr, Monat, Vorhersage
# -------------------------------------------------------------------------------
# Prognose Umsatz je Firma
# TODO: check usage of separate exception and handle it in API function
# TODO set min number of data points as constant, not parameter
def _preprocess_sales( def _preprocess_sales(
resp: SalesPrognosisResponse, resp: SalesPrognosisResponse,
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
target_features: Set[str], target_features: Set[str],
) -> ForecastPipe: ) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1 """n = 1
Parameters Parameters
@ -142,7 +127,7 @@ def _preprocess_sales(
PipeResult PipeResult
_description_ _description_
""" """
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_api_resp_to_df_wrapped(resp) res = _parse_api_resp_to_df_wrapped(resp)
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
@ -164,10 +149,10 @@ def _preprocess_sales(
def _process_sales( def _process_sales(
pipe: ForecastPipe, pipe: PipeResult[SalesPrognosisResultsExport],
min_num_data_points: int, min_num_data_points: int,
base_num_data_points_months: int, base_num_data_points_months: int,
) -> ForecastPipe: ) -> PipeResult[SalesPrognosisResultsExport]:
"""n = 1 """n = 1
Input-Data: Input-Data:
fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"] fields: ["artikel_refid", "firma_refid", "betrag", "menge", "buchungs_datum"]
@ -186,50 +171,26 @@ def _process_sales(
PipeResult PipeResult
_description_ _description_
""" """
# cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast()
# filter data # filter data
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
DATE_FEAT: Final[str] = "buchungs_datum" DATE_FEAT: Final[str] = "buchungs_datum"
SALES_FEAT: Final[str] = "betrag" SALES_FEAT: Final[str] = "betrag"
df_firma = data[(data["betrag"] > 0)]
data[DATE_FEAT] = pd.to_datetime(data[DATE_FEAT], errors="coerce") df_cust = df_firma.copy()
data = data.dropna(subset=["buchungs_datum"])
df_filter = data[(data["betrag"] > 0)]
df_cust = df_filter.copy()
df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index() df_cust = df_cust.sort_values(by=DATE_FEAT).reset_index()
len_ds = len(df_cust)
if len_ds < min_num_data_points: if len(df_cust) < min_num_data_points:
status = STATUS_HANDLER.pipe_states.TOO_FEW_POINTS pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_POINTS)
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
df_cust["jahr"] = df_cust[DATE_FEAT].dt.year df_cust["jahr"] = df_cust[DATE_FEAT].dt.year
df_cust["monat"] = df_cust[DATE_FEAT].dt.month df_cust["monat"] = df_cust[DATE_FEAT].dt.month
monthly_sum_data_only = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index() monthly_sum = df_cust.groupby(["jahr", "monat"])[SALES_FEAT].sum().reset_index()
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
years = range(df_cust["jahr"].min(), current_year + 1)
all_month_year_combinations = pd.DataFrame(
[
(year, month)
for year in years
for month in range(1, 13)
if (year < current_year or (year == current_year and month <= current_month))
],
columns=["jahr", "monat"],
)
monthly_sum = pd.merge(
all_month_year_combinations, monthly_sum_data_only, on=["jahr", "monat"], how="left"
)
monthly_sum[SALES_FEAT] = monthly_sum[SALES_FEAT].fillna(0)
monthly_sum[DATE_FEAT] = ( monthly_sum[DATE_FEAT] = (
monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str) monthly_sum["monat"].astype(str) + "." + monthly_sum["jahr"].astype(str)
) )
@ -238,17 +199,13 @@ def _process_sales(
features = ["jahr", "monat"] features = ["jahr", "monat"]
target = SALES_FEAT target = SALES_FEAT
current_year = datetime.datetime.now().year
last_date = pd.to_datetime(datetime.datetime.now().strftime("%m.%Y"), format="%m.%Y") first_year = cast(int, df_cust["jahr"].min())
future_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1), periods=6, freq="MS"
)
forecast = pd.DataFrame({"datum": future_dates}).set_index("datum")
# Randomized Search # Randomized Search
kfold = KFold(n_splits=5, shuffle=True) kfold = KFold(n_splits=5, shuffle=True)
params: ParamSearchXGBRegressor = { params: ParamSearchXGBRegressor = {
"n_estimators": scipy.stats.poisson(mu=100), "n_estimators": scipy.stats.poisson(mu=1000),
"learning_rate": [0.03, 0.04, 0.05], "learning_rate": [0.03, 0.04, 0.05],
"max_depth": range(2, 9), "max_depth": range(2, 9),
"min_child_weight": range(1, 5), "min_child_weight": range(1, 5),
@ -258,119 +215,79 @@ def _process_sales(
"early_stopping_rounds": [20, 50], "early_stopping_rounds": [20, 50],
} }
best_estimator = None
best_params: BestParametersXGBRegressor | None = None best_params: BestParametersXGBRegressor | None = None
best_score_mae: float | None = float("inf") best_score_mae: float = float("inf")
best_score_r2: float | None = None best_score_r2: float = float("inf")
best_start_year: int | None = None best_start_year: int | None = None
too_few_month_points: bool = True too_few_month_points: bool = True
forecast: pd.DataFrame | None = None
dates = cast(pd.DatetimeIndex, monthly_sum.index) for start_year in range(current_year - 4, first_year - 1, -1):
# baseline: 3 years - 36 months
starting_date = datetime.datetime.now() - relativedelta(months=36)
target_index, _ = next(
((i, True) for i, date in enumerate(dates) if date >= starting_date),
(len(dates) - 1, False),
)
for add_year, date_idx in enumerate(range(target_index, -1, -12)):
first_date = dates[date_idx]
split_date = dates[-6]
train = cast( train = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum.loc[first_date:split_date].copy(), # type: ignore monthly_sum[monthly_sum.index.year >= start_year].iloc[:-5].copy(), # type: ignore
) )
test = cast( test = cast(
pd.DataFrame, pd.DataFrame,
monthly_sum.loc[split_date:].copy(), # type: ignore monthly_sum[monthly_sum.index.year >= start_year].iloc[-5:].copy(), # type: ignore
) )
X_train, X_test = train[features], test[features] X_train, X_test = train[features], test[features]
y_train, y_test = train[target], test[target] y_train, y_test = train[target], test[target]
# test set size fixed at 6 --> first iteration: baseline - 6 entries if len(train) >= (base_num_data_points_months + 10 * (current_year - 4 - start_year)):
# for each new year 10 new data points (i.e., sales strictly positive) needed
if len(train[train[SALES_FEAT] > 0]) >= (base_num_data_points_months + 10 * add_year):
too_few_month_points = False too_few_month_points = False
with joblib.parallel_config(backend="loky"): rand = RandomizedSearchCV(
rand = RandomizedSearchCV( XGBRegressor(),
XGBRegressor(), params,
params, scoring="neg_mean_absolute_error",
scoring="neg_mean_absolute_error", cv=kfold,
cv=kfold, n_jobs=-1,
n_jobs=MAX_NUM_WORKERS, n_iter=100,
n_iter=100, verbose=0,
verbose=0, )
) rand.fit(
rand.fit( X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=0
X_train, )
y_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
verbose=0,
)
y_pred = rand.best_estimator_.predict(X_test) # type: ignore y_pred = rand.best_estimator_.predict(X_test) # type: ignore
if len(np.unique(y_pred)) != 1: if len(np.unique(y_pred)) != 1:
# pp(y_pred)
error = cast(float, mean_absolute_error(y_test, y_pred)) error = cast(float, mean_absolute_error(y_test, y_pred))
if error < best_score_mae: if error < best_score_mae:
best_params = cast(BestParametersXGBRegressor, rand.best_params_) best_params = cast(BestParametersXGBRegressor, rand.best_params_)
best_score_mae = error best_score_mae = error
best_score_r2 = cast(float, r2_score(y_test, y_pred)) best_score_r2 = cast(float, r2_score(y_test, y_pred))
# --- new: use first_date for best_start_year best_start_year = start_year
best_start_year = first_date.year print("executed")
# --- new: store best_estimator forecast = test.copy()
best_estimator = copy.copy(rand.best_estimator_) forecast.loc[:, "vorhersage"] = y_pred
if best_estimator is not None: # pp(best_params)
X_future = pd.DataFrame( # pp(best_score_mae)
{"jahr": future_dates.year, "monat": future_dates.month}, index=future_dates # pp(best_score_r2)
) # pp(best_start_year)
y_future = best_estimator.predict(X_future) # type: ignore if forecast is not None:
forecast["vorhersage"] = y_future forecast = forecast.drop(SALES_FEAT, axis=1).reset_index(drop=True)
forecast["jahr"] = forecast.index.year # type: ignore
forecast["monat"] = forecast.index.month # type: ignore
forecast = forecast.reset_index(drop=True)
best_score_mae = best_score_mae if not math.isinf(best_score_mae) else None # TODO log metrics
if too_few_month_points: if too_few_month_points:
status = STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS pipe.fail(STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS)
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
elif best_params is None: elif best_params is None:
status = STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST pipe.fail(STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST)
pipe.fail(status)
stats = SalesForecastStatistics(status.code, status.description, len_ds)
pipe.stats(stats)
return pipe return pipe
assert "vorhersage" in forecast.columns, ( assert forecast is not None, "forecast is None, but was attempted to be returned"
"forecast does not contain prognosis values, but was attempted to be returned" pipe.success(forecast, STATUS_HANDLER.SUCCESS)
)
status = STATUS_HANDLER.SUCCESS
pipe.success(forecast, status)
stats = SalesForecastStatistics(
status.code,
status.description,
len_ds,
score_mae=best_score_mae,
score_r2=best_score_r2,
best_start_year=best_start_year,
xgb_params=best_params,
)
pipe.stats(stats)
return pipe return pipe
def _postprocess_sales( def _postprocess_sales(
pipe: ForecastPipe, pipe: PipeResult[SalesPrognosisResultsExport],
feature_map: Mapping[str, str], feature_map: Mapping[str, str],
) -> ForecastPipe: ) -> PipeResult[SalesPrognosisResultsExport]:
data = pipe.data data = pipe.data
assert data is not None, "processing not existing pipe result" assert data is not None, "processing not existing pipe result"
# convert features back to original naming # convert features back to original naming
@ -404,20 +321,19 @@ def _export_on_fail(
return SalesPrognosisResultsExport(response=response, status=status) return SalesPrognosisResultsExport(response=response, status=status)
def pipeline_sales_forecast( def pipeline_sales(
session: Session, session: Session,
company_ids: list[int] | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data( response, status = get_sales_prognosis_data(
session, session,
company_ids=company_ids, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error( logger.error(
"Error during sales forecast data retrieval, Status: %s", "Error during sales prognosis data retrieval, Status: %s",
status, status,
stack_info=True, stack_info=True,
) )
@ -429,8 +345,8 @@ def pipeline_sales_forecast(
target_features=FEATURES_SALES_PROGNOSIS, target_features=FEATURES_SALES_PROGNOSIS,
) )
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error( logger.error(
"Error during sales forecast preprocessing, Status: %s", "Error during sales prognosis preprocessing, Status: %s",
pipe.status, pipe.status,
stack_info=True, stack_info=True,
) )
@ -439,18 +355,11 @@ def pipeline_sales_forecast(
pipe = _process_sales( pipe = _process_sales(
pipe, pipe,
min_num_data_points=SALES_MIN_NUM_DATAPOINTS, min_num_data_points=SALES_MIN_NUM_DATAPOINTS,
base_num_data_points_months=session.cfg.forecast.threshold_month_data_points, base_num_data_points_months=SALES_BASE_NUM_DATAPOINTS_MONTHS,
) )
if pipe.statistics is not None:
res = _write_sales_forecast_stats_wrapped(pipe.statistics)
if res.status != STATUS_HANDLER.SUCCESS:
logger_db.error(
"[DB] Error during write process of sales forecast statistics: %s",
res.status,
)
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error( logger.error(
"Error during sales forecast main processing, Status: %s", "Error during sales prognosis main processing, Status: %s",
pipe.status, pipe.status,
stack_info=True, stack_info=True,
) )
@ -461,8 +370,8 @@ def pipeline_sales_forecast(
feature_map=DualDict(), feature_map=DualDict(),
) )
if pipe.status != STATUS_HANDLER.SUCCESS: if pipe.status != STATUS_HANDLER.SUCCESS:
logger_pipelines.error( logger.error(
"Error during sales forecast postprocessing, Status: %s", "Error during sales prognosis postprocessing, Status: %s",
pipe.status, pipe.status,
stack_info=True, stack_info=True,
) )
@ -470,8 +379,6 @@ def pipeline_sales_forecast(
assert pipe.results is not None, "needed export response not set in pipeline" assert pipe.results is not None, "needed export response not set in pipeline"
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
return pipe.results return pipe.results
@ -481,23 +388,18 @@ def pipeline_sales_dummy(
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
"""prototype dummy function for tests by DelBar""" """prototype dummy function for tests by DelBar"""
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
_, _, _ = session, company_id, start_date _, _, _ = session, company_id, start_date
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
assert data_pth.exists(), "sales forecast dummy data not existent" assert data_pth.exists(), "sales forecast dummy data not existent"
data = pd.read_pickle(data_pth) data = pd.read_pickle(data_pth)
pipe: ForecastPipe = PipeResult(None, STATUS_HANDLER.SUCCESS) pipe: PipeResult[SalesPrognosisResultsExport] = PipeResult(None, STATUS_HANDLER.SUCCESS)
res = _parse_df_to_results_wrapped(data) res = _parse_df_to_results_wrapped(data)
if res.status != STATUS_HANDLER.SUCCESS: if res.status != STATUS_HANDLER.SUCCESS:
pipe.fail(res.status) pipe.fail(res.status)
return _export_on_fail(res.status) return _export_on_fail(res.status)
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
return SalesPrognosisResultsExport( return SalesPrognosisResultsExport(
response=res.unwrap(), response=res.unwrap(),
status=res.status, status=res.status,

View File

@ -1,31 +1,236 @@
from __future__ import annotations from __future__ import annotations
from typing import Final from pathlib import Path
from typing import TYPE_CHECKING, Final
import requests import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel from pydantic import BaseModel
from requests import Response from requests import Response
import delta_barth.logging
from delta_barth.errors import ( from delta_barth.errors import (
STATUS_HANDLER,
UnspecifiedRequestType, UnspecifiedRequestType,
) )
from delta_barth.logging import logger_session as logger
from delta_barth.types import ( from delta_barth.types import (
ApiCredentials, ApiCredentials,
DelBarApiError,
HttpRequestTypes, HttpRequestTypes,
) )
if TYPE_CHECKING:
# ** login from delta_barth.types import HttpContentHeaders, Status
class LoginRequest(BaseModel):
userName: str
password: str
databaseName: str
mandantName: str
class LoginResponse(BaseModel): class Session:
token: str def __init__(
self,
base_headers: HttpContentHeaders,
logging_folder: str = "logs",
) -> None:
self._data_path: Path | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
def setup(self) -> None:
self.setup_logging()
@property
def data_path(self) -> Path:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def logging_dir(self) -> Path:
if self._logging_dir is not None:
return self._logging_dir
logging_dir = self.data_path / self._logging_folder
if not logging_dir.exists():
logging_dir.mkdir(parents=False)
self._logging_dir = logging_dir
return self._logging_dir
def setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging")
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
return self._creds
def set_data_path(
self,
path: str,
):
self._data_path = validate_path(path)
def set_credentials(
self,
username: str,
password: str,
database: str,
mandant: str,
) -> None:
if self.logged_in:
self.logout()
self._creds = validate_credentials(
username=username,
password=password,
database=database,
mandant=mandant,
)
@property
def base_url(self) -> str:
assert self._base_url is not None, "accessed base URL not set"
return self._base_url
def set_base_url(
self,
base_url: str,
) -> None:
if self.logged_in:
self.logout()
self._base_url = base_url
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def _add_session_token(
self,
token: str,
) -> None:
assert self.session_token is None, "tried overwriting existing API session token"
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def _remove_session_token(self) -> None:
assert self.session_token is not None, (
"tried to delete non-existing API session token"
)
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False
def login(
self,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(self.base_url, ROUTE)
login_req = LoginRequest(
userName=self.creds.username,
password=self.creds.password,
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def logout(
self,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put(
URL,
headers=self.headers, # type: ignore
)
response = None
status: Status
if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS
self._remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def assert_login(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def validate_path(
str_path: str,
) -> Path:
path = Path(str_path).resolve()
if not path.exists():
raise FileNotFoundError(f"Provided path >{path}< seems not to exist.")
elif not path.is_dir():
raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.")
return path
def validate_credentials( def validate_credentials(
@ -60,3 +265,15 @@ def ping(
raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint") raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint")
return resp return resp
# ** login
class LoginRequest(BaseModel):
userName: str
password: str
databaseName: str
mandantName: str
class LoginResponse(BaseModel):
token: str

View File

@ -7,20 +7,17 @@ import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT, MAX_LOGIN_RETRIES
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
if TYPE_CHECKING: if TYPE_CHECKING:
from requests import Response from delta_barth.api.common import Session
from delta_barth.session import Session
# ** sales data # ** sales data
# ** import # ** import
class SalesPrognosisRequestP(BaseModel): class SalesPrognosisRequestP(BaseModel):
FirmaIds: SkipValidation[list[int] | None] FirmaId: SkipValidation[int | None]
BuchungsDatum: SkipValidation[Datetime | None] BuchungsDatum: SkipValidation[Datetime | None]
@ -55,50 +52,34 @@ class SalesPrognosisResultsExport(ExportResponse):
def get_sales_prognosis_data( def get_sales_prognosis_data(
session: Session, session: Session,
company_ids: list[int] | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]: ) -> tuple[SalesPrognosisResponse, Status]:
resp, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple())
return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten" ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE) URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP( sales_prog_req = SalesPrognosisRequestP(
FirmaIds=company_ids, FirmaId=company_id,
BuchungsDatum=start_date, BuchungsDatum=start_date,
) )
empty_response = SalesPrognosisResponse(daten=tuple()) resp = requests.get(
if not session.logged_in: URL,
_, status = session.login() params=sales_prog_req.model_dump(mode="json", exclude_none=True),
if status != STATUS_HANDLER.SUCCESS: headers=session.headers, # type: ignore[argumentType]
return empty_response, status )
resp: Response | None = None
try:
for attempt in range(1, (MAX_LOGIN_RETRIES + 1)):
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
)
if resp.status_code == 401:
_, status = session.relogin()
if status != STATUS_HANDLER.SUCCESS and attempt == MAX_LOGIN_RETRIES:
return empty_response, status
continue
break
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: SalesPrognosisResponse response: SalesPrognosisResponse
status: Status status: Status
assert resp is not None, "tried to use not defined response"
if resp.status_code == 200: if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json()) response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
else: else:
response = empty_response response = SalesPrognosisResponse(daten=tuple())
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@ -1,43 +0,0 @@
from __future__ import annotations
from pathlib import Path
import dopt_basics.configs
from pydantic import BaseModel
class Config(BaseModel):
forecast: CfgForecast
class CfgForecast(BaseModel):
threshold_month_data_points: int
class LazyCfgLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
cfg_path = cfg_path.resolve()
assert cfg_path.exists(), f"config path {cfg_path} seems not to exist"
assert cfg_path.is_file(), f"config path {cfg_path} seems not to be a file"
self._path = cfg_path
self._cfg: Config | None = None
@property
def path(self) -> Path:
return self._path
def _load(self) -> Config:
cfg = dopt_basics.configs.load_toml(self.path)
return Config(**cfg)
def reload(self) -> None:
self._cfg = self._load()
def get(self) -> Config:
if self._cfg is None:
self._cfg = self._load()
return self._cfg

View File

@ -1,19 +1,10 @@
from __future__ import annotations
import enum import enum
from pathlib import Path from pathlib import Path
from typing import Final from typing import Final
import psutil
import delta_barth._env
from delta_barth.types import DualDict, HttpContentHeaders from delta_barth.types import DualDict, HttpContentHeaders
# ** config # ** config
CFG_FILENAME: Final[str] = "dopt-cfg.toml"
CFG_HOT_RELOAD: Final[bool] = True
cpu_count = psutil.cpu_count(logical=False)
MAX_NUM_WORKERS: Final[int] = (cpu_count - 1) if cpu_count is not None else 3
# ** lib path # ** lib path
lib_path = Path(__file__).parent lib_path = Path(__file__).parent
@ -22,20 +13,16 @@ LIB_PATH: Final[Path] = lib_path
dummy_data_pth = LIB_PATH / "_dummy_data" dummy_data_pth = LIB_PATH / "_dummy_data"
assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}" assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}"
DUMMY_DATA_PATH: Final[Path] = dummy_data_pth DUMMY_DATA_PATH: Final[Path] = dummy_data_pth
# ** runtime and deployment status
RUNTIME_PATH: Final[Path | None] = delta_barth._env.prepare_env(LIB_PATH)
deployment_status: bool = False
if RUNTIME_PATH is not None:
deployment_status = True
DEPLOYMENT_STATUS: Final[bool] = deployment_status
# ** logging
ENABLE_LOGGING: Final[bool] = False
LOGGING_TO_FILE: Final[bool] = True
LOGGING_TO_STDERR: Final[bool] = True
LOG_FILENAME: Final[str] = "dopt-delbar.log"
# ** databases
DB_ECHO: Final[bool] = False
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400 DEFAULT_API_ERR_CODE: Final[int] = 400
@ -49,10 +36,6 @@ class KnownDelBarApiErrorCodes(enum.Enum):
COMMON = frozenset((400, 401, 409, 500)) COMMON = frozenset((400, 401, 409, 500))
# ** API
API_CON_TIMEOUT: Final[float] = 20.0 # secs to response
MAX_LOGIN_RETRIES: Final[int] = 2
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(
@ -75,6 +58,4 @@ FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset(
# ** Pipelines # ** Pipelines
# ** Forecast # ** Forecast
SALES_MIN_NUM_DATAPOINTS: Final[int] = 36 SALES_MIN_NUM_DATAPOINTS: Final[int] = 36
# !! now in config SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36
# TODO remove later till proven stable
# SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36

View File

@ -1,60 +0,0 @@
from pathlib import Path
import sqlalchemy as sql
# ** meta
metadata = sql.MetaData()
def get_engine(
db_path: Path,
echo: bool = False,
) -> sql.Engine:
path = db_path.resolve()
connection_str: str = f"sqlite:///{str(path)}"
engine = sql.create_engine(connection_str, echo=echo)
return engine
# ** table declarations
# ** ---- common
perf_meas = sql.Table(
"performance_measurement",
metadata,
sql.Column("id", sql.Integer, primary_key=True),
sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
)
# ** ---- forecasts
sf_stats = sql.Table(
"sales_forecast_statistics",
metadata,
sql.Column("id", sql.Integer, primary_key=True),
sql.Column("status_code", sql.Integer),
sql.Column("status_dscr", sql.String(length=200)),
sql.Column("length_dataset", sql.Integer),
sql.Column("score_mae", sql.Float, nullable=True),
sql.Column("score_r2", sql.Float, nullable=True),
sql.Column("best_start_year", sql.Integer, nullable=True),
)
sf_XGB = sql.Table(
"sales_forecast_XGB_parameters",
metadata,
sql.Column("id", sql.Integer, primary_key=True),
sql.Column(
"forecast_id",
sql.Integer,
sql.ForeignKey(
"sales_forecast_statistics.id", onupdate="CASCADE", ondelete="CASCADE"
),
unique=True,
),
sql.Column("n_estimators", sql.Integer),
sql.Column("learning_rate", sql.Float),
sql.Column("max_depth", sql.Integer),
sql.Column("min_child_weight", sql.Integer),
sql.Column("gamma", sql.Float),
sql.Column("subsample", sql.Float),
sql.Column("colsample_bytree", sql.Float),
sql.Column("early_stopping_rounds", sql.Integer),
)

View File

@ -1,2 +0,0 @@
[forecast]
threshold_month_data_points = 28

View File

@ -6,7 +6,7 @@ from functools import wraps
from typing import Any, Final from typing import Any, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.logging import logger_status, logger_wrapped_results from delta_barth.logging import logger_wrapped_results as logger
from delta_barth.types import DataPipeStates, Status from delta_barth.types import DataPipeStates, Status
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
@ -53,19 +53,9 @@ class UApiError(Exception):
## ** internal error handling ## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
( ("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
"CONNECTION_TIMEOUT", ("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
1, ("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"),
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
),
(
"CONNECTION_ERROR",
2,
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
),
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
) )
@ -161,32 +151,23 @@ class StatusHandler:
state: Status, state: Status,
) -> None: ) -> None:
if state == self.SUCCESS: if state == self.SUCCESS:
logger_status.info(
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
)
return return
code = state.code code = state.code
descr = state.description descr = state.description
msg = state.message msg = state.message
exc: Exception
if code < DEFAULT_INTERNAL_ERR_CODE: if code < DEFAULT_INTERNAL_ERR_CODE:
exc = _construct_exception(UDataProcessingError, descr, msg) raise _construct_exception(UDataProcessingError, descr, msg)
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE: elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
exc = _construct_exception(UInternalError, descr, msg) raise _construct_exception(UInternalError, descr, msg)
else: else:
api_err = state.api_server_error api_err = state.api_server_error
assert api_err is not None, ( assert api_err is not None, (
"error code inidcated API error, but no error instance found" "error code inidcated API error, but no error instance found"
) )
add_info = api_err.model_dump(exclude_none=True) add_info = api_err.model_dump(exclude_none=True)
exc = _construct_exception(UApiError, descr, msg, add_info) raise _construct_exception(UApiError, descr, msg, add_info)
logger_status.error(
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
)
raise exc
STATUS_HANDLER: Final[StatusHandler] = StatusHandler() STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
@ -248,24 +229,24 @@ def wrap_result(
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]: def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
@wraps(func) @wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]: def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
wrapped_result: ResultWrapper[T] status: ResultWrapper[T]
try: try:
res = func(*args, **kwargs) res = func(*args, **kwargs)
wrapped_result = ResultWrapper( status = ResultWrapper(
result=res, exception=None, code_on_error=code_on_error result=res, exception=None, code_on_error=code_on_error
) )
except Exception as err: except Exception as err:
wrapped_result = ResultWrapper( status = ResultWrapper(
result=NotSet(), exception=err, code_on_error=code_on_error result=NotSet(), exception=err, code_on_error=code_on_error
) )
logger_wrapped_results.info( logger.error(
"[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:", "An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__, func.__name__,
str(err), str(err),
stack_info=True, stack_info=True,
) )
return wrapped_result return status
return wrapper return wrapper

View File

@ -6,54 +6,53 @@ from pathlib import Path
from time import gmtime from time import gmtime
from typing import Final from typing import Final
# ** config from delta_barth.constants import (
# ** logging ENABLE_LOGGING,
ENABLE_LOGGING: Final[bool] = True LOG_FILENAME,
LOGGING_TO_FILE: Final[bool] = True LOGGING_TO_FILE,
LOGGING_TO_STDERR: Final[bool] = False LOGGING_TO_STDERR,
LOG_FILENAME: Final[str] = "dopt-delbar.log" )
# ** config
logging.Formatter.converter = gmtime logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s" LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000" LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
# if not LOG_FILE_FOLDER.exists():
# LOG_FILE_FOLDER.mkdir(parents=True)
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
# ** handlers
NULL_HANDLER = logging.NullHandler()
# ** formatters
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** loggers and configuration # ** loggers and configuration
logger_all = logging.getLogger("delta_barth")
logger_base = logging.getLogger("delta_barth") # logger_all.addHandler(logger_all_handler_stderr)
logger_status = logging.getLogger("delta_barth.status") # logger_all.addHandler(logger_all_handler_file)
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session") logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG) logger_session.setLevel(logging.DEBUG)
logger_config = logging.getLogger("delta_barth.config")
logger_config.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results") logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
logger_wrapped_results.setLevel(logging.DEBUG) logger_wrapped_results.setLevel(logging.DEBUG)
logger_pipelines = logging.getLogger("delta_barth.pipelines") logger_pipelines = logging.getLogger("delta_barth.logger_pipelines")
logger_pipelines.setLevel(logging.DEBUG) logger_pipelines.setLevel(logging.DEBUG)
logger_db = logging.getLogger("delta_barth.databases")
logger_db.setLevel(logging.DEBUG)
def setup_logging( def setup_logging(
logging_dir: Path, logging_dir: Path,
) -> None: ) -> None:
# ** formatters
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** handlers # ** handlers
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
null_handler = logging.NullHandler()
if ENABLE_LOGGING and LOGGING_TO_STDERR: if ENABLE_LOGGING and LOGGING_TO_STDERR:
logger_all_handler_stderr = logging.StreamHandler() logger_all_handler_stderr = logging.StreamHandler()
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR) logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER) logger_all_handler_stderr.setFormatter(logger_all_formater)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_stderr = NULL_HANDLER logger_all_handler_stderr = null_handler
if ENABLE_LOGGING and LOGGING_TO_FILE: if ENABLE_LOGGING and LOGGING_TO_FILE:
logger_all_handler_file = logging.handlers.RotatingFileHandler( logger_all_handler_file = logging.handlers.RotatingFileHandler(
@ -64,17 +63,9 @@ def setup_logging(
delay=True, delay=True,
) )
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE) logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER) logger_all_handler_file.setFormatter(logger_all_formater)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_file = NULL_HANDLER logger_all_handler_file = null_handler
logger_base.addHandler(logger_all_handler_stderr) logger_all.addHandler(logger_all_handler_stderr)
logger_base.addHandler(logger_all_handler_file) logger_all.addHandler(logger_all_handler_file)
def disable_logging() -> None:
handlers = tuple(logger_base.handlers)
for handler in handlers:
logger_base.removeHandler(handler)
logger_base.addHandler(NULL_HANDLER)

View File

@ -5,22 +5,16 @@ from __future__ import annotations
from typing import Final from typing import Final
from delta_barth.api.common import Session
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.logging import logger_session as logger
from delta_barth.session import Session
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS) SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup( def set_data_path(
data_path: str, path: str,
base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
# at this point: no logging configured SESSION.set_data_path(path)
SESSION.set_data_path(data_path)
SESSION.set_base_url(base_url=base_url)
SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
def set_credentials( def set_credentials(
@ -29,33 +23,25 @@ def set_credentials(
database: str, database: str,
mandant: str, mandant: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
SESSION.set_credentials( SESSION.set_credentials(
username=username, username=username,
password=password, password=password,
database=database, database=database,
mandant=mandant, mandant=mandant,
) )
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
# ** not part of external API, only internal
def get_credentials() -> str: # pragma: no cover def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds creds = SESSION.creds
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
return creds.model_dump_json() return creds.model_dump_json()
# ** legacy: not part of external API
def set_base_url( def set_base_url(
base_url: str, base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
SESSION.set_base_url(base_url=base_url) SESSION.set_base_url(base_url=base_url)
def get_data_path() -> str: # pragma: no cover
return str(SESSION.data_path)
def get_base_url() -> str: # pragma: no cover def get_base_url() -> str: # pragma: no cover
return SESSION.base_url return SESSION.base_url

View File

@ -1,83 +1,18 @@
"""collection of configured data pipelines, intended to be invoked from C#""" """collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse, PipelineMetrics from delta_barth.types import JsonExportResponse
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_ids: list[int] | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast" result = forecast.pipeline_sales(SESSION, company_id=company_id, start_date=start_date)
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast(
SESSION, company_ids=company_ids, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export
@ -86,38 +21,11 @@ def pipeline_sales_forecast_dummy(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy( result = forecast.pipeline_sales_dummy(
SESSION, SESSION,
company_id=company_id, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export

View File

@ -1,302 +0,0 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Final
import requests
import sqlalchemy as sql
from dopt_basics.io import combine_route
import delta_barth.logging
from delta_barth import databases as db
from delta_barth.api.common import (
LoginRequest,
LoginResponse,
validate_credentials,
)
from delta_barth.config import LazyCfgLoader
from delta_barth.constants import (
API_CON_TIMEOUT,
CFG_FILENAME,
CFG_HOT_RELOAD,
DB_ECHO,
LIB_PATH,
)
from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING:
from delta_barth.config import Config
from delta_barth.types import ApiCredentials, HttpContentHeaders
def validate_path(
str_path: str,
) -> Path:
path = Path(str_path).resolve()
if not path.exists():
raise FileNotFoundError(f"Provided path >{path}< seems not to exist.")
elif not path.is_dir():
raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.")
return path
class Session:
def __init__(
self,
base_headers: HttpContentHeaders,
db_folder: str = "data",
logging_folder: str = "logs",
cfg_folder: str = "config",
) -> None:
self._setup: bool = False
self._data_path: Path | None = None
self._db_path: Path | None = None
self._db_folder = db_folder
self._db_engine: sql.Engine | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._cfg_path: Path | None = None
self._cfg_folder = cfg_folder
self._cfg_loader: LazyCfgLoader | None = None
self._cfg: Config | None = None
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
def setup(self) -> None:
# at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging()
self._setup_config()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@property
def data_path(self) -> Path:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def cfg_path(self) -> Path:
if self._cfg_path is not None and self._setup:
return self._cfg_path
root = (self.data_path / self._cfg_folder).resolve()
cfg_path = root / CFG_FILENAME
if not root.exists():
root.mkdir(parents=False)
self._cfg_path = cfg_path
return self._cfg_path
@property
def cfg(self) -> Config:
assert self._cfg is not None, "tried to access not set config from session"
if CFG_HOT_RELOAD:
self.reload_cfg()
return self._cfg
def _setup_config(self) -> None:
if not self.cfg_path.exists():
src_cfg = LIB_PATH / CFG_FILENAME
shutil.copyfile(src_cfg, self.cfg_path)
self._cfg_loader = LazyCfgLoader(self.cfg_path)
self._cfg = self._cfg_loader.get()
logger.info("[SESSION] Successfully read and setup config")
def reload_cfg(self) -> None:
assert self._cfg_loader is not None, "tried reloading with no CFG loader intialised"
self._cfg_loader.reload()
self._cfg = self._cfg_loader.get()
@property
def db_engine(self) -> sql.Engine:
assert self._db_engine is not None, "accessed database engine not set"
return self._db_engine
@property
def db_path(self) -> Path:
if self._db_path is not None and self._setup:
return self._db_path
root = (self.data_path / self._db_folder).resolve()
db_path = root / "dopt-data.db"
if not root.exists():
root.mkdir(parents=False)
self._db_path = db_path
return self._db_path
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
@property
def logging_dir(self) -> Path:
if self._logging_dir is not None and self._setup:
return self._logging_dir
logging_dir = self.data_path / self._logging_folder
if not logging_dir.exists():
logging_dir.mkdir(parents=False)
self._logging_dir = logging_dir
return self._logging_dir
def _setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging")
def disable_logging(self) -> None:
delta_barth.logging.disable_logging()
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
return self._creds
def set_data_path(
self,
path: str,
):
self._data_path = validate_path(path)
self._setup = False
def set_credentials(
self,
username: str,
password: str,
database: str,
mandant: str,
) -> None:
if self.logged_in:
self.logout()
self._creds = validate_credentials(
username=username,
password=password,
database=database,
mandant=mandant,
)
@property
def base_url(self) -> str:
assert self._base_url is not None, "accessed base URL not set"
return self._base_url
def set_base_url(
self,
base_url: str,
) -> None:
if self.logged_in:
self.logout()
self._base_url = base_url
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def _add_session_token(
self,
token: str,
) -> None:
assert self.session_token is None, "tried overwriting existing API session token"
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def _remove_session_token(self) -> None:
assert self.session_token is not None, (
"tried to delete non-existing API session token"
)
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False
def login(
self,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(self.base_url, ROUTE)
login_req = LoginRequest(
userName=self.creds.username,
password=self.creds.password,
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
empty_response = LoginResponse(token="")
try:
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def logout(
self,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
try:
resp = requests.put(
URL,
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
status: Status
if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS
self._remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return None, status
def relogin(
self,
) -> tuple[LoginResponse, Status]:
if self.session_token is None:
return self.login()
self._remove_session_token()
return self.login()

View File

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
import enum import enum
import pprint
import typing as t import typing as t
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -15,7 +14,6 @@ __all__ = ["DualDict"]
# ** Pipeline state management # ** Pipeline state management
StatusDescription: t.TypeAlias = tuple[str, int, str] StatusDescription: t.TypeAlias = tuple[str, int, str]
R = t.TypeVar("R", bound="ExportResponse") R = t.TypeVar("R", bound="ExportResponse")
S = t.TypeVar("S", bound="Statistics")
class IError(t.Protocol): class IError(t.Protocol):
@ -30,10 +28,6 @@ class Status(BaseModel):
message: SkipValidation[str] = "" message: SkipValidation[str] = ""
api_server_error: SkipValidation[DelBarApiError | None] = None api_server_error: SkipValidation[DelBarApiError | None] = None
def __str__(self) -> str:
py_repr = self.model_dump()
return pprint.pformat(py_repr, indent=4, sort_dicts=False)
class ResponseType(BaseModel): class ResponseType(BaseModel):
pass pass
@ -47,19 +41,16 @@ class ExportResponse(BaseModel):
@dataclass(slots=True) @dataclass(slots=True)
class DataPipeStates: class DataPipeStates:
SUCCESS: Status SUCCESS: Status
CONNECTION_TIMEOUT: Status
CONNECTION_ERROR: Status
TOO_FEW_POINTS: Status TOO_FEW_POINTS: Status
TOO_FEW_MONTH_POINTS: Status TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status NO_RELIABLE_FORECAST: Status
@dataclass(slots=True) @dataclass(slots=True)
class PipeResult(t.Generic[R, S]): class PipeResult(t.Generic[R]):
data: pd.DataFrame | None data: pd.DataFrame | None
status: Status status: Status
results: R | None = None results: R | None = None
statistics: S | None = None
def success( def success(
self, self,
@ -86,12 +77,6 @@ class PipeResult(t.Generic[R, S]):
self.status = response.status self.status = response.status
self.results = response self.results = response
def stats(
self,
statistics: S,
) -> None:
self.statistics = statistics
JsonExportResponse = t.NewType("JsonExportResponse", str) JsonExportResponse = t.NewType("JsonExportResponse", str)
JsonResponse = t.NewType("JsonResponse", str) JsonResponse = t.NewType("JsonResponse", str)
@ -136,18 +121,7 @@ HttpContentHeaders = t.TypedDict(
) )
# ** statistics # ** forecasts
class Statistics:
pass
# ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)
@ -166,19 +140,7 @@ class ParamSearchXGBRegressor(t.TypedDict):
early_stopping_rounds: Sequence[int] early_stopping_rounds: Sequence[int]
@dataclass(slots=True, eq=False)
class SalesForecastStatistics(Statistics):
status_code: int
status_dscr: str
length_dataset: int
score_mae: float | None = None
score_r2: float | None = None
best_start_year: int | None = None
xgb_params: BestParametersXGBRegressor | None = None
class BestParametersXGBRegressor(t.TypedDict): class BestParametersXGBRegressor(t.TypedDict):
forecast_id: t.NotRequired[int]
n_estimators: int n_estimators: int
learning_rate: float learning_rate: float
max_depth: int max_depth: int

View File

@ -1,2 +0,0 @@
[forecast]
threshold_month_data_points = 28

View File

@ -1,23 +1,17 @@
import datetime import importlib
from datetime import datetime as Datetime from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pytest import pytest
import sqlalchemy as sql
from pydantic import ValidationError from pydantic import ValidationError
from delta_barth import databases as db import delta_barth.analysis.forecast
from delta_barth.analysis import forecast as fc from delta_barth.analysis import forecast as fc
from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry from delta_barth.api.requests import SalesPrognosisResponse, SalesPrognosisResponseEntry
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import ( from delta_barth.types import DualDict, PipeResult
BestParametersXGBRegressor,
DualDict,
PipeResult,
SalesForecastStatistics,
)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -131,96 +125,6 @@ def test_parse_df_to_results_InvalidData(invalid_results):
_ = fc._parse_df_to_results(invalid_results) _ = fc._parse_df_to_results(invalid_results)
def test_write_sales_forecast_stats_small(session):
eng = session.db_engine
code = 0
descr = "Test case to write stats"
length = 32
stats = SalesForecastStatistics(code, descr, length)
# execute
with patch("delta_barth.analysis.forecast.SESSION", session):
fc._write_sales_forecast_stats(stats)
# read
with eng.begin() as conn:
res = conn.execute(sql.select(db.sf_stats))
inserted = tuple(res.mappings())[0]
data = dict(**inserted)
del data["id"]
result = SalesForecastStatistics(**data)
assert result.status_code == code
assert result.status_dscr == descr
assert result.length_dataset == length
assert result.score_mae is None
assert result.score_r2 is None
assert result.best_start_year is None
assert result.xgb_params is None
def test_write_sales_forecast_stats_large(session):
eng = session.db_engine
code = 0
descr = "Test case to write stats"
length = 32
score_mae = 3.54
score_r2 = 0.56
best_start_year = 2020
xgb_params = BestParametersXGBRegressor(
n_estimators=2,
learning_rate=0.3,
max_depth=2,
min_child_weight=5,
gamma=0.5,
subsample=0.8,
colsample_bytree=5.25,
early_stopping_rounds=5,
)
stats = SalesForecastStatistics(
code,
descr,
length,
score_mae,
score_r2,
best_start_year,
xgb_params,
)
# execute
with patch("delta_barth.analysis.forecast.SESSION", session):
fc._write_sales_forecast_stats(stats)
# read
with eng.begin() as conn:
res_stats = conn.execute(sql.select(db.sf_stats))
res_xgb = conn.execute(sql.select(db.sf_XGB))
# reconstruct best XGB parameters
inserted_xgb = tuple(res_xgb.mappings())[0]
data_xgb = dict(**inserted_xgb)
del data_xgb["id"]
xgb_stats = BestParametersXGBRegressor(**data_xgb)
# reconstruct other statistics
inserted = tuple(res_stats.mappings())[0]
data_inserted = dict(**inserted)
stats_id_fk = data_inserted["id"] # foreign key in XGB parameters
del data_inserted["id"]
stats = SalesForecastStatistics(**data_inserted, xgb_params=xgb_stats)
assert stats.status_code == code
assert stats.status_dscr == descr
assert stats.length_dataset == length
assert stats.score_mae == pytest.approx(score_mae)
assert stats.score_r2 == pytest.approx(score_r2)
assert stats.best_start_year == best_start_year
assert stats.xgb_params is not None
# compare xgb_stats
assert stats.xgb_params["forecast_id"] == stats_id_fk # type: ignore
assert stats.xgb_params["n_estimators"] == 2
assert stats.xgb_params["learning_rate"] == pytest.approx(0.3)
assert stats.xgb_params["max_depth"] == 2
assert stats.xgb_params["min_child_weight"] == 5
assert stats.xgb_params["gamma"] == pytest.approx(0.5)
assert stats.xgb_params["subsample"] == pytest.approx(0.8)
assert stats.xgb_params["colsample_bytree"] == pytest.approx(5.25)
assert stats.xgb_params["early_stopping_rounds"] == 5
def test_preprocess_sales_Success( def test_preprocess_sales_Success(
exmpl_api_sales_prognosis_resp, exmpl_api_sales_prognosis_resp,
feature_map, feature_map,
@ -256,7 +160,6 @@ def test_preprocess_sales_FailOnTargetFeature(
assert pipe.results is None assert pipe.results is None
@pytest.mark.forecast
def test_process_sales_Success(sales_data_real_preproc): def test_process_sales_Success(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -269,39 +172,8 @@ def test_process_sales_Success(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.SUCCESS assert pipe.status == STATUS_HANDLER.SUCCESS
assert pipe.data is not None assert pipe.data is not None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.SUCCESS.code
assert pipe.statistics.status_dscr == STATUS_HANDLER.SUCCESS.description
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is not None
assert pipe.statistics.score_r2 is not None
assert pipe.statistics.best_start_year is not None
assert pipe.statistics.xgb_params is not None
@pytest.mark.forecast
def test_process_sales_InvalidDates(sales_data_real_preproc):
false_date = Datetime(2519, 6, 30)
data = sales_data_real_preproc.copy()
data = data.iloc[:20, :]
data["buchungs_datum"] = data["buchungs_datum"].astype(object)
data.at[0, "buchungs_datum"] = false_date
assert data["buchungs_datum"].dtype.char == "O"
assert len(data) == 20
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
pipe = fc._process_sales(
pipe,
min_num_data_points=36,
base_num_data_points_months=36,
)
assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None
assert pipe.results is None
assert pipe.statistics is not None
@pytest.mark.forecast
def test_process_sales_FailTooFewPoints(sales_data_real_preproc): def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
data = data.iloc[:20, :] data = data.iloc[:20, :]
@ -316,19 +188,8 @@ def test_process_sales_FailTooFewPoints(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.code
assert (
pipe.statistics.status_dscr == STATUS_HANDLER.pipe_states.TOO_FEW_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc): def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
data = sales_data_real_preproc.copy() data = sales_data_real_preproc.copy()
pipe = PipeResult(data, STATUS_HANDLER.SUCCESS) pipe = PipeResult(data, STATUS_HANDLER.SUCCESS)
@ -342,32 +203,10 @@ def test_process_sales_FailTooFewMonthPoints(sales_data_real_preproc):
assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS assert pipe.status == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.TOO_FEW_MONTH_POINTS.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
@pytest.mark.forecast
def test_process_sales_FailNoReliableForecast(sales_data_real_preproc): def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
# prepare fake data data = sales_data_real_preproc.copy()
df = sales_data_real_preproc.copy()
f_dates = "buchungs_datum"
end = datetime.datetime.now()
start = df[f_dates].max()
fake_dates = pd.date_range(start, end, freq="MS")
fake_data = [(1234, 1014, 1024, 1000, 10, date) for date in fake_dates]
fake_df = pd.DataFrame(fake_data, columns=df.columns)
enhanced_df = pd.concat((df, fake_df), ignore_index=True)
data = enhanced_df.copy()
data["betrag"] = 10000 data["betrag"] = 10000
print(data["betrag"]) print(data["betrag"])
data = data.iloc[:20000, :] data = data.iloc[:20000, :]
@ -377,7 +216,7 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
class Predictor: class Predictor:
def predict(self, *args, **kwargs): def predict(self, *args, **kwargs):
return np.array([1, 1, 1, 1], dtype=np.float64) return np.array([1, 1, 1, 1])
self.best_estimator_ = Predictor() self.best_estimator_ = Predictor()
@ -391,24 +230,13 @@ def test_process_sales_FailNoReliableForecast(sales_data_real_preproc):
pipe = fc._process_sales( pipe = fc._process_sales(
pipe, pipe,
min_num_data_points=1, min_num_data_points=1,
base_num_data_points_months=1, base_num_data_points_months=-100,
) )
assert pipe.status != STATUS_HANDLER.SUCCESS assert pipe.status != STATUS_HANDLER.SUCCESS
assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST assert pipe.status == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST
assert pipe.data is None assert pipe.data is None
assert pipe.results is None assert pipe.results is None
assert pipe.statistics is not None
assert pipe.statistics.status_code == STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.code
assert (
pipe.statistics.status_dscr
== STATUS_HANDLER.pipe_states.NO_RELIABLE_FORECAST.description
)
assert pipe.statistics.length_dataset is not None
assert pipe.statistics.score_mae is None
assert pipe.statistics.score_r2 is None
assert pipe.statistics.best_start_year is None
assert pipe.statistics.xgb_params is None
def test_postprocess_sales_Success( def test_postprocess_sales_Success(
@ -452,20 +280,18 @@ def test_export_on_fail():
assert res.status.description == status.description assert res.status.description == status.description
@patch("delta_barth.session.CFG_HOT_RELOAD", False) @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_pipeline_sales_forecast_SuccessDbWrite(exmpl_api_sales_prognosis_resp, session): def test_pipeline_sales_prognosis(exmpl_api_sales_prognosis_resp):
assert session.cfg.forecast.threshold_month_data_points is not None def mock_request(*args, **kwargs): # pragma: no cover
date = Datetime(2023, 8, 15) return exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
company_ids = [5661, 1027, 1024]
with ( with patch(
patch( "delta_barth.analysis.forecast.get_sales_prognosis_data",
"delta_barth.analysis.forecast.get_sales_prognosis_data", # new=mock_request,
) as get_mock, ) as mock:
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock, mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS
): result = fc.pipeline_sales(None) # type: ignore
get_mock.return_value = exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS print(result)
sess_mock.cfg.forecast.threshold_month_data_points = 1
result = fc.pipeline_sales_forecast(session, company_ids, date) # type: ignore
assert result.status == STATUS_HANDLER.SUCCESS assert result.status == STATUS_HANDLER.SUCCESS
assert len(result.response.daten) > 0 assert len(result.response.daten) > 0

32
tests/api/conftest.py Normal file
View File

@ -0,0 +1,32 @@
from unittest.mock import patch
import pytest
from delta_barth.api import common
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
@pytest.fixture(scope="function")
def session(credentials, api_base_url) -> common.Session:
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
return session
@pytest.fixture
def mock_put():
with patch("requests.put") as mock:
yield mock
@pytest.fixture
def mock_get():
with patch("requests.get") as mock:
yield mock

View File

@ -1,13 +1,72 @@
from pathlib import Path
from unittest.mock import patch
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
from delta_barth.api import common from delta_barth.api import common
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
)
from delta_barth.errors import ( from delta_barth.errors import (
UnspecifiedRequestType, UnspecifiedRequestType,
) )
from delta_barth.types import HttpRequestTypes from delta_barth.types import HttpRequestTypes
def test_validate_path_Success():
str_pth = str(Path.cwd())
path = common.validate_path(str_pth)
assert path.name == Path.cwd().name
def test_validate_path_FailNotExisting():
str_pth = str(Path.cwd() / "test")
with pytest.raises(FileNotFoundError, match=r"seems not to exist"):
_ = common.validate_path(str_pth)
def test_validate_path_FailNoDirectory(tmp_path):
file = tmp_path / "test.txt"
file.write_text("test", encoding="utf-8")
str_pth = str(file)
with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"):
_ = common.validate_path(str_pth)
def test_session_set_DataPath(tmp_path):
str_path = str(tmp_path)
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
assert session._data_path is None
session.set_data_path(str_path)
assert session._data_path is not None
assert isinstance(session.data_path, Path)
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = common.Session(HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
assert target_file.exists()
def test_validate_creds(credentials): def test_validate_creds(credentials):
creds = common.validate_credentials( creds = common.validate_credentials(
username=credentials["user"], username=credentials["user"],
@ -51,3 +110,204 @@ def test_ping(api_base_url):
with pytest.raises(UnspecifiedRequestType): with pytest.raises(UnspecifiedRequestType):
resp = common.ping(api_base_url, HttpRequestTypes.POST) resp = common.ping(api_base_url, HttpRequestTypes.POST)
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
assert session._base_url is not None
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert session.session_token is None
assert not session.logged_in
@pytest.mark.api_con_required
def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url):
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
# prepare login
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
session.login()
assert session._base_url is not None
assert session.logged_in
# reset base URL
session.set_base_url(api_base_url)
assert session._base_url is not None
assert not session.logged_in
assert session.session_token is None
# reset credentials
session.login()
assert session.logged_in
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert not session.logged_in
assert session.session_token is None
@pytest.mark.api_con_required
def test_login_logout_Success(session, credentials):
assert not session.logged_in
resp, status = session.login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert resp is None
assert status.code == 0
assert session.session_token is None
assert "DelecoToken" not in session.headers
session.set_credentials(
username=credentials["user"],
password="WRONG_PASSWORD",
database=credentials["db"],
mandant=credentials["mandant"],
)
resp, status = session.login()
assert resp is not None
assert status.code == DEFAULT_API_ERR_CODE
assert status.api_server_error is not None
assert status.api_server_error.status_code == 409
assert status.api_server_error.message == "Nutzer oder Passwort falsch."
def test_login_logout_FailApiServer(session, mock_put):
code = 401
json = {
"message": "GenericError",
"code": "TestLogin",
"hints": "TestCase",
}
mock_put.return_value.status_code = code
mock_put.return_value.json.return_value = json
resp, status = session.login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
resp, status = session.logout()
assert resp is None
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
@pytest.mark.api_con_required
def test_assert_login_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]

View File

@ -1,44 +1,45 @@
from datetime import datetime as Datetime from datetime import datetime as Datetime
import pytest import pytest
import requests
from delta_barth.api import requests as requests_ from delta_barth.api import requests as requests_
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_Success(session): def test_get_sales_prognosis_data_Success(session):
# do not login: let routine do it resp, status = session.login()
# test without company ID # test without company ID
date = Datetime(2023, 12, 15) assert status.code == 0
date = Datetime(2022, 6, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date) resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
date = Datetime(2520, 1, 1) date = Datetime(2030, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, None, date) resp, status = requests_.get_sales_prognosis_data(session, None, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test with company ID # test with company ID
assert status.code == 0 assert status.code == 0
date = Datetime(2023, 8, 15) date = Datetime(2022, 6, 1)
company_ids = [5661, 1027] company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date) resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
date = Datetime(2520, 1, 1) date = Datetime(2030, 1, 1)
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date) resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test with non-existent company ID # test with non-existent company ID
assert status.code == 0 assert status.code == 0
date = Datetime(2022, 6, 1) date = Datetime(2022, 6, 1)
company_ids = [1000024] company_id = 1000024
resp, status = requests_.get_sales_prognosis_data(session, company_ids, date) resp, status = requests_.get_sales_prognosis_data(session, company_id, date)
# TODO check if this behaviour is still considered "successful"
assert status.code == 0 assert status.code == 0
assert len(resp.daten) == 0 assert len(resp.daten) == 0
# test without date # test without date
company_ids = [1024] company_id = 1024
resp, status = requests_.get_sales_prognosis_data(session, company_ids, None) resp, status = requests_.get_sales_prognosis_data(session, company_id, None)
assert status.code == 0 assert status.code == 0
assert len(resp.daten) > 0 assert len(resp.daten) > 0
# test without filters # test without filters
@ -51,11 +52,12 @@ def test_get_sales_prognosis_data_Success(session):
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_NoAuth(session, mock_get): def test_get_sales_prognosis_data_FailLogin(session, mock_get):
code = 401 session.login()
code = 500
json = { json = {
"message": "ServerError", "message": "ServerError",
"code": "TestFailAuth", "code": "TestExternalServerError",
"hints": "TestCase", "hints": "TestCase",
} }
mock_get.return_value.status_code = code mock_get.return_value.status_code = code
@ -72,36 +74,6 @@ def test_get_sales_prognosis_data_NoAuth(session, mock_get):
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailLogin(session, mock_get, mock_put):
code = 401
json = {
"message": "ServerError",
"code": "TestFailAuth",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
code_put = 500
json_put = {
"message": "ServerError",
"code": "TestUnknownError",
"hints": "TestCase",
}
mock_put.return_value.status_code = code_put
mock_put.return_value.json.return_value = json_put
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code_put
assert status.api_server_error.message == json_put["message"]
assert status.api_server_error.code == json_put["code"]
assert status.api_server_error.hints == json_put["hints"]
@pytest.mark.api_con_required @pytest.mark.api_con_required
def test_get_sales_prognosis_data_FailApiServer(session, mock_get): def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
code = 405 code = 405
@ -122,21 +94,3 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
assert status.api_server_error.message == json["message"] assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"] assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 1
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 2

View File

@ -3,16 +3,12 @@ from __future__ import annotations
import json import json
import tomllib import tomllib
from pathlib import Path from pathlib import Path
from typing import cast from typing import Any, cast
from unittest.mock import patch
import pandas as pd import pandas as pd
import pytest import pytest
import tomli_w
import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse from delta_barth.api.requests import SalesPrognosisResponse
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -34,26 +30,38 @@ def api_base_url(credentials) -> str:
return credentials["base_url"] return credentials["base_url"]
@pytest.fixture(scope="session") # TODO: maybe include in main package depending if needed in future
def pth_dummy_cfg() -> Path: # TODO check deletion
pwd = Path.cwd() # def _cvt_str_float(value: str) -> float:
assert "barth" in pwd.parent.name.lower(), "not in project root directory" # import locale
data_pth = pwd / "./tests/_test_data/dopt-cfg.toml"
assert data_pth.exists(), "file to dummy CFG not found" # locale.setlocale(locale.LC_NUMERIC, "de_DE.UTF-8")
return data_pth # return locale.atof(value)
@pytest.fixture(scope="function") # def _cvt_str_ts(value: str) -> Any:
def pth_cfg(pth_dummy_cfg, tmp_path) -> Path: # date = value.split("_")[0]
with open(pth_dummy_cfg, "rb") as file:
cfg_data = tomllib.load(file)
target = tmp_path / "dummy_cfg.toml" # return pd.to_datetime(date, format="%Y%m%d", errors="coerce")
target.touch()
with open(target, "wb") as file:
tomli_w.dump(cfg_data, file)
return target
# @pytest.fixture(scope="session")
# def sales_data_db_export() -> pd.DataFrame:
# pwd = Path.cwd()
# assert "barth" in pwd.parent.name.lower(), "not in project root directory"
# data_pth = pwd / "./tests/_test_data/swm_f_umsatz_fakt.csv"
# assert data_pth.exists(), "file to sales data not found"
# data = pd.read_csv(data_pth, sep="\t")
# data["betrag"] = data["betrag"].apply(_cvt_str_float)
# data["buchungs_datum"] = data["buchungs_datum"].apply(_cvt_str_ts)
# data = data.dropna(
# how="any",
# subset=["firma_refid", "beleg_typ", "buchungs_datum", "betrag"],
# ignore_index=True,
# )
# data["buchungs_datum"] = pd.to_datetime(data["buchungs_datum"])
# return data
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -93,32 +101,3 @@ def exmpl_api_sales_prognosis_output() -> pd.DataFrame:
assert data_pth.exists(), "file to API sales data not found" assert data_pth.exists(), "file to API sales data not found"
return pd.read_pickle(data_pth) return pd.read_pickle(data_pth)
# ** sessions
@pytest.fixture(scope="function")
def session(credentials, api_base_url, tmp_path) -> delta_barth.session.Session:
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
session.set_data_path(str(tmp_path))
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
session.setup()
return session
@pytest.fixture
def mock_put():
with patch("requests.put") as mock:
yield mock
@pytest.fixture(scope="function")
def mock_get():
with patch("requests.get") as mock:
yield mock

View File

@ -1,40 +0,0 @@
import tomllib
import tomli_w
from delta_barth import config
def test_CfgLoader_Init(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
assert loader.path == pth_cfg
assert loader._cfg is None
def test_CfgLoader_Get(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
def test_CfgLoader_Reload(pth_cfg):
loader = config.LazyCfgLoader(pth_cfg)
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 28
# modify config and reload
with open(pth_cfg, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(pth_cfg, "wb") as file:
tomli_w.dump(cfg_data, file)
assert parsed_cfg.forecast.threshold_month_data_points == 28
loader.reload()
parsed_cfg = loader.get()
assert isinstance(parsed_cfg, config.Config)
assert parsed_cfg.forecast.threshold_month_data_points == 30

View File

@ -1,11 +0,0 @@
import sqlalchemy as sql
from delta_barth import databases as db
def test_get_engine(tmp_path):
db_path = tmp_path / "test_db.db"
engine = db.get_engine(db_path)
assert isinstance(engine, sql.Engine)
assert "sqlite" in str(engine.url)
assert db_path.parent.name in str(engine.url)

View File

@ -1,49 +0,0 @@
import importlib
import sys
from unittest.mock import patch
import pytest
import delta_barth.constants
from delta_barth import _env
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "test123456")
def test_prepare_env_NoRuntimeFolder(tmp_path):
ret = _env.prepare_env(tmp_path)
assert ret is None
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_FailNoInterpreter(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
with pytest.raises(FileNotFoundError):
_ = _env.prepare_env(mocked_lib_pth)
@patch("delta_barth._env.PY_RUNTIME_FOLDER", "base")
def test_prepare_env_Success(tmp_path_factory):
mocked_lib_pth = tmp_path_factory.mktemp("path") / "to/base/folder/lib/"
mocked_lib_pth.mkdir(parents=True, exist_ok=True)
rt_path = mocked_lib_pth.parents[1]
mocked_interpreter = rt_path / "python.exe"
mocked_interpreter.touch()
assert mocked_interpreter.exists()
ret = _env.prepare_env(mocked_lib_pth)
assert ret == rt_path
# sys attributes
executable = getattr(sys, "executable")
assert executable == str(mocked_interpreter)
base_executable = getattr(sys, "_base_executable")
assert base_executable == str(mocked_interpreter)
class MockPath:
def __init__(self, *args, **kwargs):
self.parent = mocked_lib_pth
with patch("pathlib.Path", MockPath):
(mocked_lib_pth / "_dummy_data").mkdir(exist_ok=True)
importlib.reload(delta_barth.constants)
assert delta_barth.constants.DEPLOYMENT_STATUS
assert delta_barth.constants.RUNTIME_PATH == rt_path

View File

@ -1,65 +1,22 @@
import importlib
import json import json
from datetime import datetime as Datetime
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql
from delta_barth import databases as db import delta_barth.pipelines
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics_Success(session): @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
pipe_name = "test_pipe" def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
t_start = 20_000_000_000 with patch(
t_end = 30_000_000_000 "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock:
with patch("delta_barth.pipelines.SESSION", session): mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
metrics = pl._write_performance_metrics( importlib.reload(delta_barth.pipelines)
pipeline_name=pipe_name, json_export = pl.pipeline_sales_forecast(None, None)
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
def test_write_performance_metrics_FailStartingTime(session):
pipe_name = "test_pipe"
t_start = 30_000_000_000
t_end = 20_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
with pytest.raises(ValueError):
_ = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monkeypatch):
date = Datetime(2023, 8, 15)
company_ids = [5661, 1027, 1024]
with (
patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data",
) as get_mock,
patch("delta_barth.pipelines.SESSION", session),
patch("delta_barth.analysis.forecast.SESSION", session) as sess_mock,
):
get_mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
sess_mock.cfg.forecast.threshold_month_data_points = 1
json_export = pl.pipeline_sales_forecast(company_ids, date)
assert isinstance(json_export, str) assert isinstance(json_export, str)
parsed_resp = json.loads(json_export) parsed_resp = json.loads(json_export)
@ -70,17 +27,9 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session, monke
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1] def test_sales_prognosis_pipeline_dummy():
assert metrics.pipeline_name == "sales_forecast" json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert metrics.execution_duration > 0
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
parsed_resp = json.loads(json_export) parsed_resp = json.loads(json_export)
@ -94,10 +43,3 @@ def test_sales_prognosis_pipeline_dummy(session):
assert entry["vorhersage"] == pytest.approx(47261.058594) assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0

View File

@ -1,343 +0,0 @@
import tomllib
from pathlib import Path
from unittest.mock import patch
import pytest
import tomli_w
import delta_barth.config
import delta_barth.session
from delta_barth import logging
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
)
from delta_barth.logging import LOG_FILENAME
def test_validate_path_Success():
str_pth = str(Path.cwd())
path = delta_barth.session.validate_path(str_pth)
assert path.name == Path.cwd().name
def test_validate_path_FailNotExisting():
str_pth = str(Path.cwd() / "test")
with pytest.raises(FileNotFoundError, match=r"seems not to exist"):
_ = delta_barth.session.validate_path(str_pth)
def test_validate_path_FailNoDirectory(tmp_path):
file = tmp_path / "test.txt"
file.write_text("test", encoding="utf-8")
str_pth = str(file)
with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"):
_ = delta_barth.session.validate_path(str_pth)
def test_session_set_DataPath(tmp_path):
str_path = str(tmp_path)
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
assert session._data_path is None
session.set_data_path(str_path)
assert session._data_path is not None
assert isinstance(session.data_path, Path)
def test_session_setup_db_management(tmp_path):
str_path = str(tmp_path)
foldername: str = "data_test"
target_db_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, db_folder=foldername)
session.set_data_path(str_path)
db_path = session.db_path
assert db_path.parent.exists()
assert db_path.parent == target_db_dir
assert not db_path.exists()
session.setup()
db_path2 = session.db_path
assert db_path2 == db_path
assert session._db_engine is not None
assert db_path.exists()
def test_session_setup_config(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
cfg_path2 = session.cfg_path
assert cfg_path2 == cfg_path
assert session._cfg is not None
assert cfg_path.exists()
assert session.cfg.forecast.threshold_month_data_points == 28
@patch("delta_barth.session.CFG_HOT_RELOAD", False)
def test_session_reload_config_NoHotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 28
session.reload_cfg()
reload_cfg = session.cfg
assert isinstance(reload_cfg, delta_barth.config.Config)
assert reload_cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.session.CFG_HOT_RELOAD", True)
def test_session_reload_config_HotReload(tmp_path):
str_path = str(tmp_path)
foldername: str = "cfg_test"
target_cfg_dir = tmp_path / foldername
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS, cfg_folder=foldername)
session.set_data_path(str_path)
cfg_path = session.cfg_path
assert cfg_path.parent.exists()
assert cfg_path.parent == target_cfg_dir
assert not cfg_path.exists()
session.setup()
assert cfg_path.exists()
parsed_cfg = session.cfg
assert isinstance(parsed_cfg, delta_barth.config.Config)
# modify config and reload
with open(cfg_path, "rb") as file:
cfg_data = tomllib.load(file)
cfg_data["forecast"]["threshold_month_data_points"] = 30
with open(cfg_path, "wb") as file:
tomli_w.dump(cfg_data, file)
assert session.cfg.forecast.threshold_month_data_points == 30
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
@patch("delta_barth.logging.LOGGING_TO_STDERR", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
log_dir2 = session.logging_dir
assert log_dir2 == log_dir
assert target_file.exists()
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_disable_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
assert target_file.exists()
# provoke entry
msg = "this is a test"
logging.logger_base.critical(msg)
session.disable_logging()
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg in last_line.lower()
# log new entry which should not be added as logging is disabled
msg = "this is a second test"
logging.logger_base.critical(msg)
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg not in last_line.lower()
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
assert session._base_url is not None
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert session.session_token is None
assert not session.logged_in
@pytest.mark.api_con_required
def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url):
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
# prepare login
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
session.login()
assert session._base_url is not None
assert session.logged_in
# reset base URL
session.set_base_url(api_base_url)
assert session._base_url is not None
assert not session.logged_in
assert session.session_token is None
# reset credentials
session.login()
assert session.logged_in
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert not session.logged_in
assert session.session_token is None
@pytest.mark.api_con_required
def test_login_logout_Success(session, credentials):
assert not session.logged_in
resp, status = session.login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert resp is None
assert status.code == 0
assert session.session_token is None
assert "DelecoToken" not in session.headers
session.set_credentials(
username=credentials["user"],
password="WRONG_PASSWORD",
database=credentials["db"],
mandant=credentials["mandant"],
)
resp, status = session.login()
assert resp is not None
assert status.code == DEFAULT_API_ERR_CODE
assert status.api_server_error is not None
assert status.api_server_error.status_code == 409
assert status.api_server_error.message == "Nutzer oder Passwort falsch."
def test_login_logout_FailApiServer(session, mock_put):
code = 401
json = {
"message": "GenericError",
"code": "TestLogin",
"hints": "TestCase",
}
mock_put.return_value.status_code = code
mock_put.return_value.json.return_value = json
resp, status = session.login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
resp, status = session.logout()
assert resp is None
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
@pytest.mark.api_con_required
def test_relogin_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.relogin()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_relogin_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
old_token = session.session_token
assert old_token is not None
resp, status = session.relogin()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
assert session.session_token != old_token
resp, status = session.logout()
assert status.code == 0