writing results and temp tables

This commit is contained in:
Florian Förster 2026-01-16 14:13:55 +01:00
parent 4ea9d35a91
commit 3b2e0e5773
4 changed files with 247 additions and 72 deletions

View File

@ -23,7 +23,6 @@ from umbreit import db, types
# types = importlib.reload(types) # types = importlib.reload(types)
# db = importlib.reload(db) # db = importlib.reload(db)
# %% # %%
p_cfg = io.search_file_iterative( p_cfg = io.search_file_iterative(
starting_path=Path.cwd(), starting_path=Path.cwd(),
@ -61,7 +60,7 @@ def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany):
########### RESULTS ########### ########### RESULTS ###########
# temporary # temporary
res_engine = sql.create_engine("sqlite://") res_engine = sql.create_engine("sqlite://")
db.metadata.create_all(res_engine, tables=(db.results,)) db.metadata.create_all(res_engine, tables=(db.results_local,))
# %% # %%
@ -70,13 +69,13 @@ def delete_results(
res_engine: sql.Engine, res_engine: sql.Engine,
) -> None: ) -> None:
with res_engine.begin() as conn: with res_engine.begin() as conn:
res = conn.execute(sql.delete(db.results)) res = conn.execute(sql.delete(db.results_local))
print("Rows deleted: ", res.rowcount) print("Rows deleted: ", res.rowcount)
delete_results(res_engine) delete_results(res_engine)
stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) stmt = sql.select(db.results_local.c.bedarf_nr, db.results_local.c.bedarf_sequenz)
with res_engine.connect() as conn: with res_engine.connect() as conn:
res = conn.execute(stmt) res = conn.execute(stmt)
print(res.all()) print(res.all())
@ -173,9 +172,9 @@ df_order
# %% # %%
# df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum())
print(f"Query duration: {elapsed:.4f} sec") # print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df)) # print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %% # %%
# try title_info parsing # try title_info parsing
@ -202,10 +201,10 @@ stmt = sql.select(
db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MAN,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
sql.case( # sql.case(
(db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), # (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, # else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"), # ).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
@ -227,11 +226,17 @@ print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %% # %%
df.head() df.head()
# %%
temp = df.with_columns(
pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0),
)
temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0)
# %% # %%
# // NO LIVE DATA NEEDED # // NO LIVE DATA NEEDED
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" # p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow"
# p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
# df.write_ipc(p_save) # df.write_ipc(p_save)
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
# %% # %%
@ -373,7 +378,6 @@ df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE
p_save_diff_VM_bedp_tinfo = ( p_save_diff_VM_bedp_tinfo = (
Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx"
) )
from polars.datatypes import classes as pl_dtypes
df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False)
# why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? # why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"?
@ -396,8 +400,12 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18)))
# df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0))
# %% # %%
# VM_CRITERION = "MENGE_VORMERKER" # VM_CRITERION = "MENGE_VORMERKER"
VM_CRITERION = "BEDP_MENGE_BEDARF_VM" VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION = "BEDP_MAN" MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
db.EXT_DOPT_ERGEBNIS.columns.keys()
)
def get_starting_date( def get_starting_date(
@ -435,10 +443,55 @@ def get_raw_data() -> pl.DataFrame:
) )
# def get_empty_pipeline_result( def save_tmp_data(df: pl.DataFrame) -> None:
# data: pl.DataFrame, with engine.begin() as conn:
# ) -> PipelineResult: conn.execute(sql.delete(db.tmp_data))
# return PipelineResult(data)
with engine.begin() as conn:
conn.execute(sql.insert(db.tmp_data), df.to_dicts())
def get_tmp_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.tmp_data),
engine,
schema_overrides=db.tmp_data_schema_map,
)
def get_result_data() -> pl.DataFrame:
return pl.read_database(
sql.select(db.EXT_DOPT_ERGEBNIS),
engine,
schema_overrides=db.results_schema_map,
)
def save_result_data(results: pl.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts())
def clear_result_data() -> None:
with engine.begin() as conn:
conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS))
def save_result_data_native(results: pl.DataFrame) -> None:
results = results.with_columns(
[
pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c)
for c in results.select(cs.boolean()).columns
]
)
stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6)
"""
with engine.begin() as conn:
raw_conn = conn.connection.connection
with raw_conn.cursor() as cursor:
cursor.executemany(stmt, tmp.to_pandas(use_pyarrow_extension_array=True))
def _apply_several_filters( def _apply_several_filters(
@ -469,7 +522,7 @@ class PipelineResult:
) -> None: ) -> None:
self._open = data self._open = data
schema = db.results_schema_map.copy() schema = db.results_schema_map.copy()
del schema["id"] del schema["ID"]
self._results = pl.DataFrame(schema=schema) self._results = pl.DataFrame(schema=schema)
schema = {} schema = {}
@ -511,7 +564,12 @@ class PipelineResult:
self, self,
data: pl.DataFrame, data: pl.DataFrame,
) -> None: ) -> None:
self._results = pl.concat([self._results, data]) print(self._results)
res = pl.concat([self._results, data])
# self._results = res.with_columns(
# (pl.arange(0, res.height) + 1).alias("ID").cast(db.results_schema_map["ID"])
# )
self._results = res
def merge_pipeline( def merge_pipeline(
self, self,
@ -528,29 +586,28 @@ class PipelineResult:
freigabe_auto: types.Freigabe, freigabe_auto: types.Freigabe,
is_out: bool, is_out: bool,
) -> None: ) -> None:
# TODO move to other position results = data.rename(db.map_data_to_result)
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" # TODO rework because it is not WF-agnostic
results = data.rename(db.map_to_result)
order_qty_expr: pl.Expr order_qty_expr: pl.Expr
if is_out: if is_out:
order_qty_expr = ( order_qty_expr = (
pl.lit(0) pl.lit(0)
.alias("ORDER_QTY_CRIT") .alias("ORDER_QTY_CRIT")
.alias("best_menge") .alias("BEST_MENGE")
.cast(db.results_schema_map["best_menge"]) .cast(db.results_schema_map["BEST_MENGE"])
) )
else: else:
order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge")
results = results.with_columns( results = results.with_columns(
[ [
pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), # pl.lit(0).alias("ID").cast(db.results_schema_map["ID"]),
pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]),
pl.lit(wf_id).alias("WF_ID").cast(db.results_schema_map["WF_ID"]),
order_qty_expr, order_qty_expr,
pl.lit(freigabe_auto.value) pl.lit(freigabe_auto.value)
.alias("freigabe_auto") .alias("FREIGABE_AUTO")
.cast(db.results_schema_map["freigabe_auto"]), .cast(db.results_schema_map["FREIGABE_AUTO"]),
] ]
) )
results = results.drop( results = results.drop(
@ -564,6 +621,9 @@ class PipelineResult:
"MANDFUEHR", "MANDFUEHR",
] ]
) )
# results = results.select(RESULT_COLUMN_ORDER)
print(results)
print("####################")
self._subtract_data(data) self._subtract_data(data)
self._add_results(results) self._add_results(results)
@ -854,7 +914,7 @@ def wf100_petersen_wdb_sub1(
# %% # %%
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow"
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
print(f"Number of entries: {len(df)}") print(f"Number of entries: {len(df)}")
@ -911,6 +971,61 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %% # %%
pipe_res.results pipe_res.results
# %%
# // test result writing
res = pipe_res.results.clone()
res.height
# %%
# %%
# %%
save_result_data(res)
get_result_data()
# %%
clear_result_data()
# %%
get_result_data()
# %%
stmt = sql.text("TRUNCATE TABLE EXT_DOPT_ERGEBNIS")
with engine.begin() as conn:
conn.exec_driver_sql("TRUNCATE TABLE UMB.EXT_DOPT_ERGEBNIS")
# %%
stmt = sql.insert(db.EXT_DOPT_ERGEBNIS)
print(stmt.compile(engine))
# %%
engine.dispose()
# %%
# %%
t1 = time.perf_counter()
save_result_data(res)
t2 = time.perf_counter()
print(f"Elapsed: {t2 - t1}")
# %%
get_result_data()
# %%
clear_result_data()
get_result_data()
# %%
t1 = time.perf_counter()
raw_input(res)
t2 = time.perf_counter()
print(f"Elapsed: {t2 - t1}")
# %%
# %%
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %% # %%
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
@ -946,9 +1061,6 @@ raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3
# %% # %%
raw_data.head() raw_data.head()
# %%
filt_out
# %% # %%
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #

View File

@ -259,4 +259,4 @@ ORDER BY view_name;
-- DESC all_views; -- DESC all_views;
DESC EXT_DOPT_ERGEBNIS; DESC EXT_DOPT_ERGEBNIS;
SELECT * FROM v$version;d-opt SELECT * FROM v$version;

View File

@ -6,6 +6,30 @@ from umbreit.types import PolarsNullValues, PolarsSchema
metadata = sql.MetaData() metadata = sql.MetaData()
class OracleBoolean(sql.types.TypeDecorator):
impl = sql.VARCHAR(1)
cache_ok = True
def process_bind_param(
self,
value: bool | None,
dialect,
) -> str | None:
if value is None:
return None
return "Y" if value else "N"
def process_result_value(
self,
value: sql.VARCHAR | None,
dialect,
) -> bool | None:
if value is None:
return None
return value == "Y"
ext_bedpbed = Table( ext_bedpbed = Table(
"ext_bedpbed", "ext_bedpbed",
metadata, metadata,
@ -153,41 +177,7 @@ EXT_AUFPAUF_schema_map: PolarsSchema = {
EXT_AUFPAUF_null_values: PolarsNullValues = {} EXT_AUFPAUF_null_values: PolarsNullValues = {}
csv_tables: tuple[tuple[Table, PolarsSchema, PolarsNullValues], ...] = ( # // queries and temp data
(ext_bedpbed, ext_bedpbed_schema_map, ext_bedpbed_null_values),
(ext_titel_info, ext_titel_info_schema_map, ext_titel_info_null_values),
(EXT_AUFPAUF, EXT_AUFPAUF_schema_map, EXT_AUFPAUF_null_values),
(EXT_BESPBES_INFO, EXT_BESPBES_INFO_schema_map, EXT_BESPBES_INFO_null_values),
)
results = Table(
"results",
metadata,
Column("id", sql.Integer, nullable=False, primary_key=True, autoincrement=True),
Column("bedarf_nr", sql.Integer, nullable=False),
Column("bedarf_sequenz", sql.Integer, nullable=False),
Column("vorlage", sql.Boolean, nullable=False),
Column("wf_id", sql.Integer, nullable=False),
Column("best_menge", sql.Integer, nullable=True),
Column("freigabe_auto", sql.Boolean, nullable=False),
)
results_schema_map: PolarsSchema = {
"id": pl.UInt32,
"bedarf_nr": pl.UInt32,
"bedarf_sequenz": pl.UInt32,
"vorlage": pl.Boolean,
"wf_id": pl.UInt16,
"best_menge": pl.UInt32,
"freigabe_auto": pl.Boolean,
}
map_to_result: dict[str, str] = {
"BEDARFNR": "bedarf_nr",
"BEDP_SEQUENZ": "bedarf_sequenz",
}
raw_data_query_schema_map: PolarsSchema = { raw_data_query_schema_map: PolarsSchema = {
"BEDARFNR": pl.UInt32, "BEDARFNR": pl.UInt32,
"BEDP_SEQUENZ": pl.UInt32, "BEDP_SEQUENZ": pl.UInt32,
@ -199,3 +189,75 @@ raw_data_query_schema_map: PolarsSchema = {
"MENGE_VORMERKER": pl.UInt32, "MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8, "MANDFUEHR": pl.UInt8,
} }
tmp_data = Table(
"EXT_TMP_BEDP_TINFO",
metadata,
Column("BEDARFNR", sql.Integer, primary_key=True, autoincrement=False, nullable=False),
Column(
"BEDP_SEQUENZ", sql.Integer, primary_key=True, autoincrement=False, nullable=False
),
Column("BEDP_TITELNR", sql.Integer, nullable=False),
Column("BEDP_MAN", sql.Integer, nullable=False),
Column("BEDP_MENGE_BEDARF_VM", sql.Integer, nullable=True),
Column("MELDENUMMER", sql.Integer, nullable=False),
Column("VERLAGSNR", sql.Integer, nullable=False),
Column("MENGE_VORMERKER", sql.Integer, nullable=True),
Column("MANDFUEHR", sql.Integer, primary_key=True, autoincrement=False, nullable=False),
)
tmp_data_schema_map = raw_data_query_schema_map
csv_tables: tuple[tuple[Table, PolarsSchema, PolarsNullValues], ...] = (
(ext_bedpbed, ext_bedpbed_schema_map, ext_bedpbed_null_values),
(ext_titel_info, ext_titel_info_schema_map, ext_titel_info_null_values),
(EXT_AUFPAUF, EXT_AUFPAUF_schema_map, EXT_AUFPAUF_null_values),
(EXT_BESPBES_INFO, EXT_BESPBES_INFO_schema_map, EXT_BESPBES_INFO_null_values),
)
# ** results
# ** Umbreit
EXT_DOPT_ERGEBNIS = Table(
"EXT_DOPT_ERGEBNIS",
metadata,
Column("ID", sql.Integer, nullable=False, primary_key=True, autoincrement=True),
Column("BEDARF_NR", sql.Integer, nullable=False),
Column("BEDARF_SEQUENZ", sql.Integer, nullable=False),
Column("VORLAGE", OracleBoolean, nullable=False),
Column("WF_ID", sql.Integer, nullable=False),
Column("BEST_MENGE", sql.Integer, nullable=True),
Column("FREIGABE_AUTO", OracleBoolean, nullable=False),
)
results_schema_map: PolarsSchema = {
"ID": pl.UInt64,
"BEDARF_NR": pl.UInt32,
"BEDARF_SEQUENZ": pl.UInt32,
"VORLAGE": pl.Boolean,
"WF_ID": pl.UInt16,
"BEST_MENGE": pl.UInt32,
"FREIGABE_AUTO": pl.Boolean,
}
map_data_to_result: dict[str, str] = {
"BEDARFNR": "BEDARF_NR",
"BEDP_SEQUENZ": "BEDARF_SEQUENZ",
}
# ** local
results_local = Table(
"results",
metadata,
Column("id", sql.Integer, nullable=False, primary_key=True, autoincrement=True),
Column("bedarf_nr", sql.Integer, nullable=False),
Column("bedarf_sequenz", sql.Integer, nullable=False),
Column("vorlage", sql.Boolean, nullable=False),
Column("wf_id", sql.Integer, nullable=False),
Column("best_menge", sql.Integer, nullable=True),
Column("freigabe_auto", sql.Boolean, nullable=False),
)
results_local_schema_map: PolarsSchema = {
k.lower(): v for (k, v) in results_schema_map.items()
}

View File

@ -30,3 +30,4 @@ class Freigabe(enum.Enum):
WF_200 = False WF_200 = False
WF_900 = False WF_900 = False
WF_910 = False WF_910 = False
OPEN = False