delta-barth-py/tests/test_errors.py

125 lines
3.9 KiB
Python

from __future__ import annotations
from dataclasses import asdict
from typing import cast
import pytest
import delta_barth._management
from delta_barth import errors
from delta_barth.constants import DEFAULT_API_ERR_CODE
from delta_barth.types import DelBarApiError, Status
def test_state_handler_parsing():
predef_errs = errors.DATA_PIPELINE_STATUS_DESCR
state_hdlr = delta_barth._management.StateHandler()
assert state_hdlr.pipe_states is not None
parsed_pipe_states = state_hdlr.pipe_states
assert parsed_pipe_states.SUCCESS == state_hdlr.SUCCESS
parsed_pipe_states = asdict(parsed_pipe_states)
for err in predef_errs:
dopt_err = cast(Status, parsed_pipe_states[err[0]])
assert isinstance(dopt_err, Status)
assert dopt_err.code == err[1]
assert dopt_err.description == err[2]
assert dopt_err.message == ""
state_hdlr._parse_data_pipe_states()
def test_state_handler_internal():
DESCRIPTION = "test case"
MESSAGE = "an error occurred"
ERR_CODE = 101
state_hdlr = delta_barth._management.StateHandler()
new_err = state_hdlr.error(
description=DESCRIPTION,
message=MESSAGE,
code=ERR_CODE,
)
assert new_err.code == ERR_CODE
assert new_err.description == DESCRIPTION
assert new_err.message == MESSAGE
# failure cases
err_code = 50 # default lower bound: 100
with pytest.raises(ValueError):
new_err = state_hdlr.error(
description=DESCRIPTION,
message=MESSAGE,
code=err_code,
)
err_code = 500 # default upper bound: 400
with pytest.raises(ValueError):
new_err = state_hdlr.error(
description=DESCRIPTION,
message=MESSAGE,
code=err_code,
)
def test_state_handler_api_error():
MESSAGE = "an error occurred"
api_err = DelBarApiError(status_code=401, message="test case")
assert api_err.status_code == 401
assert api_err.message == "test case"
state_hdlr = delta_barth._management.StateHandler()
new_err = state_hdlr.api_error(error=api_err)
assert new_err.code == DEFAULT_API_ERR_CODE
assert "API-Server" in new_err.description
assert new_err.message != MESSAGE
assert new_err.api_server_error is not None
assert new_err.api_server_error == api_err
@pytest.mark.new
def test_state_handler_raising():
state_hdlr = delta_barth._management.StateHandler()
# success: should not raise
err_state = state_hdlr.SUCCESS
assert state_hdlr.unwrap(err_state) is None
# data related errors (predefined)
err_state = state_hdlr.pipe_states.BAD_QUALITY
err_descr = err_state.description
with pytest.raises(errors.UDataProcessingError):
try:
state_hdlr.unwrap(err_state)
except errors.UDataProcessingError as err:
descr = str(err)
assert err_descr in descr
raise err
# internal error, not data-related
description = "test case"
message = "an error occurred"
err_code = 101
err_state = state_hdlr.error(
description=description,
message=message,
code=err_code,
)
with pytest.raises(errors.UInternalError):
try:
state_hdlr.unwrap(err_state)
except errors.UInternalError as err:
descr = str(err)
assert description in descr
raise err
# external API error
api_err = DelBarApiError(status_code=401, message="test case", code="1234")
description = "Kommunikation mit dem API-Server aufgetreten"
msg = "Bitte beachten Sie die"
err_state = state_hdlr.api_error(error=api_err)
with pytest.raises(errors.UApiError):
try:
state_hdlr.unwrap(err_state)
except errors.UApiError as err:
descr = str(err)
assert description in descr
assert msg in descr
raise err