6 Commits

16 changed files with 381 additions and 81 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.0" version = "0.5.5"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.0" current_version = "0.5.5"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
def status_err() -> str: def status_err() -> str:
status = Status(code=102, description="internal error occurred", message="caused by test") status = Status(
code=102,
description="internal error occurred: 'Limit-Überschreitung'",
message="caused by test",
)
return status.model_dump_json() return status.model_dump_json()

View File

@@ -26,6 +26,7 @@ from delta_barth.api.requests import (
) )
from delta_barth.constants import ( from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS, SALES_BASE_NUM_DATAPOINTS_MONTHS,
@@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data) return _parse_df_to_results(data)
@wrap_result() @wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped( def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics, stats: SalesForecastStatistics,
) -> None: ) -> None:
@@ -353,6 +354,7 @@ def pipeline_sales_forecast(
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data( response, status = get_sales_prognosis_data(
session, session,
company_id=company_id, company_id=company_id,
@@ -413,6 +415,8 @@ def pipeline_sales_forecast(
assert pipe.results is not None, "needed export response not set in pipeline" assert pipe.results is not None, "needed export response not set in pipeline"
logger_pipelines.info("[PIPELINES] Main sales forecast pipeline successful")
return pipe.results return pipe.results
@@ -422,6 +426,9 @@ def pipeline_sales_dummy(
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport: ) -> SalesPrognosisResultsExport:
"""prototype dummy function for tests by DelBar""" """prototype dummy function for tests by DelBar"""
logger_pipelines.info("[PIPELINES] Starting dummy sales forecast pipeline...")
_, _, _ = session, company_id, start_date _, _, _ = session, company_id, start_date
data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl" data_pth = DUMMY_DATA_PATH / "exmp_sales_prognosis_output.pkl"
@@ -434,6 +441,8 @@ def pipeline_sales_dummy(
pipe.fail(res.status) pipe.fail(res.status)
return _export_on_fail(res.status) return _export_on_fail(res.status)
logger_pipelines.info("[PIPELINES] Dummy sales forecast pipeline successful")
return SalesPrognosisResultsExport( return SalesPrognosisResultsExport(
response=res.unwrap(), response=res.unwrap(),
status=res.status, status=res.status,

View File

@@ -7,6 +7,7 @@ import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
@@ -55,7 +56,7 @@ def get_sales_prognosis_data(
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]: ) -> tuple[SalesPrognosisResponse, Status]:
resp, status = session.assert_login() _, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple()) response = SalesPrognosisResponse(daten=tuple())
return response, status return response, status
@@ -67,11 +68,18 @@ def get_sales_prognosis_data(
FirmaId=company_id, FirmaId=company_id,
BuchungsDatum=start_date, BuchungsDatum=start_date,
) )
resp = requests.get( empty_response = SalesPrognosisResponse(daten=tuple())
URL, try:
params=sales_prog_req.model_dump(mode="json", exclude_none=True), resp = requests.get(
headers=session.headers, # type: ignore[argumentType] URL,
) params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: SalesPrognosisResponse response: SalesPrognosisResponse
status: Status status: Status
@@ -79,7 +87,7 @@ def get_sales_prognosis_data(
response = SalesPrognosisResponse(**resp.json()) response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
else: else:
response = SalesPrognosisResponse(daten=tuple()) response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400 DEFAULT_API_ERR_CODE: Final[int] = 400
@@ -38,6 +39,8 @@ class KnownDelBarApiErrorCodes(enum.Enum):
COMMON = frozenset((400, 401, 409, 500)) COMMON = frozenset((400, 401, 409, 500))
# ** API
API_CON_TIMEOUT: Final[float] = 1.0 # secs to response
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(

View File

@@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement", "performance_measurement",
metadata, metadata,
sql.Column("id", sql.Integer, primary_key=True), sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)), sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
) )
# ** ---- forecasts # ** ---- forecasts
sf_stats = sql.Table( sf_stats = sql.Table(

View File

@@ -6,7 +6,7 @@ from functools import wraps
from typing import Any, Final from typing import Any, Final
from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE from delta_barth.constants import DEFAULT_API_ERR_CODE, DEFAULT_INTERNAL_ERR_CODE
from delta_barth.logging import logger_wrapped_results as logger from delta_barth.logging import logger_status, logger_wrapped_results
from delta_barth.types import DataPipeStates, Status from delta_barth.types import DataPipeStates, Status
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
@@ -53,9 +53,19 @@ class UApiError(Exception):
## ** internal error handling ## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"), (
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"), "CONNECTION_TIMEOUT",
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"), 1,
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
),
(
"CONNECTION_ERROR",
2,
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
),
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
) )
@@ -151,23 +161,32 @@ class StatusHandler:
state: Status, state: Status,
) -> None: ) -> None:
if state == self.SUCCESS: if state == self.SUCCESS:
logger_status.info(
"[STATUS] Raise for status - SUCCESS. all good.", stack_info=True
)
return return
code = state.code code = state.code
descr = state.description descr = state.description
msg = state.message msg = state.message
exc: Exception
if code < DEFAULT_INTERNAL_ERR_CODE: if code < DEFAULT_INTERNAL_ERR_CODE:
raise _construct_exception(UDataProcessingError, descr, msg) exc = _construct_exception(UDataProcessingError, descr, msg)
elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE: elif DEFAULT_INTERNAL_ERR_CODE <= code < DEFAULT_API_ERR_CODE:
raise _construct_exception(UInternalError, descr, msg) exc = _construct_exception(UInternalError, descr, msg)
else: else:
api_err = state.api_server_error api_err = state.api_server_error
assert api_err is not None, ( assert api_err is not None, (
"error code inidcated API error, but no error instance found" "error code inidcated API error, but no error instance found"
) )
add_info = api_err.model_dump(exclude_none=True) add_info = api_err.model_dump(exclude_none=True)
raise _construct_exception(UApiError, descr, msg, add_info) exc = _construct_exception(UApiError, descr, msg, add_info)
logger_status.error(
"[STATUS] Raise for status - Error occurred: %s", exc, stack_info=True
)
raise exc
STATUS_HANDLER: Final[StatusHandler] = StatusHandler() STATUS_HANDLER: Final[StatusHandler] = StatusHandler()
@@ -229,24 +248,24 @@ def wrap_result(
def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]: def wrap_result(func: Callable[P, T]) -> Callable[P, ResultWrapper[T]]:
@wraps(func) @wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]: def wrapper(*args: P.args, **kwargs: P.kwargs) -> ResultWrapper[T]:
status: ResultWrapper[T] wrapped_result: ResultWrapper[T]
try: try:
res = func(*args, **kwargs) res = func(*args, **kwargs)
status = ResultWrapper( wrapped_result = ResultWrapper(
result=res, exception=None, code_on_error=code_on_error result=res, exception=None, code_on_error=code_on_error
) )
except Exception as err: except Exception as err:
status = ResultWrapper( wrapped_result = ResultWrapper(
result=NotSet(), exception=err, code_on_error=code_on_error result=NotSet(), exception=err, code_on_error=code_on_error
) )
logger.error( logger_wrapped_results.info(
"An exception in routine %s occurred - msg: %s, stack trace:", "[RESULT-WRAPPER] An exception in routine %s occurred - msg: %s, stack trace:",
func.__name__, func.__name__,
str(err), str(err),
stack_info=True, stack_info=True,
) )
return status return wrapped_result
return wrapper return wrapper

