prepare export logic to Umbreit IT

This commit is contained in:
Florian Förster 2026-03-11 15:36:50 +01:00
parent 97cd74d230
commit 6ff7629b39
3 changed files with 112 additions and 45 deletions

View File

@ -755,7 +755,6 @@ def get_expr_order_qty(
def wf900( def wf900(
pipe_result: PipelineResult, pipe_result: PipelineResult,
) -> PipelineResult: ) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))

View File

@ -1,6 +1,6 @@
[project] [project]
name = "umbreit" name = "umbreit"
version = "0.1.0" version = "0.1.1dev0"
description = "Umbreit's Python-based application" description = "Umbreit's Python-based application"
authors = [ authors = [
{name = "Florian Förster", email = "f.foerster@d-opt.com"}, {name = "Florian Förster", email = "f.foerster@d-opt.com"},
@ -71,7 +71,7 @@ directory = "reports/coverage"
[tool.bumpversion] [tool.bumpversion]
current_version = "0.1.0" current_version = "0.1.1dev0"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@ -3,10 +3,10 @@ from __future__ import annotations
import datetime import datetime
import shutil import shutil
import tempfile import tempfile
import typing
import uuid import uuid
from collections.abc import Sequence from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from typing import Final, Literal, Protocol, TypeAlias, overload
import dopt_basics.datetime as dt import dopt_basics.datetime as dt
import oracledb import oracledb
@ -40,7 +40,7 @@ def remove_tmp_dir() -> None:
TMP_DIR = create_tmp_dir() TMP_DIR = create_tmp_dir()
CONN_STRING: Final[str] = ( CONN_STRING: typing.Final[str] = (
f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}" f"oracle+oracledb://{USER_NAME}:{USER_PASS}@{HOST}:{PORT}?service_name={SERVICE}"
) )
engine = sql.create_engine( engine = sql.create_engine(
@ -48,12 +48,15 @@ engine = sql.create_engine(
execution_options={"stream_results": True}, execution_options={"stream_results": True},
) )
VM_CRITERION: Final[str] = "BEDP_MENGE_BEDARF_VM" VM_CRITERION: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
MANDANT_CRITERION: Final[str] = "BEDP_MAN" MANDANT_CRITERION: typing.Final[str] = "BEDP_MAN"
ORDER_QTY_CRIT: Final[str] = "BEDP_MENGE_BEDARF_VM" ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
RESULT_COLUMN_ORDER: Final[tuple[str, ...]] = tuple(db.EXT_DOPT_ERGEBNIS.columns.keys()) RESULT_COLUMN_ORDER: typing.Final[tuple[str, ...]] = tuple(
ORDER_QTY_EXPR_KWARGS: Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs() db.EXT_DOPT_ERGEBNIS.columns.keys()
SAVE_TMP_FILES: Final[bool] = True )
ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyExprKwArgs()
SAVE_TMP_FILES: typing.Final[bool] = True
TMPFILE_WF100_SUB1_UMBREIT = "WF-100_Sub1-Umbreit"
TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB" TMPFILE_WF100_SUB1_WDB = "WF-100_Sub1-WDB"
TMPFILE_WF200_SUB1 = "WF-200_Sub1" TMPFILE_WF200_SUB1 = "WF-200_Sub1"
@ -296,31 +299,31 @@ class PipelineResult:
self._add_results(results) self._add_results(results)
class ExprOrderQty(Protocol): ... class ExprOrderQty(typing.Protocol): ...
class ExprOrderQty_Base(ExprOrderQty, Protocol): class ExprOrderQty_Base(ExprOrderQty, typing.Protocol):
def __call__(self) -> pl.Expr: ... def __call__(self) -> pl.Expr: ...
ExprOrderQty_Base_Types: TypeAlias = ( ExprOrderQty_Base_Types: typing.TypeAlias = (
Literal[types.Workflows.ID_200] typing.Literal[types.Workflows.ID_200]
| Literal[types.Workflows.ID_900] | typing.Literal[types.Workflows.ID_900]
| Literal[types.Workflows.ID_910] | typing.Literal[types.Workflows.ID_910]
) )
class ExprOrderQty_WF100(ExprOrderQty, Protocol): class ExprOrderQty_WF100(ExprOrderQty, typing.Protocol):
def __call__(self, empty: bool) -> pl.Expr: ... def __call__(self, empty: bool) -> pl.Expr: ...
@overload @typing.overload
def get_expr_order_qty( def get_expr_order_qty(
wf_id: Literal[types.Workflows.ID_100], wf_id: typing.Literal[types.Workflows.ID_100],
) -> ExprOrderQty_WF100: ... ) -> ExprOrderQty_WF100: ...
@overload @typing.overload
def get_expr_order_qty( def get_expr_order_qty(
wf_id: ExprOrderQty_Base_Types, wf_id: ExprOrderQty_Base_Types,
) -> ExprOrderQty_Base: ... ) -> ExprOrderQty_Base: ...
@ -366,7 +369,6 @@ def get_expr_order_qty(
def wf900( def wf900(
pipe_result: PipelineResult, pipe_result: PipelineResult,
) -> PipelineResult: ) -> PipelineResult:
"""filter 'Meldenummer' and fill non-feasible entries"""
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_900)
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
@ -419,13 +421,14 @@ def wf100_umbreit(
pipe_result: PipelineResult, pipe_result: PipelineResult,
vm_criterion: str, vm_criterion: str,
) -> PipelineResult: ) -> PipelineResult:
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100) # TODO remove
# ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
filter_meldenummer = pl.col("MELDENUMMER") == 18 filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col(MANDANT_CRITERION) == 1 filter_mandant = pl.col(MANDANT_CRITERION) == 1
filter_number_vm = pl.col(vm_criterion) > 0 filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters( res_candidates = _apply_several_filters(
pipe_result.open, pipe_result.open,
( (
filter_meldenummer, filter_meldenummer,
@ -433,8 +436,88 @@ def wf100_umbreit(
filter_number_vm, filter_number_vm,
), ),
) )
# sub-pipe neccessary:
# analyse MNr(18) mit #VM > 0 for reservations in the past two months
# similar to subroutine in WF-200 "_wf200_sub1"
sub_pipe = PipelineResult(res_candidates.in_)
sub_pipe = _wf100_sub1_umbreit(sub_pipe)
assert sub_pipe.open.height == 0, "Sub pipe not fully processed"
pipe_result.merge_pipeline(sub_pipe)
return pipe_result
def _wf100_sub1_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
# entry titles with MNr(18) and #VM > 0
# show entries with more than three orders from different
# customers in the past two months
save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_100)
RELEVANT_DATE = get_starting_date(60) # see REQ-1002
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_(
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= RELEVANT_DATE,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG == "J",
)
stmt = (
sql.select(
db.tmp_data,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG,
db.EXT_AUFPAUF.c.AUFP_VORMERKUNG,
)
.select_from(db.tmp_data.join(db.EXT_AUFPAUF, join_condition))
.where(filter_)
)
sub1 = stmt.subquery()
unique_count_col = sql.func.count(sub1.c.KUNDE_RECHNUNG.distinct())
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(unique_count_col >= 3)
)
if SAVE_TMP_FILES:
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
sql.func.count().label("count"),
unique_count_col.label("customer_count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
)
# !! this is a sub result which must be used in the result set
# !! for testing and feedback by the customer
relevant_titles = pl.read_database(
stmt,
engine,
)
if SAVE_TMP_FILES:
save_tmp_file(relevant_titles, TMPFILE_WF100_SUB1_UMBREIT)
relevant_titles = relevant_titles.filter(pl.col.CUSTOMER_COUNT >= 3)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
)
pipe_result.write_results( pipe_result.write_results(
data=res.in_, data=entries_to_show,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
pipe_result.write_results(
data=pipe_result.open,
vorlage=False, vorlage=False,
wf_id=types.Workflows.ID_100, wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100, freigabe_auto=types.Freigabe.WF_100,
@ -489,24 +572,9 @@ def wf100_petersen(
pipe_result.merge_pipeline(wdb_sub_pipe) pipe_result.merge_pipeline(wdb_sub_pipe)
# // other branch # // other branch
# show always entries with #VM > 1 # Verlage: always show because of missing information of ONIX
filter_number_vm = pl.col(vm_criterion) > 1 # data (REQ-1003)
res = _apply_several_filters( # show always entries with #VM > 0
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.write_results(
data=res.in_,
vorlage=True,
wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False),
)
filter_number_vm = pl.col(vm_criterion) > 0 filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters( res = _apply_several_filters(
pipe_result.open, pipe_result.open,
@ -518,7 +586,7 @@ def wf100_petersen(
) )
pipe_result.write_results( pipe_result.write_results(
data=res.in_, data=res.in_,
vorlage=False, vorlage=True,
wf_id=types.Workflows.ID_100, wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100, freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False), order_qty_expr=ORDER_QTY_FUNC(empty=False),
@ -677,7 +745,7 @@ def _wf200_sub1(
) -> PipelineResult: ) -> PipelineResult:
save_tmp_data(pipe_result.open) save_tmp_data(pipe_result.open)
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_200)
RELEVANT_DATE = get_starting_date(90) RELEVANT_DATE = get_starting_date(60) # see changes REQ-1000
join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_AUFPAUF.c.TITELNR
filter_ = sql.and_( filter_ = sql.and_(