5 Commits

Author SHA1 Message Date
a0d189ac9f add logging of pipeline metrics in database 2025-04-04 13:37:05 +02:00
6a418118d2 prepare metrics writing process 2025-04-03 16:05:46 +02:00
5d78fc9e02 added handling for API connectivity errors 2025-04-03 12:51:14 +02:00
b93b070682 adapt C# JSON type 2025-04-03 11:22:00 +02:00
30641103ec rework session management: interface to C# 2025-04-03 09:26:56 +02:00
14 changed files with 257 additions and 45 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "delta-barth" name = "delta-barth"
version = "0.5.1" version = "0.5.5"
description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system" description = "workflows and pipelines for the Python-based Plugin of Delta Barth's ERP system"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@@ -73,7 +73,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.5.1" current_version = "0.5.5"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -42,7 +42,11 @@ def delta_barth_api_error() -> str:
def status_err() -> str: def status_err() -> str:
status = Status(code=102, description="internal error occurred", message="caused by test") status = Status(
code=102,
description="internal error occurred: 'Limit-Überschreitung'",
message="caused by test",
)
return status.model_dump_json() return status.model_dump_json()

View File

@@ -26,6 +26,7 @@ from delta_barth.api.requests import (
) )
from delta_barth.constants import ( from delta_barth.constants import (
COL_MAP_SALES_PROGNOSIS, COL_MAP_SALES_PROGNOSIS,
DEFAULT_DB_ERR_CODE,
DUMMY_DATA_PATH, DUMMY_DATA_PATH,
FEATURES_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS,
SALES_BASE_NUM_DATAPOINTS_MONTHS, SALES_BASE_NUM_DATAPOINTS_MONTHS,
@@ -110,7 +111,7 @@ def _parse_df_to_results_wrapped(
return _parse_df_to_results(data) return _parse_df_to_results(data)
@wrap_result() @wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_sales_forecast_stats_wrapped( def _write_sales_forecast_stats_wrapped(
stats: SalesForecastStatistics, stats: SalesForecastStatistics,
) -> None: ) -> None:

View File

@@ -7,6 +7,7 @@ import requests
from dopt_basics.io import combine_route from dopt_basics.io import combine_route
from pydantic import BaseModel, PositiveInt, SkipValidation from pydantic import BaseModel, PositiveInt, SkipValidation
from delta_barth.constants import API_CON_TIMEOUT
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status
@@ -55,7 +56,7 @@ def get_sales_prognosis_data(
company_id: int | None = None, company_id: int | None = None,
start_date: Datetime | None = None, start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]: ) -> tuple[SalesPrognosisResponse, Status]:
resp, status = session.assert_login() _, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS: if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple()) response = SalesPrognosisResponse(daten=tuple())
return response, status return response, status
@@ -67,11 +68,18 @@ def get_sales_prognosis_data(
FirmaId=company_id, FirmaId=company_id,
BuchungsDatum=start_date, BuchungsDatum=start_date,
) )
empty_response = SalesPrognosisResponse(daten=tuple())
try:
resp = requests.get( resp = requests.get(
URL, URL,
params=sales_prog_req.model_dump(mode="json", exclude_none=True), params=sales_prog_req.model_dump(mode="json", exclude_none=True),
headers=session.headers, # type: ignore[argumentType] headers=session.headers, # type: ignore[argumentType]
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException:
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: SalesPrognosisResponse response: SalesPrognosisResponse
status: Status status: Status
@@ -79,7 +87,7 @@ def get_sales_prognosis_data(
response = SalesPrognosisResponse(**resp.json()) response = SalesPrognosisResponse(**resp.json())
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
else: else:
response = SalesPrognosisResponse(daten=tuple()) response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@@ -25,6 +25,7 @@ DB_ECHO: Final[bool] = True
# ** error handling # ** error handling
DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100 DEFAULT_INTERNAL_ERR_CODE: Final[int] = 100
DEFAULT_DB_ERR_CODE: Final[int] = 150
DEFAULT_API_ERR_CODE: Final[int] = 400 DEFAULT_API_ERR_CODE: Final[int] = 400
@@ -38,6 +39,8 @@ class KnownDelBarApiErrorCodes(enum.Enum):
COMMON = frozenset((400, 401, 409, 500)) COMMON = frozenset((400, 401, 409, 500))
# ** API
API_CON_TIMEOUT: Final[float] = 1.0 # secs to response
# ** API response parsing # ** API response parsing
# ** column mapping [API-Response --> Target-Features] # ** column mapping [API-Response --> Target-Features]
COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict( COL_MAP_SALES_PROGNOSIS: Final[DualDict[str, str]] = DualDict(

View File

@@ -22,8 +22,8 @@ perf_meas = sql.Table(
"performance_measurement", "performance_measurement",
metadata, metadata,
sql.Column("id", sql.Integer, primary_key=True), sql.Column("id", sql.Integer, primary_key=True),
sql.Column("execution_duration", sql.Float),
sql.Column("pipeline_name", sql.String(length=30)), sql.Column("pipeline_name", sql.String(length=30)),
sql.Column("execution_duration", sql.Float),
) )
# ** ---- forecasts # ** ---- forecasts
sf_stats = sql.Table( sf_stats = sql.Table(

View File

@@ -53,9 +53,19 @@ class UApiError(Exception):
## ** internal error handling ## ** internal error handling
DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = ( DATA_PIPELINE_STATUS_DESCR: Final[tuple[StatusDescription, ...]] = (
("SUCCESS", 0, "Erfolg"), ("SUCCESS", 0, "Erfolg"),
("TOO_FEW_POINTS", 1, "Datensatz besitzt nicht genügend Datenpunkte"), (
("TOO_FEW_MONTH_POINTS", 2, "nach Aggregation pro Monat nicht genügend Datenpunkte"), "CONNECTION_TIMEOUT",
("NO_RELIABLE_FORECAST", 3, "Prognosequalität des Modells unzureichend"), 1,
"Der Verbindungsaufbau zum API-Server dauerte zu lange. Ist der Server erreichbar?",
),
(
"CONNECTION_ERROR",
2,
"Es ist keine Verbindung zum API-Server möglich. Ist der Server erreichbar?",
),
("TOO_FEW_POINTS", 3, "Datensatz besitzt nicht genügend Datenpunkte"),
("TOO_FEW_MONTH_POINTS", 4, "nach Aggregation pro Monat nicht genügend Datenpunkte"),
("NO_RELIABLE_FORECAST", 5, "Prognosequalität des Modells unzureichend"),
) )

View File

@@ -14,9 +14,11 @@ SESSION: Final[Session] = Session(HTTP_BASE_CONTENT_HEADERS)
def setup( def setup(
data_path: str, data_path: str,
base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
# at this point: no logging configured # at this point: no logging configured
SESSION.set_data_path(data_path) SESSION.set_data_path(data_path)
SESSION.set_base_url(base_url=base_url)
SESSION.setup() SESSION.setup()
logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session") logger.info("[EXT-CALL MANAGEMENT] Successfully set up current session")
@@ -37,6 +39,7 @@ def set_credentials(
logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session") logger.info("[EXT-CALL MANAGEMENT] Successfully set credentials for current session")
# ** not part of external API, only internal
def get_credentials() -> str: # pragma: no cover def get_credentials() -> str: # pragma: no cover
logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...") logger.info("[EXT-CALL MANAGEMENT] Getting credentials for current session...")
creds = SESSION.creds creds = SESSION.creds
@@ -44,12 +47,15 @@ def get_credentials() -> str: # pragma: no cover
return creds.model_dump_json() return creds.model_dump_json()
# ** legacy: not part of external API
def set_base_url( def set_base_url(
base_url: str, base_url: str,
) -> None: # pragma: no cover ) -> None: # pragma: no cover
SESSION.set_base_url(base_url=base_url) SESSION.set_base_url(base_url=base_url)
def get_data_path() -> str: # pragma: no cover
return str(SESSION.data_path)
def get_base_url() -> str: # pragma: no cover def get_base_url() -> str: # pragma: no cover
return SESSION.base_url return SESSION.base_url

View File

@@ -1,24 +1,83 @@
"""collection of configured data pipelines, intended to be invoked from C#""" """collection of configured data pipelines, intended to be invoked from C#"""
import time
from datetime import datetime as Datetime from datetime import datetime as Datetime
from typing import Final
import sqlalchemy as sql
from delta_barth import databases as db
from delta_barth.analysis import forecast from delta_barth.analysis import forecast
from delta_barth.constants import DEFAULT_DB_ERR_CODE
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.logging import logger_pipelines as logger from delta_barth.logging import logger_pipelines as logger
from delta_barth.management import SESSION from delta_barth.management import SESSION
from delta_barth.types import JsonExportResponse from delta_barth.types import JsonExportResponse, PipelineMetrics
def _write_performance_metrics(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
if time_end < time_start:
raise ValueError("Ending time smaller than starting time")
execution_duration = (time_end - time_start) / 1e9
metrics = PipelineMetrics(
pipeline_name=pipeline_name,
execution_duration=execution_duration,
)
with SESSION.db_engine.begin() as con:
con.execute(sql.insert(db.perf_meas).values(**metrics))
return metrics
@wrap_result(code_on_error=DEFAULT_DB_ERR_CODE)
def _write_performance_metrics_wrapped(
pipeline_name: str,
time_start: int,
time_end: int,
) -> PipelineMetrics:
return _write_performance_metrics(pipeline_name, time_start, time_end)
def pipeline_sales_forecast( def pipeline_sales_forecast(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...") logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast( result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date SESSION, company_id=company_id, start_date=start_date
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful") logger.info("[EXT-CALL PIPELINES] Main sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export
@@ -27,14 +86,38 @@ def pipeline_sales_forecast_dummy(
company_id: int | None, company_id: int | None,
start_date: Datetime | None, start_date: Datetime | None,
) -> JsonExportResponse: ) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast_dummy"
logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...") logger.info("[EXT-CALL PIPELINES] Starting dummy sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_dummy( result = forecast.pipeline_sales_dummy(
SESSION, SESSION,
company_id=company_id, company_id=company_id,
start_date=start_date, start_date=start_date,
) )
export = JsonExportResponse(result.model_dump_json()) export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()
logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful") logger.info("[EXT-CALL PIPELINES] Dummy sales forecast pipeline successful")
logger.info("[EXT-CALL PIPELINES] Writing performance metrics...")
res = _write_performance_metrics_wrapped(
pipeline_name=PIPELINE_NAME,
time_start=t_start,
time_end=t_end,
)
if res.status != STATUS_HANDLER.SUCCESS:
logger.error(
(
"[DB-WRITE][METRICS] Pipeline: >%s< - Error on writing "
"pipeline metrics to database: %s"
),
PIPELINE_NAME,
res.status,
)
else:
metrics = res.unwrap()
logger.info(
"[METRICS] Pipeline: >%s< - Execution time: %.6f",
PIPELINE_NAME,
metrics["execution_duration"],
)
return export return export

View File

@@ -14,7 +14,7 @@ from delta_barth.api.common import (
LoginResponse, LoginResponse,
validate_credentials, validate_credentials,
) )
from delta_barth.constants import DB_ECHO from delta_barth.constants import API_CON_TIMEOUT, DB_ECHO
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
from delta_barth.logging import logger_session as logger from delta_barth.logging import logger_session as logger
from delta_barth.types import DelBarApiError, Status from delta_barth.types import DelBarApiError, Status
@@ -191,11 +191,18 @@ class Session:
databaseName=self.creds.database, databaseName=self.creds.database,
mandantName=self.creds.mandant, mandantName=self.creds.mandant,
) )
empty_response = LoginResponse(token="")
try:
resp = requests.put( resp = requests.put(
URL, URL,
login_req.model_dump_json(), login_req.model_dump_json(),
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -204,7 +211,7 @@ class Session:
status = STATUS_HANDLER.pipe_states.SUCCESS status = STATUS_HANDLER.pipe_states.SUCCESS
self._add_session_token(response.token) self._add_session_token(response.token)
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
@@ -216,12 +223,17 @@ class Session:
ROUTE: Final[str] = "user/logout" ROUTE: Final[str] = "user/logout"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
try:
resp = requests.put( resp = requests.put(
URL, URL,
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return None, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response = None
status: Status status: Status
if resp.status_code == 200: if resp.status_code == 200:
status = STATUS_HANDLER.SUCCESS status = STATUS_HANDLER.SUCCESS
@@ -230,7 +242,7 @@ class Session:
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)
return response, status return None, status
def assert_login( def assert_login(
self, self,
@@ -246,11 +258,18 @@ class Session:
ROUTE: Final[str] = "verkauf/umsatzprognosedaten" ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(self.base_url, ROUTE) URL: Final = combine_route(self.base_url, ROUTE)
params: dict[str, int] = {"FirmaId": 999999} params: dict[str, int] = {"FirmaId": 999999}
empty_response = LoginResponse(token="")
try:
resp = requests.get( resp = requests.get(
URL, URL,
params=params, params=params,
headers=self.headers, # type: ignore headers=self.headers, # type: ignore
timeout=API_CON_TIMEOUT,
) )
except requests.exceptions.Timeout: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT
except requests.exceptions.RequestException: # pragma: no cover
return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR
response: LoginResponse response: LoginResponse
status: Status status: Status
@@ -261,7 +280,7 @@ class Session:
self._remove_session_token() self._remove_session_token()
response, status = self.login() response, status = self.login()
else: else:
response = LoginResponse(token="") response = empty_response
err = DelBarApiError(status_code=resp.status_code, **resp.json()) err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err) status = STATUS_HANDLER.api_error(err)

View File

@@ -47,6 +47,8 @@ class ExportResponse(BaseModel):
@dataclass(slots=True) @dataclass(slots=True)
class DataPipeStates: class DataPipeStates:
SUCCESS: Status SUCCESS: Status
CONNECTION_TIMEOUT: Status
CONNECTION_ERROR: Status
TOO_FEW_POINTS: Status TOO_FEW_POINTS: Status
TOO_FEW_MONTH_POINTS: Status TOO_FEW_MONTH_POINTS: Status
NO_RELIABLE_FORECAST: Status NO_RELIABLE_FORECAST: Status
@@ -139,7 +141,13 @@ class Statistics:
pass pass
# ** forecasts # ** ---- performance
class PipelineMetrics(t.TypedDict):
pipeline_name: str
execution_duration: float
# ** ---- forecasts
@dataclass(slots=True) @dataclass(slots=True)
class CustomerDataSalesForecast: class CustomerDataSalesForecast:
order: list[int] = field(default_factory=list) order: list[int] = field(default_factory=list)

View File

@@ -1,8 +1,10 @@
from datetime import datetime as Datetime from datetime import datetime as Datetime
import pytest import pytest
import requests
from delta_barth.api import requests as requests_ from delta_barth.api import requests as requests_
from delta_barth.api.common import LoginResponse
@pytest.mark.api_con_required @pytest.mark.api_con_required
@@ -94,3 +96,31 @@ def test_get_sales_prognosis_data_FailApiServer(session, mock_get):
assert status.api_server_error.message == json["message"] assert status.api_server_error.message == json["message"]
assert status.api_server_error.code == json["code"] assert status.api_server_error.code == json["code"]
assert status.api_server_error.hints == json["hints"] assert status.api_server_error.hints == json["hints"]
def test_get_sales_prognosis_data_FailGetTimeout(session, mock_get):
mock_get.side_effect = requests.exceptions.Timeout("Test timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 1
def test_get_sales_prognosis_data_FailGetRequestException(session, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("Test not timeout")
def assert_login():
return LoginResponse(token=""), requests_.STATUS_HANDLER.SUCCESS
session.assert_login = assert_login
resp, status = requests_.get_sales_prognosis_data(session, None, None)
assert resp is not None
assert len(resp.daten) == 0
assert status.code == 2

View File

@@ -95,7 +95,7 @@ def mock_put():
yield mock yield mock
@pytest.fixture @pytest.fixture(scope="function")
def mock_get(): def mock_get():
with patch("requests.get") as mock: with patch("requests.get") as mock:
yield mock yield mock

View File

@@ -3,19 +3,43 @@ import json
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
import sqlalchemy as sql
import delta_barth.pipelines import delta_barth.pipelines
from delta_barth import databases as db
from delta_barth import pipelines as pl from delta_barth import pipelines as pl
from delta_barth.errors import STATUS_HANDLER from delta_barth.errors import STATUS_HANDLER
def test_write_performance_metrics(session):
pipe_name = "test_pipe"
t_start = 20_000_000_000
t_end = 30_000_000_000
with patch("delta_barth.pipelines.SESSION", session):
metrics = pl._write_performance_metrics(
pipeline_name=pipe_name,
time_start=t_start,
time_end=t_end,
)
assert metrics["pipeline_name"] == pipe_name
assert metrics["execution_duration"] == 10
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == pipe_name
assert metrics.execution_duration == 10
@patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1) @patch("delta_barth.analysis.forecast.SALES_BASE_NUM_DATAPOINTS_MONTHS", 1)
def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp): def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp, session):
with patch( with patch(
"delta_barth.analysis.forecast.get_sales_prognosis_data", "delta_barth.analysis.forecast.get_sales_prognosis_data",
) as mock: ) as mock:
mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS) mock.return_value = (exmpl_api_sales_prognosis_resp, STATUS_HANDLER.SUCCESS)
importlib.reload(delta_barth.pipelines) with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast(None, None) json_export = pl.pipeline_sales_forecast(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
@@ -27,8 +51,17 @@ def test_sales_prognosis_pipeline(exmpl_api_sales_prognosis_resp):
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
def test_sales_prognosis_pipeline_dummy(): metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast"
assert metrics.execution_duration > 0
@pytest.mark.new
def test_sales_prognosis_pipeline_dummy(session):
with patch("delta_barth.pipelines.SESSION", session):
json_export = pl.pipeline_sales_forecast_dummy(None, None) json_export = pl.pipeline_sales_forecast_dummy(None, None)
assert isinstance(json_export, str) assert isinstance(json_export, str)
@@ -43,3 +76,10 @@ def test_sales_prognosis_pipeline_dummy():
assert entry["vorhersage"] == pytest.approx(47261.058594) assert entry["vorhersage"] == pytest.approx(47261.058594)
assert "code" in parsed_resp["status"] assert "code" in parsed_resp["status"]
assert parsed_resp["status"]["code"] == 0 assert parsed_resp["status"]["code"] == 0
with session.db_engine.begin() as con:
ret = con.execute(sql.select(db.perf_meas))
metrics = ret.all()[-1]
assert metrics.pipeline_name == "sales_forecast_dummy"
assert metrics.execution_duration > 0