option to unwrap errors

This commit is contained in:
Florian Förster 2025-03-07 08:18:56 +01:00
parent 58f0e3dbfa
commit 3a62573ae1
2 changed files with 103 additions and 1 deletions

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Final from typing import TYPE_CHECKING, Any, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.types import DataPipeStates, Status from delta_barth.types import DataPipeStates, Status
@ -21,6 +21,22 @@ class FeaturesMissingError(Exception):
"""exception raised if needed features are missing""" """exception raised if needed features are missing"""
# ** Exceptions for unwrap of error value
class UDataProcessingError(Exception):
"""unwrap exception: all data related errors (e.g. wrong format, non-sufficient quality)"""
class UInternalError(Exception):
"""unwrap exception: all internal errors; internal, if error is not caused by (external)
data related issues"""
class UApiError(Exception):
"""unwrap exception: all errors occurred on the server side of Delta Barth's API
default case: should not be raised, as this kind of errors should be handled by
Delta Barth themselves"""
## ** internal error handling ## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
@ -29,6 +45,21 @@ DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
) )
def _construct_exception(
exception: type[Exception],
description: str,
message: str,
additional: dict[str, Any] | None = None,
) -> Exception:
err_message = f"DESCRIPTION: >{description}<, MESSAGE: >{message}<"
if additional is not None:
add_msg = f", additional info/Server side errors: {additional}"
err_message += add_msg
return exception(err_message)
class StateHandler: class StateHandler:
def __init__(self) -> None: def __init__(self) -> None:
self._pipe_states: DataPipeStates | None = None self._pipe_states: DataPipeStates | None = None
@ -91,3 +122,26 @@ class StateHandler:
message=message, message=message,
api_server_error=error, api_server_error=error,
) )
def unwrap(
self,
state: Status,
) -> None:
if state == self.SUCCESS:
return
code = state.code
descr = state.description
msg = state.message
if code < DEFAULT_INTERNAL_ERR_CODE:
raise _construct_exception(UDataProcessingError, descr, msg)
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
raise _construct_exception(UInternalError, descr, msg)
else:
api_err = state.api_server_error
assert api_err is not None, (
"error code inidcated API error, but no error instance found"
)
add_info = api_err.model_dump(exclude_none=True)
raise _construct_exception(UApiError, descr, msg, add_info)

View File

@ -74,3 +74,51 @@ def test_state_handler_api_error():
assert new_err.message != MESSAGE assert new_err.message != MESSAGE
assert new_err.api_server_error is not None assert new_err.api_server_error is not None
assert new_err.api_server_error == api_err assert new_err.api_server_error == api_err
@pytest.mark.new
def test_state_handler_raising():
state_hdlr = delta_barth._management.StateHandler()
# success: should not raise
err_state = state_hdlr.SUCCESS
assert state_hdlr.unwrap(err_state) is None
# data related errors (predefined)
err_state = state_hdlr.pipe_states.BAD_QUALITY
err_descr = err_state.description
with pytest.raises(errors.UDataProcessingError):
try:
state_hdlr.unwrap(err_state)
except errors.UDataProcessingError as err:
descr = str(err)
assert err_descr in descr
raise err
# internal error, not data-related
description = "test case"
message = "an error occurred"
err_code = 101
err_state = state_hdlr.error(
description=description,
message=message,
code=err_code,
)
with pytest.raises(errors.UInternalError):
try:
state_hdlr.unwrap(err_state)
except errors.UInternalError as err:
descr = str(err)
assert description in descr
raise err
# external API error
api_err = DelBarApiError(status_code=401, message="test case", code="1234")
description = "Kommunikation mit dem API-Server aufgetreten"
msg = "Bitte beachten Sie die"
err_state = state_hdlr.api_error(error=api_err)
with pytest.raises(errors.UApiError):
try:
state_hdlr.unwrap(err_state)
except errors.UApiError as err:
descr = str(err)
assert description in descr
assert msg in descr
raise err