base structure for logging and logging management via session, closes #2

This commit is contained in:
Florian Förster 2025-03-28 11:31:27 +01:00
parent 302ccc16db
commit d1d665e60a
9 changed files with 128 additions and 40 deletions

View File

@ -1,6 +1,6 @@
[project]
name = "delta-barth"
version = "0.5.0"
version = "0.5.1"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.5.0"
current_version = "0.5.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@ -353,6 +353,7 @@ def pipeline_sales_forecast(
company_id: int | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data(
session,
company_id=company_id,
@ -413,6 +414,8 @@ def pipeline_sales_forecast(
assert pipe.results is not None, "needed export response not set in pipeline"
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
return pipe.results
@ -422,6 +425,9 @@ def pipeline_sales_dummy(
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
"""prototype dummy function for tests by DelBar"""
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
_, _, _ = session, company_id, start_date
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
@ -434,6 +440,8 @@ def pipeline_sales_dummy(
pipe.fail(res.status)
return _export_on_fail(res.status)
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
return SalesPrognosisResultsExport(
response=res.unwrap(),
status=res.status,

View File

@ -6,7 +6,7 @@ from functools import wraps
from typing import Any, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.logging import logger_wrapped_results as logger
from delta_barth.logging import logger_status, logger_wrapped_results
from delta_barth.types import DataPipeStates, Status
if t.TYPE_CHECKING:
@ -151,23 +151,32 @@ class StatusHandler:
state: Status,
) -> None:
if state == self.SUCCESS:
logger_status.info(
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
)
return
code = state.code
descr = state.description
msg = state.message
exc: Exception
if code < DEFAULT_INTERNAL_ERR_CODE:
raise _construct_exception(UDataProcessingError, descr, msg)
exc = _construct_exception(UDataProcessingError, descr, msg)
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
raise _construct_exception(UInternalError, descr, msg)
exc = _construct_exception(UInternalError, descr, msg)
else:
api_err = state.api_server_error
assert api_err is not None, (
"error code inidcated API error, but no error instance found"
)
add_info = api_err.model_dump(exclude_none=True)
raise _construct_exception(UApiError, descr, msg, add_info)
exc = _construct_exception(UApiError, descr, msg, add_info)
logger_status.error(
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
)
raise exc
STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
@ -229,24 +238,24 @@ def wrap_result(
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
status: ResultWrapper[T]
wrapped_result: ResultWrapper[T]
try:
res = func(*args, **kwargs)
status = ResultWrapper(
wrapped_result = ResultWrapper(
result=res, exception=None, code_on_error=code_on_error
)
except Exception as err:
status = ResultWrapper(
wrapped_result = ResultWrapper(
result=NotSet(), exception=err, code_on_error=code_on_error
)
logger.error(
"An exception in routine %s occurred - msg: %s, stack trace:",
logger_wrapped_results.info(
"[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__,
str(err),
stack_info=True,
)
return status
return wrapped_result
return wrapper

View File

@ -17,21 +17,22 @@ from delta_barth.constants import (
logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
# if not LOG_FILE_FOLDER.exists():
# LOG_FILE_FOLDER.mkdir(parents=True)
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
# ** handlers
NULL_HANDLER = logging.NullHandler()
# ** formatters
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** loggers and configuration
logger_all = logging.getLogger("delta_barth")
# logger_all.addHandler(logger_all_handler_stderr)
# logger_all.addHandler(logger_all_handler_file)
logger_base = logging.getLogger("delta_barth")
logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
logger_wrapped_results.setLevel(logging.DEBUG)
logger_pipelines = logging.getLogger("delta_barth.pipelines")
@ -43,18 +44,15 @@ logger_db.setLevel(logging.DEBUG)
def setup_logging(
logging_dir: Path,
) -> None:
# ** formatters
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** handlers
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
null_handler = logging.NullHandler()
if ENABLE_LOGGING and LOGGING_TO_STDERR:
logger_all_handler_stderr = logging.StreamHandler()
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
logger_all_handler_stderr.setFormatter(logger_all_formater)
logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover
logger_all_handler_stderr = null_handler
logger_all_handler_stderr = NULL_HANDLER
if ENABLE_LOGGING and LOGGING_TO_FILE:
logger_all_handler_file = logging.handlers.RotatingFileHandler(
@ -65,9 +63,17 @@ def setup_logging(
delay=True,
)
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
logger_all_handler_file.setFormatter(logger_all_formater)
logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover
logger_all_handler_file = null_handler
logger_all_handler_file = NULL_HANDLER
logger_all.addHandler(logger_all_handler_stderr)
logger_all.addHandler(logger_all_handler_file)
logger_base.addHandler(logger_all_handler_stderr)
logger_base.addHandler(logger_all_handler_file)
def disable_logging() -> None:
handlers = tuple(logger_base.handlers)
for handler in handlers:
logger_base.removeHandler(handler)
logger_base.addHandler(NULL_HANDLER)

View File

@ -6,6 +6,7 @@ from __future__ import annotations
from typing import Final
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.logging import logger_session as logger
from delta_barth.session import Session
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
@ -14,8 +15,10 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup(
data_path: str,
) -> None: # pragma: no cover
# at this point: no logging configured
SESSION.set_data_path(data_path)
SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
def set_credentials(
@ -24,16 +27,20 @@ def set_credentials(
database: str,
mandant: str,
) -> None: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
SESSION.set_credentials(
username=username,
password=password,
database=database,
mandant=mandant,
)
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
return creds.model_dump_json()

View File

@ -3,6 +3,7 @@
from datetime import datetime as Datetime
from delta_barth.analysis import forecast
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse
@ -11,11 +12,14 @@ def pipeline_sales_forecast(
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json())
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
return export
@ -23,6 +27,7 @@ def pipeline_sales_forecast_dummy(
company_id: int | None,
start_date: Datetime | None,
) -> JsonExportResponse:
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
result = forecast.pipeline_sales_dummy(
SESSION,
company_id=company_id,
@ -30,4 +35,6 @@ def pipeline_sales_forecast_dummy(
)
export = JsonExportResponse(result.model_dump_json())
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
return export

View File

@ -42,6 +42,7 @@ class Session:
db_folder: str = "data",
logging_folder: str = "logs",
) -> None:
self._setup: bool = False
self._data_path: Path | None = None
self._db_path: Path | None = None
self._db_folder = db_folder
@ -55,8 +56,12 @@ class Session:
self._logged_in: bool = False
def setup(self) -> None:
self._setup_db_management()
# at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@property
def data_path(self) -> Path:
@ -70,7 +75,7 @@ class Session:
@property
def db_path(self) -> Path:
if self._db_path is not None:
if self._db_path is not None and self._setup:
return self._db_path
db_root = (self.data_path / self._db_folder).resolve()
@ -80,9 +85,14 @@ class Session:
self._db_path = db_path
return self._db_path
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
@property
def logging_dir(self) -> Path:
if self._logging_dir is not None:
if self._logging_dir is not None and self._setup:
return self._logging_dir
logging_dir = self.data_path / self._logging_folder
@ -91,15 +101,13 @@ class Session:
self._logging_dir = logging_dir
return self._logging_dir
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
def _setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging")
def disable_logging(self) -> None:
delta_barth.logging.disable_logging()
@property
def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set"
@ -110,6 +118,7 @@ class Session:
path: str,
):
self._data_path = validate_path(path)
self._setup = False
def set_credentials(
self,

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import json
import tomllib
from pathlib import Path
from typing import Any, cast
from typing import cast
from unittest.mock import patch
import pandas as pd

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
import delta_barth.session
from delta_barth import logging
from delta_barth.constants import (
DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS,
@ -55,6 +56,8 @@ def test_session_setup_db_management(tmp_path):
assert db_path.parent == target_db_dir
assert not db_path.exists()
session.setup()
db_path2 = session.db_path
assert db_path2 == db_path
assert session._db_engine is not None
assert db_path.exists()
@ -66,6 +69,30 @@ def test_session_setup_logging(tmp_path):
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
log_dir2 = session.logging_dir
assert log_dir2 == log_dir
assert target_file.exists()
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_disable_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
@ -78,6 +105,21 @@ def test_session_setup_logging(tmp_path):
assert not target_file.exists()
session.setup() # calls setup code for logging
assert target_file.exists()
# provoke entry
msg = "this is a test"
logging.logger_base.critical(msg)
session.disable_logging()
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg in last_line.lower()
# log new entry which should not be added as logging is disabled
msg = "this is a second test"
logging.logger_base.critical(msg)
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg not in last_line.lower()
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):