from __future__ import annotations import enum from pathlib import Path from typing import Final import psutil import delta_barth._env from delta_barth.types import DualDict, HttpContentHeaders # ** config CFG_FILENAME: Final[str] = "dopt-cfg.toml" CFG_HOT_RELOAD: Final[bool] = True cpu_count = psutil.cpu_count(logical=False) MAX_NUM_WORKERS: Final[int] = (cpu_count - 1) if cpu_count is not None else 3 # ** lib path lib_path = Path(__file__).parent assert lib_path is not None, "lib path not resolved" LIB_PATH: Final[Path] = lib_path dummy_data_pth = LIB_PATH / "_dummy_data" assert dummy_data_pth.exists(), f"dummy data path not found: {dummy_data_pth}" DUMMY_DATA_PATH: Final[Path] = dummy_data_pth # ** runtime and deployment status RUNTIME_PATH: Final[Path | None] = delta_barth._env.prepare_env(LIB_PATH) deployment_status: bool = False if RUNTIME_PATH is not None: deployment_status = True DEPLOYMENT_STATUS: Final[bool] = deployment_status # ** databases DB_ECHO: Final[bool] = False # ** error handling DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_DB_ERR_CODE: Final[int] = 150 DEFAULT_API_ERR_CODE: Final[int] = 400 HTTP_BASE_CONTENT_HEADERS: Final[HttpContentHeaders] = { "Content-type": "application/json", "Accept": "application/json", } class KnownDelBarApiErrorCodes(enum.Enum): COMMON = frozenset((400, 401, 409, 500)) # ** API API_CON_TIMEOUT: Final[float] = 10.0 # secs to response MAX_LOGIN_RETRIES: Final[int] = 1 # ** API response parsing # ** column mapping [API-Response --> Target-Features] COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( artikelId="artikel_refid", firmaId="firma_refid", betrag="betrag", menge="menge", buchungsDatum="buchungs_datum", ) FEATURES_SALES_PROGNOSIS: Final[frozenset[str]] = frozenset( ( "firma_refid", "betrag", "buchungs_datum", ) ) # ** Pipelines # ** Forecast SALES_MIN_NUM_DATAPOINTS: Final[int] = 36 # !! now in config # TODO remove later till proven stable # SALES_BASE_NUM_DATAPOINTS_MONTHS: Final[int] = 36