From c244285c4df29b71bebfa980bf02dab3d74ea132 Mon Sep 17 00:00:00 2001 From: foefl Date: Mon, 2 Feb 2026 11:53:37 +0100 Subject: [PATCH] begin refactoring --- data_analysis/02-3_oracle_workflow_test.py | 2 +- src/umbreit/constants.py | 30 + src/umbreit/pipeline.py | 750 +++++++++++++++++++++ 3 files changed, 781 insertions(+), 1 deletion(-) create mode 100644 src/umbreit/constants.py create mode 100644 src/umbreit/pipeline.py diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index 71301e7..eba003a 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -758,7 +758,7 @@ def wf900( filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) res = _apply_several_filters( - pipe_res.open, + pipe_result.open, ( filter_meldenummer_null, filter_mandant, diff --git a/src/umbreit/constants.py b/src/umbreit/constants.py new file mode 100644 index 0000000..5a64a0b --- /dev/null +++ b/src/umbreit/constants.py @@ -0,0 +1,30 @@ +from pathlib import Path +from typing import Final + +from dopt_basics import configs +from dopt_basics import io as io_ + +LIB_PATH: Final[Path] = Path(__file__).parent + +# // database connections +p_cfg = io_.search_file_iterative( + starting_path=LIB_PATH, + glob_pattern="CRED*.toml", + stop_folder_name="umbreit-py", +) +if p_cfg is None: + raise FileNotFoundError("Config was not found") + +CFG = configs.load_toml(p_cfg) +HOST = CFG["server"]["host"] +PORT = CFG["server"]["port"] +SERVICE = CFG["server"]["service"] +USER_NAME = CFG["user"]["name"] +USER_PASS = CFG["user"]["pass"] + +# TODO remove or change +# ** Oracle client libs +USE_THICK_MODE: Final[bool] = False +P_ORACLE_CLIENT_LIBS = Path(r"C:\Databases\Oracle\instantclient_19_29") +assert P_ORACLE_CLIENT_LIBS.exists(), "Client libs not found" +assert P_ORACLE_CLIENT_LIBS.is_dir() diff --git a/src/umbreit/pipeline.py b/src/umbreit/pipeline.py new file mode 100644 index 0000000..4f51542 --- /dev/null +++ b/src/umbreit/pipeline.py @@ -0,0 +1,750 @@ +from __future__ import annotations + +import datetime +import shutil +import tempfile +import uuid +from collections.abc import Sequence +from pathlib import Path +from typing import Final, Literal, Protocol, TypeAlias, overload + +import dopt_basics.datetime as dt +import oracledb +import polars as pl +import polars.selectors as cs +import sqlalchemy as sql + +from umbreit import constants, db, types +from umbreit.constants import HOST, PORT, SERVICE, USER_NAME, USER_PASS + +oracledb.defaults.arraysize = 1000 +oracledb.defaults.prefetchrows = 1000 +if constants.USE_THICK_MODE: + oracledb.init_oracle_client(lib_dir=str(constants.P_ORACLE_CLIENT_LIBS)) + + +def create_tmp_dir() -> Path: + tmp_pth = Path(tempfile.mkdtemp()) + assert tmp_pth.exists() + return tmp_pth + + +def clear_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + TMP_DIR.mkdir() + + +def remove_tmp_dir() -> None: + shutil.rmtree(TMP_DIR) + + +TMP_DIR = create_tmp_dir() + +CONN_STRING: Final[str] = ( + f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" +) +engine = sql.create_engine( + CONN_STRING, + execution_options={"stream_results": True}, +) + +VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM" +MANDANT_CRITERION: Final[str] = "BEDP_MAN" +ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM" +RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys()) +ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() +SAVE_TMP_FILES: Final[bool] = True +TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" +TMPFILE_WF200_SUB1 = "WF-200_Sub1" + + +def save_tmp_file( + data: pl.DataFrame, + filename: str | None, +) -> None: + if filename is None: + filename = str(uuid.uuid4()) + pth = (TMP_DIR / filename).with_suffix(".arrow") + + n: int = 1 + while pth.exists(): + filename_new = pth.stem + f"_{n}" + pth = (TMP_DIR / filename_new).with_suffix(".arrow") + n += 1 + + data.write_ipc(pth) + + +def load_tmp_file( + filename: str, +) -> pl.DataFrame: + pth = (TMP_DIR / filename).with_suffix(".arrow") + if not pth.exists(): + raise FileNotFoundError(f"File >{pth.name}< not found") + + return pl.read_ipc(pth) + + +def load_all_tmp_files() -> dict[str, pl.DataFrame]: + all_dfs: dict[str, pl.DataFrame] = {} + for file in TMP_DIR.glob("*.arrow"): + df = pl.read_ipc(file) + all_dfs[file.stem] = df + + return all_dfs + + +def get_starting_date( + days_from_now: int, +) -> datetime.date: + current_dt = dt.current_time_tz(cut_microseconds=True) + td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS) + + return (current_dt - td).date() + + +def get_raw_data() -> pl.DataFrame: + join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER + stmt = sql.select( + db.ext_bedpbed.c.BEDARFNR, + db.ext_bedpbed.c.BEDP_SEQUENZ, + db.ext_bedpbed.c.BEDP_TITELNR, + db.ext_bedpbed.c.BEDP_MAN, + sql.case( + (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ).label("BEDP_MENGE_BEDARF_VM"), + db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.VERLAGSNR, + db.ext_titel_info.c.MENGE_VORMERKER, + db.ext_titel_info.c.MANDFUEHR, + db.ext_titel_info.c.EINKAEUFER, + ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) + + return pl.read_database( + stmt, + engine, + schema_overrides=db.raw_data_query_schema_map, + ) + + +def save_tmp_data(df: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.tmp_data)) + + with engine.begin() as conn: + conn.execute(sql.insert(db.tmp_data), df.to_dicts()) + + +def get_tmp_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.tmp_data), + engine, + schema_overrides=db.tmp_data_schema_map, + ) + + +def get_result_data() -> pl.DataFrame: + return pl.read_database( + sql.select(db.EXT_DOPT_ERGEBNIS), + engine, + schema_overrides=db.results_schema_map, + ) + + +def save_result_data(results: pl.DataFrame) -> None: + with engine.begin() as conn: + conn.execute(sql.insert(db.EXT_DOPT_ERGEBNIS), results.to_dicts()) + + +def clear_result_data() -> None: + with engine.begin() as conn: + conn.execute(sql.delete(db.EXT_DOPT_ERGEBNIS)) + + +def save_result_data_native(results: pl.DataFrame) -> None: + results = results.with_columns( + [ + pl.when(pl.col(c)).then(pl.lit("Y")).otherwise(pl.lit("N")).alias(c) + for c in results.select(cs.boolean()).columns + ] + ) + stmt = """ + INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", + "BEST_MENGE", "FREIGABE_AUTO") + VALUES (:1, :2, :3, :4, :5, :6) + """ + with engine.begin() as conn: + raw_conn = conn.connection.connection + with raw_conn.cursor() as cursor: + cursor.executemany(stmt, results.to_pandas(use_pyarrow_extension_array=True)) + + +def _apply_several_filters( + df: pl.DataFrame, + filters: Sequence[pl.Expr], +) -> types.FilterResult: + df_current = df + removed_rows: list[pl.DataFrame] = [] + + for filter in filters: + removed = df_current.filter(~filter) + removed_rows.append(removed) + + df_current = df_current.filter(filter) + + df_removed = pl.concat(removed_rows) + + return types.FilterResult(in_=df_current, out_=df_removed) + + +class PipelineResult: + __slots__ = ("_results", "_open", "_subtracted_indices") + _index_cols: tuple[str, ...] = ("BEDARFNR", "BEDP_SEQUENZ") + + def __init__( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + schema = db.results_schema_map.copy() + del schema["ID"] + self._results = pl.DataFrame(schema=schema) + + schema = {} + for col in self._index_cols: + schema[col] = db.raw_data_query_schema_map[col] + self._subtracted_indices = pl.DataFrame(schema=schema) + + def __len__(self) -> int: + return len(self._results) + len(self._open) + + @property + def open(self) -> pl.DataFrame: + return self._open + + @property + def results(self) -> pl.DataFrame: + return self._results + + @property + def subtracted_indices(self) -> pl.DataFrame: + return self._subtracted_indices + + def update_open( + self, + data: pl.DataFrame, + ) -> None: + self._open = data + + def _subtract_data( + self, + data: pl.DataFrame, + ) -> None: + self._open = self._open.join(data, on=self._index_cols, how="anti") + self._subtracted_indices = pl.concat( + (self._subtracted_indices, data[self._index_cols]) + ) + + def _add_results( + self, + data: pl.DataFrame, + ) -> None: + res = pl.concat([self._results, data]) + self._results = res + + def merge_pipeline( + self, + pipeline: PipelineResult, + ) -> None: + self._subtract_data(pipeline.subtracted_indices) + self._add_results(pipeline.results) + + def write_results( + self, + data: pl.DataFrame, + vorlage: bool, + wf_id: types.Workflows, + freigabe_auto: types.Freigabe, + order_qty_expr: pl.Expr, + ) -> None: + results = data.rename(db.map_data_to_result) + results = results.with_columns( + [ + pl.lit(vorlage).alias("VORLAGE").cast(db.results_schema_map["VORLAGE"]), + pl.lit(wf_id.value).alias("WF_ID").cast(db.results_schema_map["WF_ID"]), + order_qty_expr, + pl.lit(freigabe_auto.value) + .alias("FREIGABE_AUTO") + .cast(db.results_schema_map["FREIGABE_AUTO"]), + ] + ) + results = results.drop( + [ + "BEDP_TITELNR", + "BEDP_MAN", + "BEDP_MENGE_BEDARF_VM", + "MELDENUMMER", + "VERLAGSNR", + "MENGE_VORMERKER", + "MANDFUEHR", + "EINKAEUFER", + ] + ) + + self._subtract_data(data) + self._add_results(results) + + +class ExprOrderQty(Protocol): ... + + +class ExprOrderQty_Base(ExprOrderQty, Protocol): + def __call__(self) -> pl.Expr: ... + + +ExprOrderQty_Base_Types: TypeAlias = ( + Literal[types.Workflows.ID_200] + | Literal[types.Workflows.ID_900] + | Literal[types.Workflows.ID_910] +) + + +class ExprOrderQty_WF100(ExprOrderQty, Protocol): + def __call__(self, empty: bool) -> pl.Expr: ... + + +@overload +def get_expr_order_qty( + wf_id: Literal[types.Workflows.ID_100], +) -> ExprOrderQty_WF100: ... + + +@overload +def get_expr_order_qty( + wf_id: ExprOrderQty_Base_Types, +) -> ExprOrderQty_Base: ... + + +def get_expr_order_qty( + wf_id: types.Workflows, +) -> ExprOrderQty: + empty_expr = ( + pl.lit(0) + .alias(ORDER_QTY_CRIT) + .alias("BEST_MENGE") + .cast(db.results_schema_map["BEST_MENGE"]) + ) + + def _empty() -> pl.Expr: + return empty_expr + + func: ExprOrderQty + match wf_id: + case types.Workflows.ID_100: + + def _func(empty: bool) -> pl.Expr: + order_qty_expr: pl.Expr + if empty: + order_qty_expr = empty_expr + else: + order_qty_expr = pl.col(ORDER_QTY_CRIT).alias("BEST_MENGE") + return order_qty_expr + + func = _func + + case types.Workflows.ID_200 | types.Workflows.ID_900 | types.Workflows.ID_910: + func = _empty + case _: + raise NotImplementedError( + f"Order expression for WF-ID {wf_id.value} is not implemented" + ) + + return func + + +def wf900( + pipe_result: PipelineResult, +) -> PipelineResult: + """filter 'Meldenummer' and fill non-feasible entries""" + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) + filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() + filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer_null, + filter_mandant, + ), + ) + pipe_result.write_results( + data=res.out_, + vorlage=False, + wf_id=types.Workflows.ID_900, + freigabe_auto=types.Freigabe.WF_900, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + pipe_result.update_open( + res.in_.with_columns( + pl.col("MENGE_VORMERKER").fill_null(0), + pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0), + ) + ) + + return pipe_result + + +def wf910( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) + filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 + + res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,)) + pipe_result.write_results( + data=res.out_, + vorlage=False, + wf_id=types.Workflows.ID_910, + freigabe_auto=types.Freigabe.WF_910, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + return pipe_result + + +# this a main routine: +# receives and gives back result objects +def wf100_umbreit( + pipe_result: PipelineResult, + vm_criterion: str, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col(MANDANT_CRITERION) == 1 + filter_number_vm = pl.col(vm_criterion) > 0 + + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + return pipe_result + + +def wf100_petersen( + pipe_result: PipelineResult, + vm_criterion: str, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # difference WDB and others + + # // WDB branch + # order quantity 0, no further action in other WFs + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + filter_number_vm = pl.col(vm_criterion) == 0 + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=True), + ) + filter_number_vm = pl.col(vm_criterion) > 0 + res_candidates = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + wdb_sub_pipe = PipelineResult(res_candidates.in_) + wdb_sub_pipe = _wf100_petersen_sub1_wdb(wdb_sub_pipe) + assert wdb_sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(wdb_sub_pipe) + + # // other branch + # show always entries with #VM > 1 + filter_number_vm = pl.col(vm_criterion) > 1 + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + filter_number_vm = pl.col(vm_criterion) > 0 + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + + return pipe_result + + +def _wf100_petersen_sub1_wdb( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) + # input: pre-filtered entries (WDB titles and #VM > 0) + # more than 1 VM: show these entries + filter_number_vm = pl.col(VM_CRITERION) > 1 + res = _apply_several_filters( + pipe_result.open, + (filter_number_vm,), + ) + pipe_result.write_results( + data=res.in_, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + # filtered out entries (WDB with #VM == 1) must be analysed for orders in the + # past 6 months + save_tmp_data(pipe_result.open) + RELEVANT_DATE = get_starting_date(180) + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR + filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE + stmt = ( + sql.select( + db.tmp_data, + db.EXT_BESPBES_INFO.c.BESP_MENGE, + db.EXT_BESPBES_INFO.c.BESP_STATUS, + ) + .select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + count_col = sql.func.count() + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(count_col > 1) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_WDB) + relevant_titles = relevant_titles.filter(pl.col.COUNT > 1) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_100, + freigabe_auto=types.Freigabe.WF_100, + order_qty_expr=ORDER_QTY_FUNC(empty=False), + ) + return pipe_result + + +def wf200_umbreit( + pipe_result: PipelineResult, +) -> PipelineResult: + relevant_mnr: tuple[int, ...] = (17, 18) + filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr) + filter_mandant = pl.col("BEDP_MAN") == 1 + + res = _apply_several_filters( + pipe_result.open, + (filter_meldenummer, filter_mandant), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0 + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def wf200_petersen( + pipe_result: PipelineResult, +) -> PipelineResult: + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_MNR: tuple[int, ...] = (17, 18) + # // WDB branch + filter_meldenummer = pl.col("MELDENUMMER").is_in(RELEVANT_MNR) + filter_mandant = pl.col(MANDANT_CRITERION) == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + ), + ) + # ignore these + pipe_result.write_results( + data=res.in_, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + # // other branch + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + ), + ) + sub_pipe = PipelineResult(res.in_) + sub_pipe = _wf200_sub1(sub_pipe) + assert sub_pipe.open.height == 0, "Sub pipe not fully processed" + pipe_result.merge_pipeline(sub_pipe) + + return pipe_result + + +def _wf200_sub1( + pipe_result: PipelineResult, +) -> PipelineResult: + save_tmp_data(pipe_result.open) + ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) + RELEVANT_DATE = get_starting_date(90) + + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR + filter_ = sql.and_( + db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)), + db.EXT_AUFPAUF.c.AUFTRAGS_ART.in_((1, 99)), + ) + stmt = ( + sql.select( + db.tmp_data, + db.EXT_AUFPAUF.c.KUNDE_RECHNUNG, + db.EXT_AUFPAUF.c.AUFTRAGS_ART, + ) + .select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition)) + .where(filter_) + ) + sub1 = stmt.subquery() + + unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct()) + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(unique_count_col >= 3) + ) + if SAVE_TMP_FILES: + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + sql.func.count().label("count"), + unique_count_col.label("customer_count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + ) + # !! this is a sub result which must be used in the result set + # !! for testing and feedback by the customer + relevant_titles = pl.read_database( + stmt, + engine, + ) + + if SAVE_TMP_FILES: + save_tmp_file(relevant_titles, TMPFILE_WF200_SUB1) + relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3) + + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) + ) + + pipe_result.write_results( + data=entries_to_show, + vorlage=True, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + pipe_result.write_results( + data=pipe_result.open, + vorlage=False, + wf_id=types.Workflows.ID_200, + freigabe_auto=types.Freigabe.WF_200, + order_qty_expr=ORDER_QTY_FUNC(), + ) + + return pipe_result