generated from dopt-python/py311
database interaction with saving and loading
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
# %%
|
||||
import datetime
|
||||
import enum
|
||||
import importlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -10,12 +11,14 @@ import sqlalchemy as sql
|
||||
|
||||
from wattanalyse import db
|
||||
|
||||
importlib.reload(db)
|
||||
|
||||
# %%
|
||||
PROJECT_BASE = Path(__file__).parents[1]
|
||||
DATA = PROJECT_BASE / "data"
|
||||
assert DATA.exists()
|
||||
DATA_PTH = PROJECT_BASE / "data"
|
||||
assert DATA_PTH.exists()
|
||||
# %%
|
||||
data_t1 = DATA / "PSM/20260507"
|
||||
data_t1 = DATA_PTH / "PSM/20260507"
|
||||
assert data_t1.exists()
|
||||
# %%
|
||||
data_t1_jobs = data_t1 / "MIS-Auträge_22.csv"
|
||||
@@ -24,7 +27,7 @@ data_t1_PSM = data_t1 / "Produktionsstandsmeldungen.csv"
|
||||
assert data_t1_PSM.exists()
|
||||
# %%
|
||||
# // MIS-Aufträge
|
||||
pl.read_csv(data_t1_jobs, encoding="windows-1252", separator=";")
|
||||
# pl.read_csv(data_t1_jobs, encoding="windows-1252", separator=";")
|
||||
|
||||
|
||||
# %%
|
||||
@@ -35,46 +38,26 @@ class QualityPsm(enum.StrEnum):
|
||||
PLAUSIBEL = enum.auto()
|
||||
|
||||
|
||||
# %%
|
||||
schema_PSM: dict[str, type[pl.DataType]] = {
|
||||
"VK Auftrag": pl.UInt32,
|
||||
"Artikelbez.": pl.String,
|
||||
"Auftragsmenge": pl.UInt32,
|
||||
"Kunde": pl.String,
|
||||
"PA": pl.UInt64,
|
||||
"PA Pos": pl.UInt32,
|
||||
"PSM gemeldet am": pl.Datetime,
|
||||
"Konfektionär": pl.String,
|
||||
"Artikelnr.": pl.String,
|
||||
"LT Kunde bestätigt": pl.Date,
|
||||
"Export Ist": pl.Date,
|
||||
"1.bestät. Import Konfektionär": pl.Date,
|
||||
"Import Ist": pl.Date,
|
||||
"Ablief.(Import Ist+Transport)": pl.Date,
|
||||
"Wareneingang am": pl.Date,
|
||||
"Wareneingang geprüft": pl.String,
|
||||
"Täglicher Ausstoss": pl.Int64,
|
||||
"Zuschnitt am": pl.Date,
|
||||
"Teile in Zuschnitt": pl.UInt64,
|
||||
"Teile im Nähband": pl.UInt64,
|
||||
"Fertigware aus Nähband": pl.UInt64,
|
||||
"Teile kontrolliert": pl.UInt64,
|
||||
"Teile verpackt in Karton": pl.UInt64,
|
||||
"Anzahl Bänder": pl.UInt16,
|
||||
"Anzahl Näher": pl.UInt16,
|
||||
"Arbeitsstunden pro Näher": pl.UInt8,
|
||||
"Anzahl Arbeitstage pro Woche": pl.UInt8,
|
||||
"Blockauftrag": pl.String,
|
||||
PSM_SCORES: dict[QualityPsm, int] = {
|
||||
QualityPsm.FEHLEND: 1,
|
||||
QualityPsm.UNPLAUSIBEL: 0,
|
||||
QualityPsm.PLAUSIBEL: 2,
|
||||
}
|
||||
|
||||
# psm = pl.read_csv(data_t1_PSM, encoding="windows-1252", separator=";")
|
||||
# %%
|
||||
psm = pl.read_csv(
|
||||
data_t1_PSM,
|
||||
encoding="windows-1252",
|
||||
separator=";",
|
||||
schema_overrides=schema_PSM,
|
||||
schema_overrides=db.extern_prod_order_t_schema,
|
||||
null_values=["01.01.1111 00:00:00"],
|
||||
)
|
||||
|
||||
# %%
|
||||
# // save data as raw
|
||||
target = DATA_PTH / "PSM_20260507.arrow"
|
||||
psm.write_ipc(target)
|
||||
|
||||
# %%
|
||||
psm.filter(pl.col("Konfektionär").str.contains("MEMTEKS"))
|
||||
# %%
|
||||
@@ -201,11 +184,6 @@ df_marked = tmp_1.with_columns(
|
||||
.alias("Produktionsstückzahlen_valide")
|
||||
)
|
||||
|
||||
PSM_SCORES: dict[QualityPsm, int] = {
|
||||
QualityPsm.FEHLEND: 1,
|
||||
QualityPsm.UNPLAUSIBEL: 0,
|
||||
QualityPsm.PLAUSIBEL: 2,
|
||||
}
|
||||
|
||||
df_score = df_marked.with_columns(
|
||||
pl.when(pl.col("is_empty"))
|
||||
@@ -241,10 +219,10 @@ renaming_scheme: dict[str, str] = {
|
||||
"Teile verpackt in Karton": "Prod-EP50_Historie",
|
||||
}
|
||||
|
||||
KEYS = ["PA", "PA_Pos"]
|
||||
PRIM_KEYS = ["PA", "PA_Pos"]
|
||||
|
||||
tmp = tmp.rename(renaming_scheme)
|
||||
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||
tmp = tmp.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||
|
||||
|
||||
plausi_features_all = [
|
||||
@@ -325,10 +303,10 @@ tmp = tmp.with_columns(
|
||||
# need additional "alias" on "Prod-Start_Historie"
|
||||
|
||||
# duration since last report in days
|
||||
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||
tmp = tmp.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||
(
|
||||
pl.col("Meldezeitpunkt_Historie")
|
||||
- pl.col("Meldezeitpunkt_Historie").shift(1).over(KEYS)
|
||||
- pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS)
|
||||
)
|
||||
.dt.total_days()
|
||||
.alias("Tage_zu_letzter_PSM_Historie")
|
||||
@@ -341,9 +319,9 @@ tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_column
|
||||
# aggregate "Import-Ist_Historie" and use "drop_nulls" "last"
|
||||
# need additional "alias" on "Import-Ist_Historie"
|
||||
|
||||
tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||
tmp = tmp.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns(
|
||||
# Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position?
|
||||
(pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(KEYS))
|
||||
(pl.col("Import-Ist_Historie") != pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS))
|
||||
.fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung
|
||||
.alias("Import-Ist_geaendert")
|
||||
)
|
||||
@@ -357,8 +335,8 @@ tmp = tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_column
|
||||
|
||||
# whole aggregates see DB schema
|
||||
tmp = (
|
||||
tmp.sort(KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||
.group_by(KEYS + ["Konfektionär"])
|
||||
tmp.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False)
|
||||
.group_by(PRIM_KEYS + ["Konfektionär"])
|
||||
.agg(
|
||||
pl.col("Meldezeitpunkt_Historie"),
|
||||
pl.col("Liefertermin_Soll").drop_nulls().first(),
|
||||
@@ -390,27 +368,41 @@ tmp
|
||||
LOWER_BOUND_DATE_DEVIATION = 0
|
||||
UPPER_BOUND_DATE_DEVIATION = 0
|
||||
|
||||
tmp = tmp.with_columns(
|
||||
pl.when(
|
||||
(pl.col("Liefertermin_Ist").is_not_null())
|
||||
& (pl.col("Liefertermin_Soll").is_not_null())
|
||||
tmp = (
|
||||
tmp.with_columns(
|
||||
pl.when(
|
||||
(pl.col("Liefertermin_Ist").is_not_null())
|
||||
& (pl.col("Liefertermin_Soll").is_not_null())
|
||||
)
|
||||
.then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days())
|
||||
.otherwise(None)
|
||||
.alias("Terminabweichung_Anzahl_Tage")
|
||||
)
|
||||
.with_columns(
|
||||
pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION)
|
||||
.then(pl.lit(True))
|
||||
.otherwise(pl.lit(False))
|
||||
.alias("Terminunterschreitung"),
|
||||
pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION)
|
||||
.then(pl.lit(True))
|
||||
.otherwise(pl.lit(False))
|
||||
.alias("Terminüberschreitung"),
|
||||
pl.when(
|
||||
(pl.col("Liefertermin_Ist").is_not_null()) & (pl.col("Prod-Start").is_not_null())
|
||||
)
|
||||
.then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days())
|
||||
.otherwise(None)
|
||||
.alias("Durchlaufzeit_Anzahl_Tage"),
|
||||
)
|
||||
.with_columns(
|
||||
pl.when(
|
||||
(pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null())
|
||||
& (pl.col("Durchlaufzeit_Anzahl_Tage") < 0)
|
||||
)
|
||||
.then(None)
|
||||
.otherwise(pl.col("Durchlaufzeit_Anzahl_Tage"))
|
||||
.alias("Durchlaufzeit_Anzahl_Tage")
|
||||
)
|
||||
.then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days())
|
||||
.otherwise(None)
|
||||
.alias("Terminabweichung_Anzahl_Tage")
|
||||
).with_columns(
|
||||
pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION)
|
||||
.then(pl.lit(True))
|
||||
.otherwise(pl.lit(False))
|
||||
.alias("Terminunterschreitung"),
|
||||
pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION)
|
||||
.then(pl.lit(True))
|
||||
.otherwise(pl.lit(False))
|
||||
.alias("Terminüberschreitung"),
|
||||
pl.when((pl.col("Liefertermin_Ist").is_not_null()) & (pl.col("Prod-Start").is_not_null()))
|
||||
.then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days())
|
||||
.otherwise(None)
|
||||
.alias("Durchlaufzeit_Anzahl_Tage"),
|
||||
)
|
||||
tmp
|
||||
|
||||
@@ -419,28 +411,105 @@ tmp
|
||||
# // dump to database
|
||||
|
||||
|
||||
def _parse_to_json(value: Any) -> str:
|
||||
def _json_default(
|
||||
value: Any,
|
||||
) -> str:
|
||||
if isinstance(value, (datetime.date, datetime.datetime)):
|
||||
return value.isoformat()
|
||||
else:
|
||||
raise TypeError
|
||||
|
||||
|
||||
parsed_lists = tmp.with_columns(
|
||||
def _parse_to_json(
|
||||
x: pl.Series | None,
|
||||
) -> str | None:
|
||||
if x is None:
|
||||
return None
|
||||
|
||||
return json.dumps(x.to_list(), default=_json_default)
|
||||
|
||||
|
||||
staging_data = tmp.with_columns(
|
||||
pl.col(pl.List)
|
||||
.map_elements(
|
||||
lambda x: json.dumps(x.to_list(), default=_parse_to_json) if x is not None else None,
|
||||
_parse_to_json,
|
||||
return_dtype=pl.String,
|
||||
)
|
||||
.name.keep()
|
||||
)
|
||||
parsed_lists
|
||||
staging_data
|
||||
|
||||
|
||||
# %%
|
||||
parsed_lists["Import-Ist_Historie"].item(0)
|
||||
rows_inserted = staging_data.write_database(
|
||||
"Produktionsauftrag-Einzelsicht_Staging",
|
||||
connection=db.DB_URI,
|
||||
engine="adbc",
|
||||
if_table_exists="replace",
|
||||
)
|
||||
assert rows_inserted == staging_data.height
|
||||
|
||||
# %%
|
||||
# TODO make UPSERT with staging
|
||||
all_columns = staging_data.columns
|
||||
update_columns = [col for col in all_columns if col not in PRIM_KEYS]
|
||||
|
||||
sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns])
|
||||
sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS])
|
||||
sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns])
|
||||
|
||||
upsert_sql = f"""
|
||||
INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str})
|
||||
SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1
|
||||
ON CONFLICT({sql_pk_list_str}) DO UPDATE SET
|
||||
{sql_update_rules_str};
|
||||
"""
|
||||
|
||||
# %%
|
||||
with db.ENGINE_INTERNAL.begin() as conn:
|
||||
res = conn.execute(sql.text(upsert_sql))
|
||||
conn.execute(sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";'))
|
||||
|
||||
# %%
|
||||
# ** test if loaded correctly
|
||||
stmt = sql.select(db.intern_prod_order_t)
|
||||
|
||||
with db.ENGINE_INTERNAL.connect() as conn:
|
||||
ret = conn.execute(stmt)
|
||||
|
||||
ret.fetchall()
|
||||
|
||||
# %%
|
||||
# // database loading
|
||||
|
||||
df = pl.read_database_uri(
|
||||
'SELECT * FROM "Produktionsauftrag-Einzelsicht"',
|
||||
uri=db.DB_URI,
|
||||
engine="adbc",
|
||||
schema_overrides=db.intern_prod_order_t_schema,
|
||||
)
|
||||
|
||||
list_cols_to_type: dict[str, type[pl.DataType]] = {
|
||||
"Meldezeitpunkt_Historie": pl.Datetime,
|
||||
"Bestaetigter-Import_Historie": pl.Date,
|
||||
"Import-Ist_Historie": pl.Date,
|
||||
"Tage_zu_letzter_PSM_Historie": pl.Int64,
|
||||
"Prod-EP10_Historie": pl.UInt64,
|
||||
"Prod-EP20_Historie": pl.UInt64,
|
||||
"Prod-EP30_Historie": pl.UInt64,
|
||||
"Prod-EP40_Historie": pl.UInt64,
|
||||
"Prod-EP50_Historie": pl.UInt64,
|
||||
"Prod-Qualitaet_Historie": pl.Int32,
|
||||
"Prod-Start_Historie": pl.Date,
|
||||
}
|
||||
|
||||
list_col_parse_conds = {
|
||||
col: pl.col(col).str.json_decode(pl.List(list_type))
|
||||
for col, list_type in list_cols_to_type.items()
|
||||
}
|
||||
|
||||
df.with_columns(**list_col_parse_conds)
|
||||
|
||||
|
||||
########################################################
|
||||
# %%
|
||||
@@ -450,6 +519,9 @@ tmp_1 = tmp_1.with_columns(
|
||||
)
|
||||
tmp_1
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
# %%
|
||||
tmp_1 = tmp.with_columns(
|
||||
# Aktuelles Datum minus verschobenes Datum (isoliert je Auftrag)
|
||||
|
||||
Reference in New Issue
Block a user