273 lines
8.4 KiB
Python
273 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
import typing as t
|
|
from collections.abc import Callable
|
|
from functools import wraps
|
|
from typing import Any, Final
|
|
|
|
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
|
|
from delta_barth.logging import logger_status, logger_wrapped_results
|
|
from delta_barth.types import DataPipeStates, Status
|
|
|
|
if t.TYPE_CHECKING:
|
|
from delta_barth.types import DelBarApiError, StatusDescription
|
|
|
|
P = t.ParamSpec("P")
|
|
T = t.TypeVar("T")
|
|
|
|
|
|
class UnspecifiedRequestType(Exception):
|
|
"""exception raised if for a given API endpoint a not defined operation is requested"""
|
|
|
|
|
|
class ApiConnectionError(Exception):
|
|
"""exception raised if an established connection is needed, but the current session is not connected"""
|
|
|
|
|
|
class FeaturesMissingError(Exception):
|
|
"""exception raised if needed features are missing"""
|
|
|
|
|
|
# ** Exceptions for result wrappers
|
|
class WAccessResultDespiteError(Exception):
|
|
"""wrapped results exception: raised if result is accessed, even though
|
|
there was an error in the underlying procedure"""
|
|
|
|
|
|
# ** Exceptions for unwrap of error value
|
|
class UDataProcessingError(Exception):
|
|
"""unwrap exception: all data related errors (e.g. wrong format, non-sufficient quality)"""
|
|
|
|
|
|
class UInternalError(Exception):
|
|
"""unwrap exception: all internal errors; internal, if error is not caused by (external)
|
|
data related issues"""
|
|
|
|
|
|
class UApiError(Exception):
|
|
"""unwrap exception: all errors occurred on the server side of Delta Barth's API
|
|
default case: should not be raised, as this kind of errors should be handled by
|
|
Delta Barth themselves"""
|
|
|
|
|
|
## ** internal error handling
|
|
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
|
|
("SUCCESS", 0, "Erfolg"),
|
|
(
|
|
"CONNECTION_TIMEOUT",
|
|
1,
|
|
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
|
|
),
|
|
(
|
|
"CONNECTION_ERROR",
|
|
2,
|
|
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
|
|
),
|
|
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
|
|
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
|
|
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
|
|
)
|
|
|
|
|
|
def _construct_exception(
|
|
exception: type[Exception],
|
|
description: str,
|
|
message: str,
|
|
additional: dict[str, Any] | None = None,
|
|
) -> Exception:
|
|
err_message = f"DESCRIPTION: >{description}<, MESSAGE: >{message}<"
|
|
|
|
if additional is not None:
|
|
add_msg = f", additional info/Server side errors: {additional}"
|
|
err_message += add_msg
|
|
|
|
return exception(err_message)
|
|
|
|
|
|
class StatusHandler:
|
|
def __init__(self) -> None:
|
|
self._pipe_states: DataPipeStates | None = None
|
|
self._parse_data_pipe_states()
|
|
|
|
@property
|
|
def pipe_states(self) -> DataPipeStates:
|
|
assert self._pipe_states is not None, (
|
|
"tried to access not parsed data pipeline states"
|
|
)
|
|
return self._pipe_states
|
|
|
|
@property
|
|
def SUCCESS(self) -> Status:
|
|
assert self._pipe_states is not None, (
|
|
"tried to access not parsed data pipeline states"
|
|
)
|
|
return self.pipe_states.SUCCESS
|
|
|
|
def _parse_data_pipe_states(self) -> None:
|
|
if self._pipe_states is not None:
|
|
return
|
|
parsed_errors: dict[str, Status] = {}
|
|
for err in DATA_PIPELINE_STATUS_DESCR:
|
|
parsed_errors[err[0]] = Status(code=err[1], description=err[2])
|
|
|
|
self._pipe_states = DataPipeStates(**parsed_errors)
|
|
|
|
def exception_to_error(
|
|
self,
|
|
exception: Exception,
|
|
code: int = DEFAULT_INTERNAL_ERR_CODE,
|
|
) -> Status:
|
|
description = exception.__class__.__name__
|
|
message = str(exception)
|
|
return self.error(description, message, code)
|
|
|
|
@staticmethod
|
|
def error(
|
|
description: str,
|
|
message: str = "",
|
|
code: int = DEFAULT_INTERNAL_ERR_CODE,
|
|
) -> Status:
|
|
lower_bound = DEFAULT_INTERNAL_ERR_CODE
|
|
upper_bound = DEFAULT_API_ERR_CODE
|
|
if code < lower_bound or code > upper_bound:
|
|
raise ValueError(
|
|
f"Custom error codes must be between default "
|
|
f"values of {lower_bound}-{upper_bound}"
|
|
)
|
|
|
|
return Status(
|
|
code=code,
|
|
description=description,
|
|
message=message,
|
|
)
|
|
|
|
@staticmethod
|
|
def api_error(
|
|
error: DelBarApiError,
|
|
) -> Status:
|
|
description = "Es ist ein Fehler bei der Kommunikation mit dem API-Server aufgetreten"
|
|
message = (
|
|
"Bitte beachten Sie die zusätzliche Fehlerausgabe des Servers in dieser Antwort"
|
|
)
|
|
return Status(
|
|
code=DEFAULT_API_ERR_CODE,
|
|
description=description,
|
|
message=message,
|
|
api_server_error=error,
|
|
)
|
|
|
|
def raise_for_status(
|
|
self,
|
|
state: Status,
|
|
) -> None:
|
|
if state == self.SUCCESS:
|
|
logger_status.info(
|
|
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
|
|
)
|
|
return
|
|
|
|
code = state.code
|
|
descr = state.description
|
|
msg = state.message
|
|
|
|
exc: Exception
|
|
if code < DEFAULT_INTERNAL_ERR_CODE:
|
|
exc = _construct_exception(UDataProcessingError, descr, msg)
|
|
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
|
|
exc = _construct_exception(UInternalError, descr, msg)
|
|
else:
|
|
api_err = state.api_server_error
|
|
assert api_err is not None, (
|
|
"error code inidcated API error, but no error instance found"
|
|
)
|
|
add_info = api_err.model_dump(exclude_none=True)
|
|
exc = _construct_exception(UApiError, descr, msg, add_info)
|
|
|
|
logger_status.error(
|
|
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
|
|
)
|
|
raise exc
|
|
|
|
|
|
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
|
|
|
|
|
|
# ** result wrapping
|
|
class NotSet:
|
|
__slots__ = tuple()
|
|
|
|
def __init__(self) -> None: ...
|
|
|
|
def __str__(self) -> str:
|
|
return ">Not set<"
|
|
|
|
def __repr__(self) -> str:
|
|
return f"{self.__class__.__name__}()"
|
|
|
|
|
|
class ResultWrapper(t.Generic[T]):
|
|
def __init__(
|
|
self,
|
|
result: T | NotSet,
|
|
exception: Exception | None,
|
|
code_on_error: int,
|
|
) -> None:
|
|
assert (isinstance(result, NotSet) and exception is not None) or (
|
|
not isinstance(result, NotSet) and exception is None
|
|
), "set >NotSet< without exception or result with exception"
|
|
|
|
self._result = result
|
|
status: Status = STATUS_HANDLER.SUCCESS
|
|
if exception is not None:
|
|
status = STATUS_HANDLER.exception_to_error(exception, code=code_on_error)
|
|
self.status = status
|
|
|
|
@property
|
|
def result(self) -> T:
|
|
if isinstance(self._result, NotSet):
|
|
raise WAccessResultDespiteError(
|
|
"Can not access result because an error occurred during procedure"
|
|
)
|
|
return self._result
|
|
|
|
def __str__(self) -> str:
|
|
return f"Status(result: {self._result}, status: {self.status})"
|
|
|
|
def __repr__(self) -> str:
|
|
return self.__str__()
|
|
|
|
def unwrap(self) -> T:
|
|
STATUS_HANDLER.raise_for_status(self.status)
|
|
|
|
return self.result
|
|
|
|
|
|
def wrap_result(
|
|
code_on_error: int = DEFAULT_INTERNAL_ERR_CODE,
|
|
) -> Callable[[Callable[P, T]], Callable[P, ResultWrapper[T]]]:
|
|
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
|
|
@wraps(func)
|
|
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
|
|
wrapped_result: ResultWrapper[T]
|
|
try:
|
|
res = func(*args, **kwargs)
|
|
wrapped_result = ResultWrapper(
|
|
result=res, exception=None, code_on_error=code_on_error
|
|
)
|
|
except Exception as err:
|
|
wrapped_result = ResultWrapper(
|
|
result=NotSet(), exception=err, code_on_error=code_on_error
|
|
)
|
|
logger_wrapped_results.info(
|
|
"[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
|
|
func.__name__,
|
|
str(err),
|
|
stack_info=True,
|
|
)
|
|
|
|
return wrapped_result
|
|
|
|
return wrapper
|
|
|
|
return wrap_result
|