from __future__ import annotations import dataclasses as dc from collections.abc import Mapping, Set from typing import TYPE_CHECKING import pandas as pd from sklearn.metrics import mean_squared_error from xgboost import XGBRegressor from delta_barth.analysis import parse from delta_barth.constants import COL_MAP_SALES_PROGNOSIS, FEATURES_SALES_PROGNOSIS from delta_barth.types import CustomerDataSalesForecast, FcErrorCodes if TYPE_CHECKING: from delta_barth.api.common import SalesPrognosisResponse from delta_barth.types import FcResult # TODO check pandera for DataFrame validation def parse_api_resp_to_df( resp: SalesPrognosisResponse, ) -> pd.DataFrame: if resp.error is not None: raise ValueError("Response contains error code. Parsing aborted.") data = resp.model_dump()["daten"] return pd.DataFrame(data) # ------------------------------------------------------------------------------ # Input: # DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp # kunde (muss enthalten sein in df['firmen']['firma_refid']) # Output: # Integer umsetzung (Prognose möglich): 0 ja, 1 nein (zu wenig Daten verfügbar), # 2 nein (Daten nicht für Prognose geeignet) # DataFrame test: Jahr, Monat, Vorhersage # ------------------------------------------------------------------------------- # Prognose Umsatz je Firma # TODO: check usage of separate exception and handle it in API function # TODO set min number of data points as constant, not parameter def preprocess_sales_per_customer( resp: SalesPrognosisResponse, feature_map: Mapping[str, str], target_features: Set[str], ) -> pd.DataFrame: df = parse_api_resp_to_df(resp) df = parse.preprocess_features( df, feature_map=feature_map, target_features=target_features, ) return df def sales_per_customer( data: pd.DataFrame, customer_id: int, min_num_data_points: int = 100, ) -> FcResult: """_summary_ Parameters ---------- df : pd.DataFrame Input DF: table "f_umsatz_fakt" kunde : int customer ID (FK "firma_ref_ID") min_num_data_points : int, optional minimum number of data points to obtain result, by default 100 Returns ------- FcResult _description_ """ cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() # filter data # TODO change away from nested DataFrames: just use "f_umsatz_fakt" # TODO with strong type checks data = data.copy() df_firma = data[ (data["firma_refid"] == customer_id) & (data["beleg_typ"] == 1) & (data["betrag"] > 0) ] for transaction in df_firma["vorgang_refid"].unique(): cust_data.order.append(transaction) cust_data.date.append( df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0] ) cust_data.sales.append( df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum() ) df_cust = pd.DataFrame(dc.asdict(cust_data)) df_cust = df_cust.sort_values(by="date").reset_index() # check data availability if len(df_cust) < min_num_data_points: return FcErrorCodes.DATA_TOO_FEW_POINTS, None else: # Entwicklung der Umsätze: definierte Zeiträume Monat df_cust["year"] = df_cust["date"].dt.year df_cust["month"] = df_cust["date"].dt.month monthly_sum = df_cust.groupby(["year", "month"])["sales"].sum().reset_index() monthly_sum["date"] = ( monthly_sum["month"].astype(str) + "." + monthly_sum["year"].astype(str) ) monthly_sum["date"] = pd.to_datetime(monthly_sum["date"], format="%m.%Y") monthly_sum = monthly_sum.set_index("date") train = monthly_sum.iloc[:-5].copy() test = monthly_sum.iloc[-5:].copy() features = ["year", "month"] target = "sales" X_train, y_train = train[features], train[target] X_test, y_test = test[features], test[target] reg = XGBRegressor( base_score=0.5, booster="gbtree", n_estimators=1000, early_stopping_rounds=50, objective="reg:squarederror", max_depth=3, learning_rate=0.01, ) reg.fit( X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100 ) test.loc[:, "prediction"] = reg.predict(X_test) test = test.reset_index(drop=True) # umsetzung, prognose return FcErrorCodes.SUCCESS, test