refactoring session management

This commit is contained in:
Florian Förster 2025-03-27 08:47:50 +01:00
parent df09732c56
commit 1f497fe193
9 changed files with 537 additions and 556 deletions

View File

@ -39,7 +39,7 @@ from delta_barth.types import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.api.common import Session from delta_barth.session import Session
from delta_barth.types import Status from delta_barth.types import Status
ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics] ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics]

View File

@ -1,236 +1,31 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from typing import Final
from typing import TYPE_CHECKING, Final
import requests import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel from pydantic import BaseModel
from requests import Response from requests import Response
import delta_barth.logging
from delta_barth.errors import ( from delta_barth.errors import (
STATUS_HANDLER,
UnspecifiedRequestType, UnspecifiedRequestType,
) )
from delta_barth.logging import logger_session as logger
from delta_barth.types import ( from delta_barth.types import (
ApiCredentials, ApiCredentials,
DelBarApiError,
HttpRequestTypes, HttpRequestTypes,
) )
if TYPE_CHECKING:
from delta_barth.types import HttpContentHeaders, Status # ** login
class LoginRequest(BaseModel):
userName: str
password: str
databaseName: str
mandantName: str
class Session: class LoginResponse(BaseModel):
def __init__( token: str
self,
base_headers: HttpContentHeaders,
logging_folder: str = "logs",
) -> None:
self._data_path: Path | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
def setup(self) -> None:
self.setup_logging()
@property
def data_path(self) -> Path:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def logging_dir(self) -> Path:
if self._logging_dir is not None:
return self._logging_dir
logging_dir = self.data_path / self._logging_folder
if not logging_dir.exists():
logging_dir.mkdir(parents=False)
self._logging_dir = logging_dir
return self._logging_dir
def setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging")
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
return self._creds
def set_data_path(
self,
path: str,
):
self._data_path = validate_path(path)
def set_credentials(
self,
username: str,
password: str,
database: str,
mandant: str,
) -> None:
if self.logged_in:
self.logout()
self._creds = validate_credentials(
username=username,
password=password,
database=database,
mandant=mandant,
)
@property
def base_url(self) -> str:
assert self._base_url is not None, "accessed base URL not set"
return self._base_url
def set_base_url(
self,
base_url: str,
) -> None:
if self.logged_in:
self.logout()
self._base_url = base_url
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def _add_session_token(
self,
token: str,
) -> None:
assert self.session_token is None, "tried overwriting existing API session token"
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def _remove_session_token(self) -> None:
assert self.session_token is not None, (
"tried to delete non-existing API session token"
)
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False
def login(
self,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(self.base_url, ROUTE)
login_req = LoginRequest(
userName=self.creds.username,
password=self.creds.password,
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def logout(
self,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put(
URL,
headers=self.headers, # type: ignore
)
response = None
status: Status
if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS
self._remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def assert_login(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def validate_path(
str_path: str,
) -> Path:
path = Path(str_path).resolve()
if not path.exists():
raise FileNotFoundError(f"Provided path >{path}< seems not to exist.")
elif not path.is_dir():
raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.")
return path
def validate_credentials( def validate_credentials(
@ -265,15 +60,3 @@ def ping(
raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint") raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint")
return resp return resp
# ** login
class LoginRequest(BaseModel):
userName: str
password: str
databaseName: str
mandantName: str
class LoginResponse(BaseModel):
token: str

View File

@ -11,7 +11,7 @@ from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
if TYPE_CHECKING: if TYPE_CHECKING:
from delta_barth.api.common import Session from delta_barth.session import Session
# ** sales data # ** sales data

View File

@ -5,8 +5,8 @@ from __future__ import annotations
from typing import Final from typing import Final
from delta_barth.api.common import Session
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.session import Session
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS) SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)

229
src/delta_barth/session.py Normal file
View File

@ -0,0 +1,229 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Final
import requests
from dopt_basics.io import combine_route
import delta_barth.logging
from delta_barth.api.common import (
LoginRequest,
LoginResponse,
validate_credentials,
)
from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING:
from delta_barth.types import ApiCredentials, HttpContentHeaders
def validate_path(
str_path: str,
) -> Path:
path = Path(str_path).resolve()
if not path.exists():
raise FileNotFoundError(f"Provided path >{path}< seems not to exist.")
elif not path.is_dir():
raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.")
return path
class Session:
def __init__(
self,
base_headers: HttpContentHeaders,
logging_folder: str = "logs",
) -> None:
self._data_path: Path | None = None
self._logging_dir: Path | None = None
self._logging_folder = logging_folder
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
def setup(self) -> None:
self.setup_logging()
@property
def data_path(self) -> Path:
assert self._data_path is not None, "accessed data path not set"
return self._data_path
@property
def logging_dir(self) -> Path:
if self._logging_dir is not None:
return self._logging_dir
logging_dir = self.data_path / self._logging_folder
if not logging_dir.exists():
logging_dir.mkdir(parents=False)
self._logging_dir = logging_dir
return self._logging_dir
def setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging")
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
return self._creds
def set_data_path(
self,
path: str,
):
self._data_path = validate_path(path)
def set_credentials(
self,
username: str,
password: str,
database: str,
mandant: str,
) -> None:
if self.logged_in:
self.logout()
self._creds = validate_credentials(
username=username,
password=password,
database=database,
mandant=mandant,
)
@property
def base_url(self) -> str:
assert self._base_url is not None, "accessed base URL not set"
return self._base_url
def set_base_url(
self,
base_url: str,
) -> None:
if self.logged_in:
self.logout()
self._base_url = base_url
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def _add_session_token(
self,
token: str,
) -> None:
assert self.session_token is None, "tried overwriting existing API session token"
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def _remove_session_token(self) -> None:
assert self.session_token is not None, (
"tried to delete non-existing API session token"
)
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False
def login(
self,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(self.base_url, ROUTE)
login_req = LoginRequest(
userName=self.creds.username,
password=self.creds.password,
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def logout(
self,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put(
URL,
headers=self.headers, # type: ignore
)
response = None
status: Status
if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS
self._remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status
def assert_login(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(token=self.session_token)
status = STATUS_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)
return response, status

View File

@ -1,32 +0,0 @@
from unittest.mock import patch
import pytest
from delta_barth.api import common
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
@pytest.fixture(scope="function")
def session(credentials, api_base_url) -> common.Session:
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
return session
@pytest.fixture
def mock_put():
with patch("requests.put") as mock:
yield mock
@pytest.fixture
def mock_get():
with patch("requests.get") as mock:
yield mock

View File

@ -1,72 +1,13 @@
from pathlib import Path
from unittest.mock import patch
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
from delta_barth.api import common from delta_barth.api import common
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
)
from delta_barth.errors import ( from delta_barth.errors import (
UnspecifiedRequestType, UnspecifiedRequestType,
) )
from delta_barth.types import HttpRequestTypes from delta_barth.types import HttpRequestTypes
def test_validate_path_Success():
str_pth = str(Path.cwd())
path = common.validate_path(str_pth)
assert path.name == Path.cwd().name
def test_validate_path_FailNotExisting():
str_pth = str(Path.cwd() / "test")
with pytest.raises(FileNotFoundError, match=r"seems not to exist"):
_ = common.validate_path(str_pth)
def test_validate_path_FailNoDirectory(tmp_path):
file = tmp_path / "test.txt"
file.write_text("test", encoding="utf-8")
str_pth = str(file)
with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"):
_ = common.validate_path(str_pth)
def test_session_set_DataPath(tmp_path):
str_path = str(tmp_path)
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
assert session._data_path is None
session.set_data_path(str_path)
assert session._data_path is not None
assert isinstance(session.data_path, Path)
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = common.Session(HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
assert target_file.exists()
def test_validate_creds(credentials): def test_validate_creds(credentials):
creds = common.validate_credentials( creds = common.validate_credentials(
username=credentials["user"], username=credentials["user"],
@ -110,204 +51,3 @@ def test_ping(api_base_url):
with pytest.raises(UnspecifiedRequestType): with pytest.raises(UnspecifiedRequestType):
resp = common.ping(api_base_url, HttpRequestTypes.POST) resp = common.ping(api_base_url, HttpRequestTypes.POST)
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
assert session._base_url is not None
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert session.session_token is None
assert not session.logged_in
@pytest.mark.api_con_required
def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url):
session = common.Session(HTTP_BASE_CONTENT_HEADERS)
# prepare login
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
session.login()
assert session._base_url is not None
assert session.logged_in
# reset base URL
session.set_base_url(api_base_url)
assert session._base_url is not None
assert not session.logged_in
assert session.session_token is None
# reset credentials
session.login()
assert session.logged_in
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert not session.logged_in
assert session.session_token is None
@pytest.mark.api_con_required
def test_login_logout_Success(session, credentials):
assert not session.logged_in
resp, status = session.login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert resp is None
assert status.code == 0
assert session.session_token is None
assert "DelecoToken" not in session.headers
session.set_credentials(
username=credentials["user"],
password="WRONG_PASSWORD",
database=credentials["db"],
mandant=credentials["mandant"],
)
resp, status = session.login()
assert resp is not None
assert status.code == DEFAULT_API_ERR_CODE
assert status.api_server_error is not None
assert status.api_server_error.status_code == 409
assert status.api_server_error.message == "Nutzer oder Passwort falsch."
def test_login_logout_FailApiServer(session, mock_put):
code = 401
json = {
"message": "GenericError",
"code": "TestLogin",
"hints": "TestCase",
}
mock_put.return_value.status_code = code
mock_put.return_value.json.return_value = json
resp, status = session.login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
resp, status = session.logout()
assert resp is None
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
@pytest.mark.api_con_required
def test_assert_login_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]

View File

@ -4,11 +4,14 @@ import json
import tomllib import tomllib
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import Any, cast
from unittest.mock import patch
import pandas as pd import pandas as pd
import pytest import pytest
import delta_barth.session
from delta_barth.api.requests import SalesPrognosisResponse from delta_barth.api.requests import SalesPrognosisResponse
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -30,40 +33,6 @@ def api_base_url(credentials) -> str:
return credentials["base_url"] return credentials["base_url"]
# TODO: maybe include in main package depending if needed in future
# TODO check deletion
# def _cvt_str_float(value: str) -> float:
# import locale
# locale.setlocale(locale.LC_NUMERIC, "de_DE.UTF-8")
# return locale.atof(value)
# def _cvt_str_ts(value: str) -> Any:
# date = value.split("_")[0]
# return pd.to_datetime(date, format="%Y%m%d", errors="coerce")
# @pytest.fixture(scope="session")
# def sales_data_db_export() -> pd.DataFrame:
# pwd = Path.cwd()
# assert "barth" in pwd.parent.name.lower(), "not in project root directory"
# data_pth = pwd / "./tests/_test_data/swm_f_umsatz_fakt.csv"
# assert data_pth.exists(), "file to sales data not found"
# data = pd.read_csv(data_pth, sep="\t")
# data["betrag"] = data["betrag"].apply(_cvt_str_float)
# data["buchungs_datum"] = data["buchungs_datum"].apply(_cvt_str_ts)
# data = data.dropna(
# how="any",
# subset=["firma_refid", "beleg_typ", "buchungs_datum", "betrag"],
# ignore_index=True,
# )
# data["buchungs_datum"] = pd.to_datetime(data["buchungs_datum"])
# return data
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def sales_data_real() -> pd.DataFrame: def sales_data_real() -> pd.DataFrame:
pwd = Path.cwd() pwd = Path.cwd()
@ -101,3 +70,30 @@ def exmpl_api_sales_prognosis_output() -> pd.DataFrame:
assert data_pth.exists(), "file to API sales data not found" assert data_pth.exists(), "file to API sales data not found"
return pd.read_pickle(data_pth) return pd.read_pickle(data_pth)
# ** sessions
@pytest.fixture(scope="function")
def session(credentials, api_base_url) -> delta_barth.session.Session:
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
return session
@pytest.fixture
def mock_put():
with patch("requests.put") as mock:
yield mock
@pytest.fixture
def mock_get():
with patch("requests.get") as mock:
yield mock

265
tests/test_session.py Normal file
View File

@ -0,0 +1,265 @@
from pathlib import Path
from unittest.mock import patch
import pytest
import delta_barth.session
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
LOG_FILENAME,
)
def test_validate_path_Success():
str_pth = str(Path.cwd())
path = delta_barth.session.validate_path(str_pth)
assert path.name == Path.cwd().name
def test_validate_path_FailNotExisting():
str_pth = str(Path.cwd() / "test")
with pytest.raises(FileNotFoundError, match=r"seems not to exist"):
_ = delta_barth.session.validate_path(str_pth)
def test_validate_path_FailNoDirectory(tmp_path):
file = tmp_path / "test.txt"
file.write_text("test", encoding="utf-8")
str_pth = str(file)
with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"):
_ = delta_barth.session.validate_path(str_pth)
def test_session_set_DataPath(tmp_path):
str_path = str(tmp_path)
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
assert session._data_path is None
session.set_data_path(str_path)
assert session._data_path is not None
assert isinstance(session.data_path, Path)
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_setup_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
assert target_file.exists()
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
assert session._base_url is not None
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert session.session_token is None
assert not session.logged_in
@pytest.mark.api_con_required
def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url):
session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS)
# prepare login
assert session.session_token is None
assert session._creds is None
assert session._base_url is None
session.set_base_url(api_base_url)
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
session.login()
assert session._base_url is not None
assert session.logged_in
# reset base URL
session.set_base_url(api_base_url)
assert session._base_url is not None
assert not session.logged_in
assert session.session_token is None
# reset credentials
session.login()
assert session.logged_in
session.set_credentials(
username=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert session._creds is not None
assert not session.logged_in
assert session.session_token is None
@pytest.mark.api_con_required
def test_login_logout_Success(session, credentials):
assert not session.logged_in
resp, status = session.login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert resp is None
assert status.code == 0
assert session.session_token is None
assert "DelecoToken" not in session.headers
session.set_credentials(
username=credentials["user"],
password="WRONG_PASSWORD",
database=credentials["db"],
mandant=credentials["mandant"],
)
resp, status = session.login()
assert resp is not None
assert status.code == DEFAULT_API_ERR_CODE
assert status.api_server_error is not None
assert status.api_server_error.status_code == 409
assert status.api_server_error.message == "Nutzer oder Passwort falsch."
def test_login_logout_FailApiServer(session, mock_put):
code = 401
json = {
"message": "GenericError",
"code": "TestLogin",
"hints": "TestCase",
}
mock_put.return_value.status_code = code
mock_put.return_value.json.return_value = json
resp, status = session.login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
resp, status = session.logout()
assert resp is None
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]
@pytest.mark.api_con_required
def test_assert_login_SuccessLoggedOut(session):
assert session.session_token is None
assert session._creds is not None
# test logged out state
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_SuccessStillLoggedIn(session):
assert session.session_token is None
assert session._creds is not None
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginNoValidAuth(session, mock_get):
code = 401
json = {
"message": "AuthentificationError",
"code": "TestAssertLoginAfter",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_ReloginWrongToken(session):
# triggers code 401
assert session.session_token is None
assert session._creds is not None
_, status = session.login()
assert status.code == 0
session._session_token = "WRONGTOKEN"
resp, status = session.assert_login()
assert resp is not None
assert status.code == 0
assert session.session_token is not None
resp, status = session.logout()
assert status.code == 0
@pytest.mark.api_con_required
def test_assert_login_FailApiServer(session, mock_get):
code = 500
json = {
"message": "ServerError",
"code": "TestExternalServerError",
"hints": "TestCase",
}
mock_get.return_value.status_code = code
mock_get.return_value.json.return_value = json
resp, status = session.login()
resp, status = session.assert_login()
assert resp is not None
assert not resp.token
assert status.code == 400
assert status.api_server_error is not None
assert status.api_server_error.status_code == code
assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"]