From c70bd1cdc673c4c3d96a1e11c80d2e083e685c81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20F=C3=B6rster?= Date: Wed, 19 Feb 2025 12:22:21 +0100 Subject: [PATCH] basic structure and pipeline definition --- src/delta_barth/{workflow.py => _workflow.py} | 0 src/delta_barth/analysis/__init__.py | 0 src/delta_barth/analysis/forecast.py | 117 ++++++++++++++++++ src/delta_barth/constants.py | 1 + src/delta_barth/prognose.py | 65 ---------- src/delta_barth/types.py | 22 ++++ 6 files changed, 140 insertions(+), 65 deletions(-) rename src/delta_barth/{workflow.py => _workflow.py} (100%) create mode 100644 src/delta_barth/analysis/__init__.py create mode 100644 src/delta_barth/analysis/forecast.py create mode 100644 src/delta_barth/constants.py delete mode 100644 src/delta_barth/prognose.py create mode 100644 src/delta_barth/types.py diff --git a/src/delta_barth/workflow.py b/src/delta_barth/_workflow.py similarity index 100% rename from src/delta_barth/workflow.py rename to src/delta_barth/_workflow.py diff --git a/src/delta_barth/analysis/__init__.py b/src/delta_barth/analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/delta_barth/analysis/forecast.py b/src/delta_barth/analysis/forecast.py new file mode 100644 index 0000000..a9b54a5 --- /dev/null +++ b/src/delta_barth/analysis/forecast.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import dataclasses as dc +from typing import TYPE_CHECKING + +import pandas as pd +from sklearn.metrics import mean_squared_error +from xgboost import XGBRegressor + +from delta_barth.types import CustomerDataSalesForecast, FcErrorCodes + +if TYPE_CHECKING: + from delta_barth.types import FcResult + +# TODO check pandera for DataFrame validation + +# ------------------------------------------------------------------------------ +# Input: +# DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp +# kunde (muss enthalten sein in df['firmen']['firma_refid']) + +# Output: +# Integer umsetzung (Prognose möglich): 0 ja, 1 nein (zu wenig Daten verfügbar), +# 2 nein (Daten nicht für Prognose geeignet) +# DataFrame test: Jahr, Monat, Vorhersage +# ------------------------------------------------------------------------------- + + +# Prognose Umsatz je Firma + +# TODO: check usage of separate exception and handle it in API function +# TODO set min number of data points as constant, not parameter + + +def sales_per_customer( + df: pd.DataFrame, + kunde: int, + min_num_data_points: int = 100, +) -> FcResult: + """_summary_ + + Parameters + ---------- + df : pd.DataFrame + Input DF: table "f_umsatz_fakt" + kunde : int + customer ID (FK "firma_ref_ID") + min_num_data_points : int, optional + minimum number of data points to obtain result, by default 100 + + Returns + ------- + FcResult + _description_ + """ + + cust_data: CustomerDataSalesForecast = CustomerDataSalesForecast() + # filter data + # TODO change away from nested DataFrames: just use "f_umsatz_fakt" + # TODO with strong type checks + df = df.copy() + df_firma = df[(df["firma_refid"] == kunde) & (df["beleg_typ"] == 1) & (df["betrag"] > 0)] + + for transaction in df_firma["vorgang_refid"].unique(): + cust_data.order.append(transaction) + cust_data.date.append( + df_firma[df_firma["vorgang_refid"] == transaction]["buchungs_datum"].iloc[0] + ) + cust_data.sales.append( + df_firma[df_firma["vorgang_refid"] == transaction]["betrag"].sum() + ) + + df_cust = pd.DataFrame(dc.asdict(cust_data)) + df_cust = df_cust.sort_values(by="date").reset_index() + + # check data availability + if len(df_cust) < min_num_data_points: + return FcErrorCodes.DATA_TOO_FEW_POINTS, None + else: + # Entwicklung der Umsätze: definierte Zeiträume Monat + df_cust["year"] = df_cust["date"].dt.year + df_cust["month"] = df_cust["date"].dt.month + + monthly_sum = df_cust.groupby(["year", "month"])["sales"].sum().reset_index() + monthly_sum["date"] = ( + monthly_sum["month"].astype(str) + "." + monthly_sum["year"].astype(str) + ) + monthly_sum["date"] = pd.to_datetime(monthly_sum["date"], format="%m.%Y") + monthly_sum = monthly_sum.set_index("date") + + train = monthly_sum.iloc[:-5].copy() + test = monthly_sum.iloc[-5:].copy() + + features = ["year", "month"] + target = "sales" + + X_train, y_train = train[features], train[target] + X_test, y_test = test[features], test[target] + + reg = XGBRegressor( + base_score=0.5, + booster="gbtree", + n_estimators=1000, + early_stopping_rounds=50, + objective="reg:squarederror", + max_depth=3, + learning_rate=0.01, + ) + reg.fit( + X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100 + ) + + test.loc[:, "prediction"] = reg.predict(X_test) + test = test.reset_index(drop=True) + + # umsetzung, prognose + return FcErrorCodes.SUCCESS, test diff --git a/src/delta_barth/constants.py b/src/delta_barth/constants.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/delta_barth/constants.py @@ -0,0 +1 @@ + diff --git a/src/delta_barth/prognose.py b/src/delta_barth/prognose.py deleted file mode 100644 index 79ed39d..0000000 --- a/src/delta_barth/prognose.py +++ /dev/null @@ -1,65 +0,0 @@ -import pandas as pd -from xgboost import XGBRegressor -from sklearn.metrics import mean_squared_error - - - -# ----------------------------------------------------------------------------------------------------------------------------- -# Input: -# DataFrame df mit Columns f_umsatz_fakt, firmen, art, v_warengrp -# kunde (muss enthalten sein in df['firmen']['firma_refid']) - -# Output: -# Integer umsetzung (Prognose möglich): 0 ja, 1 nein (zu wenig Daten verfügbar), 2 nein (Daten nicht für Prognose geeignet) -# DataFrame test: Jahr, Monat, Vorhersage -# ----------------------------------------------------------------------------------------------------------------------------- - - - -# Prognose Umsatz je Firma - -def prognose(df, kunde): - daten = {'Auftrag': [], 'Datum': [], 'Umsatz': []} - df_firma = df['f_umsatz_fakt'][(df['f_umsatz_fakt']['firma_refid'] == kunde) & (df['f_umsatz_fakt']['beleg_typ'] == 1) & (df['f_umsatz_fakt']['betrag'] > 0)] - for auftrag in df_firma['vorgang_refid'].unique(): - daten['Auftrag'].append(auftrag) - daten['Datum'].append(df_firma[df_firma['vorgang_refid'] == auftrag]['buchungs_datum'].iloc[0]) - daten['Umsatz'].append(df_firma[df_firma['vorgang_refid'] == auftrag]['betrag'].sum()) - - daten = pd.DataFrame(daten) - daten = daten.sort_values(by='Datum') - daten = daten.reset_index() - - # Datenverfügbarkeit prüfen - if len(daten) >= 100: - # Entwicklung der Umsätze: definierte Zeiträume Monat - daten['Jahr'] = daten['Datum'].dt.year - daten['Monat'] = daten['Datum'].dt.month - - monthly_sum = daten.groupby(['Jahr', 'Monat'])['Umsatz'].sum().reset_index() - monthly_sum['Datum'] = monthly_sum['Monat'].astype(str) + '.' + monthly_sum['Jahr'].astype(str) - monthly_sum['Datum'] = pd.to_datetime(monthly_sum['Datum'], format='%m.%Y') - monthly_sum = monthly_sum.set_index('Datum') - - train = monthly_sum.iloc[:-5].copy() - test = monthly_sum.iloc[-5:].copy() - - features = ['Jahr', 'Monat'] - target = 'Umsatz' - - X_train, y_train = train[features], train[target] - X_test, y_test = test[features], test[target] - - reg = XGBRegressor(base_score=0.5, booster='gbtree', n_estimators=1000, early_stopping_rounds=50, objective='reg:squarederror', max_depth=3, learning_rate=0.01) - reg.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_test, y_test)], verbose=100) - - test.loc[:, 'Vorhersage'] = reg.predict(X_test) - test = test.reset_index(drop=True) - - # umsetzung, prognose - return 0, test - - # zu wenig Daten verfügbar - else: - # umsetzung, prognose - return 1, None diff --git a/src/delta_barth/types.py b/src/delta_barth/types.py new file mode 100644 index 0000000..81955d0 --- /dev/null +++ b/src/delta_barth/types.py @@ -0,0 +1,22 @@ +import enum +from dataclasses import dataclass, field +from typing import TypeAlias + +import pandas as pd + + +# ** forecasts +@dataclass(slots=True) +class CustomerDataSalesForecast: + order: list[int] = field(default_factory=list) + date: list[pd.Timestamp] = field(default_factory=list) + sales: list[float] = field(default_factory=list) + + +class FcErrorCodes(enum.IntEnum): + SUCCESS = 0 + DATA_TOO_FEW_POINTS = 1 + DATA_BAD_QUALITY = 2 + + +FcResult: TypeAlias = tuple[FcErrorCodes, pd.DataFrame | None]