diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index 90c3280..e22dc1f 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -1,4 +1,7 @@ # %% +from __future__ import annotations + +import datetime import json import time import typing @@ -7,9 +10,11 @@ from pathlib import Path from pprint import pprint import dopt_basics.datetime as dt +import oracledb import polars as pl import sqlalchemy as sql from dopt_basics import configs, io +from sqlalchemy import event from umbreit import db, types @@ -34,18 +39,24 @@ USER_NAME = CFG["user"]["name"] USER_PASS = CFG["user"]["pass"] # %% # !! init thick mode -# p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") -# assert p_oracle_client.exists() -# assert p_oracle_client.is_dir() -# oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) -# %% -types.Freigabe.WF_100.value - +p_oracle_client = Path(r"C:\Databases\Oracle\instantclient_19_29") +assert p_oracle_client.exists() +assert p_oracle_client.is_dir() +oracledb.init_oracle_client(lib_dir=str(p_oracle_client)) # %% conn_string = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) -engine = sql.create_engine(conn_string) +# engine = sql.create_engine(conn_string) +engine = sql.create_engine(conn_string, execution_options={"stream_results": True}) + + +@event.listens_for(engine, "after_cursor_execute") +def set_fetch_sizes(conn, cursor, statement, parameters, context, executemany): + cursor.arraysize = 1000 + cursor.prefetchrows = 1000 + + # %% ########### RESULTS ########### # temporary @@ -129,35 +140,36 @@ df_order # prefilter amount columns for invalid entries -print("--------------- ext_bedpbed --------------") -t1 = time.perf_counter() -AMOUNT_COLS = frozenset( - ( - "BEDP_MENGE_BEDARF", - "BEDP_MENGE_VERKAUF", - "BEDP_MENGE_ANFRAGE", - "BEDP_MENGE_BESTELLUNG", - "BEDP_MENGE_FREI", - "BEDP_MENGE_BEDARF_VM", - ) -) +# // tests with ext_bedpbed +# print("--------------- ext_bedpbed --------------") +# t1 = time.perf_counter() +# AMOUNT_COLS = frozenset( +# ( +# "BEDP_MENGE_BEDARF", +# "BEDP_MENGE_VERKAUF", +# "BEDP_MENGE_ANFRAGE", +# "BEDP_MENGE_BESTELLUNG", +# "BEDP_MENGE_FREI", +# "BEDP_MENGE_BEDARF_VM", +# ) +# ) -case_stmts = [] -for col in AMOUNT_COLS: - case_stmts.append( - sql.case( - (db.ext_bedpbed.c[col] <= -1, sql.null()), - else_=db.ext_bedpbed.c[col], - ).label(col) - ) +# case_stmts = [] +# for col in AMOUNT_COLS: +# case_stmts.append( +# sql.case( +# (db.ext_bedpbed.c[col] <= -1, sql.null()), +# else_=db.ext_bedpbed.c[col], +# ).label(col) +# ) -stmt = sql.select( - *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], - *case_stmts, -) -df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) -t2 = time.perf_counter() -elapsed = t2 - t1 +# stmt = sql.select( +# *[c for c in db.ext_bedpbed.c if c.name not in AMOUNT_COLS], +# *case_stmts, +# ) +# df = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map) +# t2 = time.perf_counter() +# elapsed = t2 - t1 # %% # df.select(pl.col("BEDP_MENGE_BEDARF").is_null().sum()) @@ -217,7 +229,7 @@ df.head() # %% # // NO LIVE DATA NEEDED # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20251211-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% @@ -336,6 +348,16 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18))) # %% # VM_CRITERION = "MENGE_VORMERKER" VM_CRITERION = "BEDP_MENGE_BEDARF_VM" +MANDANT_CRITERION = "BEDP_MAN" + + +def get_starting_date( + days: int, +) -> datetime.date: + current_dt = dt.current_time_tz(cut_microseconds=True) + td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS) + + return (current_dt - td).date() # TODO exchange to new query focusing on TINFO table @@ -364,13 +386,10 @@ def get_raw_data() -> pl.DataFrame: ) -def get_empty_pipeline_result( - data: pl.DataFrame, -) -> types.PipelineResult: - schema = db.results_schema_map.copy() - del schema["id"] - results = pl.DataFrame(schema=schema) - return types.PipelineResult(results=results, open=data) +# def get_empty_pipeline_result( +# data: pl.DataFrame, +# ) -> PipelineResult: +# return PipelineResult(data) def _apply_several_filters( @@ -391,63 +410,204 @@ def _apply_several_filters( return types.FilterResult(in_=df_current, out_=df_removed) +class PipelineResult: + __slots__ = ("_results", "_open", "_subtracted_indices") + _index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ") + + def __init__( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + schema = db.results_schema_map.copy() + del schema["id"] + self._results = pl.DataFrame(schema=schema) + + schema = {} + for col in self._index_cols: + schema[col] = db.raw_data_query_schema_map[col] + self._subtracted_indices = pl.DataFrame(schema=schema) + + def __len__(self) -> int: + return len(self._results) + len(self._open) + + @property + def open(self) -> pl.DataFrame: + return self._open + + @property + def results(self) -> pl.DataFrame: + return self._results + + @property + def subtracted_indices(self) -> pl.DataFrame: + return self._subtracted_indices + + def update_open( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + + def _subtract_data( + self, + data: pl.DataFrame, + ) -> None: + self._open = self._open.join(data, on=self._index_cols, how="anti") + self._subtracted_indices = pl.concat( + (self._subtracted_indices, data[self._index_cols]) + ) + + # TODO remove + # def _subtract_from_open( + # self, + # data: pl.DataFrame, + # ) -> None: + # self._open = self._open.join(data, on=self._index_cols, how="anti") + # self._subtracted_indices = pl.concat( + # (self._subtracted_indices, data[self._index_cols]) + # ) + + # def _subtract_from_indices( + # self, + # indices: pl.DataFrame, + # ) -> None: + # self._open = self._open.join(indices, on=self._index_cols, how="anti") + # self._subtracted_indices = pl.concat( + # (self._subtracted_indices, indices[self._index_cols]) + # ) + + def _add_results( + self, + data: pl.DataFrame, + ) -> None: + self._results = pl.concat([self._results, data]) + + # TODO remove + # def add_pipeline_results(self, pipeline: PipelineResult) -> None: + # self._add_results(pipeline.results) + + # def subtract_pipeline( + # self, + # pipeline: PipelineResult, + # ) -> None: + # self._subtract_data(pipeline.subtracted_indices) + + def merge_pipeline( + self, + pipeline: PipelineResult, + ) -> None: + self._subtract_data(pipeline.subtracted_indices) + self._add_results(pipeline.results) + + def write_results( + self, + data: pl.DataFrame, + vorlage: bool, + wf_id: int, + freigabe_auto: types.Freigabe, + is_out: bool, + ) -> None: + ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" + + results = data.rename(db.map_to_result) + order_qty_expr: pl.Expr + if is_out: + order_qty_expr = ( + pl.lit(0) + .alias("ORDER_QTY_CRIT") + .alias("best_menge") + .cast(db.results_schema_map["best_menge"]) + ) + else: + order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") + + results = results.with_columns( + [ + pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), + pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), + order_qty_expr, + pl.lit(freigabe_auto.value) + .alias("freigabe_auto") + .cast(db.results_schema_map["freigabe_auto"]), + ] + ) + results = results.drop( + [ + "BEDP_TITELNR", + "BEDP_MAN", + "BEDP_MENGE_BEDARF_VM", + "MELDENUMMER", + "VERLAGSNR", + "MENGE_VORMERKER", + "MANDFUEHR", + ] + ) + + self._subtract_data(data) + self._add_results(results) + + # post-processing the results # TODO: order quantity not always necessary # TODO: change relevant criterion for order quantity -def _write_results( - results_table: pl.DataFrame, - data: pl.DataFrame, - vorlage: bool, - wf_id: int, - freigabe_auto: types.Freigabe, - is_out: bool, -) -> pl.DataFrame: - ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +# def _write_results( +# pipe_result: PipelineResult, +# data: pl.DataFrame, +# vorlage: bool, +# wf_id: int, +# freigabe_auto: types.Freigabe, +# is_out: bool, +# ) -> PipelineResult: +# ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" - data = data.rename(db.map_to_result) - order_qty_expr: pl.Expr - if is_out: - order_qty_expr = ( - pl.lit(0) - .alias("ORDER_QTY_CRIT") - .alias("best_menge") - .cast(db.results_schema_map["best_menge"]) - ) - else: - order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") +# results = data.rename(db.map_to_result) +# order_qty_expr: pl.Expr +# if is_out: +# order_qty_expr = ( +# pl.lit(0) +# .alias("ORDER_QTY_CRIT") +# .alias("best_menge") +# .cast(db.results_schema_map["best_menge"]) +# ) +# else: +# order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") - data = data.with_columns( - [ - pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), - pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), - order_qty_expr, - pl.lit(freigabe_auto.value) - .alias("freigabe_auto") - .cast(db.results_schema_map["freigabe_auto"]), - ] - ) - data = data.drop( - [ - "BEDP_TITELNR", - "BEDP_MAN", - "BEDP_MENGE_BEDARF_VM", - "MELDENUMMER", - "VERLAGSNR", - "MENGE_VORMERKER", - "MANDFUEHR", - ] - ) +# results = results.with_columns( +# [ +# pl.lit(vorlage).alias("vorlage").cast(db.results_schema_map["vorlage"]), +# pl.lit(wf_id).alias("wf_id").cast(db.results_schema_map["wf_id"]), +# order_qty_expr, +# pl.lit(freigabe_auto.value) +# .alias("freigabe_auto") +# .cast(db.results_schema_map["freigabe_auto"]), +# ] +# ) +# results = results.drop( +# [ +# "BEDP_TITELNR", +# "BEDP_MAN", +# "BEDP_MENGE_BEDARF_VM", +# "MELDENUMMER", +# "VERLAGSNR", +# "MENGE_VORMERKER", +# "MANDFUEHR", +# ] +# ) - return pl.concat([results_table, data]) +# pipe_result.subtract_from_open(data) +# pipe_result.add_results(results) + +# return pipe_result def workflow_900( - pipe_result: types.PipelineResult, -) -> types.PipelineResult: - """pre-routine to handle non-feasible entries""" + pipe_result: PipelineResult, +) -> PipelineResult: + """filter 'Meldenummer' and fill non-feasible entries""" filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() - filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) + filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( pipe_res.open, ( @@ -456,8 +616,7 @@ def workflow_900( ), ) - pipe_result.results = _write_results( - pipe_result.results, + pipe_result.write_results( data=res.out_, vorlage=False, wf_id=900, @@ -465,18 +624,18 @@ def workflow_900( is_out=True, ) - pipe_result.open = res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) - pipe_result.open = res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) + pipe_result.update_open(res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))) + pipe_result.update_open(res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))) return pipe_result -# main routine -# results for filtered out entries written def workflow_910( - pipe_result: types.PipelineResult, -) -> types.PipelineResult: - filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) + pipe_result: PipelineResult, +) -> PipelineResult: + # TODO check if necessary because of WF-900 + filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) + filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 res = _apply_several_filters( @@ -487,15 +646,13 @@ def workflow_910( ), ) # write results for entries which were filtered out - pipe_result.results = _write_results( - pipe_result.results, + pipe_result.write_results( data=res.out_, vorlage=False, wf_id=910, freigabe_auto=types.Freigabe.WF_910, is_out=True, ) - pipe_result.open = res.in_ return pipe_result @@ -503,11 +660,11 @@ def workflow_910( # this a main routine: # receives and gives back result objects def workflow_100_umbreit( - pipe_result: types.PipelineResult, + pipe_result: PipelineResult, vm_criterion: str, -) -> types.PipelineResult: +) -> PipelineResult: filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col("MANDFUEHR") == 1 + filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( @@ -518,53 +675,27 @@ def workflow_100_umbreit( filter_number_vm, ), ) - pipe_result.results = _write_results( - results_table=pipe_result.results, + pipe_result.write_results( data=res.in_, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) - pipe_result.open = res.out_ return pipe_result def workflow_100_petersen( - pipe_result: types.PipelineResult, + pipe_result: PipelineResult, vm_criterion: str, -) -> types.PipelineResult: +) -> PipelineResult: # difference WDB and others # // WDB branch - filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col("MANDFUEHR") == 90 - filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) - filter_number_vm = pl.col(vm_criterion) > 0 - - res = _apply_several_filters( - pipe_result.open, - ( - filter_meldenummer, - filter_mandant, - filter_WDB, - filter_number_vm, - ), - ) - pipe_result.results = _write_results( - results_table=pipe_result.results, - data=res.in_, - vorlage=True, - wf_id=100, - freigabe_auto=types.Freigabe.WF_100, - is_out=False, - ) - pipe_result.open = res.out_ - # order quantity 0, no further action in other WFs filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col("MANDFUEHR") == 90 + filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) == 0 @@ -577,19 +708,37 @@ def workflow_100_petersen( filter_number_vm, ), ) - pipe_result.results = _write_results( - results_table=pipe_result.results, + pipe_result.write_results( data=res.in_, vorlage=False, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) - pipe_result.open = res.out_ + + # TODO add check for orders or quantity to transform + # TODO show them + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + filter_number_vm = pl.col(vm_criterion) > 0 + res_candidates = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + wdb_sub_pipe = PipelineResult(res_candidates.in_) + wdb_sub_pipe = wf100_petersen_wdb_sub1(wdb_sub_pipe) + assert len(wdb_sub_pipe.open) == 0 + pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col("MANDFUEHR") == 90 + filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( @@ -600,27 +749,85 @@ def workflow_100_petersen( filter_number_vm, ), ) - pipe_result.results = _write_results( - results_table=pipe_result.results, + pipe_result.write_results( data=res.in_, vorlage=True, wf_id=100, freigabe_auto=types.Freigabe.WF_100, is_out=False, ) - pipe_result.open = res.out_ + + return pipe_result + + +def wf100_petersen_wdb_sub1( + pipe_result: PipelineResult, +) -> PipelineResult: + # input: pre-filtered entries (WDB titles and #VM > 0) + # more then 1 VM + # !! show these entries + filter_number_vm = pl.col(VM_CRITERION) > 1 + res = _apply_several_filters( + pipe_result.open, + (filter_number_vm,), + ) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + + # filtered out entries (WDB with #VM == 1) must be analysed for orders in the + # past 6 months + title_nos = res.out_["BEDP_TITELNR"].to_list() + # !! query used because of slow pre-filtering queries + # TODO check for more native pre-filtering within the database when + # TODO performance problems are solved + start_date = get_starting_date(180) + filter_ = sql.and_( + db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos), + db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, + ) + stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) + df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) + df_show = ( + df_order.group_by("BESP_TITELNR") + .agg(pl.col("BESP_TITELNR").count().alias("count")) + .filter(pl.col("count") > 1) + ) + entries_to_show = df_show["BESP_TITELNR"].to_list() + filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show) + res = _apply_several_filters(pipe_result.open, (filter_titleno,)) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) return pipe_result # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20251211-1.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") # %% df.head() + + # %% # removed_rows = [] @@ -659,7 +866,8 @@ df.head() # %% raw_data = df.clone() -pipe_res = get_empty_pipeline_result(raw_data) +# pipe_res = get_empty_pipeline_result(raw_data) +pipe_res = PipelineResult(raw_data) pipe_res.results pipe_res = workflow_900(pipe_res) print(f"Length of base data: {len(raw_data):>18}") @@ -916,3 +1124,124 @@ for col, dtype in zip(df.columns, df.dtypes): print("dtypes of DF...") pprint(col_dtype) # %% +# ** Petersen WDB +filter_meldenummer = pl.col("MELDENUMMER") == 18 +filter_mandant = pl.col(MANDANT_CRITERION) == 90 +filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) +filter_number_vm = pl.col(VM_CRITERION) > 0 + +res = _apply_several_filters( + df, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), +) +# %% +res.in_ +# %% +# !! show these entries +filter_number_vm = pl.col(VM_CRITERION) > 1 + +res_vm_crit = _apply_several_filters( + res.in_, + (filter_number_vm,), +) +# %% +res_vm_crit.out_ +# %% +# filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months +title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list() +len(title_nos) + + +# %% +# define starting date for 6 month interval +# returns UTC time +start_date = get_starting_date(180) +filter_ = sql.and_( + db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos), + db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, +) +stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) +df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) +df_order +# %% +# filter entries which have +df_show = ( + df_order.group_by("BESP_TITELNR") + .agg(pl.col("BESP_TITELNR").count().alias("count")) + .filter(pl.col("count") > 1) +) +# %% +# !! show these entries +# !! do not show others +entries_to_show = df_show["BESP_TITELNR"].to_list() +print(f"Number of entries relevant: {len(entries_to_show)}") +# %% +res_vm_crit.out_ +# %% +filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show) + +res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,)) + +# %% +res_wdb.in_ +# %% +res_wdb.out_ + + +# %% + + +# %% +# %% +# %% +schema = {} +for col in ("BEDARFNR", "BEDP_SEQUENZ"): + schema[col] = db.raw_data_query_schema_map[col] +base = pl.DataFrame(schema=schema) +# %% +data = {"BEDARFNR": list(range(10)), "BEDP_SEQUENZ": list(range(10))} +orig_data = pl.DataFrame(data, schema=schema) +data = orig_data[:5].clone() +# %% +pl.concat([base, data]) +# %% +orig_data.join(data, on=["BEDARFNR", "BEDP_SEQUENZ"], how="anti") +# %% +orig_data[("BEDARFNR", "BEDP_SEQUENZ")] +# %% +raw_data = df.clone() +pipe_res = PipelineResult(raw_data) +pipe_res.open +# %% +pipe_res.results +# %% +sub_data = pipe_res.open[:20].clone() +sub_data +# %% +pipe_res.write_results( + sub_data, + vorlage=True, + wf_id=30, + freigabe_auto=types.Freigabe.WF_100, + is_out=True, +) +# %% +pipe_res.open +# %% +pipe_res.results +# %% +raw_data = df.clone() +pipe_res_main = PipelineResult(raw_data) +pipe_res_main.open +# %% +pipe_res_main.merge_pipeline(pipe_res) +# %% +pipe_res_main.open +# %% +pipe_res.results +# %% diff --git a/data_analysis/queries.sql b/data_analysis/queries.sql index 04d17b8..2a41515 100644 --- a/data_analysis/queries.sql +++ b/data_analysis/queries.sql @@ -19,8 +19,27 @@ set timing on -- AND bedp.BEDP_MAN = t_info.MANDFUEHR; -- PROMPT #################################### -PROMPT >>>>>>>>> All allowed -SELECT COUNT(*) FROM ( +-- PROMPT >>>>>>>>> All allowed +-- SELECT COUNT(*) FROM ( +-- SELECT +-- bedp.BEDARFNR, +-- bedp.BEDP_SEQUENZ, +-- bedp.BEDP_TITELNR, +-- bedp.BEDP_MAN, +-- bedp.BEDP_MENGE_BEDARF_VM, +-- t_info.MELDENUMMER, +-- t_info.VERLAGSNR +-- t_info.MENGE_VORMERKER +-- t_info.MANDFUEHR +-- FROM EXT_BEDPBED bedp +-- LEFT JOIN EXT_TITEL_INFO t_info +-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER +-- ); + +-- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26; +-- PROMPT ###################################### +PROMPT ################################################# +SELECT * FROM ( SELECT bedp.BEDARFNR, bedp.BEDP_SEQUENZ, @@ -28,17 +47,21 @@ SELECT COUNT(*) FROM ( bedp.BEDP_MAN, bedp.BEDP_MENGE_BEDARF_VM, t_info.MELDENUMMER, - t_info.VERLAGSNR - t_info.MENGE_VORMERKER + t_info.VERLAGSNR, + t_info.MENGE_VORMERKER, t_info.MANDFUEHR FROM EXT_BEDPBED bedp LEFT JOIN EXT_TITEL_INFO t_info ON bedp.BEDP_TITELNR = t_info.TI_NUMMER -); - --- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26; --- PROMPT ###################################### -PROMPT ################################################# +) view1 +WHERE view1.VERLAGSNR IN (76008, 76070) +FETCH FIRST 100 ROWS ONLY; + +SELECT * FROM EXT_BESPBES_INFO besp +WHERE besp.BESP_TITELNR = 7590554 AND +besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD'); + + -- SELECT COUNT(*) FROM ( -- SELECT /*+ NO_USE_HASH(bedp t_info) */ -- view1.BEDP_TITELNR, diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 490480e..1058920 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -16,13 +16,13 @@ class FilterResult: out_: pl.DataFrame -@dataclass(slots=True, kw_only=True, eq=False) -class PipelineResult: - results: pl.DataFrame - open: pl.DataFrame +# @dataclass(slots=True, kw_only=True, eq=False) +# class PipelineResult: +# results: pl.DataFrame +# open: pl.DataFrame - def __len__(self) -> int: - return len(self.results) + len(self.open) +# def __len__(self) -> int: +# return len(self.results) + len(self.open) class Freigabe(enum.Enum):