83 lines
2.5 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.types import DataPipeStates, Status
if TYPE_CHECKING:
from delta_barth.types import DelBarApiError, ErrorDescription
class UnspecifiedRequestType(Exception):
"""exception raised if for a given API endpoint a not defined operation is requested"""
class UnknownApiErrorCode(Exception):
"""exception raised if for a given request a unknown error response code is transmitted"""
class ApiConnectionError(Exception):
"""exception raised if an established connection is needed, but the current session is not connected"""
class FeaturesMissingError(Exception):
"""exception raised if needed features are missing"""
## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[ErrorDescription, ...]] = (
("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"),
("BAD_QUALITY", 2, "Prognosequalität des Modells unzureichend"),
)
class ErrorHandler:
def __init__(self) -> None:
self._pipe_states: DataPipeStates | None = None
self._parse_data_pipe_states()
@property
def pipe_states(self) -> DataPipeStates:
assert self._pipe_states is not None, (
"tried to access not parsed data pipeline errors"
)
return self._pipe_states
def _parse_data_pipe_states(self) -> None:
if self._pipe_states is not None:
return
parsed_errors: dict[str, Status] = {}
for err in DATA_PIPELINE_STATUS_DESCR:
parsed_errors[err[0]] = Status(status_code=err[1], description=err[2])
self._pipe_states = DataPipeStates(**parsed_errors)
def error(
self,
description: str,
message: str = "",
err_code: int = DEFAULT_INTERNAL_ERR_CODE,
) -> Status:
return Status(
status_code=err_code,
description=description,
message=message,
)
def api_error(
self,
error: DelBarApiError,
) -> Status:
description = "Es ist ein Fehler bei der Kommunikation mit dem API-Server aufgetreten"
message = (
"Bitte beachten Sie die zusätzliche Fehlerausgabe des Servers in dieser Antwort"
)
return Status(
status_code=DEFAULT_API_ERR_CODE,
description=description,
message=message,
api_server_error=error,
)