from __future__ import annotations import typing as t from collections.abc import Callable from functools import wraps from typing import Any, Final from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE from delta_barth.logging import logger_status, logger_wrapped_results from delta_barth.types import DataPipeStates, Status if t.TYPE_CHECKING: from delta_barth.types import DelBarApiError, StatusDescription P = t.ParamSpec("P") T = t.TypeVar("T") class UnspecifiedRequestType(Exception): """exception raised if for a given API endpoint a not defined operation is requested""" class ApiConnectionError(Exception): """exception raised if an established connection is needed, but the current session is not connected""" class FeaturesMissingError(Exception): """exception raised if needed features are missing""" # ** Exceptions for result wrappers class WAccessResultDespiteError(Exception): """wrapped results exception: raised if result is accessed, even though there was an error in the underlying procedure""" # ** Exceptions for unwrap of error value class UDataProcessingError(Exception): """unwrap exception: all data related errors (e.g. wrong format, non-sufficient quality)""" class UInternalError(Exception): """unwrap exception: all internal errors; internal, if error is not caused by (external) data related issues""" class UApiError(Exception): """unwrap exception: all errors occurred on the server side of Delta Barth's API default case: should not be raised, as this kind of errors should be handled by Delta Barth themselves""" ## ** internal error handling DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( ("SUCCESS", 0, "Erfolg"), ( "CONNECTION_TIMEOUT", 1, "Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?", ), ( "CONNECTION_ERROR", 2, "Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?", ), ("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"), ("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"), ("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"), ) def _construct_exception( exception: type[Exception], description: str, message: str, additional: dict[str, Any] | None = None, ) -> Exception: err_message = f"DESCRIPTION: >{description}<, MESSAGE: >{message}<" if additional is not None: add_msg = f", additional info/Server side errors: {additional}" err_message += add_msg return exception(err_message) class StatusHandler: def __init__(self) -> None: self._pipe_states: DataPipeStates | None = None self._parse_data_pipe_states() @property def pipe_states(self) -> DataPipeStates: assert self._pipe_states is not None, ( "tried to access not parsed data pipeline states" ) return self._pipe_states @property def SUCCESS(self) -> Status: assert self._pipe_states is not None, ( "tried to access not parsed data pipeline states" ) return self.pipe_states.SUCCESS def _parse_data_pipe_states(self) -> None: if self._pipe_states is not None: return parsed_errors: dict[str, Status] = {} for err in DATA_PIPELINE_STATUS_DESCR: parsed_errors[err[0]] = Status(code=err[1], description=err[2]) self._pipe_states = DataPipeStates(**parsed_errors) def exception_to_error( self, exception: Exception, code: int = DEFAULT_INTERNAL_ERR_CODE, ) -> Status: description = exception.__class__.__name__ message = str(exception) return self.error(description, message, code) @staticmethod def error( description: str, message: str = "", code: int = DEFAULT_INTERNAL_ERR_CODE, ) -> Status: lower_bound = DEFAULT_INTERNAL_ERR_CODE upper_bound = DEFAULT_API_ERR_CODE if code < lower_bound or code > upper_bound: raise ValueError( f"Custom error codes must be between default " f"values of {lower_bound}-{upper_bound}" ) return Status( code=code, description=description, message=message, ) @staticmethod def api_error( error: DelBarApiError, ) -> Status: description = "Es ist ein Fehler bei der Kommunikation mit dem API-Server aufgetreten" message = ( "Bitte beachten Sie die zusätzliche Fehlerausgabe des Servers in dieser Antwort" ) return Status( code=DEFAULT_API_ERR_CODE, description=description, message=message, api_server_error=error, ) def raise_for_status( self, state: Status, ) -> None: if state == self.SUCCESS: logger_status.info( "[STATUS] Raise for status - SUCCESS. all good.", stack_info=True ) return code = state.code descr = state.description msg = state.message exc: Exception if code < DEFAULT_INTERNAL_ERR_CODE: exc = _construct_exception(UDataProcessingError, descr, msg) elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE: exc = _construct_exception(UInternalError, descr, msg) else: api_err = state.api_server_error assert api_err is not None, ( "error code inidcated API error, but no error instance found" ) add_info = api_err.model_dump(exclude_none=True) exc = _construct_exception(UApiError, descr, msg, add_info) logger_status.error( "[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True ) raise exc STATUS_HANDLER: Final[StatusHandler] = StatusHandler() # ** result wrapping class NotSet: __slots__ = tuple() def __init__(self) -> None: ... def __str__(self) -> str: return ">Not set<" def __repr__(self) -> str: return f"{self.__class__.__name__}()" class ResultWrapper(t.Generic[T]): def __init__( self, result: T | NotSet, exception: Exception | None, code_on_error: int, ) -> None: assert (isinstance(result, NotSet) and exception is not None) or ( not isinstance(result, NotSet) and exception is None ), "set >NotSet< without exception or result with exception" self._result = result status: Status = STATUS_HANDLER.SUCCESS if exception is not None: status = STATUS_HANDLER.exception_to_error(exception, code=code_on_error) self.status = status @property def result(self) -> T: if isinstance(self._result, NotSet): raise WAccessResultDespiteError( "Can not access result because an error occurred during procedure" ) return self._result def __str__(self) -> str: return f"Status(result: {self._result}, status: {self.status})" def __repr__(self) -> str: return self.__str__() def unwrap(self) -> T: STATUS_HANDLER.raise_for_status(self.status) return self.result def wrap_result( code_on_error: int = DEFAULT_INTERNAL_ERR_CODE, ) -> Callable[[Callable[P, T]], Callable[P, ResultWrapper[T]]]: def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]: @wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]: wrapped_result: ResultWrapper[T] try: res = func(*args, **kwargs) wrapped_result = ResultWrapper( result=res, exception=None, code_on_error=code_on_error ) except Exception as err: wrapped_result = ResultWrapper( result=NotSet(), exception=err, code_on_error=code_on_error ) logger_wrapped_results.info( "[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:", func.__name__, str(err), stack_info=True, ) return wrapped_result return wrapper return wrap_result