add option for multiple company IDs, related to #26

This commit was merged in pull request #29.
This commit is contained in:
2025-04-30 14:46:23 +02:00
parent 248b811786
commit 690431472c
7 changed files with 24 additions and 26 deletions

View File

@@ -403,13 +403,13 @@ def _export_on_fail(
def pipeline_sales_forecast(
session: Session,
company_id: int | None = None,
company_ids: list[int] | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
logger_pipelines.info("[PIPELINES] Starting main sales forecast pipeline...")
response, status = get_sales_prognosis_data(
session,
company_id=company_id,
company_ids=company_ids,
start_date=start_date,
)
if status != STATUS_HANDLER.SUCCESS:

View File

@@ -20,7 +20,7 @@ if TYPE_CHECKING:
# ** sales data
# ** import
class SalesPrognosisRequestP(BaseModel):
FirmaId: SkipValidation[int | None]
FirmaIds: SkipValidation[list[int] | None]
BuchungsDatum: SkipValidation[Datetime | None]
@@ -55,20 +55,14 @@ class SalesPrognosisResultsExport(ExportResponse):
def get_sales_prognosis_data(
session: Session,
company_id: int | None = None,
company_ids: list[int] | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
# TODO check elimination of assertion for login, #25
# _, status = session.assert_login()
# if status != STATUS_HANDLER.SUCCESS:
# response = SalesPrognosisResponse(daten=tuple())
# return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE)
sales_prog_req = SalesPrognosisRequestP(
FirmaId=company_id,
FirmaIds=company_ids,
BuchungsDatum=start_date,
)
empty_response = SalesPrognosisResponse(daten=tuple())

View File

@@ -50,7 +50,7 @@ class KnownDelBarApiErrorCodes(enum.Enum):
# ** API
API_CON_TIMEOUT: Final[float] = 10.0 # secs to response
API_CON_TIMEOUT: Final[float] = 20.0 # secs to response
MAX_LOGIN_RETRIES: Final[int] = 1
# ** API response parsing

View File

@@ -44,14 +44,14 @@ def _write_performance_metrics_wrapped(
def pipeline_sales_forecast(
company_id: int | None,
company_ids: list[int] | None,
start_date: Datetime | None,
) -> JsonExportResponse:
PIPELINE_NAME: Final[str] = "sales_forecast"
logger.info("[EXT-CALL PIPELINES] Starting main sales forecast pipeline...")
t_start = time.perf_counter_ns()
result = forecast.pipeline_sales_forecast(
SESSION, company_id=company_id, start_date=start_date
SESSION, company_ids=company_ids, start_date=start_date
)
export = JsonExportResponse(result.model_dump_json())
t_end = time.perf_counter_ns()