major refactoring

This commit is contained in:
2025-03-07 12:23:59 +01:00
parent a94fd4a936
commit 76f58ecba0
13 changed files with 526 additions and 286 deletions

View File

@@ -1,8 +1,3 @@
from typing import Final
from delta_barth.errors import STATE_HANDLER
from delta_barth.api._session import Session
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.errors import StateHandler
STATE_HANDLER: Final[StateHandler] = StateHandler()
CURRENT_SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
__all__ = ["STATE_HANDLER"]

View File

@@ -8,13 +8,13 @@ import pandas as pd
from sklearn.metrics import mean_squared_error
from xgboost import XGBRegressor
from delta_barth._management import STATE_HANDLER
from delta_barth.analysis import parse
from delta_barth.constants import COL_MAP_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS
from delta_barth.errors import STATE_HANDLER
from delta_barth.types import CustomerDataSalesForecast, DataPipeStates, PipeResult
if TYPE_CHECKING:
from delta_barth.api.common import SalesPrognosisResponse
from delta_barth.api.requests import SalesPrognosisResponse
# TODO check pandera for DataFrame validation

View File

@@ -1,48 +0,0 @@
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from delta_barth.types import HttpContentHeaders
class Session:
def __init__(
self,
base_headers: HttpContentHeaders,
) -> None:
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def add_session_token(
self,
token: str,
) -> None:
if self.session_token is not None:
warnings.warn(
"Setting new API session token despite it was already set. "
"Overwriting existing token."
)
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def remove_session_token(self) -> None:
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False

View File

