adapt TEMP JOIN strategy for WF-100 sub-pipe WDB

This commit is contained in:
Florian Förster 2026-01-29 14:59:39 +01:00
parent 80aea577d8
commit 24550ce310

View File

@ -193,19 +193,17 @@ t1 = time.perf_counter()
# db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, # db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
# db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, # db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
# ) # )
join_condition = sql.and_( join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
)
stmt = sql.select( stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN, db.ext_bedpbed.c.BEDP_MAN,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, # db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
# sql.case( sql.case(
# (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()),
# else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
# ).label("BEDP_MENGE_BEDARF_VM"), ).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
@ -411,20 +409,16 @@ ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyEx
def get_starting_date( def get_starting_date(
days: int, days_from_now: int,
) -> datetime.date: ) -> datetime.date:
current_dt = dt.current_time_tz(cut_microseconds=True) current_dt = dt.current_time_tz(cut_microseconds=True)
td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS) td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS)
return (current_dt - td).date() return (current_dt - td).date()
# TODO exchange to new query focusing on TINFO table
def get_raw_data() -> pl.DataFrame: def get_raw_data() -> pl.DataFrame:
join_condition = sql.and_( join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
)
stmt = sql.select( stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_SEQUENZ,
@ -435,7 +429,9 @@ def get_raw_data() -> pl.DataFrame:
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"), ).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
return pl.read_database( return pl.read_database(
@ -487,7 +483,8 @@ def save_result_data_native(results: pl.DataFrame) -> None:
] ]
) )
stmt = """ stmt = """
INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO") INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID",
"BEST_MENGE", "FREIGABE_AUTO")
VALUES (:1, :2, :3, :4, :5, :6) VALUES (:1, :2, :3, :4, :5, :6)
""" """
with engine.begin() as conn: with engine.begin() as conn:
@ -718,19 +715,10 @@ def wf900(
def wf910( def wf910(
pipe_result: PipelineResult, pipe_result: PipelineResult,
) -> PipelineResult: ) -> PipelineResult:
# TODO check if necessary because of WF-900
ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910)
filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90))
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
res = _apply_several_filters( res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,))
pipe_result.open,
filters=(
filter_mandant,
filter_ignore_MNR26,
),
)
pipe_result.write_results( pipe_result.write_results(
data=res.out_, data=res.out_,
vorlage=False, vorlage=False,
@ -879,26 +867,43 @@ def _wf100_petersen_sub1_wdb(
freigabe_auto=types.Freigabe.WF_100, freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False), order_qty_expr=ORDER_QTY_FUNC(empty=False),
) )
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the # filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months # past 6 months
start_date = get_starting_date(180) save_tmp_data(pipe_result.open)
filter_ = sql.and_( RELEVANT_DATE = get_starting_date(180)
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()), join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE
stmt = (
sql.select(
db.tmp_data,
db.EXT_BESPBES_INFO.c.BESP_MENGE,
db.EXT_BESPBES_INFO.c.BESP_STATUS,
) )
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) .select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition))
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) .where(filter_)
entries_show = ( )
df_order.group_by("BESP_TITELNR") sub1 = stmt.subquery()
.agg(pl.col("BESP_TITELNR").count().alias("count"))
.filter(pl.col("count") > 1) count_col = sql.func.count()
stmt = (
sql.select(
sub1.c.BEDP_TITELNR,
count_col.label("count"),
)
.select_from(sub1)
.group_by(sub1.c.BEDP_TITELNR)
.having(count_col > 1)
)
relevant_titles = pl.read_database(
stmt,
engine,
)
print(relevant_titles)
entries_to_show = pipe_result.open.filter(
pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode())
) )
# TODO IS IN check good because of performance?
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list())
res = _apply_several_filters(pipe_result.open, (filter_titleno,))
pipe_result.write_results( pipe_result.write_results(
data=res.in_, data=entries_to_show,
vorlage=True, vorlage=True,
wf_id=types.Workflows.ID_100, wf_id=types.Workflows.ID_100,
freigabe_auto=types.Freigabe.WF_100, freigabe_auto=types.Freigabe.WF_100,
@ -911,7 +916,6 @@ def _wf100_petersen_sub1_wdb(
freigabe_auto=types.Freigabe.WF_100, freigabe_auto=types.Freigabe.WF_100,
order_qty_expr=ORDER_QTY_FUNC(empty=False), order_qty_expr=ORDER_QTY_FUNC(empty=False),
) )
return pipe_result return pipe_result
@ -1044,8 +1048,21 @@ print(f"Number of entries: {len(df)}")
# %% # %%
df.head() df.head()
# %%
# df.filter(pl.col.BEDP_TITELNR == 4314750)
# # %%
# RELEVANT_DATE = get_starting_date(180)
# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR
# filter_ = sql.and_(
# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE,
# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750,
# )
# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
# relevant_titles = pl.read_database(
# stmt,
# engine,
# )
# print(relevant_titles)
# %% # %%
# removed_rows = [] # removed_rows = []