diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index f3e4851..c29e68c 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -12,6 +12,7 @@ from pprint import pprint import dopt_basics.datetime as dt import oracledb import polars as pl +import polars.selectors as cs import sqlalchemy as sql from dopt_basics import configs, io from sqlalchemy import event @@ -406,6 +407,7 @@ ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( db.EXT_DOPT_ERGEBNIS.columns.keys() ) +ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() def get_starting_date( @@ -491,7 +493,7 @@ def save_result_data_native(results: pl.DataFrame) -> None: with engine.begin() as conn: raw_conn = conn.connection.connection with raw_conn.cursor() as cursor: - cursor.executemany(stmt, tmp.to_pandas(use_pyarrow_extension_array=True)) + cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True)) def _apply_several_filters( @@ -582,28 +584,15 @@ class PipelineResult: self, data: pl.DataFrame, vorlage: bool, - wf_id: int, + wf_id: types.Workflows, freigabe_auto: types.Freigabe, - is_out: bool, + order_qty_expr: pl.Expr, ) -> None: results = data.rename(db.map_data_to_result) - # TODO rework because it is not WF-agnostic - order_qty_expr: pl.Expr - if is_out: - order_qty_expr = ( - pl.lit(0) - .alias("ORDER_QTY_CRIT") - .alias("BEST_MENGE") - .cast(db.results_schema_map["BEST_MENGE"]) - ) - else: - order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("best_menge") - results = results.with_columns( [ - # pl.lit(0).alias("ID").cast(db.results_schema_map["ID"]), pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), - pl.lit(wf_id).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), + pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), order_qty_expr, pl.lit(freigabe_auto.value) .alias("FREIGABE_AUTO") @@ -621,14 +610,83 @@ class PipelineResult: "MANDFUEHR", ] ) + # TODO remove # results = results.select(RESULT_COLUMN_ORDER) - print(results) - print("####################") + # print(results) + # print("####################") + # print(self._results) self._subtract_data(data) self._add_results(results) +class ExprOrderQty(typing.Protocol): ... + + +class ExprOrderQty_Base(ExprOrderQty, typing.Protocol): + def __call__(self) -> pl.Expr: ... + + +ExprOrderQty_Base_Types: typing.TypeAlias = ( + typing.Literal[types.Workflows.ID_200] + | typing.Literal[types.Workflows.ID_900] + | typing.Literal[types.Workflows.ID_910] +) + + +class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol): + def __call__(self, empty: bool) -> pl.Expr: ... + + +@typing.overload +def get_expr_order_qty( + wf_id: typing.Literal[types.Workflows.ID_100], +) -> ExprOrderQty_WF100: ... + + +@typing.overload +def get_expr_order_qty( + wf_id: ExprOrderQty_Base_Types, +) -> ExprOrderQty_Base: ... + + +def get_expr_order_qty( + wf_id: types.Workflows, +) -> ExprOrderQty: + empty_expr = ( + pl.lit(0) + .alias(ORDER_QTY_CRIT) + .alias("BEST_MENGE") + .cast(db.results_schema_map["BEST_MENGE"]) + ) + + def _empty() -> pl.Expr: + return empty_expr + + func: ExprOrderQty + match wf_id: + case types.Workflows.ID_100: + + def _func(empty: bool) -> pl.Expr: + order_qty_expr: pl.Expr + if empty: + order_qty_expr = empty_expr + else: + order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE") + return order_qty_expr + + func = _func + + case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910: + func = _empty + case _: + raise NotImplementedError( + f"Order expression for WF-ID {wf_id.value} is not implemented" + ) + + return func + + # post-processing the results # TODO: order quantity not always necessary # TODO: change relevant criterion for order quantity @@ -682,11 +740,11 @@ class PipelineResult: # return pipe_result -def workflow_900( +def wf900( pipe_result: PipelineResult, ) -> PipelineResult: """filter 'Meldenummer' and fill non-feasible entries""" - + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( @@ -696,25 +754,29 @@ def workflow_900( filter_mandant, ), ) - pipe_result.write_results( data=res.out_, vorlage=False, - wf_id=900, + wf_id=types.Workflows.ID_900, freigabe_auto=types.Freigabe.WF_900, - is_out=True, + order_qty_expr=ORDER_QTY_FUNC(), ) - pipe_result.update_open(res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))) - pipe_result.update_open(res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))) + pipe_result.update_open( + res.in_.with_columns( + pl.col("MENGE_VORMERKER").fill_null(0), + pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0), + ) + ) return pipe_result -def workflow_910( +def wf910( pipe_result: PipelineResult, ) -> PipelineResult: # TODO check if necessary because of WF-900 + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 @@ -726,13 +788,12 @@ def workflow_910( filter_ignore_MNR26, ), ) - # write results for entries which were filtered out pipe_result.write_results( data=res.out_, vorlage=False, - wf_id=910, + wf_id=types.Workflows.ID_910, freigabe_auto=types.Freigabe.WF_910, - is_out=True, + order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result @@ -740,10 +801,12 @@ def workflow_910( # this a main routine: # receives and gives back result objects -def workflow_100_umbreit( +def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 @@ -758,19 +821,20 @@ def workflow_100_umbreit( ) pipe_result.write_results( data=res.in_, - vorlage=True, - wf_id=100, + vorlage=False, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result -def workflow_100_petersen( +def wf100_petersen( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # difference WDB and others # // WDB branch @@ -788,19 +852,17 @@ def workflow_100_petersen( filter_number_vm, ), ) + pipe_result.write_results( data=res.in_, vorlage=False, - wf_id=100, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=True), ) # TODO add check for orders or quantity to transform # TODO show them - filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col(MANDANT_CRITERION) == 90 - filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) filter_number_vm = pl.col(vm_criterion) > 0 res_candidates = _apply_several_filters( pipe_result.open, @@ -812,14 +874,12 @@ def workflow_100_petersen( ), ) wdb_sub_pipe = PipelineResult(res_candidates.in_) - wdb_sub_pipe = wf100_petersen_wdb_sub1(wdb_sub_pipe) - assert len(wdb_sub_pipe.open) == 0 + wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe) + assert wdb_sub_pipe.open.height == 0 pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch # show always entries with #VM > 1 - filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_number_vm = pl.col(vm_criterion) > 1 res = _apply_several_filters( pipe_result.open, @@ -832,13 +892,11 @@ def workflow_100_petersen( pipe_result.write_results( data=res.in_, vorlage=True, - wf_id=100, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), ) - filter_meldenummer = pl.col("MELDENUMMER") == 18 - filter_mandant = pl.col(MANDANT_CRITERION) == 90 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, @@ -850,18 +908,19 @@ def workflow_100_petersen( ) pipe_result.write_results( data=res.in_, - vorlage=True, - wf_id=100, + vorlage=False, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), ) return pipe_result -def wf100_petersen_wdb_sub1( +def _wf100_petersen_sub1_wdb( pipe_result: PipelineResult, ) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # input: pre-filtered entries (WDB titles and #VM > 0) # more then 1 VM # !! show these entries @@ -873,9 +932,9 @@ def wf100_petersen_wdb_sub1( pipe_result.write_results( data=res.in_, vorlage=True, - wf_id=100, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), ) # filtered out entries (WDB with #VM == 1) must be analysed for orders in the @@ -892,21 +951,143 @@ def wf100_petersen_wdb_sub1( .agg(pl.col("BESP_TITELNR").count().alias("count")) .filter(pl.col("count") > 1) ) + # TODO IS IN check good because of performance? filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list()) res = _apply_several_filters(pipe_result.open, (filter_titleno,)) pipe_result.write_results( data=res.in_, vorlage=True, - wf_id=100, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), ) pipe_result.write_results( data=pipe_result.open, vorlage=False, - wf_id=100, + wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, - is_out=False, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + return pipe_result + + +def wf200_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + relevant_mnr: tuple[int, ...] = (17, 18) + filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) + filter_mandant = pl.col("BEDP_MAN") == 1 + + res = _apply_several_filters( + pipe_result.open, + (filter_meldenummer, filter_mandant), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0 + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def wf200_petersen( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_MNR: tuple[int, ...] = (17, 18) + # // WDB branch + filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR) + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + ), + ) + # ignore these + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + # // other branch + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + ), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0 + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def _wf200_sub1( + pipe_result: PipelineResult, +) -> PipelineResult: + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_DATE = get_starting_date(90) + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), + db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFTRAGS_ART, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + relevant_titles = pl.read_database( + stmt, + engine, + ) + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), ) return pipe_result @@ -963,7 +1144,7 @@ raw_data = df.clone() # pipe_res = get_empty_pipeline_result(raw_data) pipe_res = PipelineResult(raw_data) pipe_res.results -pipe_res = workflow_900(pipe_res) +pipe_res = wf900(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") @@ -977,60 +1158,11 @@ pipe_res.results res = pipe_res.results.clone() res.height - -# %% -# %% - - -# %% -save_result_data(res) -get_result_data() -# %% -clear_result_data() -# %% -get_result_data() - -# %% -stmt = sql.text("TRUNCATE TABLE EXT_DOPT_ERGEBNIS") - -with engine.begin() as conn: - conn.exec_driver_sql("TRUNCATE TABLE UMB.EXT_DOPT_ERGEBNIS") - -# %% -stmt = sql.insert(db.EXT_DOPT_ERGEBNIS) -print(stmt.compile(engine)) - -# %% -engine.dispose() - -# %% - - -# %% -t1 = time.perf_counter() -save_result_data(res) -t2 = time.perf_counter() -print(f"Elapsed: {t2 - t1}") -# %% -get_result_data() -# %% -clear_result_data() -get_result_data() -# %% -t1 = time.perf_counter() -raw_input(res) -t2 = time.perf_counter() -print(f"Elapsed: {t2 - t1}") -# %% - - -# %% - # raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) # %% # pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) # %% -pipe_res = workflow_910(pipe_res) +pipe_res = wf910(pipe_res) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") @@ -1038,30 +1170,33 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% # pipe_res.results.select(pl.col("vorlage").value_counts()) # %% -pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION) +pipe_res = wf100_umbreit(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -pipe_res = workflow_100_petersen(pipe_res, VM_CRITERION) +pipe_res = wf100_petersen(pipe_res, VM_CRITERION) print(f"Length of base data: {len(raw_data):>18}") print(f"Number of entries pipe data: {len(pipe_res):>10}") print(f"Number of entries result data: {len(pipe_res.results):>8}") print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -pipe_res.open.filter(pl.col.MELDENUMMER == 18).filter(pl.col.BEDP_MENGE_BEDARF_VM > 0) - +pipe_res = wf200_umbreit(pipe_res) +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -pipe_res.results.select(pl.col("vorlage").value_counts()) +pipe_res = wf200_petersen(pipe_res) +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") # %% -pipe_res.results.filter(pl.col("vorlage") == True) +pipe_res.open.filter(pl.col.MELDENUMMER.is_in((17, 18))) # %% -raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3) -# %% -raw_data.head() - - +pipe_res.results.select(pl.col("VORLAGE").value_counts()) # %% # ---------------------------------------------------------------------------- # # Workflow 200 (Umbreit only) @@ -1072,26 +1207,14 @@ wf_200_start_data # %% -def workflow_200_umbreit( - pipe_result: PipelineResult, -) -> PipelineResult: - relevant_mnr: tuple[int, ...] = (17, 18) - filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) - filter_mandant = pl.col("BEDP_MAN") == 1 - # not relevant, because already done in WF-100 - # filter_number_vm = pl.col(vm_criterion) == 0 - - res = _apply_several_filters( - pipe_result.open, - (filter_meldenummer, filter_mandant), - ) +# %% relevant_mnr: tuple[int, ...] = (17, 18) filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) filter_mandant = pl.col("BEDP_MAN") == 1 res = _apply_several_filters( - pipe_res.open, + wf_200_start_data, (filter_meldenummer, filter_mandant), ) # %% @@ -1104,52 +1227,136 @@ res = _apply_several_filters( # -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR # -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND # -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND -# -- -res.in_ +# +# this is a separate sub-pipeline like in Petersen WF-100 +# these entries are either to be shown or not +sub_pipe_umbreit = PipelineResult(res.in_) +# %% +sub_pipe_umbreit.open + + +# %% + + +# %% +save_tmp_data(sub_pipe_umbreit.open) # %% -# // demo query with IN statement -data = res.in_.clone() -title_sub_choice = data["BEDP_TITELNR"][:300].to_list() rel_date = get_starting_date(90) rel_date +# %% +# old way using in statements +# filter_ = sql.and_( +# db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice), +# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, +# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), +# ) +# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR +# filter_ = sql.and_( +# db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, +# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), +# db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), +# ) + +# stmt = ( +# sql.select( +# db.tmp_data, +# db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, +# db.EXT_AUFPAUF.c.AUFTRAGS_ART, +# ) +# .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) +# .where(filter_) +# ) + +# print(stmt.compile(engine)) +# new_schema = db.EXT_AUFPAUF_schema_map.copy() +# new_schema.update(db.tmp_data_schema_map) +# new_schema +# %% +# demo = pl.read_database( +# stmt, +# engine, +# schema_overrides=db.EXT_AUFPAUF_schema_map, +# ) +# # %% +# demo +# # %% +# demo.select(pl.col.AUFTRAGS_ART).unique() +# %% +get_tmp_data() # %% +# demo_2 = demo.clone() +# # demo_2.head() +# print(f"Number of titles before filtering: {len(demo_2)}") +# demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99))) +# demo_2 = ( +# demo_2.group_by("BEDP_TITELNR", maintain_order=True) +# .agg( +# pl.len().alias("count"), +# pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"), +# ) +# .filter(pl.col.customer_count >= 3) +# ) +# # these remaining titles are relevant and should be shown +# # the others should be disposed +# print(f"Number of titles which are relevant: {len(demo_2)}") +# print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}") +# demo_2 +# %% +# make a subquery for the pre-filtered entries +# // query to obtain relevant title numbers +join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_( - db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice), db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date, db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), + db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), +) +stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFTRAGS_ART, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) +) +sub1 = stmt.subquery() + +unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) +stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) ) -stmt = sql.select(db.EXT_AUFPAUF).where(filter_) print(stmt.compile(engine)) - # %% -demo = pl.read_database( +demo_agg = pl.read_database( stmt, engine, - schema_overrides=db.EXT_AUFPAUF_schema_map, ) # %% -demo.head() +demo_agg # %% -demo_2 = demo.clone() -demo_2.head() -print(f"Number of titles before filtering: {len(demo_2)}") -demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99))) -demo_2 = ( - demo_2.group_by("TITELNR", maintain_order=True) - .agg( - pl.len().alias("count"), - pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"), - ) - .filter(pl.col.customer_count >= 3) -) +sub_pipe_umbreit.open +# sub_pipe_umbreit.open.select("BEDP_TITELNR").n_unique() -# these remaining titles are relevant and should be shown -# the others should be disposed -print(f"Number of titles which are relevant: {len(demo_2)}") -print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}") -demo_2 +# %% +# now obtain these entries from the open data +demo_agg["BEDP_TITELNR"].unique().implode() +entries_to_show = sub_pipe_umbreit.open.filter( + pl.col.BEDP_TITELNR.is_in(demo_agg["BEDP_TITELNR"].unique().implode()) +) +entries_to_show + + +# %% +sub_pipe_umbreit.open # %% df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION) df diff --git a/src/umbreit/db.py b/src/umbreit/db.py index dc5299a..05a4964 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -141,7 +141,7 @@ EXT_BESPBES_INFO_null_values: PolarsNullValues = {} EXT_AUFPAUF = Table( "EXT_AUFPAUF", metadata, - Column("TITELNR", sql.Integer, nullable=False), + Column("TITELNR", sql.Integer, primary_key=True, autoincrement=False, nullable=False), Column("AUFTRAGSNUMMER", sql.Integer, nullable=False), Column("AUFTRAGS_DATUM", sql.DateTime, nullable=False), Column("AUFTRAGS_ART", sql.Integer, nullable=False), diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 5113e99..1f7a457 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -25,6 +25,18 @@ class FilterResult: # return len(self.results) + len(self.open) +class Workflows(enum.Enum): + ID_100 = 100 + ID_200 = 200 + ID_900 = 900 + ID_910 = 910 + + +@dataclass(slots=True, repr=False, eq=False) +class OrderQtyExprKwArgs: + empty: bool = False + + class Freigabe(enum.Enum): WF_100 = False WF_200 = False