@@ -1,29 +1,208 @@
from __future__ import annotations
import re
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final
import requests
from pydantic import BaseModel, PositiveInt, SkipValidation
from pydantic import BaseModel
from requests import Response
from delta_barth._management import CURRENT_SESSION, STATE_HANDLER
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.errors import (
ApiConnectionError,
STATE_HANDLER,
UnspecifiedRequestType,
)
from delta_barth.types import DelBarApiError, HttpRequestTypes
from delta_barth.types import (
ApiCredentials,
DelBarApiError,
HttpRequestTypes,
)
if TYPE_CHECKING:
from delta_barth.types import Status
LOGIN_ERROR_CODES_KNOWN: Final[frozenset[int]] = frozenset((400, 401, 409, 500))
from delta_barth.types import HttpContentHeaders, Status
def _assert_login_status() -> None:
if not CURRENT_SESSION.logged_in:
raise ApiConnectionError("Curent session is not logged in")
class Session:
def __init__(
self,
base_headers: HttpContentHeaders,
) -> None:
self._creds: ApiCredentials | None = None
self._base_url: str | None = None
self._headers = base_headers
self._session_token: str | None = None
self._logged_in: bool = False
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
return self._creds
def set_credentials(
self,
user_name: str,
password: str,
database: str,
mandant: str,
) -> None:
self._creds = validate_credentials(
user_name=user_name,
password=password,
database=database,
mandant=mandant,
)
@property
def base_url(self) -> str:
assert self._base_url is not None, "accessed base URL not set"
return self._base_url
def set_base_url(
self,
base_url: str,
) -> None:
self._base_url = base_url
@property
def headers(self) -> HttpContentHeaders:
return self._headers
@property
def session_token(self) -> str | None:
return self._session_token
@property
def logged_in(self) -> bool:
return self._logged_in
def _add_session_token(
self,
token: str,
) -> None:
assert self.session_token is None, "tried overwriting existing API session token"
self._session_token = token
self._headers.update(DelecoToken=token)
self._logged_in = True
def _remove_session_token(self) -> None:
assert self.session_token is not None, (
"tried to delete non-existing API session token"
)
if "DelecoToken" in self.headers:
del self._headers["DelecoToken"]
self._session_token = None
self._logged_in = False
def login(
self,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(self.base_url, ROUTE)
login_req = LoginRequest(
userName=self.creds.user_name,
password=self.creds.password,
databaseName=self.creds.database,
mandantName=self.creds.mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATE_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token)
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
return response, status
def logout(
self,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put(
URL,
headers=self.headers, # type: ignore
)
response = None
status: Status
if resp.status_code == 200:
status = STATE_HANDLER.SUCCESS
self._remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
return response, status
def assert_login(
self,
) -> tuple[LoginResponse, Status]:
# check if login token is still valid
# re-login if necessary
if self.session_token is None:
return self.login()
# use known endpoint which requires a valid token in its header
# evaluate the response to decide if:
# current token is still valid, token is not valid, other errors occurred
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get(
URL,
params=params,
headers=self.headers, # type: ignore
)
response: LoginResponse
status: Status
if resp.status_code == 200:
# TODO use to check for catching unknown exceptions
# response = LoginResponse(**resp.json())
response = LoginResponse(token=self.session_token)
status = STATE_HANDLER.SUCCESS
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else: # pragma: no cover
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
return response, status
def validate_credentials(
user_name: str,
password: str,
database: str,
mandant: str,
) -> ApiCredentials:
return ApiCredentials(
user_name=user_name,
password=password,
database=database,
mandant=mandant,
)
# def _assert_login_status() -> None:
# if not CURRENT_SESSION.logged_in:
# raise ApiConnectionError("Curent session is not logged in")
# assert login:
# doing request to defined end point which
def _strip_url_components(string: str) -> str:
@@ -68,118 +247,61 @@ class LoginResponse(BaseModel):
token: str
def login(
base_url: str,
user_name: str,
password: str,
database: str,
mandant: str,
) -> tuple[LoginResponse, Status]:
ROUTE: Final[str] = "user/login"
URL: Final = combine_route(base_url, ROUTE)
# def login(
# base_url: str,
# user_name: str,
# password: str,
# database: str,
# mandant: str,
# ) -> tuple[LoginResponse, Status]:
# ROUTE: Final[str] = "user/login"
# URL: Final = combine_route(base_url, ROUTE)
login_req = LoginRequest(
userName=user_name,
password=password,
databaseName=database,
mandantName=mandant,
)
resp = requests.put(
URL,
login_req.model_dump_json(),
headers=CURRENT_SESSION.headers, # type: ignore
)
# login_req = LoginRequest(
# userName=user_name,
# password=password,
# databaseName=database,
# mandantName=mandant,
# )
# resp = requests.put(
# URL,
# login_req.model_dump_json(),
# headers=CURRENT_SESSION.headers, # type: ignore
# )
response: LoginResponse
status: Status
if resp.status_code == 200:
response = LoginResponse(**resp.json())
status = STATE_HANDLER.pipe_states.SUCCESS
CURRENT_SESSION.add_session_token(response.token)
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
# response: LoginResponse
# status: Status
# if resp.status_code == 200:
# response = LoginResponse(**resp.json())
# status = STATE_HANDLER.pipe_states.SUCCESS
# CURRENT_SESSION.add_session_token(response.token)
# else:
# response = LoginResponse(token="")
# err = DelBarApiError(status_code=resp.status_code, **resp.json())
# status = STATE_HANDLER.api_error(err)
return response, status
# return response, status
# ** logout
def logout(
base_url: str,
) -> tuple[None, Status]:
ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(base_url, ROUTE)
# def logout(
# base_url: str,
# ) -> tuple[None, Status]:
# ROUTE: Final[str] = "user/logout"
# URL: Final = combine_route(base_url, ROUTE)
resp = requests.put(
URL,
headers=CURRENT_SESSION.headers, # type: ignore
)
# resp = requests.put(
# URL,
# headers=CURRENT_SESSION.headers, # type: ignore
# )
response = None
status: Status
if resp.status_code == 200:
status = STATE_HANDLER.SUCCESS
CURRENT_SESSION.remove_session_token()
else:
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
# response = None
# status: Status
# if resp.status_code == 200:
# status = STATE_HANDLER.SUCCESS
# CURRENT_SESSION.remove_session_token()
# else:
# err = DelBarApiError(status_code=resp.status_code, **resp.json())
# status = STATE_HANDLER.api_error(err)
return response, status
# ** sales data
class SalesPrognosisRequestP(BaseModel):
FirmaId: SkipValidation[int | None]
BuchungsDatum: SkipValidation[Datetime | None]
class SalesPrognosisResponseEntry(BaseModel):
artikelId: PositiveInt
firmaId: PositiveInt
betrag: float # negative values are filtered out later
menge: float # reasons for negative values unknown
buchungsDatum: Datetime
class SalesPrognosisResponse(BaseModel):
daten: tuple[SalesPrognosisResponseEntry, ...]
# error: DelBarApiError | None = None
def get_sales_prognosis_data(
base_url: str,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
_assert_login_status()
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP(
FirmaId=company_id,
BuchungsDatum=start_date,
)
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=CURRENT_SESSION.headers, # type: ignore[argumentType]
)
response: SalesPrognosisResponse
status: Status
if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json())
status = STATE_HANDLER.SUCCESS
else:
response = SalesPrognosisResponse(daten=tuple())
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
# elif resp.status_code in KnownDelBarApiErrorCodes.COMMON.value: # pragma: no cover
# err = DelBarApiError(status_code=resp.status_code, **resp.json())
# response = SalesPrognosisResponse(daten=tuple(), error=err)
# else: # pragma: no cover
# _raise_for_unknown_error(resp)
return response, status
# return response, status

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from datetime import datetime as Datetime
from typing import TYPE_CHECKING, Final
import requests
from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.api.common import combine_route
from delta_barth.errors import STATE_HANDLER
from delta_barth.types import DelBarApiError, Status
if TYPE_CHECKING:
from delta_barth.api.common import Session
# ** sales data
class SalesPrognosisRequestP(BaseModel):
FirmaId: SkipValidation[int | None]
BuchungsDatum: SkipValidation[Datetime | None]
class SalesPrognosisResponseEntry(BaseModel):
artikelId: PositiveInt
firmaId: PositiveInt
betrag: float # negative values are filtered out later
menge: float # reasons for negative values unknown
buchungsDatum: Datetime
class SalesPrognosisResponse(BaseModel):
daten: tuple[SalesPrognosisResponseEntry, ...]
def get_sales_prognosis_data(
session: Session,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
session.assert_login()
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP(
FirmaId=company_id,
BuchungsDatum=start_date,
)
resp = requests.get(
URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
)
response: SalesPrognosisResponse
status: Status
if resp.status_code == 200:
response = SalesPrognosisResponse(**resp.json())
status = STATE_HANDLER.SUCCESS
else:
response = SalesPrognosisResponse(daten=tuple())
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATE_HANDLER.api_error(err)
return response, status

View File

@@ -7,6 +7,7 @@ from delta_barth.types import HttpContentHeaders
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_API_ERR_CODE: Final[int] = 400
HTTP_BASE_CONTENT_HEADERS: Final[HttpContentHeaders] = {
"Content-type": "application/json",
"Accept": "application/json",

View File

@@ -60,7 +60,7 @@ def _construct_exception(
return exception(err_message)
class StateHandler:
class StatusHandler:
def __init__(self) -> None:
self._pipe_states: DataPipeStates | None = None
self._parse_data_pipe_states()
@@ -145,3 +145,6 @@ class StateHandler:
)
add_info = api_err.model_dump(exclude_none=True)
raise _construct_exception(UApiError, descr, msg, add_info)
STATE_HANDLER: Final[StatusHandler] = StatusHandler()

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass, field
from typing import NotRequired, TypeAlias, TypedDict
import pandas as pd
from pydantic import BaseModel, SkipValidation
from pydantic import BaseModel, ConfigDict, SkipValidation
# ** Pipeline state management
StatusDescription: TypeAlias = tuple[str, int, str]
@@ -32,6 +32,15 @@ class PipeResult:
# ** API
class ApiCredentials(BaseModel):
model_config: ConfigDict = ConfigDict(str_strip_whitespace=True)
user_name: str
password: str
database: str
mandant: str
class DelBarApiError(BaseModel):
status_code: int
message: str = ""