from __future__ import annotations from pathlib import Path from typing import TYPE_CHECKING, Final import requests import sqlalchemy as sql from dopt_basics.io import combine_route import delta_barth.logging from delta_barth import databases as db from delta_barth.api.common import ( LoginRequest, LoginResponse, validate_credentials, ) from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO from delta_barth.errors import STATUS_HANDLER from delta_barth.logging import logger_session as logger from delta_barth.types import DelBarApiError, Status if TYPE_CHECKING: from delta_barth.types import ApiCredentials, HttpContentHeaders def validate_path( str_path: str, ) -> Path: path = Path(str_path).resolve() if not path.exists(): raise FileNotFoundError(f"Provided path >{path}< seems not to exist.") elif not path.is_dir(): raise FileNotFoundError(f"Provided path >{path}< seems not to be a directory.") return path class Session: def __init__( self, base_headers: HttpContentHeaders, db_folder: str = "data", logging_folder: str = "logs", ) -> None: self._setup: bool = False self._data_path: Path | None = None self._db_path: Path | None = None self._db_folder = db_folder self._db_engine: sql.Engine | None = None self._logging_dir: Path | None = None self._logging_folder = logging_folder self._creds: ApiCredentials | None = None self._base_url: str | None = None self._headers = base_headers self._session_token: str | None = None self._logged_in: bool = False def setup(self) -> None: # at this point: no logging configured assert not self._setup, "tried to setup session twice" self._setup_logging() self._setup_db_management() self._setup = True logger.info("[SESSION] Setup procedure successful") @property def data_path(self) -> Path: assert self._data_path is not None, "accessed data path not set" return self._data_path @property def db_engine(self) -> sql.Engine: assert self._db_engine is not None, "accessed database engine not set" return self._db_engine @property def db_path(self) -> Path: if self._db_path is not None and self._setup: return self._db_path db_root = (self.data_path / self._db_folder).resolve() db_path = db_root / "dopt-data.db" if not db_root.exists(): db_root.mkdir(parents=False) self._db_path = db_path return self._db_path def _setup_db_management(self) -> None: self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO) db.metadata.create_all(self._db_engine) logger.info("[SESSION] Successfully setup DB management") @property def logging_dir(self) -> Path: if self._logging_dir is not None and self._setup: return self._logging_dir logging_dir = self.data_path / self._logging_folder if not logging_dir.exists(): logging_dir.mkdir(parents=False) self._logging_dir = logging_dir return self._logging_dir def _setup_logging(self) -> None: delta_barth.logging.setup_logging(self.logging_dir) logger.info("[SESSION] Successfully setup logging") def disable_logging(self) -> None: delta_barth.logging.disable_logging() @property def creds(self) -> ApiCredentials: assert self._creds is not None, "accessed credentials not set" return self._creds def set_data_path( self, path: str, ): self._data_path = validate_path(path) self._setup = False def set_credentials( self, username: str, password: str, database: str, mandant: str, ) -> None: if self.logged_in: self.logout() self._creds = validate_credentials( username=username, password=password, database=database, mandant=mandant, ) @property def base_url(self) -> str: assert self._base_url is not None, "accessed base URL not set" return self._base_url def set_base_url( self, base_url: str, ) -> None: if self.logged_in: self.logout() self._base_url = base_url @property def headers(self) -> HttpContentHeaders: return self._headers @property def session_token(self) -> str | None: return self._session_token @property def logged_in(self) -> bool: return self._logged_in def _add_session_token( self, token: str, ) -> None: assert self.session_token is None, "tried overwriting existing API session token" self._session_token = token self._headers.update(DelecoToken=token) self._logged_in = True def _remove_session_token(self) -> None: assert self.session_token is not None, ( "tried to delete non-existing API session token" ) if "DelecoToken" in self.headers: del self._headers["DelecoToken"] self._session_token = None self._logged_in = False def login( self, ) -> tuple[LoginResponse, Status]: ROUTE: Final[str] = "user/login" URL: Final = combine_route(self.base_url, ROUTE) login_req = LoginRequest( userName=self.creds.username, password=self.creds.password, databaseName=self.creds.database, mandantName=self.creds.mandant, ) empty_response = LoginResponse(token="") try: resp = requests.put( URL, login_req.model_dump_json(), headers=self.headers, # type: ignore timeout=API_CON_TIMEOUT, ) except requests.exceptions.Timeout: # pragma: no cover return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT except requests.exceptions.RequestException: # pragma: no cover return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR response: LoginResponse status: Status if resp.status_code == 200: response = LoginResponse(**resp.json()) status = STATUS_HANDLER.pipe_states.SUCCESS self._add_session_token(response.token) else: response = empty_response err = DelBarApiError(status_code=resp.status_code, **resp.json()) status = STATUS_HANDLER.api_error(err) return response, status def logout( self, ) -> tuple[None, Status]: ROUTE: Final[str] = "user/logout" URL: Final = combine_route(self.base_url, ROUTE) try: resp = requests.put( URL, headers=self.headers, # type: ignore timeout=API_CON_TIMEOUT, ) except requests.exceptions.Timeout: # pragma: no cover return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT except requests.exceptions.RequestException: # pragma: no cover return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR status: Status if resp.status_code == 200: status = STATUS_HANDLER.SUCCESS self._remove_session_token() else: err = DelBarApiError(status_code=resp.status_code, **resp.json()) status = STATUS_HANDLER.api_error(err) return None, status def assert_login( self, ) -> tuple[LoginResponse, Status]: # check if login token is still valid # re-login if necessary if self.session_token is None: return self.login() # use known endpoint which requires a valid token in its header # evaluate the response to decide if: # current token is still valid, token is not valid, other errors occurred ROUTE: Final[str] = "verkauf/umsatzprognosedaten" URL: Final = combine_route(self.base_url, ROUTE) params: dict[str, int] = {"FirmaId": 999999} empty_response = LoginResponse(token="") try: resp = requests.get( URL, params=params, headers=self.headers, # type: ignore timeout=API_CON_TIMEOUT, ) except requests.exceptions.Timeout: # pragma: no cover return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT except requests.exceptions.RequestException: # pragma: no cover return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR response: LoginResponse status: Status if resp.status_code == 200: response = LoginResponse(token=self.session_token) status = STATUS_HANDLER.SUCCESS elif resp.status_code == 401: self._remove_session_token() response, status = self.login() else: response = empty_response err = DelBarApiError(status_code=resp.status_code, **resp.json()) status = STATUS_HANDLER.api_error(err) return response, status