generated from dopt-python/py311
278 lines
8.2 KiB
Python
278 lines
8.2 KiB
Python
# %%
|
|
import datetime
|
|
import importlib
|
|
from pathlib import Path
|
|
|
|
import external_code
|
|
import polars as pl
|
|
import sqlalchemy as sql
|
|
|
|
from wattanalyse import db
|
|
|
|
# %%
|
|
importlib.reload(db)
|
|
importlib.reload(external_code)
|
|
# %%
|
|
PROJECT_BASE = Path(__file__).parents[1]
|
|
DATA_PTH = PROJECT_BASE / "data"
|
|
assert DATA_PTH.exists()
|
|
|
|
# %%
|
|
# // load data
|
|
target = DATA_PTH / "PSM_20260507.arrow"
|
|
data_raw = pl.scan_ipc(target)
|
|
|
|
# %%
|
|
# 0. read data (from customer's database)
|
|
# 1. cleanup obtained new data
|
|
# ~~2. load data from internal database~~
|
|
# ~~3. integrate with with new data (whole snapshot)~~
|
|
# 2. process on order level
|
|
# 3. save results to internal database
|
|
# 4. post-process results
|
|
# 5. write to external database
|
|
|
|
# // (1) cleanup obtained new data
|
|
# load data from internal database
|
|
# integrate with with new data (whole snapshot)
|
|
res = external_code.preprocess_psm(data_raw)
|
|
data = res.data
|
|
|
|
print(f"Data:\n{data.collect()}\n\n---\n\nFiltered:\n{res.filtered}")
|
|
|
|
# %%
|
|
# // (2) processing order level
|
|
df = external_code.process_order_level(data)
|
|
|
|
|
|
# ?? What is if "Konfektionär" is NULL?
|
|
# If this is NULL, then the aggregates for "Konfektionär" will not work. Instead, they are
|
|
# calculated for all NULL entries which might incorporate different production orders which
|
|
# belong to different "Konfektionär". Thus, these values will be calculated, but should not be
|
|
# considered.
|
|
|
|
# %%
|
|
# // (3) save results to internal database
|
|
external_code.dump_order_level_to_internal_database_wipe(df)
|
|
# %%
|
|
# now load data from database
|
|
df = external_code.load_order_level_from_internal_database()
|
|
df
|
|
# %%
|
|
# ** aggregate production orders
|
|
tmp = df.clone()
|
|
|
|
# two ways to define the aggregate for date deviations: just use < 0 or use Boolean flag
|
|
# defined by the user-specified boundaries
|
|
USE_BOUNDARIES = False
|
|
filter_date_deviation_early: pl.Expr
|
|
filter_date_deviation_late: pl.Expr
|
|
if USE_BOUNDARIES:
|
|
filter_date_deviation_early = pl.col("Terminunterschreitung")
|
|
filter_date_deviation_late = pl.col("Terminüberschreitung")
|
|
else:
|
|
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
|
|
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
|
|
|
|
|
|
tmp = tmp.select(
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.filter(filter_date_deviation_early)
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.filter(filter_date_deviation_late)
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.std(ddof=1)
|
|
.alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
|
|
pl.col("Import-Ist_Anzahl_Aenderungen")
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
|
|
pl.col("Tage_zu_letzter_PSM_Historie")
|
|
.list.explode()
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
|
|
pl.col("Durchlaufzeit_Anzahl_Tage")
|
|
.mean()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
|
|
)
|
|
tmp
|
|
# %%
|
|
# to DB transform (mock Oracle database)
|
|
cols_sorted = ["ID", "AKTUALISIERT_AM"] + [c for c in tmp.columns]
|
|
tmp = (
|
|
tmp.with_columns(
|
|
pl.lit(1).alias("ID"),
|
|
pl.lit(datetime.datetime.now()).alias("AKTUALISIERT_AM"),
|
|
)
|
|
.select(
|
|
pl.col(pl.Boolean).cast(pl.Int8),
|
|
pl.all().exclude(pl.Boolean),
|
|
)
|
|
.select(cols_sorted)
|
|
)
|
|
tmp
|
|
|
|
# %%
|
|
|
|
# return sql_delete, sql_insert
|
|
|
|
print(f"SQL DELETE: {sql_delete}\nSQL Insert: {sql_insert}")
|
|
# %%
|
|
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow"
|
|
tmp.write_ipc(prepared_oracle_pth)
|
|
# %%
|
|
# ** aggregate supplier
|
|
tmp = df.clone()
|
|
|
|
USE_BOUNDARIES = False
|
|
filter_date_deviation_early: pl.Expr
|
|
filter_date_deviation_late: pl.Expr
|
|
if USE_BOUNDARIES:
|
|
filter_date_deviation_early = pl.col("Terminunterschreitung")
|
|
filter_date_deviation_late = pl.col("Terminüberschreitung")
|
|
else:
|
|
filter_date_deviation_early = pl.col("Terminabweichung_Anzahl_Tage") < 0
|
|
filter_date_deviation_late = pl.col("Terminabweichung_Anzahl_Tage") > 0
|
|
|
|
tmp = (
|
|
tmp.group_by("Konfektionär")
|
|
.agg(
|
|
(
|
|
(
|
|
~(filter_date_deviation_early | filter_date_deviation_late)
|
|
& (pl.col("Import-Ist_Anzahl_Aenderungen") == 0)
|
|
).mean()
|
|
* 100
|
|
)
|
|
.round(4, mode="half_away_from_zero")
|
|
.alias("QUOTE_ERSTBESTAETIGUNG"),
|
|
((~(filter_date_deviation_early | filter_date_deviation_late)).mean() * 100)
|
|
.round(4, mode="half_away_from_zero")
|
|
.alias("PROZENT_LIEFERTREUE"),
|
|
(filter_date_deviation_early.mean() * 100)
|
|
.round(4, mode="half_away_from_zero")
|
|
.alias("ANTEIL_PROZENT_LIEFERTERMINUNTERSCHREITUNG"),
|
|
(filter_date_deviation_late.mean() * 100)
|
|
.round(4, mode="half_away_from_zero")
|
|
.alias("ANTEIL_PROZENT_LIEFERTERMINUEBERSCHREITUNG"),
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.filter(filter_date_deviation_early)
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUNTERSCHREITUNG"),
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.filter(filter_date_deviation_late)
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_TAGE_LIEFERTERMINUEBERSCHREITUNG"),
|
|
pl.col("Terminabweichung_Anzahl_Tage")
|
|
.std(ddof=1)
|
|
.alias("STANDARDABWEICHUNG_TAGE_LIEFERTERMINABWEICHUNG"),
|
|
pl.col("Import-Ist_Anzahl_Aenderungen")
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ANZAHL_ANPASSUNGEN_LIEFERTERMIN"),
|
|
pl.col("Tage_zu_letzter_PSM_Historie")
|
|
.list.explode()
|
|
.mean()
|
|
.abs()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_ABSTAENDE_ZWISCHEN_MELDUNGEN"),
|
|
pl.col("Durchlaufzeit_Anzahl_Tage")
|
|
.mean()
|
|
.round(mode="half_away_from_zero")
|
|
.cast(pl.Int64)
|
|
.alias("MITTLERE_DURCHLAUFZEIT_ANZAHL_TAGE"),
|
|
pl.col("Prod-Qualitaet_Historie")
|
|
.list.explode()
|
|
.mean()
|
|
.round(4, mode="half_away_from_zero")
|
|
.alias("MITTLERER_QUALITAETSSCORE_PSM"),
|
|
)
|
|
.sort("Konfektionär")
|
|
)
|
|
tmp
|
|
|
|
# %%
|
|
tmp = df.clone()
|
|
tmp.filter(pl.col.Konfektionär == "BS Make Ltd").filter(
|
|
~(filter_date_deviation_early | filter_date_deviation_late)
|
|
).filter(pl.col("Import-Ist_Anzahl_Aenderungen") == 0)
|
|
# %%
|
|
tmp.filter(pl.col.Konfektionär == "BS Make Ltd")
|
|
# %%
|
|
tmp.head()
|
|
# %%
|
|
tmp.filter(pl.col.Konfektionär == "Siluet")
|
|
|
|
# %%
|
|
tmp.select(pl.col.Konfektionär.str.len_chars().alias("len_char")).sort(
|
|
"len_char", descending=True
|
|
)
|
|
|
|
# %%
|
|
# // whole pipeline
|
|
# ** aggregate production orders
|
|
tmp = df.clone()
|
|
|
|
tmp = external_code.aggregate_production_orders(tmp.lazy()).collect()
|
|
print(tmp)
|
|
tmp = external_code.oracle_prepare_KPI_aggregate(tmp.lazy()).collect()
|
|
print(tmp)
|
|
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_PRODUKTIONSAUFTRAEGE.arrow"
|
|
tmp.write_ipc(prepared_oracle_pth)
|
|
|
|
# %%
|
|
stmts = external_code.oracle_generate_sql_insert(
|
|
table_name="KPI_PRODUKTIONSAUFTRAEGE", columns=tmp.columns
|
|
)
|
|
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
|
|
|
|
# %%
|
|
# ** aggregate supplier
|
|
tmp = df.clone()
|
|
RENAME_SCHEME = {"Konfektionär": "KONFEKTIONAER"}
|
|
tmp = external_code.aggregate_suppliers(tmp.lazy()).collect()
|
|
print(tmp.head())
|
|
tmp = external_code.oracle_prepare_KPI_aggregate(
|
|
tmp.lazy(),
|
|
rename_schema=RENAME_SCHEME,
|
|
sort_by="KONFEKTIONAER",
|
|
sort_descending=False,
|
|
).collect()
|
|
print(tmp.head())
|
|
prepared_oracle_pth = DATA_PTH / "db/oracle_prepare_KPI_KONFEKTIONAERE.arrow"
|
|
tmp.write_ipc(prepared_oracle_pth)
|
|
# %%
|
|
stmts = external_code.oracle_generate_sql_insert(
|
|
table_name="KPI_KONFEKTIONAERE", columns=tmp.columns
|
|
)
|
|
print(f"SQL DELETE: {stmts.delete}\nSQL Insert: {stmts.insert}")
|
|
# %%
|
|
tmp
|
|
# %%
|