diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index b40fe7b..1dfa93e 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -755,7 +755,6 @@ def get_expr_order_qty( def wf900( pipe_result: PipelineResult, ) -> PipelineResult: - """filter 'Meldenummer' and fill non-feasible entries""" ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) diff --git a/pyproject.toml b/pyproject.toml index ab367dd..3675ccc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "umbreit" -version = "0.1.0" +version = "0.1.1dev0" description = "Umbreit's Python-based application" authors = [ {name = "Florian Förster", email = "f.foerster@d-opt.com"}, @@ -71,7 +71,7 @@ directory = "reports/coverage" [tool.bumpversion] -current_version = "0.1.0" +current_version = "0.1.1dev0" parse = """(?x) (?P0|[1-9]\\d*)\\. (?P0|[1-9]\\d*)\\. diff --git a/src/umbreit/pipeline.py b/src/umbreit/pipeline.py index 4f51542..72c7389 100644 --- a/src/umbreit/pipeline.py +++ b/src/umbreit/pipeline.py @@ -3,10 +3,10 @@ from __future__ import annotations import datetime import shutil import tempfile +import typing import uuid from collections.abc import Sequence from pathlib import Path -from typing import Final, Literal, Protocol, TypeAlias, overload import dopt_basics.datetime as dt import oracledb @@ -40,7 +40,7 @@ def remove_tmp_dir() -> None: TMP_DIR = create_tmp_dir() -CONN_STRING: Final[str] = ( +CONN_STRING: typing.Final[str] = ( f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" ) engine = sql.create_engine( @@ -48,12 +48,15 @@ engine = sql.create_engine( execution_options={"stream_results": True}, ) -VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM" -MANDANT_CRITERION: Final[str] = "BEDP_MAN" -ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM" -RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys()) -ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() -SAVE_TMP_FILES: Final[bool] = True +VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN" +ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM" +RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple( + db.EXT_DOPT_ERGEBNIS.columns.keys() +) +ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() +SAVE_TMP_FILES: typing.Final[bool] = True +TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit" TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" TMPFILE_WF200_SUB1 = "WF-200_Sub1" @@ -296,31 +299,31 @@ class PipelineResult: self._add_results(results) -class ExprOrderQty(Protocol): ... +class ExprOrderQty(typing.Protocol): ... -class ExprOrderQty_Base(ExprOrderQty, Protocol): +class ExprOrderQty_Base(ExprOrderQty, typing.Protocol): def __call__(self) -> pl.Expr: ... -ExprOrderQty_Base_Types: TypeAlias = ( - Literal[types.Workflows.ID_200] - | Literal[types.Workflows.ID_900] - | Literal[types.Workflows.ID_910] +ExprOrderQty_Base_Types: typing.TypeAlias = ( + typing.Literal[types.Workflows.ID_200] + | typing.Literal[types.Workflows.ID_900] + | typing.Literal[types.Workflows.ID_910] ) -class ExprOrderQty_WF100(ExprOrderQty, Protocol): +class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol): def __call__(self, empty: bool) -> pl.Expr: ... -@overload +@typing.overload def get_expr_order_qty( - wf_id: Literal[types.Workflows.ID_100], + wf_id: typing.Literal[types.Workflows.ID_100], ) -> ExprOrderQty_WF100: ... -@overload +@typing.overload def get_expr_order_qty( wf_id: ExprOrderQty_Base_Types, ) -> ExprOrderQty_Base: ... @@ -366,7 +369,6 @@ def get_expr_order_qty( def wf900( pipe_result: PipelineResult, ) -> PipelineResult: - """filter 'Meldenummer' and fill non-feasible entries""" ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) @@ -419,13 +421,14 @@ def wf100_umbreit( pipe_result: PipelineResult, vm_criterion: str, ) -> PipelineResult: - ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # TODO remove + # ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_number_vm = pl.col(vm_criterion) > 0 - res = _apply_several_filters( + res_candidates = _apply_several_filters( pipe_result.open, ( filter_meldenummer, @@ -433,8 +436,88 @@ def wf100_umbreit( filter_number_vm, ), ) + # sub-pipe neccessary: + # analyse MNr(18) mit #VM > 0 for reservations in the past two months + # similar to subroutine in WF-200 "_wf200_sub1" + sub_pipe = PipelineResult(res_candidates.in_) + sub_pipe = _wf100_sub1_umbreit(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def _wf100_sub1_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + # entry titles with MNr(18) and #VM > 0 + # show entries with more than three orders from different + # customers in the past two months + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + RELEVANT_DATE = get_starting_date(60) # see REQ-1002 + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J", + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFP_VORMERKUNG, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + pipe_result.write_results( - data=res.in_, + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, vorlage=False, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, @@ -489,24 +572,9 @@ def wf100_petersen( pipe_result.merge_pipeline(wdb_sub_pipe) # // other branch - # show always entries with #VM > 1 - filter_number_vm = pl.col(vm_criterion) > 1 - res = _apply_several_filters( - pipe_result.open, - ( - filter_meldenummer, - filter_mandant, - filter_number_vm, - ), - ) - pipe_result.write_results( - data=res.in_, - vorlage=True, - wf_id=types.Workflows.ID_100, - freigabe_auto=types.Freigabe.WF_100, - order_qty_expr=ORDER_QTY_FUNC(empty=False), - ) - + # Verlage: always show because of missing information of ONIX + # data (REQ-1003) + # show always entries with #VM > 0 filter_number_vm = pl.col(vm_criterion) > 0 res = _apply_several_filters( pipe_result.open, @@ -518,7 +586,7 @@ def wf100_petersen( ) pipe_result.write_results( data=res.in_, - vorlage=False, + vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), @@ -677,7 +745,7 @@ def _wf200_sub1( ) -> PipelineResult: save_tmp_data(pipe_result.open) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) - RELEVANT_DATE = get_starting_date(90) + RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000 join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR filter_ = sql.and_(