View File

@@ -17,21 +17,22 @@ from delta_barth.constants import (
logging.Formatter.converter = gmtime logging.Formatter.converter = gmtime
LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s" LOG_FMT: Final[str] = "%(asctime)s | lang_main:%(module)s:%(levelname)s | %(message)s"
LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000" LOG_DATE_FMT: Final[str] = "%Y-%m-%d %H:%M:%S +0000"
# LOG_FILE_FOLDER: Final[Path] = LIB_PATH / "logs" # !! configured in SESSION
# if not LOG_FILE_FOLDER.exists():
# LOG_FILE_FOLDER.mkdir(parents=True)
LOGGING_LEVEL_STDERR: Final[int] = logging.INFO LOGGING_LEVEL_STDERR: Final[int] = logging.INFO
LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG LOGGING_LEVEL_FILE: Final[int] = logging.DEBUG
# ** handlers
NULL_HANDLER = logging.NullHandler()
# ** formatters
LOGGER_ALL_FORMATER = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** loggers and configuration # ** loggers and configuration
logger_all = logging.getLogger("delta_barth")
# logger_all.addHandler(logger_all_handler_stderr) logger_base = logging.getLogger("delta_barth")
# logger_all.addHandler(logger_all_handler_file) logger_status = logging.getLogger("delta_barth.status")
logger_status.setLevel(logging.DEBUG)
logger_session = logging.getLogger("delta_barth.session") logger_session = logging.getLogger("delta_barth.session")
logger_session.setLevel(logging.DEBUG) logger_session.setLevel(logging.DEBUG)
logger_management = logging.getLogger("delta_barth.management")
logger_management.setLevel(logging.DEBUG)
logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results") logger_wrapped_results = logging.getLogger("delta_barth.wrapped_results")
logger_wrapped_results.setLevel(logging.DEBUG) logger_wrapped_results.setLevel(logging.DEBUG)
logger_pipelines = logging.getLogger("delta_barth.pipelines") logger_pipelines = logging.getLogger("delta_barth.pipelines")
@@ -43,18 +44,15 @@ logger_db.setLevel(logging.DEBUG)
def setup_logging( def setup_logging(
logging_dir: Path, logging_dir: Path,
) -> None: ) -> None:
# ** formatters
logger_all_formater = logging.Formatter(fmt=LOG_FMT, datefmt=LOG_DATE_FMT)
# ** handlers # ** handlers
LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME LOG_FILE_PATH: Final[Path] = logging_dir / LOG_FILENAME
null_handler = logging.NullHandler()
if ENABLE_LOGGING and LOGGING_TO_STDERR: if ENABLE_LOGGING and LOGGING_TO_STDERR:
logger_all_handler_stderr = logging.StreamHandler() logger_all_handler_stderr = logging.StreamHandler()
logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR) logger_all_handler_stderr.setLevel(LOGGING_LEVEL_STDERR)
logger_all_handler_stderr.setFormatter(logger_all_formater) logger_all_handler_stderr.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_stderr = null_handler logger_all_handler_stderr = NULL_HANDLER
if ENABLE_LOGGING and LOGGING_TO_FILE: if ENABLE_LOGGING and LOGGING_TO_FILE:
logger_all_handler_file = logging.handlers.RotatingFileHandler( logger_all_handler_file = logging.handlers.RotatingFileHandler(
@@ -65,9 +63,17 @@ def setup_logging(
delay=True, delay=True,
) )
logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE) logger_all_handler_file.setLevel(LOGGING_LEVEL_FILE)
logger_all_handler_file.setFormatter(logger_all_formater) logger_all_handler_file.setFormatter(LOGGER_ALL_FORMATER)
else: # pragma: no cover else: # pragma: no cover
logger_all_handler_file = null_handler logger_all_handler_file = NULL_HANDLER
logger_all.addHandler(logger_all_handler_stderr) logger_base.addHandler(logger_all_handler_stderr)
logger_all.addHandler(logger_all_handler_file) logger_base.addHandler(logger_all_handler_file)
def disable_logging() -> None:
handlers = tuple(logger_base.handlers)
for handler in handlers:
logger_base.removeHandler(handler)
logger_base.addHandler(NULL_HANDLER)

