From 9c8b4ea48c550ca4a5d9a260657d08c4fe7f8c82 Mon Sep 17 00:00:00 2001 From: foefl Date: Fri, 5 Jun 2026 12:01:53 +0200 Subject: [PATCH] refactor and prepare pipeline --- prototypes/02_integrate_wokflow.py | 168 ++++++++++++ prototypes/02_save_to_db.py | 27 -- prototypes/external_code.py | 411 +++++++++++++++++++++++++++++ src/wattanalyse/db.py | 69 ++++- 4 files changed, 646 insertions(+), 29 deletions(-) create mode 100644 prototypes/02_integrate_wokflow.py delete mode 100644 prototypes/02_save_to_db.py create mode 100644 prototypes/external_code.py diff --git a/prototypes/02_integrate_wokflow.py b/prototypes/02_integrate_wokflow.py new file mode 100644 index 0000000..bc2892c --- /dev/null +++ b/prototypes/02_integrate_wokflow.py @@ -0,0 +1,168 @@ +# %% +import datetime +import importlib +from pathlib import Path + +import external_code +import polars as pl +import sqlalchemy as sql + +from wattanalyse import db + +importlib.reload(db) +importlib.reload(external_code) + +# %% +PROJECT_BASE = Path(__file__).parents[1] +DATA_PTH = PROJECT_BASE / "data" +assert DATA_PTH.exists() + +# %% +# // load data +target = DATA_PTH / "PSM_20260507.arrow" +data_raw = pl.read_ipc(target) + + +# %% +# // preprocessing I +res = external_code.preprocess_psm(data_raw) + +# %% +res.filtered +# %% +data = data_raw.rename(external_code.RENAMING_SCHEME) +REGEX_PATTERN = r"^[\s\-#+/$]+$" +data = data.with_columns( + pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) + .then(None) + .otherwise(pl.col(pl.String)) + .name.keep() +) +data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) +print(f"Size of dataset before cleansing: {data.height}") +filtered_data = pl.DataFrame(schema=data.schema) +# %% +# data.filter(pl.col.Meldezeitpunkt_Historie.is_null()) +# %% +# any NULL values in critical columns +NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") +conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] +filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) +data = data.filter(~pl.any_horizontal(*conds)) + +# implausible dates +# dates not allowed to be in the future +current_datetime = datetime.datetime.now() +current_date = current_datetime.date() +NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) +NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") +conds = [ + (pl.col(col) > current_datetime).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATETIME +] + +conds.extend( + [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] +) +filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) +data = data.filter(~pl.any_horizontal(*conds)) + +# too much in the future or the past +NUMBER_YEARS_UPPER_BOUND_DATES = 4 +# dates +future_limit = current_date + datetime.timedelta(days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES)) +past_limit = datetime.date(1990, 1, 1) +cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( + pl.col(pl.Date) < past_limit +).fill_null(False) +filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) +data = data.filter(~pl.any_horizontal(cond)) +# datetime +future_limit = current_datetime + datetime.timedelta( + days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) +) +past_limit = datetime.datetime(1990, 1, 1) +cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( + pl.col(pl.Datetime) < past_limit +).fill_null(False) +filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) +data = data.filter(~pl.any_horizontal(cond)) + +print(f"Size of dataset after cleansing: {data.height}") +print(f"Filtered data: {filtered_data}") +# %% +test = pl.DataFrame( + { + "t1": [0, 1, 3], + "t2": [1, None, 3], + "t3": [3, 8, None], + } +) +test + + +# %% +columns = ["t1", "t2", "t3"] +conds = [pl.col(col).is_null() for col in columns] +test.filter(pl.any_horizontal(*conds)) + +# %% +most_occurrences = ( + data.group_by(["PA", "PA Pos", "Konfektionär"]) + .agg(pl.len().alias("count")) + .sort("count", descending=True) +) +most_occurrences +# %% +most_occurrences.filter(~pl.col("Konfektionär").str.contains("May Tekstil Camcesme")) +# %% +# data = data.filter( +# ((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)) +# | ((pl.col.PA == 16856) & (pl.col("PA Pos") == 10)) +# ).sort("PSM gemeldet am", descending=False) +data = data.filter((pl.col.PA == 15372) & (pl.col("PA Pos") == 10)).sort( + "PSM gemeldet am", descending=False +) +data.select(pl.col.PA.unique()) +# %% +# // simulate time series +# this is a sequence how data would be provided: first one entry, and then more additional entries +series: list[pl.DataFrame] = [] + +for i in range(data.height): + series.append(data[: (i + 1)]) + +assert len(series) == data.height + +for idx, entry in enumerate(series, start=1): + assert idx == entry.height + +# %% +# 1. cleanup obtained new data +# ~~2. load data from internal database~~ +# ~~3. integrate with with new data (whole snapshot)~~ +# 2. process on order level +# 3. save results to internal database +# 4. post-process results +# 5. write to external database + +# // (1) cleanup obtained new data +# load data from internal database +# integrate with with new data (whole snapshot) + + +# // (2) processing order level +tmp = series[3] +tmp +# %% +df = external_code.process_order_level(tmp) +df + +# %% +# // (3) save results to internal database +external_code.dump_order_level_to_internal_database_wipe(df) +# %% +# now load data from database +df = external_code.load_order_level_from_internal_database() +df + +# %% diff --git a/prototypes/02_save_to_db.py b/prototypes/02_save_to_db.py deleted file mode 100644 index ba8c6ff..0000000 --- a/prototypes/02_save_to_db.py +++ /dev/null @@ -1,27 +0,0 @@ -# %% -import datetime -import json -from typing import Any - -# %% -dt = datetime.datetime.now() -date = dt.date() - -# %% -val = [dt, date] -json.dumps(val) - - -# %% -def _parse_to_json(value: Any) -> str: - if isinstance(value, datetime.date): - return value.isoformat() - elif isinstance(value, datetime.datetime): - return value.isoformat() - else: - raise TypeError - - -# %% -json.dumps(val, default=_parse_to_json) -# %% diff --git a/prototypes/external_code.py b/prototypes/external_code.py new file mode 100644 index 0000000..5c32996 --- /dev/null +++ b/prototypes/external_code.py @@ -0,0 +1,411 @@ +import dataclasses as dc +import datetime +import enum +import json +from typing import Any, Final + +import polars as pl +import sqlalchemy as sql + +from wattanalyse import db + +# 1. cleanup obtained new data +# ~~2. load data from internal database~~ +# ~~3. integrate with with new data (whole snapshot)~~ +# 2. process on order level +# 3. save results to internal database +# 4. post-process results +# 5. write to external database + + +@dc.dataclass(slots=True, eq=False) +class PreProcessResult: + data: pl.DataFrame + filtered: pl.DataFrame + + +class QualityPsm(enum.StrEnum): + FEHLEND = enum.auto() + UNPLAUSIBEL = enum.auto() + PLAUSIBEL = enum.auto() + + +PSM_SCORES: dict[QualityPsm, int] = { + QualityPsm.FEHLEND: 1, + QualityPsm.UNPLAUSIBEL: 0, + QualityPsm.PLAUSIBEL: 2, +} + +RENAMING_SCHEME: dict[str, str] = { + "PA Pos": "PA_Pos", + "PSM gemeldet am": "Meldezeitpunkt_Historie", + "Import Ist": "Import-Ist_Historie", + "1.bestät. Import Konfektionär": "Bestaetigter-Import_Historie", + "Zuschnitt am": "Prod-Start_Historie", + "Teile in Zuschnitt": "Prod-EP10_Historie", + "Teile im Nähband": "Prod-EP20_Historie", + "Fertigware aus Nähband": "Prod-EP30_Historie", + "Teile kontrolliert": "Prod-EP40_Historie", + "Teile verpackt in Karton": "Prod-EP50_Historie", +} + +PRIM_KEYS: Final[list[str]] = ["PA", "PA_Pos"] + +LOWER_BOUND_DATE_DEVIATION: Final[int] = 0 +UPPER_BOUND_DATE_DEVIATION: Final[int] = 0 +NUMBER_YEARS_UPPER_BOUND_DATES: Final[int] = 4 + + +# // (1) preprocess +def preprocess_psm( + data: pl.DataFrame, +) -> PreProcessResult: + data = data.rename(RENAMING_SCHEME) + REGEX_PATTERN = r"^[\s\-#+/$]+$" + data = data.with_columns( + pl.when(pl.col(pl.String).str.contains(REGEX_PATTERN)) + .then(None) + .otherwise(pl.col(pl.String)) + .name.keep() + ) + data = data.with_columns(pl.col("Konfektionär").str.strip_chars(" \n\t")) + filtered_data = pl.DataFrame(schema=data.schema) + + # any NULL values in critical columns + NOT_NULL_COLS = ("PA", "PA_Pos", "Meldezeitpunkt_Historie") + conds = [pl.col(col).is_null() for col in NOT_NULL_COLS] + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) + data = data.filter(~pl.any_horizontal(*conds)) + + # implausible dates + # dates not allowed to be in the future + current_datetime = datetime.datetime.now() + current_date = current_datetime.date() + NOT_IN_FUTURE_COLS_DATETIME = ("Meldezeitpunkt_Historie",) + NOT_IN_FUTURE_COLS_DATE = ("Wareneingang am", "Prod-Start_Historie") + conds = [ + (pl.col(col) > current_datetime).fill_null(False) + for col in NOT_IN_FUTURE_COLS_DATETIME + ] + conds.extend( + [(pl.col(col) > current_date).fill_null(False) for col in NOT_IN_FUTURE_COLS_DATE] + ) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(*conds))]) + data = data.filter(~pl.any_horizontal(*conds)) + + # too much in the future or the past + # dates + future_limit = current_date + datetime.timedelta( + days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) + ) + past_limit = datetime.date(1990, 1, 1) + cond = (pl.col(pl.Date) > future_limit).fill_null(False) | ( + pl.col(pl.Date) < past_limit + ).fill_null(False) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) + data = data.filter(~pl.any_horizontal(cond)) + # datetimes + future_limit = current_datetime + datetime.timedelta( + days=(365 * NUMBER_YEARS_UPPER_BOUND_DATES) + ) + past_limit = datetime.datetime(1990, 1, 1) + cond = (pl.col(pl.Datetime) > future_limit).fill_null(False) | ( + pl.col(pl.Datetime) < past_limit + ).fill_null(False) + filtered_data = pl.concat([filtered_data, data.filter(pl.any_horizontal(cond))]) + data = data.filter(~pl.any_horizontal(cond)) + + return PreProcessResult(data=data, filtered=filtered_data) + + +# // (2) process on order level +def process_order_level(data: pl.DataFrame) -> pl.DataFrame: + # ** renaming + # data = data.rename(RENAMING_SCHEME) # TODO delete, done in pre-processing + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) + + # ** plausibility check of order quantities + PLAUSI_FEATURES: list[str] = [ + "Prod-EP10_Historie", + "Prod-EP20_Historie", + "Prod-EP30_Historie", + "Prod-EP40_Historie", + "Prod-EP50_Historie", + ] + data = data.with_columns( + pl.all_horizontal( + pl.col(PLAUSI_FEATURES).is_null() | (pl.col(PLAUSI_FEATURES) == 0) + ).alias("is_empty") + ) + conditions = [ + pl.col(PLAUSI_FEATURES[i]) >= pl.col(PLAUSI_FEATURES[i + 1]) + for i in range(len(PLAUSI_FEATURES) - 1) + ] + data = data.with_columns( + pl.when(pl.all_horizontal(conditions) | pl.col("is_empty")) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Prod-Qty_is_valid") + ).with_columns( + pl.when(pl.col("is_empty")) + .then(pl.lit(PSM_SCORES[QualityPsm.FEHLEND])) + .when(pl.col("Prod-Qty_is_valid")) + .then(pl.lit(PSM_SCORES[QualityPsm.PLAUSIBEL])) + .otherwise(pl.lit(PSM_SCORES[QualityPsm.UNPLAUSIBEL])) + .alias("Prod-Qualitaet_Historie") + ) + # aggregate hint for "Prod-Qualitaet_Durchschnitt": use "drop_nulls" "last" + # aggregate "Prod-Qualitaet_Historie" and use "mean" + # need additional "alias" on "Prod-Qualitaet_Historie" + + # ** planned or target delivery date + current_date = datetime.datetime.now().date() + print(f"{current_date=}") + data = data.with_columns( + pl.coalesce(["Bestaetigter-Import_Historie", "Import-Ist_Historie"]).alias( + "Liefertermin_Soll" + ) + ) + # aggregate hint for "Liefertermin_Soll": use "drop_nulls" "first" + # first filled field for "Liefertermin Soll" is the relevant target date + # should be first confirmed date, but if this field is not filled we use the first + # filled import by the supplier + + # ** actual delivery date + # logic of Wattana: set date is before current date --> becomes actual value + data = data.with_columns( + pl.when(pl.col("Import-Ist_Historie") < current_date) + .then(pl.col("Import-Ist_Historie")) + .otherwise(None) + .alias("Liefertermin_Ist") + ) + # aggregate hint for "Liefertermin_Ist": use "drop_nulls" "last" + # keep last because that is the latest value set by the supplier + # if all values are NULL then NULL is returned (no actual date available) + + # ** duration since last report in days + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + ( + pl.col("Meldezeitpunkt_Historie") + - pl.col("Meldezeitpunkt_Historie").shift(1).over(PRIM_KEYS) + ) + .dt.total_days() + .alias("Tage_zu_letzter_PSM_Historie") + ) + # aggregate hint for "Tage_zu_letzter_PSM_Durchschnitt" + # aggregate "Tage_zu_letzter_PSM_Historie" and use "mean" (NULL is ignored automatically) + # need additional "alias" on "Tage_zu_letzter_PSM_Historie" + + data = data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False).with_columns( + # Prüfen: Ist das aktuelle Datum ungleich dem vorherigen Datum derselben Position? + ( + pl.col("Import-Ist_Historie") + != pl.col("Import-Ist_Historie").shift(1).over(PRIM_KEYS) + ) + .fill_null(False) # Der allererste Eintrag hat keinen Vorgänger -> Ist keine Änderung + .alias("Import-Ist_geaendert") + ) + # aggregate hint for "Import-Ist_geaendert" + # aggregate "Import-Ist_geaendert" and use "last" + + # aggregate hint for "Import-Ist_letzter_Wert" + # aggregate "Import-Ist_Historie" and use "drop_nulls" "last" + # need additional "alias" on "Import-Ist_Historie" + + # aggregate hint for "Import-Ist_Anzahl_Aenderungen" + # aggregate "Import-Ist_geaendert" and use "sum" + # need additional "alias" on "Import-Ist_geaendert" + + # aggregate hint for "Prod-Start" + # aggregate "Prod-Start_Historie" and use "drop_nulls" "first" + # first entry should be treated as the truth value, changing later does not make sense + # need additional "alias" on "Prod-Start_Historie" + + # whole aggregates see DB schema + data = ( + data.sort(PRIM_KEYS + ["Meldezeitpunkt_Historie"], descending=False) + .group_by(PRIM_KEYS + ["Konfektionär"]) + .agg( + pl.col("Meldezeitpunkt_Historie"), + pl.col("Liefertermin_Soll").drop_nulls().first(), + pl.col("Bestaetigter-Import_Historie"), + pl.col("Liefertermin_Ist").drop_nulls().last(), + pl.col("Import-Ist_Historie"), + pl.col("Import-Ist_Historie") + .drop_nulls() + .last() + .alias("Import-Ist_letzter_Wert"), + pl.col("Import-Ist_geaendert").last(), + pl.col("Import-Ist_geaendert").sum().alias("Import-Ist_Anzahl_Aenderungen"), + pl.col("Tage_zu_letzter_PSM_Historie"), + pl.col("Tage_zu_letzter_PSM_Historie") + .mean() + .alias("Tage_zu_letzter_PSM_Durchschnitt"), + pl.col("Prod-EP10_Historie"), + pl.col("Prod-EP20_Historie"), + pl.col("Prod-EP30_Historie"), + pl.col("Prod-EP40_Historie"), + pl.col("Prod-EP50_Historie"), + pl.col("Prod-Qualitaet_Historie"), + pl.col("Prod-Qualitaet_Historie").mean().alias("Prod-Qualitaet_Durchschnitt"), + pl.col("Prod-Start_Historie"), + pl.col("Prod-Start_Historie").drop_nulls().first().alias("Prod-Start"), + ) + ) + # ** order specific aggregates + data = ( + data.with_columns( + pl.when( + (pl.col("Liefertermin_Ist").is_not_null()) + & (pl.col("Liefertermin_Soll").is_not_null()) + ) + .then((pl.col("Liefertermin_Ist") - pl.col("Liefertermin_Soll")).dt.total_days()) + .otherwise(None) + .alias("Terminabweichung_Anzahl_Tage") + ) + .with_columns( + pl.when(pl.col("Terminabweichung_Anzahl_Tage") < LOWER_BOUND_DATE_DEVIATION) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Terminunterschreitung"), + pl.when(pl.col("Terminabweichung_Anzahl_Tage") > UPPER_BOUND_DATE_DEVIATION) + .then(pl.lit(True)) + .otherwise(pl.lit(False)) + .alias("Terminüberschreitung"), + pl.when( + (pl.col("Liefertermin_Ist").is_not_null()) + & (pl.col("Prod-Start").is_not_null()) + ) + .then((pl.col("Liefertermin_Ist") - pl.col("Prod-Start")).dt.total_days()) + .otherwise(None) + .alias("Durchlaufzeit_Anzahl_Tage"), + ) + .with_columns( + pl.when( + (pl.col("Durchlaufzeit_Anzahl_Tage").is_not_null()) + & (pl.col("Durchlaufzeit_Anzahl_Tage") < 0) + ) + .then(None) + .otherwise(pl.col("Durchlaufzeit_Anzahl_Tage")) + .alias("Durchlaufzeit_Anzahl_Tage") + ) + ) + + return data + + +# // (3) dump order level to internal database +def _json_default( + value: Any, +) -> str: + if isinstance(value, (datetime.date, datetime.datetime)): + return value.isoformat() + raise TypeError + + +def _parse_to_json( + x: pl.Series | None, +) -> str | None: + if x is None: + return None + + return json.dumps(x.to_list(), default=_json_default) + + +def dump_order_level_to_internal_database_staging( + data: pl.DataFrame, +) -> None: + + staging_data = data.with_columns( + pl.col(pl.List) + .map_elements( + _parse_to_json, + return_dtype=pl.String, + ) + .name.keep() + ) + rows_inserted = staging_data.write_database( + "Produktionsauftrag-Einzelsicht_Staging", + connection=db.DB_URI, + engine="adbc", + if_table_exists="replace", + ) + if rows_inserted != staging_data.height: + raise RuntimeError("Number of inserted rows and length of staging data do not match.") + + all_columns = staging_data.columns + update_columns = [col for col in all_columns if col not in PRIM_KEYS] + + sql_column_list_str = ", ".join([f'"{c}"' for c in all_columns]) + sql_pk_list_str = ", ".join([f'"{c}"' for c in PRIM_KEYS]) + sql_update_rules_str = ", ".join([f'"{c}" = EXCLUDED."{c}"' for c in update_columns]) + + upsert_sql = f""" + INSERT INTO "Produktionsauftrag-Einzelsicht" ({sql_column_list_str}) + SELECT {sql_column_list_str} FROM "Produktionsauftrag-Einzelsicht_Staging" WHERE 1=1 + ON CONFLICT({sql_pk_list_str}) DO UPDATE SET + {sql_update_rules_str}; + """ + + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(sql.text(upsert_sql)) + conn.execute( + sql.text('DROP TABLE IF EXISTS "Produktionsauftrag-Einzelsicht_Staging";') + ) + + +def dump_order_level_to_internal_database_wipe( + data: pl.DataFrame, +) -> None: + + staging_data = data.with_columns( + pl.col(pl.List) + .map_elements( + _parse_to_json, + return_dtype=pl.String, + ) + .name.keep() + ) + # empty table + with db.ENGINE_INTERNAL.begin() as conn: + conn.execute(sql.text('DELETE FROM "Produktionsauftrag-Einzelsicht";')) + + rows_inserted = staging_data.write_database( + "Produktionsauftrag-Einzelsicht", + connection=db.DB_URI, + engine="adbc", + if_table_exists="append", + ) + if rows_inserted != staging_data.height: + raise RuntimeError("Number of inserted rows and length of staging data do not match.") + + +# ** load order level data from internal database +def load_order_level_from_internal_database() -> pl.DataFrame: + data = pl.read_database_uri( + 'SELECT * FROM "Produktionsauftrag-Einzelsicht"', + uri=db.DB_URI, + engine="adbc", + schema_overrides=db.intern_prod_order_t_schema, + ) + + list_cols_to_type: dict[str, type[pl.DataType]] = { + "Meldezeitpunkt_Historie": pl.Datetime, + "Bestaetigter-Import_Historie": pl.Date, + "Import-Ist_Historie": pl.Date, + "Tage_zu_letzter_PSM_Historie": pl.Int64, + "Prod-EP10_Historie": pl.UInt64, + "Prod-EP20_Historie": pl.UInt64, + "Prod-EP30_Historie": pl.UInt64, + "Prod-EP40_Historie": pl.UInt64, + "Prod-EP50_Historie": pl.UInt64, + "Prod-Qualitaet_Historie": pl.Int32, + "Prod-Start_Historie": pl.Date, + } + + list_col_parse_conds = { + col: pl.col(col).str.json_decode(pl.List(list_type)) + for col, list_type in list_cols_to_type.items() + } + + return data.with_columns(**list_col_parse_conds) diff --git a/src/wattanalyse/db.py b/src/wattanalyse/db.py index aa27640..ddb0215 100644 --- a/src/wattanalyse/db.py +++ b/src/wattanalyse/db.py @@ -1,3 +1,6 @@ +from typing import Final + +import polars as pl import sqlalchemy as sql from sqlalchemy import Column, Table @@ -7,7 +10,8 @@ assert constants.Config.DB_PATH_INTERNAL.parent.exists(), ( "database parent folder does not exists" ) -ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_INTERNAL}") +DB_URI: Final[str] = f"sqlite:///{constants.Config.DB_PATH_INTERNAL}" +ENGINE_INTERNAL: Final[sql.Engine] = sql.create_engine(DB_URI) MD_INTERNAL = sql.MetaData() @@ -43,4 +47,65 @@ intern_prod_order_t: Table = Table( Column("Durchlaufzeit_Anzahl_Tage", sql.Float, nullable=True), ) -MD_INTERNAL.create_all(ENGINE) +intern_prod_order_t_schema: dict[str, type[pl.DataType]] = { + "PA": pl.UInt64, + "PA_Pos": pl.UInt32, + "Konfektionär": pl.String, + "Meldezeitpunkt_Historie": pl.String, + "Liefertermin_Soll": pl.Date, + "Bestaetigter-Import_Historie": pl.String, + "Liefertermin_Ist": pl.Date, + "Import-Ist_Historie": pl.String, + "Import-Ist_letzter_Wert": pl.Date, + "Import-Ist_geaendert": pl.Boolean, + "Import-Ist_Anzahl_Aenderungen": pl.UInt32, + "Tage_zu_letzter_PSM_Historie": pl.String, + "Tage_zu_letzter_PSM_Durchschnitt": pl.Float64, + "Prod-EP10_Historie": pl.String, + "Prod-EP20_Historie": pl.String, + "Prod-EP30_Historie": pl.String, + "Prod-EP40_Historie": pl.String, + "Prod-EP50_Historie": pl.String, + "Prod-Qualitaet_Historie": pl.String, + "Prod-Qualitaet_Durchschnitt": pl.Float64, + "Prod-Start_Historie": pl.String, + "Prod-Start": pl.Date, + "Terminabweichung_Anzahl_Tage": pl.Int64, + "Terminunterschreitung": pl.Boolean, + "Terminüberschreitung": pl.Boolean, + "Durchlaufzeit_Anzahl_Tage": pl.Int64, +} + + +MD_INTERNAL.create_all(ENGINE_INTERNAL) + +extern_prod_order_t_schema: dict[str, type[pl.DataType]] = { + "VK Auftrag": pl.UInt32, + "Artikelbez.": pl.String, + "Auftragsmenge": pl.UInt32, + "Kunde": pl.String, + "PA": pl.UInt64, + "PA Pos": pl.UInt32, + "PSM gemeldet am": pl.Datetime, + "Konfektionär": pl.String, + "Artikelnr.": pl.String, + "LT Kunde bestätigt": pl.Date, + "Export Ist": pl.Date, + "1.bestät. Import Konfektionär": pl.Date, + "Import Ist": pl.Date, + "Ablief.(Import Ist+Transport)": pl.Date, + "Wareneingang am": pl.Date, + "Wareneingang geprüft": pl.String, + "Täglicher Ausstoss": pl.Int64, + "Zuschnitt am": pl.Date, + "Teile in Zuschnitt": pl.UInt64, + "Teile im Nähband": pl.UInt64, + "Fertigware aus Nähband": pl.UInt64, + "Teile kontrolliert": pl.UInt64, + "Teile verpackt in Karton": pl.UInt64, + "Anzahl Bänder": pl.UInt16, + "Anzahl Näher": pl.UInt16, + "Arbeitsstunden pro Näher": pl.UInt8, + "Anzahl Arbeitstage pro Woche": pl.UInt8, + "Blockauftrag": pl.String, +}