generated from dopt-python/py311
refactor and prepare pipeline
This commit is contained in:
168
prototypes/02_integrate_wokflow.py
Normal file
168
prototypes/02_integrate_wokflow.py
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
# %%
|
||||||
|
import datetime
|
||||||
|
import importlib
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import external_code
|
||||||
|
import polars as pl
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
from wattanalyse import db
|
||||||
|
|
||||||
|
importlib.reload(db)
|
||||||
|
importlib.reload(external_code)
|
||||||
|
|
||||||
|
# %%
|
||||||
|
PROJECT_BASE = Path(__file__).parents[1]
|
||||||
|
DATA_PTH = PROJECT_BASE / "data"
|
||||||
|
assert DATA_PTH.exists()
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# // load data
|
||||||
|
target = DATA_PTH / "PSM_20260507.arrow"
|
||||||
|
data_raw = pl.read_ipc(target)
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# // preprocessing I
|
||||||
|
res = external_code.preprocess_psm(data_raw)
|
||||||
|
|
||||||
|
# %%
|
||||||
|
res.filtered
|
||||||
|
# %%
|
||||||
|
data = data_raw.rename(external_code.RENAMING_SCHEME)
|
||||||
|
REGEX_PATTERN = r"^[\s\-#+/$]+$"
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN))
|
||||||
|
.then(None)
|
||||||
|
.otherwise(pl.col(pl.String))
|
||||||
|
.name.keep()
|
||||||
|
)
|
||||||
|
data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t"))
|
||||||
|
print(f"Size of dataset before cleansing: {data.height}")
|
||||||
|
filtered_data = pl.DataFrame(schema=data.schema)
|
||||||
|
# %%
|
||||||
|
# data.filter(pl.col.Meldezeitpunkt_Historie.is_null())
|
||||||
|
# %%
|
||||||
|
# any NULL values in critical columns
|
||||||
|
NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie")
|
||||||
|
conds = [pl.col(col).is_null() for col in NOT_NULL_COLS]
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))])
|
||||||
|
data = data.filter(~pl.any_horizontal(*conds))
|
||||||
|
|
||||||
|
# implausible dates
|
||||||
|
# dates not allowed to be in the future
|
||||||
|
current_datetime = datetime.datetime.now()
|
||||||
|
current_date = current_datetime.date()
|
||||||
|
NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",)
|
||||||
|
NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie")
|
||||||
|
conds = [
|
||||||
|
(pl.col(col) > current_datetime).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATETIME
|
||||||
|
]
|
||||||
|
|
||||||
|
conds.extend(
|
||||||
|
[(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE]
|
||||||
|
)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))])
|
||||||
|
data = data.filter(~pl.any_horizontal(*conds))
|
||||||
|
|
||||||
|
# too much in the future or the past
|
||||||
|
NUMBER_YEARS_UPPER_BOUND_DATES = 4
|
||||||
|
# dates
|
||||||
|
future_limit = current_date + datetime.timedelta(days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES))
|
||||||
|
past_limit = datetime.date(1990, 1, 1)
|
||||||
|
cond = (pl.col(pl.Date) > future_limit).fill_null(False) | (
|
||||||
|
pl.col(pl.Date) < past_limit
|
||||||
|
).fill_null(False)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))])
|
||||||
|
data = data.filter(~pl.any_horizontal(cond))
|
||||||
|
# datetime
|
||||||
|
future_limit = current_datetime + datetime.timedelta(
|
||||||
|
days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES)
|
||||||
|
)
|
||||||
|
past_limit = datetime.datetime(1990, 1, 1)
|
||||||
|
cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | (
|
||||||
|
pl.col(pl.Datetime) < past_limit
|
||||||
|
).fill_null(False)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))])
|
||||||
|
data = data.filter(~pl.any_horizontal(cond))
|
||||||
|
|
||||||
|
print(f"Size of dataset after cleansing: {data.height}")
|
||||||
|
print(f"Filtered data: {filtered_data}")
|
||||||
|
# %%
|
||||||
|
test = pl.DataFrame(
|
||||||
|
{
|
||||||
|
"t1": [0, 1, 3],
|
||||||
|
"t2": [1, None, 3],
|
||||||
|
"t3": [3, 8, None],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
test
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
columns = ["t1", "t2", "t3"]
|
||||||
|
conds = [pl.col(col).is_null() for col in columns]
|
||||||
|
test.filter(pl.any_horizontal(*conds))
|
||||||
|
|
||||||
|
# %%
|
||||||
|
most_occurrences = (
|
||||||
|
data.group_by(["PA", "PA Pos", "Konfektionär"])
|
||||||
|
.agg(pl.len().alias("count"))
|
||||||
|
.sort("count", descending=True)
|
||||||
|
)
|
||||||
|
most_occurrences
|
||||||
|
# %%
|
||||||
|
most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme"))
|
||||||
|
# %%
|
||||||
|
# data = data.filter(
|
||||||
|
# ((pl.col.PA == 15372) & (pl.col("PA Pos") == 10))
|
||||||
|
# | ((pl.col.PA == 16856) & (pl.col("PA Pos") == 10))
|
||||||
|
# ).sort("PSM gemeldet am", descending=False)
|
||||||
|
data = data.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort(
|
||||||
|
"PSM gemeldet am", descending=False
|
||||||
|
)
|
||||||
|
data.select(pl.col.PA.unique())
|
||||||
|
# %%
|
||||||
|
# // simulate time series
|
||||||
|
# this is a sequence how data would be provided: first one entry, and then more additional entries
|
||||||
|
series: list[pl.DataFrame] = []
|
||||||
|
|
||||||
|
for i in range(data.height):
|
||||||
|
series.append(data[: (i + 1)])
|
||||||
|
|
||||||
|
assert len(series) == data.height
|
||||||
|
|
||||||
|
for idx, entry in enumerate(series, start=1):
|
||||||
|
assert idx == entry.height
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# 1. cleanup obtained new data
|
||||||
|
# ~~2. load data from internal database~~
|
||||||
|
# ~~3. integrate with with new data (whole snapshot)~~
|
||||||
|
# 2. process on order level
|
||||||
|
# 3. save results to internal database
|
||||||
|
# 4. post-process results
|
||||||
|
# 5. write to external database
|
||||||
|
|
||||||
|
# // (1) cleanup obtained new data
|
||||||
|
# load data from internal database
|
||||||
|
# integrate with with new data (whole snapshot)
|
||||||
|
|
||||||
|
|
||||||
|
# // (2) processing order level
|
||||||
|
tmp = series[3]
|
||||||
|
tmp
|
||||||
|
# %%
|
||||||
|
df = external_code.process_order_level(tmp)
|
||||||
|
df
|
||||||
|
|
||||||
|
# %%
|
||||||
|
# // (3) save results to internal database
|
||||||
|
external_code.dump_order_level_to_internal_database_wipe(df)
|
||||||
|
# %%
|
||||||
|
# now load data from database
|
||||||
|
df = external_code.load_order_level_from_internal_database()
|
||||||
|
df
|
||||||
|
|
||||||
|
# %%
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
# %%
|
|
||||||
import datetime
|
|
||||||
import json
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
# %%
|
|
||||||
dt = datetime.datetime.now()
|
|
||||||
date = dt.date()
|
|
||||||
|
|
||||||
# %%
|
|
||||||
val = [dt, date]
|
|
||||||
json.dumps(val)
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
def _parse_to_json(value: Any) -> str:
|
|
||||||
if isinstance(value, datetime.date):
|
|
||||||
return value.isoformat()
|
|
||||||
elif isinstance(value, datetime.datetime):
|
|
||||||
return value.isoformat()
|
|
||||||
else:
|
|
||||||
raise TypeError
|
|
||||||
|
|
||||||
|
|
||||||
# %%
|
|
||||||
json.dumps(val, default=_parse_to_json)
|
|
||||||
# %%
|
|
||||||
411
prototypes/external_code.py
Normal file
411
prototypes/external_code.py
Normal file
@@ -0,0 +1,411 @@
|
|||||||
|
import dataclasses as dc
|
||||||
|
import datetime
|
||||||
|
import enum
|
||||||
|
import json
|
||||||
|
from typing import Any, Final
|
||||||
|
|
||||||
|
import polars as pl
|
||||||
|
import sqlalchemy as sql
|
||||||
|
|
||||||
|
from wattanalyse import db
|
||||||
|
|
||||||
|
# 1. cleanup obtained new data
|
||||||
|
# ~~2. load data from internal database~~
|
||||||
|
# ~~3. integrate with with new data (whole snapshot)~~
|
||||||
|
# 2. process on order level
|
||||||
|
# 3. save results to internal database
|
||||||
|
# 4. post-process results
|
||||||
|
# 5. write to external database
|
||||||
|
|
||||||
|
|
||||||
|
@dc.dataclass(slots=True, eq=False)
|
||||||
|
class PreProcessResult:
|
||||||
|
data: pl.DataFrame
|
||||||
|
filtered: pl.DataFrame
|
||||||
|
|
||||||
|
|
||||||
|
class QualityPsm(enum.StrEnum):
|
||||||
|
FEHLEND = enum.auto()
|
||||||
|
UNPLAUSIBEL = enum.auto()
|
||||||
|
PLAUSIBEL = enum.auto()
|
||||||
|
|
||||||
|
|
||||||
|
PSM_SCORES: dict[QualityPsm, int] = {
|
||||||
|
QualityPsm.FEHLEND: 1,
|
||||||
|
QualityPsm.UNPLAUSIBEL: 0,
|
||||||
|
QualityPsm.PLAUSIBEL: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
RENAMING_SCHEME: dict[str, str] = {
|
||||||
|
"PA Pos": "PA_Pos",
|
||||||
|
"PSM gemeldet am": "Meldezeitpunkt_Historie",
|
||||||
|
"Import Ist": "Import-Ist_Historie",
|
||||||
|
"1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie",
|
||||||
|
"Zuschnitt am": "Prod-Start_Historie",
|
||||||
|
"Teile in Zuschnitt": "Prod-EP10_Historie",
|
||||||
|
"Teile im Nähband": "Prod-EP20_Historie",
|
||||||
|
"Fertigware aus Nähband": "Prod-EP30_Historie",
|
||||||
|
"Teile kontrolliert": "Prod-EP40_Historie",
|
||||||
|
"Teile verpackt in Karton": "Prod-EP50_Historie",
|
||||||
|
}
|
||||||
|
|
||||||
|
PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"]
|
||||||
|
|
||||||
|
LOWER_BOUND_DATE_DEVIATION: Final[int] = 0
|
||||||
|
UPPER_BOUND_DATE_DEVIATION: Final[int] = 0
|
||||||
|
NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4
|
||||||
|
|
||||||
|
|
||||||
|
# // (1) preprocess
|
||||||
|
def preprocess_psm(
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> PreProcessResult:
|
||||||
|
data = data.rename(RENAMING_SCHEME)
|
||||||
|
REGEX_PATTERN = r"^[\s\-#+/$]+$"
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN))
|
||||||
|
.then(None)
|
||||||
|
.otherwise(pl.col(pl.String))
|
||||||
|
.name.keep()
|
||||||
|
)
|
||||||
|
data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t"))
|
||||||
|
filtered_data = pl.DataFrame(schema=data.schema)
|
||||||
|
|
||||||
|
# any NULL values in critical columns
|
||||||
|
NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie")
|
||||||
|
conds = [pl.col(col).is_null() for col in NOT_NULL_COLS]
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))])
|
||||||
|
data = data.filter(~pl.any_horizontal(*conds))
|
||||||
|
|
||||||
|
# implausible dates
|
||||||
|
# dates not allowed to be in the future
|
||||||
|
current_datetime = datetime.datetime.now()
|
||||||
|
current_date = current_datetime.date()
|
||||||
|
NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",)
|
||||||
|
NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie")
|
||||||
|
conds = [
|
||||||
|
(pl.col(col) > current_datetime).fill_null(False)
|
||||||
|
for col in NOT_IN_FUTURE_COLS_DATETIME
|
||||||
|
]
|
||||||
|
conds.extend(
|
||||||
|
[(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE]
|
||||||
|
)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))])
|
||||||
|
data = data.filter(~pl.any_horizontal(*conds))
|
||||||
|
|
||||||
|
# too much in the future or the past
|
||||||
|
# dates
|
||||||
|
future_limit = current_date + datetime.timedelta(
|
||||||
|
days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES)
|
||||||
|
)
|
||||||
|
past_limit = datetime.date(1990, 1, 1)
|
||||||
|
cond = (pl.col(pl.Date) > future_limit).fill_null(False) | (
|
||||||
|
pl.col(pl.Date) < past_limit
|
||||||
|
).fill_null(False)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))])
|
||||||
|
data = data.filter(~pl.any_horizontal(cond))
|
||||||
|
# datetimes
|
||||||
|
future_limit = current_datetime + datetime.timedelta(
|
||||||
|
days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES)
|
||||||
|
)
|
||||||
|
past_limit = datetime.datetime(1990, 1, 1)
|
||||||
|
cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | (
|
||||||
|
pl.col(pl.Datetime) < past_limit
|
||||||
|
).fill_null(False)
|
||||||
|
filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))])
|
||||||
|
data = data.filter(~pl.any_horizontal(cond))
|
||||||
|
|
||||||
|
return PreProcessResult(data=data, filtered=filtered_data)
|
||||||
|
|
||||||
|
|
||||||
|
# // (2) process on order level
|
||||||
|
def process_order_level(data: pl.DataFrame) -> pl.DataFrame:
|
||||||
|
# ** renaming
|
||||||
|
# data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing
|
||||||
|
data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||||
|
|
||||||
|
# ** plausibility check of order quantities
|
||||||
|
PLAUSI_FEATURES: list[str] = [
|
||||||
|
"Prod-EP10_Historie",
|
||||||
|
"Prod-EP20_Historie",
|
||||||
|
"Prod-EP30_Historie",
|
||||||
|
"Prod-EP40_Historie",
|
||||||
|
"Prod-EP50_Historie",
|
||||||
|
]
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.all_horizontal(
|
||||||
|
pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0)
|
||||||
|
).alias("is_empty")
|
||||||
|
)
|
||||||
|
conditions = [
|
||||||
|
pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1])
|
||||||
|
for i in range(len(PLAUSI_FEATURES) - 1)
|
||||||
|
]
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.when(pl.all_horizontal(conditions) | pl.col("is_empty"))
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Prod-Qty_is_valid")
|
||||||
|
).with_columns(
|
||||||
|
pl.when(pl.col("is_empty"))
|
||||||
|
.then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND]))
|
||||||
|
.when(pl.col("Prod-Qty_is_valid"))
|
||||||
|
.then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL]))
|
||||||
|
.otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL]))
|
||||||
|
.alias("Prod-Qualitaet_Historie")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last"
|
||||||
|
# aggregate "Prod-Qualitaet_Historie" and use "mean"
|
||||||
|
# need additional "alias" on "Prod-Qualitaet_Historie"
|
||||||
|
|
||||||
|
# ** planned or target delivery date
|
||||||
|
current_date = datetime.datetime.now().date()
|
||||||
|
print(f"{current_date=}")
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias(
|
||||||
|
"Liefertermin_Soll"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first"
|
||||||
|
# first filled field for "Liefertermin Soll" is the relevant target date
|
||||||
|
# should be first confirmed date, but if this field is not filled we use the first
|
||||||
|
# filled import by the supplier
|
||||||
|
|
||||||
|
# ** actual delivery date
|
||||||
|
# logic of Wattana: set date is before current date --> becomes actual value
|
||||||
|
data = data.with_columns(
|
||||||
|
pl.when(pl.col("Import-Ist_Historie") < current_date)
|
||||||
|
.then(pl.col("Import-Ist_Historie"))
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Liefertermin_Ist")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last"
|
||||||
|
# keep last because that is the latest value set by the supplier
|
||||||
|
# if all values are NULL then NULL is returned (no actual date available)
|
||||||
|
|
||||||
|
# ** duration since last report in days
|
||||||
|
data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||||
|
(
|
||||||
|
pl.col("Meldezeitpunkt_Historie")
|
||||||
|
- pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS)
|
||||||
|
)
|
||||||
|
.dt.total_days()
|
||||||
|
.alias("Tage_zu_letzter_PSM_Historie")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt"
|
||||||
|
# aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically)
|
||||||
|
# need additional "alias" on "Tage_zu_letzter_PSM_Historie"
|
||||||
|
|
||||||
|
data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||||
|
# Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position?
|
||||||
|
(
|
||||||
|
pl.col("Import-Ist_Historie")
|
||||||
|
!= pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS)
|
||||||
|
)
|
||||||
|
.fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung
|
||||||
|
.alias("Import-Ist_geaendert")
|
||||||
|
)
|
||||||
|
# aggregate hint for "Import-Ist_geaendert"
|
||||||
|
# aggregate "Import-Ist_geaendert" and use "last"
|
||||||
|
|
||||||
|
# aggregate hint for "Import-Ist_letzter_Wert"
|
||||||
|
# aggregate "Import-Ist_Historie" and use "drop_nulls" "last"
|
||||||
|
# need additional "alias" on "Import-Ist_Historie"
|
||||||
|
|
||||||
|
# aggregate hint for "Import-Ist_Anzahl_Aenderungen"
|
||||||
|
# aggregate "Import-Ist_geaendert" and use "sum"
|
||||||
|
# need additional "alias" on "Import-Ist_geaendert"
|
||||||
|
|
||||||
|
# aggregate hint for "Prod-Start"
|
||||||
|
# aggregate "Prod-Start_Historie" and use "drop_nulls" "first"
|
||||||
|
# first entry should be treated as the truth value, changing later does not make sense
|
||||||
|
# need additional "alias" on "Prod-Start_Historie"
|
||||||
|
|
||||||
|
# whole aggregates see DB schema
|
||||||
|
data = (
|
||||||
|
data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||||
|
.group_by(PRIM_KEYS + ["Konfektionär"])
|
||||||
|
.agg(
|
||||||
|
pl.col("Meldezeitpunkt_Historie"),
|
||||||
|
pl.col("Liefertermin_Soll").drop_nulls().first(),
|
||||||
|
pl.col("Bestaetigter-Import_Historie"),
|
||||||
|
pl.col("Liefertermin_Ist").drop_nulls().last(),
|
||||||
|
pl.col("Import-Ist_Historie"),
|
||||||
|
pl.col("Import-Ist_Historie")
|
||||||
|
.drop_nulls()
|
||||||
|
.last()
|
||||||
|
.alias("Import-Ist_letzter_Wert"),
|
||||||
|
pl.col("Import-Ist_geaendert").last(),
|
||||||
|
pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"),
|
||||||
|
pl.col("Tage_zu_letzter_PSM_Historie"),
|
||||||
|
pl.col("Tage_zu_letzter_PSM_Historie")
|
||||||
|
.mean()
|
||||||
|
.alias("Tage_zu_letzter_PSM_Durchschnitt"),
|
||||||
|
pl.col("Prod-EP10_Historie"),
|
||||||
|
pl.col("Prod-EP20_Historie"),
|
||||||
|
pl.col("Prod-EP30_Historie"),
|
||||||
|
pl.col("Prod-EP40_Historie"),
|
||||||
|
pl.col("Prod-EP50_Historie"),
|
||||||
|
pl.col("Prod-Qualitaet_Historie"),
|
||||||
|
pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"),
|
||||||
|
pl.col("Prod-Start_Historie"),
|
||||||
|
pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# ** order specific aggregates
|
||||||
|
data = (
|
||||||
|
data.with_columns(
|
||||||
|
pl.when(
|
||||||
|
(pl.col("Liefertermin_Ist").is_not_null())
|
||||||
|
& (pl.col("Liefertermin_Soll").is_not_null())
|
||||||
|
)
|
||||||
|
.then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days())
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Terminabweichung_Anzahl_Tage")
|
||||||
|
)
|
||||||
|
.with_columns(
|
||||||
|
pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION)
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Terminunterschreitung"),
|
||||||
|
pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION)
|
||||||
|
.then(pl.lit(True))
|
||||||
|
.otherwise(pl.lit(False))
|
||||||
|
.alias("Terminüberschreitung"),
|
||||||
|
pl.when(
|
||||||
|
(pl.col("Liefertermin_Ist").is_not_null())
|
||||||
|
& (pl.col("Prod-Start").is_not_null())
|
||||||
|
)
|
||||||
|
.then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days())
|
||||||
|
.otherwise(None)
|
||||||
|
.alias("Durchlaufzeit_Anzahl_Tage"),
|
||||||
|
)
|
||||||
|
.with_columns(
|
||||||
|
pl.when(
|
||||||
|
(pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null())
|
||||||
|
& (pl.col("Durchlaufzeit_Anzahl_Tage") < 0)
|
||||||
|
)
|
||||||
|
.then(None)
|
||||||
|
.otherwise(pl.col("Durchlaufzeit_Anzahl_Tage"))
|
||||||
|
.alias("Durchlaufzeit_Anzahl_Tage")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
# // (3) dump order level to internal database
|
||||||
|
def _json_default(
|
||||||
|
value: Any,
|
||||||
|
) -> str:
|
||||||
|
if isinstance(value, (datetime.date, datetime.datetime)):
|
||||||
|
return value.isoformat()
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_to_json(
|
||||||
|
x: pl.Series | None,
|
||||||
|
) -> str | None:
|
||||||
|
if x is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return json.dumps(x.to_list(), default=_json_default)
|
||||||
|
|
||||||
|
|
||||||
|
def dump_order_level_to_internal_database_staging(
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
staging_data = data.with_columns(
|
||||||
|
pl.col(pl.List)
|
||||||
|
.map_elements(
|
||||||
|
_parse_to_json,
|
||||||
|
return_dtype=pl.String,
|
||||||
|
)
|
||||||
|
.name.keep()
|
||||||
|
)
|
||||||
|
rows_inserted = staging_data.write_database(
|
||||||
|
"Produktionsauftrag-Einzelsicht_Staging",
|
||||||
|
connection=db.DB_URI,
|
||||||
|
engine="adbc",
|
||||||
|
if_table_exists="replace",
|
||||||
|
)
|
||||||
|
if rows_inserted != staging_data.height:
|
||||||
|
raise RuntimeError("Number of inserted rows and length of staging data do not match.")
|
||||||
|
|
||||||
|
all_columns = staging_data.columns
|
||||||
|
update_columns = [col for col in all_columns if col not in PRIM_KEYS]
|
||||||
|
|
||||||
|
sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns])
|
||||||
|
sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS])
|
||||||
|
sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns])
|
||||||
|
|
||||||
|
upsert_sql = f"""
|
||||||
|
INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str})
|
||||||
|
SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1
|
||||||
|
ON CONFLICT({sql_pk_list_str}) DO UPDATE SET
|
||||||
|
{sql_update_rules_str};
|
||||||
|
"""
|
||||||
|
|
||||||
|
with db.ENGINE_INTERNAL.begin() as conn:
|
||||||
|
conn.execute(sql.text(upsert_sql))
|
||||||
|
conn.execute(
|
||||||
|
sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def dump_order_level_to_internal_database_wipe(
|
||||||
|
data: pl.DataFrame,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
staging_data = data.with_columns(
|
||||||
|
pl.col(pl.List)
|
||||||
|
.map_elements(
|
||||||
|
_parse_to_json,
|
||||||
|
return_dtype=pl.String,
|
||||||
|
)
|
||||||
|
.name.keep()
|
||||||
|
)
|
||||||
|
# empty table
|
||||||
|
with db.ENGINE_INTERNAL.begin() as conn:
|
||||||
|
conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";'))
|
||||||
|
|
||||||
|
rows_inserted = staging_data.write_database(
|
||||||
|
"Produktionsauftrag-Einzelsicht",
|
||||||
|
connection=db.DB_URI,
|
||||||
|
engine="adbc",
|
||||||
|
if_table_exists="append",
|
||||||
|
)
|
||||||
|
if rows_inserted != staging_data.height:
|
||||||
|
raise RuntimeError("Number of inserted rows and length of staging data do not match.")
|
||||||
|
|
||||||
|
|
||||||
|
# ** load order level data from internal database
|
||||||
|
def load_order_level_from_internal_database() -> pl.DataFrame:
|
||||||
|
data = pl.read_database_uri(
|
||||||
|
'SELECT * FROM "Produktionsauftrag-Einzelsicht"',
|
||||||
|
uri=db.DB_URI,
|
||||||
|
engine="adbc",
|
||||||
|
schema_overrides=db.intern_prod_order_t_schema,
|
||||||
|
)
|
||||||
|
|
||||||
|
list_cols_to_type: dict[str, type[pl.DataType]] = {
|
||||||
|
"Meldezeitpunkt_Historie": pl.Datetime,
|
||||||
|
"Bestaetigter-Import_Historie": pl.Date,
|
||||||
|
"Import-Ist_Historie": pl.Date,
|
||||||
|
"Tage_zu_letzter_PSM_Historie": pl.Int64,
|
||||||
|
"Prod-EP10_Historie": pl.UInt64,
|
||||||
|
"Prod-EP20_Historie": pl.UInt64,
|
||||||
|
"Prod-EP30_Historie": pl.UInt64,
|
||||||
|
"Prod-EP40_Historie": pl.UInt64,
|
||||||
|
"Prod-EP50_Historie": pl.UInt64,
|
||||||
|
"Prod-Qualitaet_Historie": pl.Int32,
|
||||||
|
"Prod-Start_Historie": pl.Date,
|
||||||
|
}
|
||||||
|
|
||||||
|
list_col_parse_conds = {
|
||||||
|
col: pl.col(col).str.json_decode(pl.List(list_type))
|
||||||
|
for col, list_type in list_cols_to_type.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.with_columns(**list_col_parse_conds)
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
from typing import Final
|
||||||
|
|
||||||
|
import polars as pl
|
||||||
import sqlalchemy as sql
|
import sqlalchemy as sql
|
||||||
from sqlalchemy import Column, Table
|
from sqlalchemy import Column, Table
|
||||||
|
|
||||||
@@ -7,7 +10,8 @@ assert constants.Config.DB_PATH_INTERNAL.parent.exists(), (
|
|||||||
"database parent folder does not exists"
|
"database parent folder does not exists"
|
||||||
)
|
)
|
||||||
|
|
||||||
ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_INTERNAL}")
|
DB_URI: Final[str] = f"sqlite:///{constants.Config.DB_PATH_INTERNAL}"
|
||||||
|
ENGINE_INTERNAL: Final[sql.Engine] = sql.create_engine(DB_URI)
|
||||||
|
|
||||||
MD_INTERNAL = sql.MetaData()
|
MD_INTERNAL = sql.MetaData()
|
||||||
|
|
||||||
@@ -43,4 +47,65 @@ intern_prod_order_t: Table = Table(
|
|||||||
Column("Durchlaufzeit_Anzahl_Tage", sql.Float, nullable=True),
|
Column("Durchlaufzeit_Anzahl_Tage", sql.Float, nullable=True),
|
||||||
)
|
)
|
||||||
|
|
||||||
MD_INTERNAL.create_all(ENGINE)
|
intern_prod_order_t_schema: dict[str, type[pl.DataType]] = {
|
||||||
|
"PA": pl.UInt64,
|
||||||
|
"PA_Pos": pl.UInt32,
|
||||||
|
"Konfektionär": pl.String,
|
||||||
|
"Meldezeitpunkt_Historie": pl.String,
|
||||||
|
"Liefertermin_Soll": pl.Date,
|
||||||
|
"Bestaetigter-Import_Historie": pl.String,
|
||||||
|
"Liefertermin_Ist": pl.Date,
|
||||||
|
"Import-Ist_Historie": pl.String,
|
||||||
|
"Import-Ist_letzter_Wert": pl.Date,
|
||||||
|
"Import-Ist_geaendert": pl.Boolean,
|
||||||
|
"Import-Ist_Anzahl_Aenderungen": pl.UInt32,
|
||||||
|
"Tage_zu_letzter_PSM_Historie": pl.String,
|
||||||
|
"Tage_zu_letzter_PSM_Durchschnitt": pl.Float64,
|
||||||
|
"Prod-EP10_Historie": pl.String,
|
||||||
|
"Prod-EP20_Historie": pl.String,
|
||||||
|
"Prod-EP30_Historie": pl.String,
|
||||||
|
"Prod-EP40_Historie": pl.String,
|
||||||
|
"Prod-EP50_Historie": pl.String,
|
||||||
|
"Prod-Qualitaet_Historie": pl.String,
|
||||||
|
"Prod-Qualitaet_Durchschnitt": pl.Float64,
|
||||||
|
"Prod-Start_Historie": pl.String,
|
||||||
|
"Prod-Start": pl.Date,
|
||||||
|
"Terminabweichung_Anzahl_Tage": pl.Int64,
|
||||||
|
"Terminunterschreitung": pl.Boolean,
|
||||||
|
"Terminüberschreitung": pl.Boolean,
|
||||||
|
"Durchlaufzeit_Anzahl_Tage": pl.Int64,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MD_INTERNAL.create_all(ENGINE_INTERNAL)
|
||||||
|
|
||||||
|
extern_prod_order_t_schema: dict[str, type[pl.DataType]] = {
|
||||||
|
"VK Auftrag": pl.UInt32,
|
||||||
|
"Artikelbez.": pl.String,
|
||||||
|
"Auftragsmenge": pl.UInt32,
|
||||||
|
"Kunde": pl.String,
|
||||||
|
"PA": pl.UInt64,
|
||||||
|
"PA Pos": pl.UInt32,
|
||||||
|
"PSM gemeldet am": pl.Datetime,
|
||||||
|
"Konfektionär": pl.String,
|
||||||
|
"Artikelnr.": pl.String,
|
||||||
|
"LT Kunde bestätigt": pl.Date,
|
||||||
|
"Export Ist": pl.Date,
|
||||||
|
"1.bestät. Import Konfektionär": pl.Date,
|
||||||
|
"Import Ist": pl.Date,
|
||||||
|
"Ablief.(Import Ist+Transport)": pl.Date,
|
||||||
|
"Wareneingang am": pl.Date,
|
||||||
|
"Wareneingang geprüft": pl.String,
|
||||||
|
"Täglicher Ausstoss": pl.Int64,
|
||||||
|
"Zuschnitt am": pl.Date,
|
||||||
|
"Teile in Zuschnitt": pl.UInt64,
|
||||||
|
"Teile im Nähband": pl.UInt64,
|
||||||
|
"Fertigware aus Nähband": pl.UInt64,
|
||||||
|
"Teile kontrolliert": pl.UInt64,
|
||||||
|
"Teile verpackt in Karton": pl.UInt64,
|
||||||
|
"Anzahl Bänder": pl.UInt16,
|
||||||
|
"Anzahl Näher": pl.UInt16,
|
||||||
|
"Arbeitsstunden pro Näher": pl.UInt8,
|
||||||
|
"Anzahl Arbeitstage pro Woche": pl.UInt8,
|
||||||
|
"Blockauftrag": pl.String,
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user