refactoring and enhanced user configuration

This commit is contained in:
2026-06-11 09:46:14 +02:00
parent 2a6777becc
commit 7f864bc76b
5 changed files with 94 additions and 83 deletions

View File

@@ -1,6 +1,17 @@
[Datenbank]
NUTZER = "WATTANA"
PASSWORT = "MyWattanaPassword123"
HOST = "localhost"
PORT = 1521
SERVICE_NAME = "FREEPDB1"
Nutzer = "WATTANA"
Passwort = "MyWattanaPassword123"
Host = "localhost"
Port = 1521
Service_Name = "FREEPDB1"
Tabellenname_Produktionsstandmeldung = "EXTERN_PSM" # Datenbanktabelle mit den Produktionsstandmeldungen
Tabellenname_MIS_Auftraege = "EXTERN_MIS" # Datenbanktabelle mit den MIS-Aufträgen
[Datenpipelines_PSM]
Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig = 4 # prüft bei der Vorverarbeitung, ob Datumsangaben über diesen Horizont hinaus vorliegen; diese werden entfernt
Terminabweichung_untere_Schranke = 0 # Anzahl an Tagen
Terminabweichung_obere_Schranke = 0 # Anzahl an Tagen
Nutze_Schranken_Terminabweichung_KPI_Berechnung = true # bei "false" wird 0 als Grenze (oben + unten) angenommen
Score_Qualitaet_Produktionsmengen_fehlend = 1 # Score, wenn durch den Konfektionär die Produktionsmengen gar nicht gepflegt werden
Score_Qualitaet_Produktionsmengen_unplausibel = 0 # Score, wenn durch den Konfektionär die Produktionsmengen nicht plausibel gepflegt werden
Score_Qualitaet_Produktionsmengen_plausibel = 2 # Score, wenn durch den Konfektionär die Produktionsmengen sauber gepflegt werden

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import dataclasses as dc
import datetime
import enum
import json
import warnings
from typing import TYPE_CHECKING, Any, Final, TypeAlias, cast
@@ -12,21 +11,13 @@ import sqlalchemy as sql
from dopt_basics.datastructures import flatten
from wattanalyse import db
from wattanalyse.constants import QualityPsm
from wattanalyse.types import SqlInsertStmts, SqlStatement
if TYPE_CHECKING:
from oracledb import Connection as OracleConnection
from polars._typing import SchemaDict
# 1. cleanup obtained new data
# ~~2. load data from internal database~~
# ~~3. integrate with with new data (whole snapshot)~~
# 2. process on order level
# 3. save results to internal database
# 4. post-process results
# 5. write to external database
SqlStatement: TypeAlias = str
@dc.dataclass(slots=True, eq=False)
class PreProcessResult:
@@ -36,22 +27,10 @@ class PreProcessResult:
DROP_COLUMNS: Final[list[str]] = cast(
list[str],
list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))),
list(flatten(((x.lower(), x.upper(), x.capitalize()) for x in ("id", "index", "idx")))), # type: ignore
)
@dc.dataclass(slots=True, kw_only=True)
class SqlInsertStmts:
delete: str
insert: str
class QualityPsm(enum.StrEnum):
FEHLEND = enum.auto()
UNPLAUSIBEL = enum.auto()
PLAUSIBEL = enum.auto()
PSM_SCORES: dict[QualityPsm, int] = {
QualityPsm.FEHLEND: 1,
QualityPsm.UNPLAUSIBEL: 0,
@@ -81,6 +60,16 @@ NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4
TAB_NAME_PSM: Final[str] = "EXTERN_PSM"
TAB_NAME_MIS: Final[str] = "EXTERN_MIS"
USE_BOUNDARIES: Final[bool] = False
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
# // (0) load data
def load_PSM_data(
@@ -458,18 +447,6 @@ def load_order_level_from_internal_database() -> pl.DataFrame:
# // (4) post-process results
USE_BOUNDARIES: Final[bool] = False
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
def aggregate_production_orders(
data: pl.LazyFrame,
) -> pl.LazyFrame:

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import enum
import os
from pathlib import Path
from typing import Final
@@ -10,14 +11,13 @@ from dopt_basics import io as io_
from wattanalyse import types as t
# PROJECT_ROOT = Path(__file__).resolve().parents[2]
LIB_PATH: Final[Path] = Path(__file__).resolve().parent
BASE_PATH = io_.search_folder_path(
LIB_PATH, stop_folder_name=os.getenv("DOPT_STOP_FOLDER_NAME", "python")
)
assert BASE_PATH
assert BASE_PATH, "base path not found"
class Config:
@@ -30,8 +30,17 @@ class Config:
user_cfg = configs.load_toml(Config.PTH_USER_CFG)
USER_CFG: t.UserConfig = t.UserConfig(
Datenbank=t.UserConfig_Datenbank(**user_cfg["Datenbank"])
Datenbank=t.UserConfig_Datenbank(**user_cfg["Datenbank"]),
Datenpipelines_PSM=t.UserConfig_Pipelines_PSM(**user_cfg["Datenpipelines_PSM"]),
)
# ** DB interaction
oracledb.defaults.arraysize = 1000
oracledb.defaults.prefetchrows = 1000
# ** pipelines
class QualityPsm(enum.StrEnum):
FEHLEND = enum.auto()
UNPLAUSIBEL = enum.auto()
PLAUSIBEL = enum.auto()

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import dataclasses as dc
import datetime
import enum
import json
import warnings
from typing import TYPE_CHECKING, Any, Final, cast
@@ -13,8 +12,9 @@ from dopt_basics.datastructures import flatten
from dopt_basics.result_pattern import STATUS_HANDLER, Status, wrap_result
from wattanalyse import db
from wattanalyse.constants import USER_CFG, QualityPsm
from wattanalyse.logging import logger_pipeline as logger
from wattanalyse.types import SqlStatement
from wattanalyse.types import SqlInsertStmts, SqlStatement
if TYPE_CHECKING:
from oracledb import Connection as OracleConnection
@@ -33,22 +33,10 @@ DROP_COLUMNS: Final[list[str]] = cast(
)
@dc.dataclass(slots=True, kw_only=True)
class SqlInsertStmts:
delete: str
insert: str
class QualityPsm(enum.StrEnum):
FEHLEND = enum.auto()
UNPLAUSIBEL = enum.auto()
PLAUSIBEL = enum.auto()
PSM_SCORES: dict[QualityPsm, int] = {
QualityPsm.FEHLEND: 1,
QualityPsm.UNPLAUSIBEL: 0,
QualityPsm.PLAUSIBEL: 2,
QualityPsm.FEHLEND: USER_CFG.Datenpipelines_PSM.Score_Qualitaet_Produktionsmengen_fehlend,
QualityPsm.UNPLAUSIBEL: USER_CFG.Datenpipelines_PSM.Score_Qualitaet_Produktionsmengen_unplausibel,
QualityPsm.PLAUSIBEL: USER_CFG.Datenpipelines_PSM.Score_Qualitaet_Produktionsmengen_plausibel,
}
RENAMING_SCHEME_PSM: dict[str, str] = {
@@ -68,11 +56,29 @@ RENAMING_SCHEME_PSM: dict[str, str] = {
PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"]
LOWER_BOUND_DATE_DEVIATION: Final[int] = 0
UPPER_BOUND_DATE_DEVIATION: Final[int] = 0
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4
TAB_NAME_PSM: Final[str] = "EXTERN_PSM"
TAB_NAME_MIS: Final[str] = "EXTERN_MIS"
LOWER_BOUND_DATE_DEVIATION: Final[int] = (
USER_CFG.Datenpipelines_PSM.Terminabweichung_untere_Schranke
)
UPPER_BOUND_DATE_DEVIATION: Final[int] = (
USER_CFG.Datenpipelines_PSM.Terminabweichung_obere_Schranke
)
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = (
USER_CFG.Datenpipelines_PSM.Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig
)
TAB_NAME_PSM: Final[str] = USER_CFG.Datenbank.Tabellenname_Produktionsstandmeldung
TAB_NAME_MIS: Final[str] = USER_CFG.Datenbank.Tabellenname_MIS_Auftraege
USE_BOUNDARIES: Final[bool] = (
USER_CFG.Datenpipelines_PSM.Nutze_Schranken_Terminabweichung_KPI_Berechnung
)
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
# // (10) load data
@@ -457,18 +463,6 @@ def load_order_level_from_internal_database() -> pl.DataFrame:
# // (50) post-process results
USE_BOUNDARIES: Final[bool] = False
filter_date_deviation_early: pl.Expr
filter_date_deviation_late: pl.Expr
if USE_BOUNDARIES:
filter_date_deviation_early = pl.col("Terminunterschreitung")
filter_date_deviation_late = pl.col("Terminüberschreitung")
else:
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
@wrap_result(code_on_error=51)
def aggregate_production_orders(
data: pl.LazyFrame,

View File

@@ -8,13 +8,33 @@ SqlStatement: TypeAlias = str
@dc.dataclass(kw_only=True, slots=True)
class UserConfig_Datenbank:
NUTZER: str
PASSWORT: str
HOST: str
PORT: int
SERVICE_NAME: str
Nutzer: str
Passwort: str
Host: str
Port: int
Service_Name: str
Tabellenname_Produktionsstandmeldung: str
Tabellenname_MIS_Auftraege: str
@dc.dataclass(kw_only=True, slots=True)
class UserConfig_Pipelines_PSM:
Vorverarbeitung_Anzahl_Jahre_in_Zukunft_zulaessig: int
Terminabweichung_untere_Schranke: int
Terminabweichung_obere_Schranke: int
Nutze_Schranken_Terminabweichung_KPI_Berechnung: bool
Score_Qualitaet_Produktionsmengen_fehlend: int
Score_Qualitaet_Produktionsmengen_unplausibel: int
Score_Qualitaet_Produktionsmengen_plausibel: int
@dc.dataclass(kw_only=True, slots=True)
class UserConfig:
Datenbank: UserConfig_Datenbank
Datenpipelines_PSM: UserConfig_Pipelines_PSM
@dc.dataclass(slots=True, kw_only=True)
class SqlInsertStmts:
delete: str
insert: str