from __future__ import annotations from datetime import datetime as Datetime from typing import TYPE_CHECKING, Final import requests from dopt_basics.io import combine_route from pydantic import BaseModel, PositiveInt, SkipValidation from delta_barth.constants import API_CON_TIMEOUT from delta_barth.errors import STATUS_HANDLER from delta_barth.types import DelBarApiError, ExportResponse, ResponseType, Status if TYPE_CHECKING: from delta_barth.session import Session # ** sales data # ** import class SalesPrognosisRequestP(BaseModel): FirmaId: SkipValidation[int | None] BuchungsDatum: SkipValidation[Datetime | None] class SalesPrognosisResponseEntry(ResponseType): artikelId: PositiveInt warengruppeId: PositiveInt firmaId: PositiveInt betrag: float # negative values are filtered out later menge: float # reasons for negative values unknown buchungsDatum: Datetime class SalesPrognosisResponse(ResponseType): daten: tuple[SalesPrognosisResponseEntry, ...] # ** export class SalesPrognosisResult(BaseModel): jahr: PositiveInt monat: PositiveInt vorhersage: float class SalesPrognosisResults(BaseModel): daten: tuple[SalesPrognosisResult, ...] class SalesPrognosisResultsExport(ExportResponse): response: SalesPrognosisResults status: Status def get_sales_prognosis_data( session: Session, company_id: int | None = None, start_date: Datetime | None = None, ) -> tuple[SalesPrognosisResponse, Status]: _, status = session.assert_login() if status != STATUS_HANDLER.SUCCESS: response = SalesPrognosisResponse(daten=tuple()) return response, status ROUTE: Final[str] = "verkauf/umsatzprognosedaten" URL: Final = combine_route(session.base_url, ROUTE) sales_prog_req = SalesPrognosisRequestP( FirmaId=company_id, BuchungsDatum=start_date, ) empty_response = SalesPrognosisResponse(daten=tuple()) try: resp = requests.get( URL, params=sales_prog_req.model_dump(mode="json", exclude_none=True), headers=session.headers, # type: ignore[argumentType] timeout=API_CON_TIMEOUT, ) except requests.exceptions.Timeout: return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_TIMEOUT except requests.exceptions.RequestException: return empty_response, STATUS_HANDLER.pipe_states.CONNECTION_ERROR response: SalesPrognosisResponse status: Status if resp.status_code == 200: response = SalesPrognosisResponse(**resp.json()) status = STATUS_HANDLER.SUCCESS else: response = empty_response err = DelBarApiError(status_code=resp.status_code, **resp.json()) status = STATUS_HANDLER.api_error(err) return response, status