View File

@@ -6,6 +6,7 @@ from __future__ import annotations
from typing import Final from typing import Final
from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS from delta_barth.constants import HTTP_BASE_CONTENT_HEADERS
from delta_barth.logging import logger_session as logger
from delta_barth.session import Session from delta_barth.session import Session
SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS) SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
@@ -13,9 +14,13 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup( def setup(
data_path: str, data_path: str,
base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
# at this point: no logging configured
SESSION.set_data_path(data_path) SESSION.set_data_path(data_path)
SESSION.set_base_url(base_url=base_url)
SESSION.setup() SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
def set_credentials( def set_credentials(
@@ -24,25 +29,33 @@ def set_credentials(
database: str, database: str,
mandant: str, mandant: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Setting credentials for current session...")
SESSION.set_credentials( SESSION.set_credentials(
username=username, username=username,
password=password, password=password,
database=database, database=database,
mandant=mandant, mandant=mandant,
) )
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
# ** not part of external API, only internal
def get_credentials() -> str: # pragma: no cover def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds creds = SESSION.creds
logger.info("[EXT-CALL MANAGEMENT] Successfully got credentials for current session")
return creds.model_dump_json() return creds.model_dump_json()
# ** legacy: not part of external API
def set_base_url( def set_base_url(
base_url: str, base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
SESSION.set_base_url(base_url=base_url) SESSION.set_base_url(base_url=base_url)
def get_data_path() -> str: # pragma: no cover
return str(SESSION.data_path)
def get_base_url() -> str: # pragma: no cover def get_base_url() -> str: # pragma: no cover
return SESSION.base_url return SESSION.base_url

View File

@@ -1,20 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#""" """collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast( result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date SESSION, company_id=company_id, start_date=start_date
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export
@@ -23,11 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy( result = forecast.pipeline_sales_dummy(
SESSION, SESSION,
company_id=company_id, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export

View File

@@ -14,7 +14,7 @@ from delta_barth.api.common import (
LoginResponse, LoginResponse,
validate_credentials, validate_credentials,
) )
from delta_barth.constants import DB_ECHO from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status from delta_barth.types import DelBarApiError, Status
@@ -42,6 +42,7 @@ class Session:
db_folder: str = "data", db_folder: str = "data",
logging_folder: str = "logs", logging_folder: str = "logs",
) -> None: ) -> None:
self._setup: bool = False
self._data_path: Path | None = None self._data_path: Path | None = None
self._db_path: Path | None = None self._db_path: Path | None = None
self._db_folder = db_folder self._db_folder = db_folder
@@ -55,8 +56,12 @@ class Session:
self._logged_in: bool = False self._logged_in: bool = False
def setup(self) -> None: def setup(self) -> None:
self._setup_db_management() # at this point: no logging configured
assert not self._setup, "tried to setup session twice"
self._setup_logging() self._setup_logging()
self._setup_db_management()
self._setup = True
logger.info("[SESSION] Setup procedure successful")
@property @property
def data_path(self) -> Path: def data_path(self) -> Path:
@@ -70,7 +75,7 @@ class Session:
@property @property
def db_path(self) -> Path: def db_path(self) -> Path:
if self._db_path is not None: if self._db_path is not None and self._setup:
return self._db_path return self._db_path
db_root = (self.data_path / self._db_folder).resolve() db_root = (self.data_path / self._db_folder).resolve()
@@ -80,9 +85,14 @@ class Session:
self._db_path = db_path self._db_path = db_path
return self._db_path return self._db_path
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
@property @property
def logging_dir(self) -> Path: def logging_dir(self) -> Path:
if self._logging_dir is not None: if self._logging_dir is not None and self._setup:
return self._logging_dir return self._logging_dir
logging_dir = self.data_path / self._logging_folder logging_dir = self.data_path / self._logging_folder
@@ -91,15 +101,13 @@ class Session:
self._logging_dir = logging_dir self._logging_dir = logging_dir
return self._logging_dir return self._logging_dir
def _setup_db_management(self) -> None:
self._db_engine = db.get_engine(self.db_path, echo=DB_ECHO)
db.metadata.create_all(self._db_engine)
logger.info("[SESSION] Successfully setup DB management")
def _setup_logging(self) -> None: def _setup_logging(self) -> None:
delta_barth.logging.setup_logging(self.logging_dir) delta_barth.logging.setup_logging(self.logging_dir)
logger.info("[SESSION] Successfully setup logging") logger.info("[SESSION] Successfully setup logging")
def disable_logging(self) -> None:
delta_barth.logging.disable_logging()
@property @property
def creds(self) -> ApiCredentials: def creds(self) -> ApiCredentials:
assert self._creds is not None, "accessed credentials not set" assert self._creds is not None, "accessed credentials not set"
@@ -110,6 +118,7 @@ class Session:
path: str, path: str,
): ):
self._data_path = validate_path(path) self._data_path = validate_path(path)
self._setup = False
def set_credentials( def set_credentials(
self, self,
@@ -182,11 +191,18 @@ class Session:
databaseName=self.creds.database, databaseName=self.creds.database,
mandantName=self.creds.mandant, mandantName=self.creds.mandant,
) )
resp = requests.put( empty_response = LoginResponse(token="")
URL, try:
login_req.model_dump_json(), resp = requests.put(
headers=self.headers, # type: ignore URL,
) login_req.model_dump_json(),
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -195,7 +211,7 @@ class Session:
status = STATUS_HANDLER.pipe_states.SUCCESS status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token) self._add_session_token(response.token)
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
@@ -207,12 +223,17 @@ class Session:
ROUTE: Final[str] = "user/logout" ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
resp = requests.put( try:
URL, resp = requests.put(
headers=self.headers, # type: ignore URL,
) headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response = None
status: Status status: Status
if resp.status_code == 200: if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
@@ -221,7 +242,7 @@ class Session:
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
return response, status return None, status
def assert_login( def assert_login(
self, self,
@@ -237,11 +258,18 @@ class Session:
ROUTE: Final[str] = "verkauf/umsatzprognosedaten" ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999} params: dict[str, int] = {"FirmaId": 999999}
resp = requests.get( empty_response = LoginResponse(token="")
URL, try:
params=params, resp = requests.get(
headers=self.headers, # type: ignore URL,
) params=params,
headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
)
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -252,7 +280,7 @@ class Session:
self._remove_session_token() self._remove_session_token()
response, status = self.login() response, status = self.login()
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
@dataclass(slots=True) @dataclass(slots=True)
class DataPipeStates: class DataPipeStates:
SUCCESS: Status SUCCESS: Status
CONNECTION_TIMEOUT: Status
CONNECTION_ERROR: Status
TOO_FEW_POINTS: Status TOO_FEW_POINTS: Status
TOO_FEW_MONTH_POINTS: Status TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status NO_RELIABLE_FORECAST: Status
@@ -139,7 +141,13 @@ class Statistics:
pass pass
# ** forecasts # ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)

View File

@@ -1,8 +1,10 @@
from datetime import datetime as Datetime from datetime import datetime as Datetime
import pytest import pytest
import requests
from delta_barth.api import requests as requests_ from delta_barth.api import requests as requests_
from delta_barth.api.common import LoginResponse
@pytest.mark.api_con_required @pytest.mark.api_con_required
@@ -94,3 +96,31 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
assert status.api_server_error.message == json["message"] assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"] assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 1
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 2

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import json import json
import tomllib import tomllib
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import cast
from unittest.mock import patch from unittest.mock import patch
import pandas as pd import pandas as pd
@@ -95,7 +95,7 @@ def mock_put():
yield mock yield mock
@pytest.fixture @pytest.fixture(scope="function")
def mock_get(): def mock_get():
with patch("requests.get") as mock: with patch("requests.get") as mock:
yield mock yield mock

View File

@@ -3,20 +3,44 @@ import json
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql
import delta_barth.pipelines import delta_barth.pipelines
from delta_barth import databases as db
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session):
pipe_name = "test_pipe"
t_start = 20_000_000_000
t_end = 30_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
metrics = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
with patch( with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock: ) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS) mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
importlib.reload(delta_barth.pipelines) with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast(None, None) json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
parsed_resp = json.loads(json_export) parsed_resp = json.loads(json_export)
@@ -27,9 +51,18 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
def test_sales_prognosis_pipeline_dummy(): metrics = ret.all()[-1]
json_export = pl.pipeline_sales_forecast_dummy(None, None) assert metrics.pipeline_name == "sales_forecast"
assert metrics.execution_duration > 0
@pytest.mark.new
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
parsed_resp = json.loads(json_export) parsed_resp = json.loads(json_export)
@@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy():
assert entry["vorhersage"] == pytest.approx(47261.058594) assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0

View File

@@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest import pytest
import delta_barth.session import delta_barth.session
from delta_barth import logging
from delta_barth.constants import ( from delta_barth.constants import (
DEFAULT_API_ERR_CODE, DEFAULT_API_ERR_CODE,
HTTP_BASE_CONTENT_HEADERS, HTTP_BASE_CONTENT_HEADERS,
@@ -55,6 +56,8 @@ def test_session_setup_db_management(tmp_path):
assert db_path.parent == target_db_dir assert db_path.parent == target_db_dir
assert not db_path.exists() assert not db_path.exists()
session.setup() session.setup()
db_path2 = session.db_path
assert db_path2 == db_path
assert session._db_engine is not None assert session._db_engine is not None
assert db_path.exists() assert db_path.exists()
@@ -66,6 +69,30 @@ def test_session_setup_logging(tmp_path):
foldername: str = "logging_test" foldername: str = "logging_test"
target_log_dir = tmp_path / foldername target_log_dir = tmp_path / foldername
session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
)
session.set_data_path(str_path)
log_dir = session.logging_dir
assert log_dir.exists()
assert log_dir == target_log_dir
# write file
target_file = target_log_dir / LOG_FILENAME
assert not target_file.exists()
session.setup() # calls setup code for logging
log_dir2 = session.logging_dir
assert log_dir2 == log_dir
assert target_file.exists()
@patch("delta_barth.logging.ENABLE_LOGGING", True)
@patch("delta_barth.logging.LOGGING_TO_FILE", True)
def test_session_disable_logging(tmp_path):
str_path = str(tmp_path)
foldername: str = "logging_test"
target_log_dir = tmp_path / foldername
session = delta_barth.session.Session( session = delta_barth.session.Session(
HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername HTTP_BASE_CONTENT_HEADERS, logging_folder=foldername
) )
@@ -78,6 +105,21 @@ def test_session_setup_logging(tmp_path):
assert not target_file.exists() assert not target_file.exists()
session.setup() # calls setup code for logging session.setup() # calls setup code for logging
assert target_file.exists() assert target_file.exists()
# provoke entry
msg = "this is a test"
logging.logger_base.critical(msg)
session.disable_logging()
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg in last_line.lower()
# log new entry which should not be added as logging is disabled
msg = "this is a second test"
logging.logger_base.critical(msg)
with open(target_file, "r") as file:
content = file.readlines()
last_line = content[-1]
assert msg not in last_line.lower()
def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url): def test_session_set_ApiInfo_LoggedOut(credentials, api_base_url):