diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index cc4c4a4..99c2032 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -193,19 +193,17 @@ t1 = time.perf_counter() # db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, # db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, # ) -join_condition = sql.and_( - db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, -) +join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, db.ext_bedpbed.c.BEDP_TITELNR, db.ext_bedpbed.c.BEDP_MAN, - db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, - # sql.case( - # (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), - # else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, - # ).label("BEDP_MENGE_BEDARF_VM"), + # db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + sql.case( + (db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM <= -1, sql.null()), + else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, + ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, @@ -411,20 +409,16 @@ ORDER_QTY_EXPR_KWARGS: typing.Final[types.OrderQtyExprKwArgs] = types.OrderQtyEx def get_starting_date( - days: int, + days_from_now: int, ) -> datetime.date: current_dt = dt.current_time_tz(cut_microseconds=True) - td = dt.timedelta_from_val(days, dt.TimeUnitsTimedelta.DAYS) + td = dt.timedelta_from_val(days_from_now, dt.TimeUnitsTimedelta.DAYS) return (current_dt - td).date() -# TODO exchange to new query focusing on TINFO table def get_raw_data() -> pl.DataFrame: - join_condition = sql.and_( - db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, - db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR, - ) + join_condition = db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER stmt = sql.select( db.ext_bedpbed.c.BEDARFNR, db.ext_bedpbed.c.BEDP_SEQUENZ, @@ -435,7 +429,9 @@ def get_raw_data() -> pl.DataFrame: else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, + db.ext_titel_info.c.MANDFUEHR, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) return pl.read_database( @@ -487,7 +483,8 @@ def save_result_data_native(results: pl.DataFrame) -> None: ] ) stmt = """ - INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", "BEST_MENGE", "FREIGABE_AUTO") + INSERT INTO "EXT_DOPT_ERGEBNIS" ("BEDARF_NR", "BEDARF_SEQUENZ", "VORLAGE", "WF_ID", + "BEST_MENGE", "FREIGABE_AUTO") VALUES (:1, :2, :3, :4, :5, :6) """ with engine.begin() as conn: @@ -718,19 +715,10 @@ def wf900( def wf910( pipe_result: PipelineResult, ) -> PipelineResult: - # TODO check if necessary because of WF-900 ORDER_QTY_FUNC = get_expr_order_qty(types.Workflows.ID_910) - filter_mandant = pl.col(MANDANT_CRITERION).is_in((1, 90)) - filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 - res = _apply_several_filters( - pipe_result.open, - filters=( - filter_mandant, - filter_ignore_MNR26, - ), - ) + res = _apply_several_filters(pipe_result.open, filters=(filter_ignore_MNR26,)) pipe_result.write_results( data=res.out_, vorlage=False, @@ -879,26 +867,43 @@ def _wf100_petersen_sub1_wdb( freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) - # filtered out entries (WDB with #VM == 1) must be analysed for orders in the # past 6 months - start_date = get_starting_date(180) - filter_ = sql.and_( - db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()), - db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date, + save_tmp_data(pipe_result.open) + RELEVANT_DATE = get_starting_date(180) + join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR + filter_ = db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE + stmt = ( + sql.select( + db.tmp_data, + db.EXT_BESPBES_INFO.c.BESP_MENGE, + db.EXT_BESPBES_INFO.c.BESP_STATUS, + ) + .select_from(db.tmp_data.join(db.EXT_BESPBES_INFO, join_condition)) + .where(filter_) ) - stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) - df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map) - entries_show = ( - df_order.group_by("BESP_TITELNR") - .agg(pl.col("BESP_TITELNR").count().alias("count")) - .filter(pl.col("count") > 1) + sub1 = stmt.subquery() + + count_col = sql.func.count() + stmt = ( + sql.select( + sub1.c.BEDP_TITELNR, + count_col.label("count"), + ) + .select_from(sub1) + .group_by(sub1.c.BEDP_TITELNR) + .having(count_col > 1) + ) + relevant_titles = pl.read_database( + stmt, + engine, + ) + print(relevant_titles) + entries_to_show = pipe_result.open.filter( + pl.col.BEDP_TITELNR.is_in(relevant_titles["BEDP_TITELNR"].unique().implode()) ) - # TODO IS IN check good because of performance? - filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list()) - res = _apply_several_filters(pipe_result.open, (filter_titleno,)) pipe_result.write_results( - data=res.in_, + data=entries_to_show, vorlage=True, wf_id=types.Workflows.ID_100, freigabe_auto=types.Freigabe.WF_100, @@ -911,7 +916,6 @@ def _wf100_petersen_sub1_wdb( freigabe_auto=types.Freigabe.WF_100, order_qty_expr=ORDER_QTY_FUNC(empty=False), ) - return pipe_result @@ -1044,8 +1048,21 @@ print(f"Number of entries: {len(df)}") # %% df.head() - - +# %% +# df.filter(pl.col.BEDP_TITELNR == 4314750) +# # %% +# RELEVANT_DATE = get_starting_date(180) +# join_condition = db.tmp_data.c.BEDP_TITELNR == db.EXT_BESPBES_INFO.c.BESP_TITELNR +# filter_ = sql.and_( +# db.EXT_BESPBES_INFO.c.BES_DATUM >= RELEVANT_DATE, +# db.EXT_BESPBES_INFO.c.BESP_TITELNR == 4314750, +# ) +# stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_) +# relevant_titles = pl.read_database( +# stmt, +# engine, +# ) +# print(relevant_titles) # %% # removed_rows = []