refactoring and adding test cases for test coverage

This commit is contained in:
2025-03-14 11:55:29 +01:00
parent c3b8d104b0
commit 28b3b8d144
7 changed files with 191 additions and 31 deletions

View File

@@ -1,81 +0,0 @@
import pandas as pd
from sklearn.metrics import mean_squared_error
from xgboost import XGBRegressor
# -----------------------------------------------------------------------------------------------------------------------------
# Input:
# DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp
# kunde (muss enthalten sein in df['firmen']['firma_refid'])
# Output:
# Integer umsetzung (Prognose möglich): 0 ja, 1 nein (zu wenig Daten verfügbar), 2 nein (Daten nicht für Prognose geeignet)
# DataFrame test: Jahr, Monat, Vorhersage
# -----------------------------------------------------------------------------------------------------------------------------
# Prognose Umsatz je Firma
def prognose(df, kunde):
daten = {'Auftrag': [], 'Datum': [], 'Umsatz': []}
df_firma = df['f_umsatz_fakt'][
(df['f_umsatz_fakt']['firma_refid'] == kunde)
& (df['f_umsatz_fakt']['beleg_typ'] == 1)
& (df['f_umsatz_fakt']['betrag'] > 0)
]
for auftrag in df_firma['vorgang_refid'].unique():
daten['Auftrag'].append(auftrag)
daten['Datum'].append(
df_firma[df_firma['vorgang_refid'] == auftrag]['buchungs_datum'].iloc[0]
)
daten['Umsatz'].append(df_firma[df_firma['vorgang_refid'] == auftrag]['betrag'].sum())
daten = pd.DataFrame(daten)
daten = daten.sort_values(by='Datum')
daten = daten.reset_index()
# Datenverfügbarkeit prüfen
if len(daten) >= 100:
# Entwicklung der Umsätze: definierte Zeiträume Monat
daten['Jahr'] = daten['Datum'].dt.year
daten['Monat'] = daten['Datum'].dt.month
monthly_sum = daten.groupby(['Jahr', 'Monat'])['Umsatz'].sum().reset_index()
monthly_sum['Datum'] = (
monthly_sum['Monat'].astype(str) + '.' + monthly_sum['Jahr'].astype(str)
)
monthly_sum['Datum'] = pd.to_datetime(monthly_sum['Datum'], format='%m.%Y')
monthly_sum = monthly_sum.set_index('Datum')
train = monthly_sum.iloc[:-5].copy()
test = monthly_sum.iloc[-5:].copy()
features = ['Jahr', 'Monat']
target = 'Umsatz'
X_train, y_train = train[features], train[target]
X_test, y_test = test[features], test[target]
reg = XGBRegressor(
base_score=0.5,
booster='gbtree',
n_estimators=1000,
early_stopping_rounds=50,
objective='reg:squarederror',
max_depth=3,
learning_rate=0.01,
)
reg.fit(
X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100
)
test.loc[:, 'Vorhersage'] = reg.predict(X_test)
test = test.reset_index(drop=True)
# umsetzung, prognose
return 0, test
# zu wenig Daten verfügbar
else:
# umsetzung, prognose
return 1, None

View File

@@ -23,7 +23,6 @@ from delta_barth.constants import (
)
from delta_barth.errors import STATUS_HANDLER, wrap_result
from delta_barth.types import (
CustomerDataSalesForecast,
DualDict,
PipeResult,
)
@@ -328,6 +327,6 @@ def pipeline_sales_dummy(
session: Session,
company_id: int | None = None,
start_date: Datetime | None = None,
) -> SalesPrognosisResultsExport:
) -> SalesPrognosisResultsExport: # pragma: no cover
"""prototype dummy function for tests by DelBar"""
...

View File

@@ -173,7 +173,7 @@ class Session:
elif resp.status_code == 401:
self._remove_session_token()
response, status = self.login()
else: # pragma: no cover
else:
response = LoginResponse(token="")
err = DelBarApiError(status_code=resp.status_code, **resp.json())
status = STATUS_HANDLER.api_error(err)

View File

@@ -54,7 +54,11 @@ def get_sales_prognosis_data(
company_id: int | None = None,
start_date: Datetime | None = None,
) -> tuple[SalesPrognosisResponse, Status]:
session.assert_login()
resp, status = session.assert_login()
if status != STATUS_HANDLER.SUCCESS:
response = SalesPrognosisResponse(daten=tuple())
return response, status
ROUTE: Final[str] = "verkauf/umsatzprognosedaten"
URL: Final = combine_route(session.base_url, ROUTE)