base structure for logging and logging management via session, closes #2
This commit is contained in:
parent
302ccc16db
commit
b6e494a3dc
@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "delta-barth"
|
name = "delta-barth"
|
||||||
version = "0.5.0"
|
version = "0.5.1"
|
||||||
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
|
||||||
@ -73,7 +73,7 @@ directory = "reports/coverage"
|
|||||||
|
|
||||||
|
|
||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.5.0"
|
current_version = "0.5.1"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
@ -353,6 +353,7 @@ def pipeline_sales_forecast(
|
|||||||
company_id: int | None = None,
|
company_id: int | None = None,
|
||||||
start_date: Datetime | None = None,
|
start_date: Datetime | None = None,
|
||||||
) -> SalesPrognosisResultsExport:
|
) -> SalesPrognosisResultsExport:
|
||||||
|
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
|
||||||
response, status = get_sales_prognosis_data(
|
response, status = get_sales_prognosis_data(
|
||||||
session,
|
session,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
@ -413,6 +414,8 @@ def pipeline_sales_forecast(
|
|||||||
|
|
||||||
assert pipe.results is not None, "needed export response not set in pipeline"
|
assert pipe.results is not None, "needed export response not set in pipeline"
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
|
||||||
|
|
||||||
return pipe.results
|
return pipe.results
|
||||||
|
|
||||||
|
|
||||||
@ -422,6 +425,9 @@ def pipeline_sales_dummy(
|
|||||||
start_date: Datetime | None = None,
|
start_date: Datetime | None = None,
|
||||||
) -> SalesPrognosisResultsExport:
|
) -> SalesPrognosisResultsExport:
|
||||||
"""prototype dummy function for tests by DelBar"""
|
"""prototype dummy function for tests by DelBar"""
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
|
||||||
|
|
||||||
_, _, _ = session, company_id, start_date
|
_, _, _ = session, company_id, start_date
|
||||||
|
|
||||||
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
|
||||||
@ -434,6 +440,8 @@ def pipeline_sales_dummy(
|
|||||||
pipe.fail(res.status)
|
pipe.fail(res.status)
|
||||||
return _export_on_fail(res.status)
|
return _export_on_fail(res.status)
|
||||||
|
|
||||||
|
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
|
||||||
|
|
||||||
return SalesPrognosisResultsExport(
|
return SalesPrognosisResultsExport(
|
||||||
response=res.unwrap(),
|
response=res.unwrap(),
|
||||||
status=res.status,
|
status=res.status,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ from functools import wraps
|
|||||||
from typing import Any, Final
|
from typing import Any, Final
|
||||||
|
|
||||||
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
|
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
|
||||||
from delta_barth.logging import logger_wrapped_results as logger
|
from delta_barth.logging import logger_status, logger_wrapped_results
|
||||||
from delta_barth.types import DataPipeStates, Status
|
from delta_barth.types import DataPipeStates, Status
|
||||||
|
|
||||||
if t.TYPE_CHECKING:
|
if t.TYPE_CHECKING:
|
||||||
@ -151,23 +151,32 @@ class StatusHandler:
|
|||||||
state: Status,
|
state: Status,
|
||||||
) -> None:
|
) -> None:
|
||||||
if state == self.SUCCESS:
|
if state == self.SUCCESS:
|
||||||
|
logger_status.info(
|
||||||
|
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
code = state.code
|
code = state.code
|
||||||
descr = state.description
|
descr = state.description
|
||||||
msg = state.message
|
msg = state.message
|
||||||
|
|
||||||
|
exc: Exception
|
||||||
if code < DEFAULT_INTERNAL_ERR_CODE:
|
if code < DEFAULT_INTERNAL_ERR_CODE:
|
||||||
raise _construct_exception(UDataProcessingError, descr, msg)
|
exc = _construct_exception(UDataProcessingError, descr, msg)
|
||||||
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
|
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
|
||||||
raise _construct_exception(UInternalError, descr, msg)
|
exc = _construct_exception(UInternalError, descr, msg)
|
||||||
else:
|
else:
|
||||||
api_err = state.api_server_error
|
api_err = state.api_server_error
|
||||||
assert api_err is not None, (
|
assert api_err is not None, (
|
||||||
"error code inidcated API error, but no error instance found"
|
"error code inidcated API error, but no error instance found"
|
||||||
)
|
)
|
||||||
add_info = api_err.model_dump(exclude_none=True)
|
add_info = api_err.model_dump(exclude_none=True)
|
||||||
raise _construct_exception(UApiError, descr, msg, add_info)
|
exc = _construct_exception(UApiError, descr, msg, add_info)
|
||||||
|
|
||||||
|
logger_status.error(
|
||||||
|
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
|
||||||
|
)
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
|
||||||
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
|
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
|
||||||
@ -229,24 +238,24 @@ def wrap_result(
|
|||||||
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
|
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
|
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
|
||||||
status: ResultWrapper[T]
|
wrapped_result: ResultWrapper[T]
|
||||||
try:
|
try:
|
||||||
res = func(*args, **kwargs)
|
res = func(*args, **kwargs)
|
||||||
status = ResultWrapper(
|
wrapped_result = ResultWrapper(
|
||||||
result=res, exception=None, code_on_error=code_on_error
|
result=res, exception=None, code_on_error=code_on_error
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
status = ResultWrapper(
|
wrapped_result = ResultWrapper(
|
||||||
result=NotSet(), exception=err, code_on_error=code_on_error
|
result=NotSet(), exception=err, code_on_error=code_on_error
|
||||||
)
|
)
|
||||||
logger.error(
|
logger_wrapped_results.info(
|
||||||
"An exception in routine %s occurred - msg: %s, stack trace:",
|
"[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
|
||||||
func.__name__,
|
func.__name__,
|
||||||
str(err),
|
str(err),
|
||||||
stack_info=True,
|
stack_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
return status
|
return wrapped_result
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|||||||
@ -17,21 +17,22 @@ from delta_barth.constants import (
|
|||||||
logging.Formatter.converter = gmtime
|
logging.Formatter.converter = gmtime
|
||||||
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
|
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
|
||||||
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
|
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
|
||||||
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
|
|
||||||
# if not LOG_FILE_FOLDER.exists():
|
|
||||||
# LOG_FILE_FOLDER.mkdir(parents=True)
|
|
||||||
|
|
||||||
|
|
||||||
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
|
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
|
||||||
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
|
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
|
||||||
|
# ** handlers
|
||||||
|
NULL_HANDLER = logging.NullHandler()
|
||||||
|
# ** formatters
|
||||||
|
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
|
||||||
|
|
||||||
# ** loggers and configuration
|
# ** loggers and configuration
|
||||||
logger_all = logging.getLogger("delta_barth")
|
|
||||||
# logger_all.addHandler(logger_all_handler_stderr)
|
logger_base = logging.getLogger("delta_barth")
|
||||||
# logger_all.addHandler(logger_all_handler_file)
|
logger_status = logging.getLogger("delta_barth.status")
|
||||||
|
logger_status.setLevel(logging.DEBUG)
|
||||||
logger_session = logging.getLogger("delta_barth.session")
|
logger_session = logging.getLogger("delta_barth.session")
|
||||||
logger_session.setLevel(logging.DEBUG)
|
logger_session.setLevel(logging.DEBUG)
|
||||||
|
logger_management = logging.getLogger("delta_barth.management")
|
||||||
|
logger_management.setLevel(logging.DEBUG)
|
||||||
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
|
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
|
||||||
logger_wrapped_results.setLevel(logging.DEBUG)
|
logger_wrapped_results.setLevel(logging.DEBUG)
|
||||||
logger_pipelines = logging.getLogger("delta_barth.pipelines")
|
logger_pipelines = logging.getLogger("delta_barth.pipelines")
|
||||||
@ -43,18 +44,15 @@ logger_db.setLevel(logging.DEBUG)
|
|||||||
def setup_logging(
|
def setup_logging(
|
||||||
logging_dir: Path,
|
logging_dir: Path,
|
||||||
) -> None:
|
) -> None:
|
||||||
# ** formatters
|
|
||||||
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
|
|
||||||
|
|
||||||
# ** handlers
|
# ** handlers
|
||||||
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
|
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
|
||||||
null_handler = logging.NullHandler()
|
|
||||||
if ENABLE_LOGGING and LOGGING_TO_STDERR:
|
if ENABLE_LOGGING and LOGGING_TO_STDERR:
|
||||||
logger_all_handler_stderr = logging.StreamHandler()
|
logger_all_handler_stderr = logging.StreamHandler()
|
||||||
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
|
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
|
||||||
logger_all_handler_stderr.setFormatter(logger_all_formater)
|
logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER)
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
logger_all_handler_stderr = null_handler
|
logger_all_handler_stderr = NULL_HANDLER
|
||||||
|
|
||||||
if ENABLE_LOGGING and LOGGING_TO_FILE:
|
if ENABLE_LOGGING and LOGGING_TO_FILE:
|
||||||
logger_all_handler_file = logging.handlers.RotatingFileHandler(
|
logger_all_handler_file = logging.handlers.RotatingFileHandler(
|
||||||
@ -65,9 +63,17 @@ def setup_logging(
|
|||||||
delay=True,
|
delay=True,
|
||||||
)
|
)
|
||||||
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
|
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
|
||||||
logger_all_handler_file.setFormatter(logger_all_formater)
|
logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER)
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
logger_all_handler_file = null_handler
|
logger_all_handler_file = NULL_HANDLER
|
||||||
|
|
||||||
logger_all.addHandler(logger_all_handler_stderr)
|
logger_base.addHandler(logger_all_handler_stderr)
|
||||||
logger_all.addHandler(logger_all_handler_file)
|
logger_base.addHandler(logger_all_handler_file)
|
||||||
|
|
||||||
|
|
||||||
|
def disable_logging() -> None:
|
||||||
|
handlers = tuple(logger_base.handlers)
|
||||||
|
for handler in handlers:
|
||||||
|
logger_base.removeHandler(handler)
|
||||||
|
|
||||||
|
logger_base.addHandler(NULL_HANDLER)
|
||||||
|
|||||||
@ -6,6 +6,7 @@ from __future__ import annotations
|
|||||||
from typing import Final
|
from typing import Final
|
||||||
|
|
||||||
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
|
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
|
||||||
|
from delta_barth.logging import logger_session as logger
|
||||||
from delta_barth.session import Session
|
from delta_barth.session import Session
|
||||||
|
|
||||||
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
||||||
@ -14,8 +15,10 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
|
|||||||
def setup(
|
def setup(
|
||||||
data_path: str,
|
data_path: str,
|
||||||
) -> None: # pragma: no cover
|
) -> None: # pragma: no cover
|
||||||
|
# at this point: no logging configured
|
||||||
SESSION.set_data_path(data_path)
|
SESSION.set_data_path(data_path)
|
||||||
SESSION.setup()
|
SESSION.setup()
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
|
||||||
|
|
||||||
|
|
||||||
def set_credentials(
|
def set_credentials(
|
||||||
@ -24,16 +27,20 @@ def set_credentials(
|
|||||||
database: str,
|
database: str,
|
||||||
mandant: str,
|
mandant: str,
|
||||||
) -> None: # pragma: no cover
|
) -> None: # pragma: no cover
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
|
||||||
SESSION.set_credentials(
|
SESSION.set_credentials(
|
||||||
username=username,
|
username=username,
|
||||||
password=password,
|
password=password,
|
||||||
database=database,
|
database=database,
|
||||||
mandant=mandant,
|
mandant=mandant,
|
||||||
)
|
)
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
|
||||||
|
|
||||||
|
|
||||||
def get_credentials() -> str: # pragma: no cover
|
def get_credentials() -> str: # pragma: no cover
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
|
||||||
creds = SESSION.creds
|
creds = SESSION.creds
|
||||||
|
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
|
||||||
return creds.model_dump_json()
|
return creds.model_dump_json()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,7 @@
|
|||||||
from datetime import datetime as Datetime
|
from datetime import datetime as Datetime
|
||||||
|
|
||||||
from delta_barth.analysis import forecast
|
from delta_barth.analysis import forecast
|
||||||
|
from delta_barth.logging import logger_pipelines as logger
|
||||||
from delta_barth.management import SESSION
|
from delta_barth.management import SESSION
|
||||||
from delta_barth.types import JsonExportResponse
|
from delta_barth.types import JsonExportResponse
|
||||||
|
|
||||||
@ -11,11 +12,14 @@ def pipeline_sales_forecast(
|
|||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
|
||||||
result = forecast.pipeline_sales_forecast(
|
result = forecast.pipeline_sales_forecast(
|
||||||
SESSION, company_id=company_id, start_date=start_date
|
SESSION, company_id=company_id, start_date=start_date
|
||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|
||||||
|
|
||||||
@ -23,6 +27,7 @@ def pipeline_sales_forecast_dummy(
|
|||||||
company_id: int | None,
|
company_id: int | None,
|
||||||
start_date: Datetime | None,
|
start_date: Datetime | None,
|
||||||
) -> JsonExportResponse:
|
) -> JsonExportResponse:
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
|
||||||
result = forecast.pipeline_sales_dummy(
|
result = forecast.pipeline_sales_dummy(
|
||||||
SESSION,
|
SESSION,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
@ -30,4 +35,6 @@ def pipeline_sales_forecast_dummy(
|
|||||||
)
|
)
|
||||||
export = JsonExportResponse(result.model_dump_json())
|
export = JsonExportResponse(result.model_dump_json())
|
||||||
|
|
||||||
|
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
|
||||||
|
|
||||||
return export
|
return export
|
||||||
|
|||||||
@ -42,6 +42,7 @@ class Session:
|
|||||||
db_folder: str = "data",
|
db_folder: str = "data",
|
||||||
logging_folder: str = "logs",
|
logging_folder: str = "logs",
|
||||||
) -> None:
|
) -> None:
|
||||||
|
self._setup: bool = False
|
||||||
self._data_path: Path | None = None
|
self._data_path: Path | None = None
|
||||||
self._db_path: Path | None = None
|
self._db_path: Path | None = None
|
||||||
self._db_folder = db_folder
|
self._db_folder = db_folder
|
||||||
@ -55,8 +56,12 @@ class Session:
|
|||||||
self._logged_in: bool = False
|
self._logged_in: bool = False
|
||||||
|
|
||||||
def setup(self) -> None:
|
def setup(self) -> None:
|
||||||
self._setup_db_management()
|
# at this point: no logging configured
|
||||||
|
assert not self._setup, "tried to setup session twice"
|
||||||
self._setup_logging()
|
self._setup_logging()
|
||||||
|
self._setup_db_management()
|
||||||
|
self._setup = True
|
||||||
|
logger.info("[SESSION] Setup procedure successful")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def data_path(self) -> Path:
|
def data_path(self) -> Path:
|
||||||
@ -70,7 +75,7 @@ class Session:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def db_path(self) -> Path:
|
def db_path(self) -> Path:
|
||||||
if self._db_path is not None:
|
if self._db_path is not None and self._setup:
|
||||||
return self._db_path
|
return self._db_path
|
||||||
|
|
||||||
db_root = (self.data_path / self._db_folder).resolve()
|
db_root = (self.data_path / self._db_folder).resolve()
|
||||||
@ -80,9 +85,14 @@ class Session:
|
|||||||
self._db_path = db_path
|
self._db_path = db_path
|
||||||
return self._db_path
|
return self._db_path
|
||||||
|
|
||||||
|
def _setup_db_management(self) -> None:
|
||||||
|
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
|
||||||
|
db.metadata.create_all(self._db_engine)
|
||||||
|
logger.info("[SESSION] Successfully setup DB management")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def logging_dir(self) -> Path:
|
def logging_dir(self) -> Path:
|
||||||
if self._logging_dir is not None:
|
if self._logging_dir is not None and self._setup:
|
||||||
return self._logging_dir
|
return self._logging_dir
|
||||||
|
|
||||||
logging_dir = self.data_path / self._logging_folder
|
logging_dir = self.data_path / self._logging_folder
|
||||||
@ -91,15 +101,13 @@ class Session:
|
|||||||
self._logging_dir = logging_dir
|
self._logging_dir = logging_dir
|
||||||
return self._logging_dir
|
return self._logging_dir
|
||||||
|
|
||||||
def _setup_db_management(self) -> None:
|
|
||||||
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
|
|
||||||
db.metadata.create_all(self._db_engine)
|
|
||||||
logger.info("[SESSION] Successfully setup DB management")
|
|
||||||
|
|
||||||
def _setup_logging(self) -> None:
|
def _setup_logging(self) -> None:
|
||||||
delta_barth.logging.setup_logging(self.logging_dir)
|
delta_barth.logging.setup_logging(self.logging_dir)
|
||||||
logger.info("[SESSION] Successfully setup logging")
|
logger.info("[SESSION] Successfully setup logging")
|
||||||
|
|
||||||
|
def disable_logging(self) -> None:
|
||||||
|
delta_barth.logging.disable_logging()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def creds(self) -> ApiCredentials:
|
def creds(self) -> ApiCredentials:
|
||||||
assert self._creds is not None, "accessed credentials not set"
|
assert self._creds is not None, "accessed credentials not set"
|
||||||
@ -110,6 +118,7 @@ class Session:
|
|||||||
path: str,
|
path: str,
|
||||||
):
|
):
|
||||||
self._data_path = validate_path(path)
|
self._data_path = validate_path(path)
|
||||||
|
self._setup = False
|
||||||
|
|
||||||
def set_credentials(
|
def set_credentials(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@ -3,7 +3,7 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import tomllib
|
import tomllib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, cast
|
from typing import cast
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|||||||
@ -4,6 +4,7 @@ from unittest.mock import patch
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import delta_barth.session
|
import delta_barth.session
|
||||||
|
from delta_barth import logging
|
||||||
from delta_barth.constants import (
|
from delta_barth.constants import (
|
||||||
DEFAULT_API_ERR_CODE,
|
DEFAULT_API_ERR_CODE,
|
||||||
HTTP_BASE_CONTENT_HEADERS,
|
HTTP_BASE_CONTENT_HEADERS,
|
||||||
@ -55,6 +56,8 @@ def test_session_setup_db_management(tmp_path):
|
|||||||
assert db_path.parent == target_db_dir
|
assert db_path.parent == target_db_dir
|
||||||
assert not db_path.exists()
|
assert not db_path.exists()
|
||||||
session.setup()
|
session.setup()
|
||||||
|
db_path2 = session.db_path
|
||||||
|
assert db_path2 == db_path
|
||||||
assert session._db_engine is not None
|
assert session._db_engine is not None
|
||||||
assert db_path.exists()
|
assert db_path.exists()
|
||||||
|
|
||||||
@ -66,6 +69,30 @@ def test_session_setup_logging(tmp_path):
|
|||||||
foldername: str = "logging_test"
|
foldername: str = "logging_test"
|
||||||
target_log_dir = tmp_path / foldername
|
target_log_dir = tmp_path / foldername
|
||||||
|
|
||||||
|
session = delta_barth.session.Session(
|
||||||
|
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
||||||
|
)
|
||||||
|
session.set_data_path(str_path)
|
||||||
|
log_dir = session.logging_dir
|
||||||
|
|
||||||
|
assert log_dir.exists()
|
||||||
|
assert log_dir == target_log_dir
|
||||||
|
# write file
|
||||||
|
target_file = target_log_dir / LOG_FILENAME
|
||||||
|
assert not target_file.exists()
|
||||||
|
session.setup() # calls setup code for logging
|
||||||
|
log_dir2 = session.logging_dir
|
||||||
|
assert log_dir2 == log_dir
|
||||||
|
assert target_file.exists()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("delta_barth.logging.ENABLE_LOGGING", True)
|
||||||
|
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
|
||||||
|
def test_session_disable_logging(tmp_path):
|
||||||
|
str_path = str(tmp_path)
|
||||||
|
foldername: str = "logging_test"
|
||||||
|
target_log_dir = tmp_path / foldername
|
||||||
|
|
||||||
session = delta_barth.session.Session(
|
session = delta_barth.session.Session(
|
||||||
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
|
||||||
)
|
)
|
||||||
@ -78,6 +105,21 @@ def test_session_setup_logging(tmp_path):
|
|||||||
assert not target_file.exists()
|
assert not target_file.exists()
|
||||||
session.setup() # calls setup code for logging
|
session.setup() # calls setup code for logging
|
||||||
assert target_file.exists()
|
assert target_file.exists()
|
||||||
|
# provoke entry
|
||||||
|
msg = "this is a test"
|
||||||
|
logging.logger_base.critical(msg)
|
||||||
|
session.disable_logging()
|
||||||
|
with open(target_file, "r") as file:
|
||||||
|
content = file.readlines()
|
||||||
|
last_line = content[-1]
|
||||||
|
assert msg in last_line.lower()
|
||||||
|
# log new entry which should not be added as logging is disabled
|
||||||
|
msg = "this is a second test"
|
||||||
|
logging.logger_base.critical(msg)
|
||||||
|
with open(target_file, "r") as file:
|
||||||
|
content = file.readlines()
|
||||||
|
last_line = content[-1]
|
||||||
|
assert msg not in last_line.lower()
|
||||||
|
|
||||||
|
|
||||||
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
|
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user