diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py index a3dbe56..03c81b7 100644 --- a/src/delta_barth/analysis/forecast.py +++ b/src/delta_barth/analysis/forecast.py @@ -39,7 +39,7 @@ from delta_barth.types import ( ) if TYPE_CHECKING: - from delta_barth.api.common import Session + from delta_barth.session import Session from delta_barth.types import Status ForecastPipe: TypeAlias = PipeResult[SalesPrognosisResultsExport, SalesForecastStatistics] diff --git a/src/delta_barth/api/common.py b/src/delta_barth/api/common.py index 688a35a..d542f12 100644 --- a/src/delta_barth/api/common.py +++ b/src/delta_barth/api/common.py @@ -1,236 +1,31 @@ from __future__ import annotations -from pathlib import Path -from typing import TYPE_CHECKING, Final +from typing import Final import requests from dopt_basics.io import combine_route from pydantic import BaseModel from requests import Response -import delta_barth.logging from delta_barth.errors import ( - STATUS_HANDLER, UnspecifiedRequestType, ) -from delta_barth.logging import logger_session as logger from delta_barth.types import ( ApiCredentials, - DelBarApiError, HttpRequestTypes, ) -if TYPE_CHECKING: - from delta_barth.types import HttpContentHeaders, Status + +# ** login +class LoginRequest(BaseModel): + userName: str + password: str + databaseName: str + mandantName: str -class Session: - def __init__( - self, - base_headers: HttpContentHeaders, - logging_folder: str = "logs", - ) -> None: - self._data_path: Path | None = None - self._logging_dir: Path | None = None - self._logging_folder = logging_folder - self._creds: ApiCredentials | None = None - self._base_url: str | None = None - self._headers = base_headers - self._session_token: str | None = None - self._logged_in: bool = False - - def setup(self) -> None: - self.setup_logging() - - @property - def data_path(self) -> Path: - assert self._data_path is not None, "accessed data path not set" - return self._data_path - - @property - def logging_dir(self) -> Path: - if self._logging_dir is not None: - return self._logging_dir - - logging_dir = self.data_path / self._logging_folder - if not logging_dir.exists(): - logging_dir.mkdir(parents=False) - self._logging_dir = logging_dir - return self._logging_dir - - def setup_logging(self) -> None: - delta_barth.logging.setup_logging(self.logging_dir) - logger.info("[SESSION] Successfully setup logging") - - @property - def creds(self) -> ApiCredentials: - assert self._creds is not None, "accessed credentials not set" - return self._creds - - def set_data_path( - self, - path: str, - ): - self._data_path = validate_path(path) - - def set_credentials( - self, - username: str, - password: str, - database: str, - mandant: str, - ) -> None: - if self.logged_in: - self.logout() - self._creds = validate_credentials( - username=username, - password=password, - database=database, - mandant=mandant, - ) - - @property - def base_url(self) -> str: - assert self._base_url is not None, "accessed base URL not set" - return self._base_url - - def set_base_url( - self, - base_url: str, - ) -> None: - if self.logged_in: - self.logout() - self._base_url = base_url - - @property - def headers(self) -> HttpContentHeaders: - return self._headers - - @property - def session_token(self) -> str | None: - return self._session_token - - @property - def logged_in(self) -> bool: - return self._logged_in - - def _add_session_token( - self, - token: str, - ) -> None: - assert self.session_token is None, "tried overwriting existing API session token" - self._session_token = token - self._headers.update(DelecoToken=token) - self._logged_in = True - - def _remove_session_token(self) -> None: - assert self.session_token is not None, ( - "tried to delete non-existing API session token" - ) - if "DelecoToken" in self.headers: - del self._headers["DelecoToken"] - self._session_token = None - self._logged_in = False - - def login( - self, - ) -> tuple[LoginResponse, Status]: - ROUTE: Final[str] = "user/login" - URL: Final = combine_route(self.base_url, ROUTE) - - login_req = LoginRequest( - userName=self.creds.username, - password=self.creds.password, - databaseName=self.creds.database, - mandantName=self.creds.mandant, - ) - resp = requests.put( - URL, - login_req.model_dump_json(), - headers=self.headers, # type: ignore - ) - - response: LoginResponse - status: Status - if resp.status_code == 200: - response = LoginResponse(**resp.json()) - status = STATUS_HANDLER.pipe_states.SUCCESS - self._add_session_token(response.token) - else: - response = LoginResponse(token="") - err = DelBarApiError(status_code=resp.status_code, **resp.json()) - status = STATUS_HANDLER.api_error(err) - - return response, status - - def logout( - self, - ) -> tuple[None, Status]: - ROUTE: Final[str] = "user/logout" - URL: Final = combine_route(self.base_url, ROUTE) - - resp = requests.put( - URL, - headers=self.headers, # type: ignore - ) - - response = None - status: Status - if resp.status_code == 200: - status = STATUS_HANDLER.SUCCESS - self._remove_session_token() - else: - err = DelBarApiError(status_code=resp.status_code, **resp.json()) - status = STATUS_HANDLER.api_error(err) - - return response, status - - def assert_login( - self, - ) -> tuple[LoginResponse, Status]: - # check if login token is still valid - # re-login if necessary - if self.session_token is None: - return self.login() - - # use known endpoint which requires a valid token in its header - # evaluate the response to decide if: - # current token is still valid, token is not valid, other errors occurred - ROUTE: Final[str] = "verkauf/umsatzprognosedaten" - URL: Final = combine_route(self.base_url, ROUTE) - params: dict[str, int] = {"FirmaId": 999999} - resp = requests.get( - URL, - params=params, - headers=self.headers, # type: ignore - ) - - response: LoginResponse - status: Status - if resp.status_code == 200: - response = LoginResponse(token=self.session_token) - status = STATUS_HANDLER.SUCCESS - elif resp.status_code == 401: - self._remove_session_token() - response, status = self.login() - else: - response = LoginResponse(token="") - err = DelBarApiError(status_code=resp.status_code, **resp.json()) - status = STATUS_HANDLER.api_error(err) - - return response, status - - -def validate_path( - str_path: str, -) -> Path: - path = Path(str_path).resolve() - if not path.exists(): - raise FileNotFoundError(f"Provided path >{path}< seems not to exist.") - elif not path.is_dir(): - raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.") - - return path +class LoginResponse(BaseModel): + token: str def validate_credentials( @@ -265,15 +60,3 @@ def ping( raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint") return resp - - -# ** login -class LoginRequest(BaseModel): - userName: str - password: str - databaseName: str - mandantName: str - - -class LoginResponse(BaseModel): - token: str diff --git a/src/delta_barth/api/requests.py b/src/delta_barth/api/requests.py index 246bc4f..18fdd4f 100644 --- a/src/delta_barth/api/requests.py +++ b/src/delta_barth/api/requests.py @@ -11,7 +11,7 @@ from delta_barth.errors import STATUS_HANDLER from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status if TYPE_CHECKING: - from delta_barth.api.common import Session + from delta_barth.session import Session # ** sales data diff --git a/src/delta_barth/management.py b/src/delta_barth/management.py index 77a1ffc..359fe12 100644 --- a/src/delta_barth/management.py +++ b/src/delta_barth/management.py @@ -5,8 +5,8 @@ from __future__ import annotations from typing import Final -from delta_barth.api.common import Session from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS +from delta_barth.session import Session SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS) diff --git a/src/delta_barth/session.py b/src/delta_barth/session.py new file mode 100644 index 0000000..941b4c8 --- /dev/null +++ b/src/delta_barth/session.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Final + +import requests +from dopt_basics.io import combine_route + +import delta_barth.logging +from delta_barth.api.common import ( + LoginRequest, + LoginResponse, + validate_credentials, +) +from delta_barth.errors import STATUS_HANDLER +from delta_barth.logging import logger_session as logger +from delta_barth.types import DelBarApiError, Status + +if TYPE_CHECKING: + from delta_barth.types import ApiCredentials, HttpContentHeaders + + +def validate_path( + str_path: str, +) -> Path: + path = Path(str_path).resolve() + if not path.exists(): + raise FileNotFoundError(f"Provided path >{path}< seems not to exist.") + elif not path.is_dir(): + raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.") + + return path + + +class Session: + def __init__( + self, + base_headers: HttpContentHeaders, + logging_folder: str = "logs", + ) -> None: + self._data_path: Path | None = None + self._logging_dir: Path | None = None + self._logging_folder = logging_folder + self._creds: ApiCredentials | None = None + self._base_url: str | None = None + self._headers = base_headers + self._session_token: str | None = None + self._logged_in: bool = False + + def setup(self) -> None: + self.setup_logging() + + @property + def data_path(self) -> Path: + assert self._data_path is not None, "accessed data path not set" + return self._data_path + + @property + def logging_dir(self) -> Path: + if self._logging_dir is not None: + return self._logging_dir + + logging_dir = self.data_path / self._logging_folder + if not logging_dir.exists(): + logging_dir.mkdir(parents=False) + self._logging_dir = logging_dir + return self._logging_dir + + def setup_logging(self) -> None: + delta_barth.logging.setup_logging(self.logging_dir) + logger.info("[SESSION] Successfully setup logging") + + @property + def creds(self) -> ApiCredentials: + assert self._creds is not None, "accessed credentials not set" + return self._creds + + def set_data_path( + self, + path: str, + ): + self._data_path = validate_path(path) + + def set_credentials( + self, + username: str, + password: str, + database: str, + mandant: str, + ) -> None: + if self.logged_in: + self.logout() + self._creds = validate_credentials( + username=username, + password=password, + database=database, + mandant=mandant, + ) + + @property + def base_url(self) -> str: + assert self._base_url is not None, "accessed base URL not set" + return self._base_url + + def set_base_url( + self, + base_url: str, + ) -> None: + if self.logged_in: + self.logout() + self._base_url = base_url + + @property + def headers(self) -> HttpContentHeaders: + return self._headers + + @property + def session_token(self) -> str | None: + return self._session_token + + @property + def logged_in(self) -> bool: + return self._logged_in + + def _add_session_token( + self, + token: str, + ) -> None: + assert self.session_token is None, "tried overwriting existing API session token" + self._session_token = token + self._headers.update(DelecoToken=token) + self._logged_in = True + + def _remove_session_token(self) -> None: + assert self.session_token is not None, ( + "tried to delete non-existing API session token" + ) + if "DelecoToken" in self.headers: + del self._headers["DelecoToken"] + self._session_token = None + self._logged_in = False + + def login( + self, + ) -> tuple[LoginResponse, Status]: + ROUTE: Final[str] = "user/login" + URL: Final = combine_route(self.base_url, ROUTE) + + login_req = LoginRequest( + userName=self.creds.username, + password=self.creds.password, + databaseName=self.creds.database, + mandantName=self.creds.mandant, + ) + resp = requests.put( + URL, + login_req.model_dump_json(), + headers=self.headers, # type: ignore + ) + + response: LoginResponse + status: Status + if resp.status_code == 200: + response = LoginResponse(**resp.json()) + status = STATUS_HANDLER.pipe_states.SUCCESS + self._add_session_token(response.token) + else: + response = LoginResponse(token="") + err = DelBarApiError(status_code=resp.status_code, **resp.json()) + status = STATUS_HANDLER.api_error(err) + + return response, status + + def logout( + self, + ) -> tuple[None, Status]: + ROUTE: Final[str] = "user/logout" + URL: Final = combine_route(self.base_url, ROUTE) + + resp = requests.put( + URL, + headers=self.headers, # type: ignore + ) + + response = None + status: Status + if resp.status_code == 200: + status = STATUS_HANDLER.SUCCESS + self._remove_session_token() + else: + err = DelBarApiError(status_code=resp.status_code, **resp.json()) + status = STATUS_HANDLER.api_error(err) + + return response, status + + def assert_login( + self, + ) -> tuple[LoginResponse, Status]: + # check if login token is still valid + # re-login if necessary + if self.session_token is None: + return self.login() + + # use known endpoint which requires a valid token in its header + # evaluate the response to decide if: + # current token is still valid, token is not valid, other errors occurred + ROUTE: Final[str] = "verkauf/umsatzprognosedaten" + URL: Final = combine_route(self.base_url, ROUTE) + params: dict[str, int] = {"FirmaId": 999999} + resp = requests.get( + URL, + params=params, + headers=self.headers, # type: ignore + ) + + response: LoginResponse + status: Status + if resp.status_code == 200: + response = LoginResponse(token=self.session_token) + status = STATUS_HANDLER.SUCCESS + elif resp.status_code == 401: + self._remove_session_token() + response, status = self.login() + else: + response = LoginResponse(token="") + err = DelBarApiError(status_code=resp.status_code, **resp.json()) + status = STATUS_HANDLER.api_error(err) + + return response, status diff --git a/tests/api/conftest.py b/tests/api/conftest.py deleted file mode 100644 index 494a6b4..0000000 --- a/tests/api/conftest.py +++ /dev/null @@ -1,32 +0,0 @@ -from unittest.mock import patch - -import pytest - -from delta_barth.api import common -from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS - - -@pytest.fixture(scope="function") -def session(credentials, api_base_url) -> common.Session: - session = common.Session(HTTP_BASE_CONTENT_HEADERS) - session.set_base_url(api_base_url) - session.set_credentials( - username=credentials["user"], - password=credentials["pwd"], - database=credentials["db"], - mandant=credentials["mandant"], - ) - - return session - - -@pytest.fixture -def mock_put(): - with patch("requests.put") as mock: - yield mock - - -@pytest.fixture -def mock_get(): - with patch("requests.get") as mock: - yield mock diff --git a/tests/api/test_common.py b/tests/api/test_common.py index f48f3cb..7e166b1 100644 --- a/tests/api/test_common.py +++ b/tests/api/test_common.py @@ -1,72 +1,13 @@ -from pathlib import Path -from unittest.mock import patch - import pytest from pydantic import ValidationError from delta_barth.api import common -from delta_barth.constants import ( - DEFAULT_API_ERR_CODE, - HTTP_BASE_CONTENT_HEADERS, - LOG_FILENAME, -) from delta_barth.errors import ( UnspecifiedRequestType, ) from delta_barth.types import HttpRequestTypes -def test_validate_path_Success(): - str_pth = str(Path.cwd()) - path = common.validate_path(str_pth) - assert path.name == Path.cwd().name - - -def test_validate_path_FailNotExisting(): - str_pth = str(Path.cwd() / "test") - with pytest.raises(FileNotFoundError, match=r"seems not to exist"): - _ = common.validate_path(str_pth) - - -def test_validate_path_FailNoDirectory(tmp_path): - file = tmp_path / "test.txt" - file.write_text("test", encoding="utf-8") - - str_pth = str(file) - with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"): - _ = common.validate_path(str_pth) - - -def test_session_set_DataPath(tmp_path): - str_path = str(tmp_path) - session = common.Session(HTTP_BASE_CONTENT_HEADERS) - - assert session._data_path is None - - session.set_data_path(str_path) - assert session._data_path is not None - assert isinstance(session.data_path, Path) - - -@patch("delta_barth.logging.ENABLE_LOGGING", True) -@patch("delta_barth.logging.LOGGING_TO_FILE", True) -def test_session_setup_logging(tmp_path): - str_path = str(tmp_path) - foldername: str = "logging_test" - target_log_dir = tmp_path / foldername - - session = common.Session(HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername) - session.set_data_path(str_path) - log_dir = session.logging_dir - assert log_dir.exists() - assert log_dir == target_log_dir - # write file - target_file = target_log_dir / LOG_FILENAME - assert not target_file.exists() - session.setup() # calls setup code for logging - assert target_file.exists() - - def test_validate_creds(credentials): creds = common.validate_credentials( username=credentials["user"], @@ -110,204 +51,3 @@ def test_ping(api_base_url): with pytest.raises(UnspecifiedRequestType): resp = common.ping(api_base_url, HttpRequestTypes.POST) - - -def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url): - session = common.Session(HTTP_BASE_CONTENT_HEADERS) - - assert session.session_token is None - assert session._creds is None - assert session._base_url is None - - session.set_base_url(api_base_url) - assert session._base_url is not None - session.set_credentials( - username=credentials["user"], - password=credentials["pwd"], - database=credentials["db"], - mandant=credentials["mandant"], - ) - assert session._creds is not None - - assert session.session_token is None - assert not session.logged_in - - -@pytest.mark.api_con_required -def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url): - session = common.Session(HTTP_BASE_CONTENT_HEADERS) - # prepare login - assert session.session_token is None - assert session._creds is None - assert session._base_url is None - session.set_base_url(api_base_url) - session.set_credentials( - username=credentials["user"], - password=credentials["pwd"], - database=credentials["db"], - mandant=credentials["mandant"], - ) - session.login() - assert session._base_url is not None - assert session.logged_in - # reset base URL - session.set_base_url(api_base_url) - assert session._base_url is not None - assert not session.logged_in - assert session.session_token is None - # reset credentials - session.login() - assert session.logged_in - session.set_credentials( - username=credentials["user"], - password=credentials["pwd"], - database=credentials["db"], - mandant=credentials["mandant"], - ) - assert session._creds is not None - assert not session.logged_in - assert session.session_token is None - - -@pytest.mark.api_con_required -def test_login_logout_Success(session, credentials): - assert not session.logged_in - - resp, status = session.login() - assert resp is not None - assert status.code == 0 - assert session.session_token is not None - resp, status = session.logout() - assert resp is None - assert status.code == 0 - assert session.session_token is None - assert "DelecoToken" not in session.headers - - session.set_credentials( - username=credentials["user"], - password="WRONG_PASSWORD", - database=credentials["db"], - mandant=credentials["mandant"], - ) - resp, status = session.login() - assert resp is not None - assert status.code == DEFAULT_API_ERR_CODE - assert status.api_server_error is not None - assert status.api_server_error.status_code == 409 - assert status.api_server_error.message == "Nutzer oder Passwort falsch." - - -def test_login_logout_FailApiServer(session, mock_put): - code = 401 - json = { - "message": "GenericError", - "code": "TestLogin", - "hints": "TestCase", - } - - mock_put.return_value.status_code = code - mock_put.return_value.json.return_value = json - resp, status = session.login() - assert resp is not None - assert not resp.token - assert status.code == 400 - assert status.api_server_error is not None - assert status.api_server_error.status_code == code - assert status.api_server_error.message == json["message"] - assert status.api_server_error.code == json["code"] - assert status.api_server_error.hints == json["hints"] - resp, status = session.logout() - assert resp is None - assert status.code == 400 - assert status.api_server_error is not None - assert status.api_server_error.status_code == code - assert status.api_server_error.message == json["message"] - assert status.api_server_error.code == json["code"] - assert status.api_server_error.hints == json["hints"] - - -@pytest.mark.api_con_required -def test_assert_login_SuccessLoggedOut(session): - assert session.session_token is None - assert session._creds is not None - # test logged out state - resp, status = session.assert_login() - assert resp is not None - assert status.code == 0 - assert session.session_token is not None - resp, status = session.logout() - assert status.code == 0 - - -@pytest.mark.api_con_required -def test_assert_login_SuccessStillLoggedIn(session): - assert session.session_token is None - assert session._creds is not None - resp, status = session.login() - resp, status = session.assert_login() - assert resp is not None - assert status.code == 0 - assert session.session_token is not None - resp, status = session.logout() - assert status.code == 0 - - -@pytest.mark.api_con_required -def test_assert_login_ReloginNoValidAuth(session, mock_get): - code = 401 - json = { - "message": "AuthentificationError", - "code": "TestAssertLoginAfter", - "hints": "TestCase", - } - mock_get.return_value.status_code = code - mock_get.return_value.json.return_value = json - - resp, status = session.login() - - resp, status = session.assert_login() - assert resp is not None - assert status.code == 0 - assert session.session_token is not None - resp, status = session.logout() - assert status.code == 0 - - -@pytest.mark.api_con_required -def test_assert_login_ReloginWrongToken(session): - # triggers code 401 - assert session.session_token is None - assert session._creds is not None - _, status = session.login() - assert status.code == 0 - session._session_token = "WRONGTOKEN" - resp, status = session.assert_login() - assert resp is not None - assert status.code == 0 - assert session.session_token is not None - resp, status = session.logout() - assert status.code == 0 - - -@pytest.mark.api_con_required -def test_assert_login_FailApiServer(session, mock_get): - code = 500 - json = { - "message": "ServerError", - "code": "TestExternalServerError", - "hints": "TestCase", - } - mock_get.return_value.status_code = code - mock_get.return_value.json.return_value = json - - resp, status = session.login() - - resp, status = session.assert_login() - assert resp is not None - assert not resp.token - assert status.code == 400 - assert status.api_server_error is not None - assert status.api_server_error.status_code == code - assert status.api_server_error.message == json["message"] - assert status.api_server_error.code == json["code"] - assert status.api_server_error.hints == json["hints"] diff --git a/tests/conftest.py b/tests/conftest.py index 7b3358e..f399f4e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,11 +4,14 @@ import json import tomllib from pathlib import Path from typing import Any, cast +from unittest.mock import patch import pandas as pd import pytest +import delta_barth.session from delta_barth.api.requests import SalesPrognosisResponse +from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS @pytest.fixture(scope="session") @@ -30,40 +33,6 @@ def api_base_url(credentials) -> str: return credentials["base_url"] -# TODO: maybe include in main package depending if needed in future -# TODO check deletion -# def _cvt_str_float(value: str) -> float: -# import locale - -# locale.setlocale(locale.LC_NUMERIC, "de_DE.UTF-8") -# return locale.atof(value) - - -# def _cvt_str_ts(value: str) -> Any: -# date = value.split("_")[0] - -# return pd.to_datetime(date, format="%Y%m%d", errors="coerce") - - -# @pytest.fixture(scope="session") -# def sales_data_db_export() -> pd.DataFrame: -# pwd = Path.cwd() -# assert "barth" in pwd.parent.name.lower(), "not in project root directory" -# data_pth = pwd / "./tests/_test_data/swm_f_umsatz_fakt.csv" -# assert data_pth.exists(), "file to sales data not found" -# data = pd.read_csv(data_pth, sep="\t") -# data["betrag"] = data["betrag"].apply(_cvt_str_float) -# data["buchungs_datum"] = data["buchungs_datum"].apply(_cvt_str_ts) -# data = data.dropna( -# how="any", -# subset=["firma_refid", "beleg_typ", "buchungs_datum", "betrag"], -# ignore_index=True, -# ) -# data["buchungs_datum"] = pd.to_datetime(data["buchungs_datum"]) - -# return data - - @pytest.fixture(scope="session") def sales_data_real() -> pd.DataFrame: pwd = Path.cwd() @@ -101,3 +70,30 @@ def exmpl_api_sales_prognosis_output() -> pd.DataFrame: assert data_pth.exists(), "file to API sales data not found" return pd.read_pickle(data_pth) + + +# ** sessions +@pytest.fixture(scope="function") +def session(credentials, api_base_url) -> delta_barth.session.Session: + session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS) + session.set_base_url(api_base_url) + session.set_credentials( + username=credentials["user"], + password=credentials["pwd"], + database=credentials["db"], + mandant=credentials["mandant"], + ) + + return session + + +@pytest.fixture +def mock_put(): + with patch("requests.put") as mock: + yield mock + + +@pytest.fixture +def mock_get(): + with patch("requests.get") as mock: + yield mock diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..59506f2 --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,265 @@ +from pathlib import Path +from unittest.mock import patch + +import pytest + +import delta_barth.session +from delta_barth.constants import ( + DEFAULT_API_ERR_CODE, + HTTP_BASE_CONTENT_HEADERS, + LOG_FILENAME, +) + + +def test_validate_path_Success(): + str_pth = str(Path.cwd()) + path = delta_barth.session.validate_path(str_pth) + assert path.name == Path.cwd().name + + +def test_validate_path_FailNotExisting(): + str_pth = str(Path.cwd() / "test") + with pytest.raises(FileNotFoundError, match=r"seems not to exist"): + _ = delta_barth.session.validate_path(str_pth) + + +def test_validate_path_FailNoDirectory(tmp_path): + file = tmp_path / "test.txt" + file.write_text("test", encoding="utf-8") + + str_pth = str(file) + with pytest.raises(FileNotFoundError, match=r"seems not to be a directory"): + _ = delta_barth.session.validate_path(str_pth) + + +def test_session_set_DataPath(tmp_path): + str_path = str(tmp_path) + session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS) + + assert session._data_path is None + + session.set_data_path(str_path) + assert session._data_path is not None + assert isinstance(session.data_path, Path) + + +@patch("delta_barth.logging.ENABLE_LOGGING", True) +@patch("delta_barth.logging.LOGGING_TO_FILE", True) +def test_session_setup_logging(tmp_path): + str_path = str(tmp_path) + foldername: str = "logging_test" + target_log_dir = tmp_path / foldername + + session = delta_barth.session.Session( + HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername + ) + session.set_data_path(str_path) + log_dir = session.logging_dir + assert log_dir.exists() + assert log_dir == target_log_dir + # write file + target_file = target_log_dir / LOG_FILENAME + assert not target_file.exists() + session.setup() # calls setup code for logging + assert target_file.exists() + + +def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url): + session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS) + + assert session.session_token is None + assert session._creds is None + assert session._base_url is None + + session.set_base_url(api_base_url) + assert session._base_url is not None + session.set_credentials( + username=credentials["user"], + password=credentials["pwd"], + database=credentials["db"], + mandant=credentials["mandant"], + ) + assert session._creds is not None + + assert session.session_token is None + assert not session.logged_in + + +@pytest.mark.api_con_required +def test_session_set_ApiInfo_LoggedIn(credentials, api_base_url): + session = delta_barth.session.Session(HTTP_BASE_CONTENT_HEADERS) + # prepare login + assert session.session_token is None + assert session._creds is None + assert session._base_url is None + session.set_base_url(api_base_url) + session.set_credentials( + username=credentials["user"], + password=credentials["pwd"], + database=credentials["db"], + mandant=credentials["mandant"], + ) + session.login() + assert session._base_url is not None + assert session.logged_in + # reset base URL + session.set_base_url(api_base_url) + assert session._base_url is not None + assert not session.logged_in + assert session.session_token is None + # reset credentials + session.login() + assert session.logged_in + session.set_credentials( + username=credentials["user"], + password=credentials["pwd"], + database=credentials["db"], + mandant=credentials["mandant"], + ) + assert session._creds is not None + assert not session.logged_in + assert session.session_token is None + + +@pytest.mark.api_con_required +def test_login_logout_Success(session, credentials): + assert not session.logged_in + + resp, status = session.login() + assert resp is not None + assert status.code == 0 + assert session.session_token is not None + resp, status = session.logout() + assert resp is None + assert status.code == 0 + assert session.session_token is None + assert "DelecoToken" not in session.headers + + session.set_credentials( + username=credentials["user"], + password="WRONG_PASSWORD", + database=credentials["db"], + mandant=credentials["mandant"], + ) + resp, status = session.login() + assert resp is not None + assert status.code == DEFAULT_API_ERR_CODE + assert status.api_server_error is not None + assert status.api_server_error.status_code == 409 + assert status.api_server_error.message == "Nutzer oder Passwort falsch." + + +def test_login_logout_FailApiServer(session, mock_put): + code = 401 + json = { + "message": "GenericError", + "code": "TestLogin", + "hints": "TestCase", + } + + mock_put.return_value.status_code = code + mock_put.return_value.json.return_value = json + resp, status = session.login() + assert resp is not None + assert not resp.token + assert status.code == 400 + assert status.api_server_error is not None + assert status.api_server_error.status_code == code + assert status.api_server_error.message == json["message"] + assert status.api_server_error.code == json["code"] + assert status.api_server_error.hints == json["hints"] + resp, status = session.logout() + assert resp is None + assert status.code == 400 + assert status.api_server_error is not None + assert status.api_server_error.status_code == code + assert status.api_server_error.message == json["message"] + assert status.api_server_error.code == json["code"] + assert status.api_server_error.hints == json["hints"] + + +@pytest.mark.api_con_required +def test_assert_login_SuccessLoggedOut(session): + assert session.session_token is None + assert session._creds is not None + # test logged out state + resp, status = session.assert_login() + assert resp is not None + assert status.code == 0 + assert session.session_token is not None + resp, status = session.logout() + assert status.code == 0 + + +@pytest.mark.api_con_required +def test_assert_login_SuccessStillLoggedIn(session): + assert session.session_token is None + assert session._creds is not None + resp, status = session.login() + resp, status = session.assert_login() + assert resp is not None + assert status.code == 0 + assert session.session_token is not None + resp, status = session.logout() + assert status.code == 0 + + +@pytest.mark.api_con_required +def test_assert_login_ReloginNoValidAuth(session, mock_get): + code = 401 + json = { + "message": "AuthentificationError", + "code": "TestAssertLoginAfter", + "hints": "TestCase", + } + mock_get.return_value.status_code = code + mock_get.return_value.json.return_value = json + + resp, status = session.login() + + resp, status = session.assert_login() + assert resp is not None + assert status.code == 0 + assert session.session_token is not None + resp, status = session.logout() + assert status.code == 0 + + +@pytest.mark.api_con_required +def test_assert_login_ReloginWrongToken(session): + # triggers code 401 + assert session.session_token is None + assert session._creds is not None + _, status = session.login() + assert status.code == 0 + session._session_token = "WRONGTOKEN" + resp, status = session.assert_login() + assert resp is not None + assert status.code == 0 + assert session.session_token is not None + resp, status = session.logout() + assert status.code == 0 + + +@pytest.mark.api_con_required +def test_assert_login_FailApiServer(session, mock_get): + code = 500 + json = { + "message": "ServerError", + "code": "TestExternalServerError", + "hints": "TestCase", + } + mock_get.return_value.status_code = code + mock_get.return_value.json.return_value = json + + resp, status = session.login() + + resp, status = session.assert_login() + assert resp is not None + assert not resp.token + assert status.code == 400 + assert status.api_server_error is not None + assert status.api_server_error.status_code == code + assert status.api_server_error.message == json["message"] + assert status.api_server_error.code == json["code"] + assert status.api_server_error.hints == json["hints"]