From 5653e3fbcdf3e1a542310e2198ab2426f0b98214 Mon Sep 17 00:00:00 2001 From: foefl Date: Tue, 3 Mar 2026 08:08:39 +0100 Subject: [PATCH] prepare implementation of REQ-1002 --- data_analysis/02-3_oracle_workflow_test.py | 138 +++++++++++++++++---- 1 file changed, 116 insertions(+), 22 deletions(-) diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index f950cbb..51b2f73 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -442,6 +442,7 @@ RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( ) ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() SAVE_TMP_FILES: typing.Final[bool] = True +TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit" TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" TMPFILE_WF200_SUB1 = "WF-200_Sub1" @@ -807,13 +808,14 @@ def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: - ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # TODO remove + # ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 - res = _apply_several_filters( + res_candidates = _apply_several_filters( pipe_result.open, ( filter_meldenummer, @@ -821,8 +823,96 @@ def wf100_umbreit( filter_number_vm, ), ) + # sub-pipe neccessary: + # analyse MNr(18) mit #VM > 0 for reservations in the past two months + # similar to subroutine in WF-200 "_wf200_sub1" + sub_pipe = PipelineResult(res_candidates.in_) + sub_pipe = _wf100_sub1_umbreit(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + # pipe_result.write_results( + # data=res.in_, + # vorlage=False, + # wf_id=types.Workflows.ID_100, + # freigabe_auto=types.Freigabe.WF_100, + # order_qty_expr=ORDER_QTY_FUNC(empty=False), + # ) + + return pipe_result + + +def _wf100_sub1_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + # entry titles with MNr(18) and #VM > 0 + # show entries with more than three orders from different + # customers in the past two months + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + RELEVANT_DATE = get_starting_date(60) # see REQ-1002 + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J", + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + pipe_result.write_results( - data=res.in_, + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, @@ -877,8 +967,10 @@ def wf100_petersen( pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch - # show always entries with #VM > 1 - filter_number_vm = pl.col(vm_criterion) > 1 + # Verlage: always show because of missing information of ONIX + # data (REQ-1003) + # show always entries with #VM > 0 + filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, ( @@ -895,22 +987,24 @@ def wf100_petersen( order_qty_expr=ORDER_QTY_FUNC(empty=False), ) - filter_number_vm = pl.col(vm_criterion) > 0 - res = _apply_several_filters( - pipe_result.open, - ( - filter_meldenummer, - filter_mandant, - filter_number_vm, - ), - ) - pipe_result.write_results( - data=res.in_, - vorlage=False, - wf_id=types.Workflows.ID_100, - freigabe_auto=types.Freigabe.WF_100, - order_qty_expr=ORDER_QTY_FUNC(empty=False), - ) + # TODO remove after successful tests + # // excluded based on feedback on 27.02.2026 + # filter_number_vm = pl.col(vm_criterion) > 0 + # res = _apply_several_filters( + # pipe_result.open, + # ( + # filter_meldenummer, + # filter_mandant, + # filter_number_vm, + # ), + # ) + # pipe_result.write_results( + # data=res.in_, + # vorlage=False, + # wf_id=types.Workflows.ID_100, + # freigabe_auto=types.Freigabe.WF_100, + # order_qty_expr=ORDER_QTY_FUNC(empty=False), + # ) return pipe_result @@ -1065,7 +1159,7 @@ def _wf200_sub1( ) -> PipelineResult: save_tmp_data(pipe_result.open) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) - RELEVANT_DATE = get_starting_date(60) + RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000 join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_(