diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index 895a89f..f3e4851 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -23,7 +23,6 @@ from umbreit import db, types # types = importlib.reload(types) # db = importlib.reload(db) - # %% p_cfg = io.search_file_iterative( starting_path=Path.cwd(), @@ -61,7 +60,7 @@ def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): ########### RESULTS ########### # temporary res_engine = sql.create_engine("sqlite://") -db.metadata.create_all(res_engine, tables=(db.results,)) +db.metadata.create_all(res_engine, tables=(db.results_local,)) # %% @@ -70,13 +69,13 @@ def delete_results( res_engine: sql.Engine, ) -> None: with res_engine.begin() as conn: - res = conn.execute(sql.delete(db.results)) + res = conn.execute(sql.delete(db.results_local)) print("Rows deleted: ", res.rowcount) delete_results(res_engine) -stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz) +stmt = sql.select(db.results_local.c.bedarf_nr, db.results_local.c.bedarf_sequenz) with res_engine.connect() as conn: res = conn.execute(stmt) print(res.all()) @@ -173,9 +172,9 @@ df_order # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) -print(f"Query duration: {elapsed:.4f} sec") -print("Number of entries: ", len(df)) -print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") +# print(f"Query duration: {elapsed:.4f} sec") +# print("Number of entries: ", len(df)) +# print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% # try title_info parsing @@ -202,10 +201,10 @@ stmt = sql.select( db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, - sql.case( - (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), - else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, - ).label("BEDP_MENGE_BEDARF_VM"), + # sql.case( + # (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + # else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + # ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, @@ -227,11 +226,17 @@ print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") # %% df.head() +# %% +temp = df.with_columns( + pl.col.BEDP_MENGE_BEDARF_VM.fill_null(0), +) +temp.filter(pl.col.BEDP_MENGE_BEDARF_VM < 0) + # %% # // NO LIVE DATA NEEDED # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" -# p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow" +# p_save = Path.cwd() / "raw_data_from_sql_query_20260115-altered_query.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% @@ -373,7 +378,6 @@ df_diff_VM_bedp_tinfo = df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE p_save_diff_VM_bedp_tinfo = ( Path.cwd() / "diff_BEDP-MENGE-BEDARF-VM_TINF-MENGE-VORMERKER_20251211-1.xlsx" ) -from polars.datatypes import classes as pl_dtypes df_diff_VM_bedp_tinfo.to_pandas().to_excel(p_save_diff_VM_bedp_tinfo, index=False) # why are there entries where "BEDP_MENGE_BEDARF_VM" > "MENGE_VORMERKER"? @@ -396,8 +400,12 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18))) # df.filter(pl.col("MELDENUMMER") == 18).filter((pl.col("BEDP_MENGE_BEDARF_VM").is_not_null()) & (pl.col("BEDP_MENGE_BEDARF_VM") > 0)) # %% # VM_CRITERION = "MENGE_VORMERKER" -VM_CRITERION = "BEDP_MENGE_BEDARF_VM" -MANDANT_CRITERION = "BEDP_MAN" +VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN" +ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( + db.EXT_DOPT_ERGEBNIS.columns.keys() +) def get_starting_date( @@ -435,10 +443,55 @@ def get_raw_data() -> pl.DataFrame: ) -# def get_empty_pipeline_result( -# data: pl.DataFrame, -# ) -> PipelineResult: -# return PipelineResult(data) +def save_tmp_data(df: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.tmp_data)) + + with engine.begin() as conn: + conn.execute(sql.insert(db.tmp_data), df.to_dicts()) + + +def get_tmp_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.tmp_data), + engine, + schema_overrides=db.tmp_data_schema_map, + ) + + +def get_result_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.EXT_DOPT_ERGEBNIS), + engine, + schema_overrides=db.results_schema_map, + ) + + +def save_result_data(results: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts()) + + +def clear_result_data() -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS)) + + +def save_result_data_native(results: pl.DataFrame) -> None: + results = results.with_columns( + [ + pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c) + for c in results.select(cs.boolean()).columns + ] + ) + stmt = """ + INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO") + VALUES (:1, :2, :3, :4, :5, :6) + """ + with engine.begin() as conn: + raw_conn = conn.connection.connection + with raw_conn.cursor() as cursor: + cursor.executemany(stmt, tmp.to_pandas(use_pyarrow_extension_array=True)) def _apply_several_filters( @@ -469,7 +522,7 @@ class PipelineResult: ) -> None: self._open = data schema = db.results_schema_map.copy() - del schema["id"] + del schema["ID"] self._results = pl.DataFrame(schema=schema) schema = {} @@ -511,7 +564,12 @@ class PipelineResult: self, data: pl.DataFrame, ) -> None: - self._results = pl.concat([self._results, data]) + print(self._results) + res = pl.concat([self._results, data]) + # self._results = res.with_columns( + # (pl.arange(0, res.height) + 1).alias("ID").cast(db.results_schema_map["ID"]) + # ) + self._results = res def merge_pipeline( self, @@ -528,29 +586,28 @@ class PipelineResult: freigabe_auto: types.Freigabe, is_out: bool, ) -> None: - # TODO move to other position - ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" - - results = data.rename(db.map_to_result) + results = data.rename(db.map_data_to_result) + # TODO rework because it is not WF-agnostic order_qty_expr: pl.Expr if is_out: order_qty_expr = ( pl.lit(0) .alias("ORDER_QTY_CRIT") - .alias("best_menge") - .cast(db.results_schema_map["best_menge"]) + .alias("BEST_MENGE") + .cast(db.results_schema_map["BEST_MENGE"]) ) else: order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") results = results.with_columns( [ - pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), - pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), + # pl.lit(0).alias("ID").cast(db.results_schema_map["ID"]), + pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), + pl.lit(wf_id).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), order_qty_expr, pl.lit(freigabe_auto.value) - .alias("freigabe_auto") - .cast(db.results_schema_map["freigabe_auto"]), + .alias("FREIGABE_AUTO") + .cast(db.results_schema_map["FREIGABE_AUTO"]), ] ) results = results.drop( @@ -564,6 +621,9 @@ class PipelineResult: "MANDFUEHR", ] ) + # results = results.select(RESULT_COLUMN_ORDER) + print(results) + print("####################") self._subtract_data(data) self._add_results(results) @@ -854,7 +914,7 @@ def wf100_petersen_wdb_sub1( # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260116-1.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") @@ -911,6 +971,61 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% pipe_res.results + +# %% +# // test result writing +res = pipe_res.results.clone() +res.height + + +# %% +# %% + + +# %% +save_result_data(res) +get_result_data() +# %% +clear_result_data() +# %% +get_result_data() + +# %% +stmt = sql.text("TRUNCATE TABLE EXT_DOPT_ERGEBNIS") + +with engine.begin() as conn: + conn.exec_driver_sql("TRUNCATE TABLE UMB.EXT_DOPT_ERGEBNIS") + +# %% +stmt = sql.insert(db.EXT_DOPT_ERGEBNIS) +print(stmt.compile(engine)) + +# %% +engine.dispose() + +# %% + + +# %% +t1 = time.perf_counter() +save_result_data(res) +t2 = time.perf_counter() +print(f"Elapsed: {t2 - t1}") +# %% +get_result_data() +# %% +clear_result_data() +get_result_data() +# %% +t1 = time.perf_counter() +raw_input(res) +t2 = time.perf_counter() +print(f"Elapsed: {t2 - t1}") +# %% + + +# %% + # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # %% # pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) @@ -946,9 +1061,6 @@ raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3 # %% raw_data.head() -# %% -filt_out - # %% # ---------------------------------------------------------------------------- # diff --git a/data_analysis/queries.sql b/data_analysis/queries.sql index a1af327..3107a65 100644 --- a/data_analysis/queries.sql +++ b/data_analysis/queries.sql @@ -259,4 +259,4 @@ ORDER BY view_name; -- DESC all_views; DESC EXT_DOPT_ERGEBNIS; -SELECT * FROM v$version;d-opt \ No newline at end of file +SELECT * FROM v$version; \ No newline at end of file diff --git a/src/umbreit/db.py b/src/umbreit/db.py index 946e96e..dc5299a 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -6,6 +6,30 @@ from umbreit.types import PolarsNullValues, PolarsSchema metadata = sql.MetaData() + +class OracleBoolean(sql.types.TypeDecorator): + impl = sql.VARCHAR(1) + cache_ok = True + + def process_bind_param( + self, + value: bool | None, + dialect, + ) -> str | None: + if value is None: + return None + return "Y" if value else "N" + + def process_result_value( + self, + value: sql.VARCHAR | None, + dialect, + ) -> bool | None: + if value is None: + return None + return value == "Y" + + ext_bedpbed = Table( "ext_bedpbed", metadata, @@ -153,41 +177,7 @@ EXT_AUFPAUF_schema_map: PolarsSchema = { EXT_AUFPAUF_null_values: PolarsNullValues = {} -csv_tables: tuple[tuple[Table, PolarsSchema, PolarsNullValues], ...] = ( - (ext_bedpbed, ext_bedpbed_schema_map, ext_bedpbed_null_values), - (ext_titel_info, ext_titel_info_schema_map, ext_titel_info_null_values), - (EXT_AUFPAUF, EXT_AUFPAUF_schema_map, EXT_AUFPAUF_null_values), - (EXT_BESPBES_INFO, EXT_BESPBES_INFO_schema_map, EXT_BESPBES_INFO_null_values), -) - -results = Table( - "results", - metadata, - Column("id", sql.Integer, nullable=False, primary_key=True, autoincrement=True), - Column("bedarf_nr", sql.Integer, nullable=False), - Column("bedarf_sequenz", sql.Integer, nullable=False), - Column("vorlage", sql.Boolean, nullable=False), - Column("wf_id", sql.Integer, nullable=False), - Column("best_menge", sql.Integer, nullable=True), - Column("freigabe_auto", sql.Boolean, nullable=False), -) - -results_schema_map: PolarsSchema = { - "id": pl.UInt32, - "bedarf_nr": pl.UInt32, - "bedarf_sequenz": pl.UInt32, - "vorlage": pl.Boolean, - "wf_id": pl.UInt16, - "best_menge": pl.UInt32, - "freigabe_auto": pl.Boolean, -} - - -map_to_result: dict[str, str] = { - "BEDARFNR": "bedarf_nr", - "BEDP_SEQUENZ": "bedarf_sequenz", -} - +# // queries and temp data raw_data_query_schema_map: PolarsSchema = { "BEDARFNR": pl.UInt32, "BEDP_SEQUENZ": pl.UInt32, @@ -199,3 +189,75 @@ raw_data_query_schema_map: PolarsSchema = { "MENGE_VORMERKER": pl.UInt32, "MANDFUEHR": pl.UInt8, } + +tmp_data = Table( + "EXT_TMP_BEDP_TINFO", + metadata, + Column("BEDARFNR", sql.Integer, primary_key=True, autoincrement=False, nullable=False), + Column( + "BEDP_SEQUENZ", sql.Integer, primary_key=True, autoincrement=False, nullable=False + ), + Column("BEDP_TITELNR", sql.Integer, nullable=False), + Column("BEDP_MAN", sql.Integer, nullable=False), + Column("BEDP_MENGE_BEDARF_VM", sql.Integer, nullable=True), + Column("MELDENUMMER", sql.Integer, nullable=False), + Column("VERLAGSNR", sql.Integer, nullable=False), + Column("MENGE_VORMERKER", sql.Integer, nullable=True), + Column("MANDFUEHR", sql.Integer, primary_key=True, autoincrement=False, nullable=False), +) + +tmp_data_schema_map = raw_data_query_schema_map + +csv_tables: tuple[tuple[Table, PolarsSchema, PolarsNullValues], ...] = ( + (ext_bedpbed, ext_bedpbed_schema_map, ext_bedpbed_null_values), + (ext_titel_info, ext_titel_info_schema_map, ext_titel_info_null_values), + (EXT_AUFPAUF, EXT_AUFPAUF_schema_map, EXT_AUFPAUF_null_values), + (EXT_BESPBES_INFO, EXT_BESPBES_INFO_schema_map, EXT_BESPBES_INFO_null_values), +) + +# ** results +# ** Umbreit +EXT_DOPT_ERGEBNIS = Table( + "EXT_DOPT_ERGEBNIS", + metadata, + Column("ID", sql.Integer, nullable=False, primary_key=True, autoincrement=True), + Column("BEDARF_NR", sql.Integer, nullable=False), + Column("BEDARF_SEQUENZ", sql.Integer, nullable=False), + Column("VORLAGE", OracleBoolean, nullable=False), + Column("WF_ID", sql.Integer, nullable=False), + Column("BEST_MENGE", sql.Integer, nullable=True), + Column("FREIGABE_AUTO", OracleBoolean, nullable=False), +) + + +results_schema_map: PolarsSchema = { + "ID": pl.UInt64, + "BEDARF_NR": pl.UInt32, + "BEDARF_SEQUENZ": pl.UInt32, + "VORLAGE": pl.Boolean, + "WF_ID": pl.UInt16, + "BEST_MENGE": pl.UInt32, + "FREIGABE_AUTO": pl.Boolean, +} + +map_data_to_result: dict[str, str] = { + "BEDARFNR": "BEDARF_NR", + "BEDP_SEQUENZ": "BEDARF_SEQUENZ", +} + +# ** local +results_local = Table( + "results", + metadata, + Column("id", sql.Integer, nullable=False, primary_key=True, autoincrement=True), + Column("bedarf_nr", sql.Integer, nullable=False), + Column("bedarf_sequenz", sql.Integer, nullable=False), + Column("vorlage", sql.Boolean, nullable=False), + Column("wf_id", sql.Integer, nullable=False), + Column("best_menge", sql.Integer, nullable=True), + Column("freigabe_auto", sql.Boolean, nullable=False), +) + +results_local_schema_map: PolarsSchema = { + k.lower(): v for (k, v) in results_schema_map.items() +} diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 1058920..5113e99 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -30,3 +30,4 @@ class Freigabe(enum.Enum): WF_200 = False WF_900 = False WF_910 = False + OPEN = False