add login-logout functionality

This commit is contained in:
Florian Förster 2025-02-26 13:28:54 +01:00
parent 7affec30ae
commit ffbe63d1a5
5 changed files with 209 additions and 3 deletions

View File

@ -1,12 +1,50 @@
from __future__ import annotations
import re
from typing import Final
from typing import Final, Never
import requests
from pydantic import BaseModel
from requests import Response
from delta_barth.errors import UnspecifiedRequestType
from delta_barth.constants import HTTP_CONTENT_HEADERS, KnownApiErrorCodes
from delta_barth.errors import UnknownApiErrorCode, UnspecifiedRequestType
from delta_barth.types import HttpRequestTypes
LOGIN_ERROR_CODES_KNOWN: Final[frozenset[int]] = frozenset((400, 401, 409, 500))
class DelBarApiError(BaseModel):
status_code: int
message: str
code: str | None
hints: str | None
class LoginRequest(BaseModel):
userName: str
password: str
databaseName: str
mandantName: str
class LoginResponse(BaseModel):
token: str
error: DelBarApiError | None = None
class LogoutResponse(BaseModel):
error: DelBarApiError | None = None
def _raise_for_unknown_error(
resp: Response,
) -> Never:
raise UnknownApiErrorCode(
f"Unknown response code for request. Status Code: {resp.status_code}, "
f"Content: {resp.text}"
)
def _strip_url_components(string: str) -> str:
return re.sub(r"^[ /]+|[ /]+$", "", string)
@ -36,3 +74,66 @@ def ping(
raise UnspecifiedRequestType(f"Request type {method} not defined for endpoint")
return resp
def login(
base_url: str,
user_name: str,
password: str,
database: str,
mandant: str,
) -> LoginResponse:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(base_url, ROUTE)
login_req = LoginRequest(
userName=user_name,
password=password,
databaseName=database,
mandantName=mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=HTTP_CONTENT_HEADERS.as_dict(), # type: ignore
)
response: LoginResponse
if resp.status_code == 200:
# success
response = LoginResponse(**resp.json())
HTTP_CONTENT_HEADERS.add_session_token(response.token)
elif resp.status_code in KnownApiErrorCodes.LOGIN.value:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
response = LoginResponse(token="", error=err)
else: # pragma: no cover
_raise_for_unknown_error(resp)
return response
def logout(
base_url: str,
) -> LogoutResponse:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(base_url, ROUTE)
resp = requests.put(
URL,
headers=HTTP_CONTENT_HEADERS.as_dict(), # type: ignore
)
response: LogoutResponse
if resp.status_code == 200:
# success
response = LogoutResponse()
HTTP_CONTENT_HEADERS.remove_session_token()
elif resp.status_code in KnownApiErrorCodes.LOGOUT.value:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
response = LogoutResponse(error=err)
else: # pragma: no cover
_raise_for_unknown_error(resp)
return response

View File

@ -1 +1,18 @@
import enum
from typing import Final
from delta_barth.types import CurrentContentHeaders, HttpContentHeaders
HTTP_BASE_CONTENT_HEADERS: Final[HttpContentHeaders] = {
"Content-type": "application/json",
"Accept": "application/json",
}
HTTP_CONTENT_HEADERS: Final[CurrentContentHeaders] = CurrentContentHeaders(
HTTP_BASE_CONTENT_HEADERS
)
class KnownApiErrorCodes(enum.Enum):
LOGIN = frozenset((400, 401, 409, 500))
LOGOUT = frozenset((400, 401, 409, 500))

View File

@ -1,2 +1,6 @@
class UnspecifiedRequestType(Exception):
"""exception raised if for a given API endpoint a not defined operation is requested"""
class UnknownApiErrorCode(Exception):
"""exception raised if for a given request a unknown error response code is transmitted"""

View File

@ -1,6 +1,7 @@
import enum
import warnings
from dataclasses import dataclass, field
from typing import TypeAlias
from typing import NotRequired, TypeAlias, TypedDict
import pandas as pd
@ -13,6 +14,59 @@ class HttpRequestTypes(enum.StrEnum):
DELETE = enum.auto()
HttpContentHeaders = TypedDict(
"HttpContentHeaders",
{
"Content-type": str,
"Accept": str,
"DelecoToken": NotRequired[str],
},
)
class CurrentContentHeaders:
def __init__(
self,
base_headers: HttpContentHeaders,
) -> None:
self._headers = base_headers
self._session_token: str | None = None
def __getitem__(self, key: str) -> str:
return self.headers[key]
def __contains__(self, key: str) -> bool:
return key in self.headers
def as_dict(self) -> HttpContentHeaders:
return self.headers
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
def add_session_token(
self,
token: str,
) -> None:
if self.session_token is not None:
warnings.warn(
"Setting new API session token despite it was already set. "
"Overwriting existing token."
)
self._session_token = token
self._headers.update(DelecoToken=token)
def remove_session_token(self) -> None:
if "DelecoToken" in self:
del self._headers["DelecoToken"]
self._session_token = None
# ** forecasts
@dataclass(slots=True)
class CustomerDataSalesForecast:

View File

@ -1,6 +1,7 @@
import pytest
from delta_barth.api import common
from delta_barth.constants import HTTP_CONTENT_HEADERS
from delta_barth.errors import UnspecifiedRequestType
from delta_barth.types import HttpRequestTypes
@ -54,3 +55,32 @@ def test_ping(api_base_url):
with pytest.raises(UnspecifiedRequestType):
resp = common.ping(api_base_url, HttpRequestTypes.POST)
@pytest.mark.api_con_required
def test_login_logout(credentials, api_base_url):
assert HTTP_CONTENT_HEADERS.session_token is None
resp = common.login(
base_url=api_base_url,
user_name=credentials["user"],
password=credentials["pwd"],
database=credentials["db"],
mandant=credentials["mandant"],
)
assert resp.error is None
assert HTTP_CONTENT_HEADERS.session_token is not None
resp = common.logout(
base_url=api_base_url,
)
assert HTTP_CONTENT_HEADERS.session_token is None
assert "DelecoToken" not in HTTP_CONTENT_HEADERS
resp = common.login(
base_url=api_base_url,
user_name=credentials["user"],
password="WRONG_PASSWORD",
database=credentials["db"],
mandant=credentials["mandant"],
)
assert resp.error is not None
assert resp.error.status_code == 409
assert resp.error.message == "Nutzer oder Passwort falsch."