From 4f234670b8dc6bda47b658ba185ec7b79b15e97b Mon Sep 17 00:00:00 2001 From: foefl Date: Wed, 3 Dec 2025 16:09:09 +0100 Subject: [PATCH] implemented first reference workflows --- data_analysis/02-3_oracle_workflow_test.py | 281 +++++++++++++----- .../not_in_title_table_20251203-2.json | 175 +++++++++++ src/umbreit/db.py | 2 + src/umbreit/types.py | 4 + 4 files changed, 389 insertions(+), 73 deletions(-) create mode 100644 data_analysis/not_in_title_table_20251203-2.json diff --git a/data_analysis/02-3_oracle_workflow_test.py b/data_analysis/02-3_oracle_workflow_test.py index cb1b48f..5689060 100644 --- a/data_analysis/02-3_oracle_workflow_test.py +++ b/data_analysis/02-3_oracle_workflow_test.py @@ -191,7 +191,9 @@ stmt = sql.select( else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, ).label("BEDP_MENGE_BEDARF_VM"), db.ext_titel_info.c.MELDENUMMER, + db.ext_titel_info.c.VERLAGSNR, db.ext_titel_info.c.MENGE_VORMERKER, + db.ext_titel_info.c.MANDFUEHR, ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) print(stmt.compile(engine)) @@ -206,18 +208,31 @@ elapsed = t2 - t1 print(f"Query duration: {elapsed:.4f} sec") print("Number of entries: ", len(df)) print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") - +# %% +df.head() # %% # // NO LIVE DATA NEEDED # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow" +p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow" # df.write_ipc(p_save) df = pl.read_ipc(p_save) # %% print(len(df)) df.head() +# %% +# ** CHECK: differences MANDANT in BEDP and in TINFO # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) +df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique()) + +# %% +df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique()) +# %% +df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1) + +# %% +# df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5) +df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null()) # %% # ** CHECK: different MANDANTEN # check for valid entries for unknown MANDANTEN @@ -242,7 +257,11 @@ df.head() # %% # ** PREFILTER # always needed, entries filtered out are to be disposed -df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) +filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() +filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) +df.filter(filter_meldenummer_null).filter(filter_mandant) + +# df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) # %% len(df) # %% @@ -283,8 +302,8 @@ df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) EXPORT_FEAT = "BEDP_TITELNR" to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} -p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-1.json" -to_save +p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-2.json" +print(to_save) # with open(p_save_not_in_title_table, "w") as file: # json.dump(to_save, file, indent=4) # %% @@ -295,6 +314,7 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18))) VM_CRITERION = "BEDP_MENGE_BEDARF_VM" +# TODO exchange to new query focusing on TINFO table def get_raw_data() -> pl.DataFrame: join_condition = sql.and_( db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, @@ -320,7 +340,7 @@ def get_raw_data() -> pl.DataFrame: ) -def get_empyt_pipeline_result( +def get_empty_pipeline_result( data: pl.DataFrame, ) -> types.PipelineResult: schema = db.results_schema_map.copy() @@ -329,27 +349,6 @@ def get_empyt_pipeline_result( return types.PipelineResult(results=results, open=data) -def prepare_base_data( - df: pl.DataFrame, -) -> pl.DataFrame: - """pre-routine to handle non-feasible entries - - Parameters - ---------- - df : pl.DataFrame - raw data collected from database query - - Returns - ------- - pl.DataFrame - pre-processed data - """ - df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) - df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) - - return df - - def _apply_several_filters( df: pl.DataFrame, filters: Sequence[pl.Expr], @@ -409,13 +408,45 @@ def _write_results( "BEDP_MAN", "BEDP_MENGE_BEDARF_VM", "MELDENUMMER", + "VERLAGSNR", "MENGE_VORMERKER", + "MANDFUEHR", ] ) return pl.concat([results_table, data]) +def workflow_900( + pipe_result: types.PipelineResult, +) -> types.PipelineResult: + """pre-routine to handle non-feasible entries""" + + filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null() + filter_mandant = pl.col("MANDFUEHR").is_in((1, 90)) + res = _apply_several_filters( + pipe_res.open, + ( + filter_meldenummer_null, + filter_mandant, + ), + ) + + pipe_result.results = _write_results( + pipe_result.results, + data=res.out_, + vorlage=False, + wf_id=900, + freigabe_auto=types.Freigabe.WF_900, + is_out=True, + ) + + pipe_result.open = res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0)) + pipe_result.open = res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0)) + + return pipe_result + + # main routine # results for filtered out entries written def workflow_910( @@ -476,73 +507,177 @@ def workflow_100_umbreit( return pipe_result -# Petersen not present in data +def workflow_100_petersen( + pipe_result: types.PipelineResult, + vm_criterion: str, +) -> types.PipelineResult: + # difference WDB and others + + # WDB branch + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + filter_number_vm = pl.col(vm_criterion) > 0 + + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + pipe_result.results = _write_results( + results_table=pipe_result.results, + data=res.in_, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + pipe_result.open = res.out_ + + # order quantity 0, no further action in other WFs + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 90 + filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070)) + filter_number_vm = pl.col(vm_criterion) == 0 + + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_WDB, + filter_number_vm, + ), + ) + pipe_result.results = _write_results( + results_table=pipe_result.results, + data=res.in_, + vorlage=False, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + pipe_result.open = res.out_ + + # other branch + filter_meldenummer = pl.col("MELDENUMMER") == 18 + filter_mandant = pl.col("BEDP_MAN") == 90 + filter_number_vm = pl.col(vm_criterion) > 0 + + res = _apply_several_filters( + pipe_result.open, + ( + filter_meldenummer, + filter_mandant, + filter_number_vm, + ), + ) + pipe_result.results = _write_results( + results_table=pipe_result.results, + data=res.in_, + vorlage=True, + wf_id=100, + freigabe_auto=types.Freigabe.WF_100, + is_out=False, + ) + pipe_result.open = res.out_ + + return pipe_result + # %% # SAVING/LOADING -p_save = Path.cwd() / "raw_data_from_sql_query_20251203-1.arrow" -# df.write_ipc(p_save) +p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow" df = pl.read_ipc(p_save) print(f"Number of entries: {len(df)}") # %% df.head() - # %% -removed_rows = [] +# removed_rows = [] -raw_data = df.clone() -print(f"Length raw data: {len(raw_data)}") -filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) -filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 +# raw_data = df.clone() +# print(f"Length raw data: {len(raw_data)}") +# filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) +# filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 -filtered = raw_data.filter(filter_mandant) -filtered_n = raw_data.filter(~filter_mandant) -num_filter = len(filtered) -num_filter_n = len(filtered_n) -removed_rows.append(filtered_n) -print(f"Length filtered: {num_filter}") -print(f"Length filtered out: {num_filter_n}") -print(f"Length all: {num_filter + num_filter_n}") -raw_data = filtered -out = pl.concat(removed_rows) -print(f"Length out: {len(out)}") +# filtered = raw_data.filter(filter_mandant) +# filtered_n = raw_data.filter(~filter_mandant) +# num_filter = len(filtered) +# num_filter_n = len(filtered_n) +# removed_rows.append(filtered_n) +# print(f"Length filtered: {num_filter}") +# print(f"Length filtered out: {num_filter_n}") +# print(f"Length all: {num_filter + num_filter_n}") +# raw_data = filtered +# out = pl.concat(removed_rows) +# print(f"Length out: {len(out)}") -# %% -print("---------------------------------------") -filtered = raw_data.filter(filter_ignore_MNR26) -filtered_n = raw_data.filter(~filter_ignore_MNR26) -num_filter = len(filtered) -num_filter_n = len(filtered_n) -len(filtered_n) -# %% -removed_rows.append(filtered_n) -print(f"Length filtered: {num_filter}") -print(f"Length filtered out: {num_filter_n}") -print(f"Length all: {num_filter + num_filter_n}") -out = pl.concat(removed_rows) -print(f"Length out: {len(out)}") +# # %% +# print("---------------------------------------") +# filtered = raw_data.filter(filter_ignore_MNR26) +# filtered_n = raw_data.filter(~filter_ignore_MNR26) +# num_filter = len(filtered) +# num_filter_n = len(filtered_n) +# len(filtered_n) +# # %% +# removed_rows.append(filtered_n) +# print(f"Length filtered: {num_filter}") +# print(f"Length filtered out: {num_filter_n}") +# print(f"Length all: {num_filter + num_filter_n}") +# out = pl.concat(removed_rows) +# print(f"Length out: {len(out)}") # %% raw_data = df.clone() -pipe_res = get_empyt_pipeline_result(raw_data) +pipe_res = get_empty_pipeline_result(raw_data) pipe_res.results +pipe_res = workflow_900(pipe_res) +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") +# %% +pipe_res.results +# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1) +# %% +pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER")) +# print(f"Base data and pipe result in line: {}") # %% pipe_res = workflow_910(pipe_res) -pipe_res -# df_start = prepare_base_data(df_raw) -# df_start +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") +# %% +pipe_res.results.select(pl.col("vorlage").value_counts()) +# %% +pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION) +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") +# %% +pipe_res = workflow_100_petersen(pipe_res, VM_CRITERION) +print(f"Length of base data: {len(raw_data):>18}") +print(f"Number of entries pipe data: {len(pipe_res):>10}") +print(f"Number of entries result data: {len(pipe_res.results):>8}") +print(f"Number of entries open data: {len(pipe_res.open):>10}") +# %% +pipe_res.results.select(pl.col("vorlage").value_counts()) +# %% +pipe_res.results.filter(pl.col("vorlage") == True) +# %% +raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3) +# %% +raw_data.head() -# %% -results_init = get_empyt_pipeline_result() -results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION) -# df is where results are known -# filt_out contains entries for other workflows -# filt_out at this point represents all entries which are to be analysed in other workflows -# %% -results # %% filt_out diff --git a/data_analysis/not_in_title_table_20251203-2.json b/data_analysis/not_in_title_table_20251203-2.json new file mode 100644 index 0000000..93156e0 --- /dev/null +++ b/data_analysis/not_in_title_table_20251203-2.json @@ -0,0 +1,175 @@ +{ + "BEDP_TITELNR": [ + 5641810, + 7055141, + 9388245, + 5690882, + 8420618, + 5625063, + 4894841, + 8047302, + 7133112, + 5355081, + 6871073, + 9435273, + 4136531, + 4424591, + 7687300, + 2682366, + 4364686, + 2430598, + 2037163, + 2789480, + 2770591, + 2770577, + 2770583, + 4121829, + 2787037, + 6003708, + 4407203, + 8776286, + 5402902, + 5838480, + 5989581, + 4522891, + 3980696, + 950637, + 4965472, + 4228186, + 4210552, + 5002965, + 5545604, + 5880206, + 2241251, + 6370663, + 7683723, + 7010822, + 5161076, + 4147313, + 5793208, + 7907745, + 4261009, + 2717881, + 6067021, + 4365985, + 8040512, + 8890058, + 1780135, + 7262230, + 4410469, + 9000191, + 6444167, + 4948035, + 252810, + 4976957, + 6135037, + 9037465, + 5989608, + 5729058, + 4395070, + 5625122, + 4267436, + 7888648, + 6110254, + 9787272, + 4336175, + 5497657, + 2793591, + 6893056, + 3030639, + 5700267, + 7010792, + 5491873, + 258070, + 3853173, + 6046715, + 6125576, + 5132452, + 1504007, + 4262953, + 7935360, + 922162, + 1049053, + 9720614, + 5591810, + 2544914, + 2107970, + 7965895, + 7966115, + 7966119, + 7580940, + 6132326, + 3370678, + 6261428, + 6261430, + 8254294, + 8254295, + 6132322, + 8139591, + 8139588, + 8139587, + 8139586, + 8139585, + 8254301, + 3369002, + 4836770, + 4836769, + 4838001, + 4838000, + 6178366, + 6178370, + 6178371, + 4837536, + 4837537, + 6132318, + 6132323, + 3408132, + 5227665, + 5227661, + 5227666, + 5590678, + 4119572, + 4836779, + 3370676, + 9436407, + 4012212, + 4427503, + 4577066, + 9418557, + 2008168, + 7580941, + 6086598, + 6132319, + 8139590, + 8630511, + 717868, + 6633287, + 5335386, + 4836777, + 4154513, + 2770540, + 5730873, + 6160255, + 6939447, + 5545606, + 6178367, + 2010002, + 5494490, + 9206119, + 7146063, + 5227663, + 3369003, + 3030637, + 7414855, + 7945698, + 5514005, + 2537012, + 4263003, + 3408130, + 6924305, + 7966118, + 139058, + 4250548, + 2770562 + ] +} \ No newline at end of file diff --git a/src/umbreit/db.py b/src/umbreit/db.py index a8fac2f..946e96e 100644 --- a/src/umbreit/db.py +++ b/src/umbreit/db.py @@ -195,5 +195,7 @@ raw_data_query_schema_map: PolarsSchema = { "BEDP_MAN": pl.UInt8, "BEDP_MENGE_BEDARF_VM": pl.UInt32, "MELDENUMMER": pl.UInt8, + "VERLAGSNR": pl.UInt32, "MENGE_VORMERKER": pl.UInt32, + "MANDFUEHR": pl.UInt8, } diff --git a/src/umbreit/types.py b/src/umbreit/types.py index 21cf817..490480e 100644 --- a/src/umbreit/types.py +++ b/src/umbreit/types.py @@ -21,8 +21,12 @@ class PipelineResult: results: pl.DataFrame open: pl.DataFrame + def __len__(self) -> int: + return len(self.results) + len(self.open) + class Freigabe(enum.Enum): WF_100 = False WF_200 = False + WF_900 = False WF_910 = False