implemented first reference workflows

This commit is contained in:
Florian Förster 2025-12-03 16:09:09 +01:00
parent 82ad0691c0
commit 4f234670b8
4 changed files with 389 additions and 73 deletions

View File

@ -191,7 +191,9 @@ stmt = sql.select(
else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM, else_=db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
).label("BEDP_MENGE_BEDARF_VM"), ).label("BEDP_MENGE_BEDARF_VM"),
db.ext_titel_info.c.MELDENUMMER, db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.VERLAGSNR,
db.ext_titel_info.c.MENGE_VORMERKER, db.ext_titel_info.c.MENGE_VORMERKER,
db.ext_titel_info.c.MANDFUEHR,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True)) ).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition, isouter=True))
print(stmt.compile(engine)) print(stmt.compile(engine))
@ -206,18 +208,31 @@ elapsed = t2 - t1
print(f"Query duration: {elapsed:.4f} sec") print(f"Query duration: {elapsed:.4f} sec")
print("Number of entries: ", len(df)) print("Number of entries: ", len(df))
print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB") print(f"Estimated size in memory: {df.estimated_size(unit='mb')} MB")
# %%
df.head()
# %% # %%
# // NO LIVE DATA NEEDED # // NO LIVE DATA NEEDED
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20251203-2.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow"
# df.write_ipc(p_save) # df.write_ipc(p_save)
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
# %% # %%
print(len(df)) print(len(df))
df.head() df.head()
# %%
# ** CHECK: differences MANDANT in BEDP and in TINFO
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?) # 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).select(pl.col("BEDP_MAN").unique())
# %%
df.group_by("BEDP_MAN").agg(pl.col("MANDFUEHR").unique())
# %%
df.filter(pl.col("MANDFUEHR").is_null()).filter(pl.col("BEDP_MAN") == 1)
# %%
# df.filter(pl.col("BEDP_MAN") != pl.col("MANDFUEHR")).filter(pl.col("BEDP_MAN") == 5)
df.filter(pl.col("BEDP_MAN") == 60).filter(pl.col("MANDFUEHR").is_null())
# %% # %%
# ** CHECK: different MANDANTEN # ** CHECK: different MANDANTEN
# check for valid entries for unknown MANDANTEN # check for valid entries for unknown MANDANTEN
@ -242,7 +257,11 @@ df.head()
# %% # %%
# ** PREFILTER # ** PREFILTER
# always needed, entries filtered out are to be disposed # always needed, entries filtered out are to be disposed
df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26) filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
df.filter(filter_meldenummer_null).filter(filter_mandant)
# df = df.filter(pl.col("BEDP_MAN").is_in((1, 90))).filter(pl.col("MELDENUMMER") != 26)
# %% # %%
len(df) len(df)
# %% # %%
@ -283,8 +302,8 @@ df.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null()) not_in_title_table = df.filter(pl.col("MELDENUMMER").is_null())
EXPORT_FEAT = "BEDP_TITELNR" EXPORT_FEAT = "BEDP_TITELNR"
to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()} to_save = {EXPORT_FEAT: not_in_title_table.select(EXPORT_FEAT).to_series().to_list()}
p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-1.json" p_save_not_in_title_table = Path.cwd() / "not_in_title_table_20251203-2.json"
to_save print(to_save)
# with open(p_save_not_in_title_table, "w") as file: # with open(p_save_not_in_title_table, "w") as file:
# json.dump(to_save, file, indent=4) # json.dump(to_save, file, indent=4)
# %% # %%
@ -295,6 +314,7 @@ print(len(df.filter(pl.col("MELDENUMMER") == 18)))
VM_CRITERION = "BEDP_MENGE_BEDARF_VM" VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
# TODO exchange to new query focusing on TINFO table
def get_raw_data() -> pl.DataFrame: def get_raw_data() -> pl.DataFrame:
join_condition = sql.and_( join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER, db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
@ -320,7 +340,7 @@ def get_raw_data() -> pl.DataFrame:
) )
def get_empyt_pipeline_result( def get_empty_pipeline_result(
data: pl.DataFrame, data: pl.DataFrame,
) -> types.PipelineResult: ) -> types.PipelineResult:
schema = db.results_schema_map.copy() schema = db.results_schema_map.copy()
@ -329,27 +349,6 @@ def get_empyt_pipeline_result(
return types.PipelineResult(results=results, open=data) return types.PipelineResult(results=results, open=data)
def prepare_base_data(
df: pl.DataFrame,
) -> pl.DataFrame:
"""pre-routine to handle non-feasible entries
Parameters
----------
df : pl.DataFrame
raw data collected from database query
Returns
-------
pl.DataFrame
pre-processed data
"""
df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
df = df.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
return df
def _apply_several_filters( def _apply_several_filters(
df: pl.DataFrame, df: pl.DataFrame,
filters: Sequence[pl.Expr], filters: Sequence[pl.Expr],
@ -409,13 +408,45 @@ def _write_results(
"BEDP_MAN", "BEDP_MAN",
"BEDP_MENGE_BEDARF_VM", "BEDP_MENGE_BEDARF_VM",
"MELDENUMMER", "MELDENUMMER",
"VERLAGSNR",
"MENGE_VORMERKER", "MENGE_VORMERKER",
"MANDFUEHR",
] ]
) )
return pl.concat([results_table, data]) return pl.concat([results_table, data])
def workflow_900(
pipe_result: types.PipelineResult,
) -> types.PipelineResult:
"""pre-routine to handle non-feasible entries"""
filter_meldenummer_null = pl.col("MELDENUMMER").is_not_null()
filter_mandant = pl.col("MANDFUEHR").is_in((1, 90))
res = _apply_several_filters(
pipe_res.open,
(
filter_meldenummer_null,
filter_mandant,
),
)
pipe_result.results = _write_results(
pipe_result.results,
data=res.out_,
vorlage=False,
wf_id=900,
freigabe_auto=types.Freigabe.WF_900,
is_out=True,
)
pipe_result.open = res.in_.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
pipe_result.open = res.in_.with_columns(pl.col("BEDP_MENGE_BEDARF_VM").fill_null(0))
return pipe_result
# main routine # main routine
# results for filtered out entries written # results for filtered out entries written
def workflow_910( def workflow_910(
@ -476,73 +507,177 @@ def workflow_100_umbreit(
return pipe_result return pipe_result
# Petersen not present in data def workflow_100_petersen(
pipe_result: types.PipelineResult,
vm_criterion: str,
) -> types.PipelineResult:
# difference WDB and others
# WDB branch
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.results = _write_results(
results_table=pipe_result.results,
data=res.in_,
vorlage=True,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
is_out=False,
)
pipe_result.open = res.out_
# order quantity 0, no further action in other WFs
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 90
filter_WDB = pl.col("VERLAGSNR").is_in((76008, 76070))
filter_number_vm = pl.col(vm_criterion) == 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_WDB,
filter_number_vm,
),
)
pipe_result.results = _write_results(
results_table=pipe_result.results,
data=res.in_,
vorlage=False,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
is_out=False,
)
pipe_result.open = res.out_
# other branch
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 90
filter_number_vm = pl.col(vm_criterion) > 0
res = _apply_several_filters(
pipe_result.open,
(
filter_meldenummer,
filter_mandant,
filter_number_vm,
),
)
pipe_result.results = _write_results(
results_table=pipe_result.results,
data=res.in_,
vorlage=True,
wf_id=100,
freigabe_auto=types.Freigabe.WF_100,
is_out=False,
)
pipe_result.open = res.out_
return pipe_result
# %% # %%
# SAVING/LOADING # SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20251203-1.arrow" p_save = Path.cwd() / "raw_data_from_sql_query_20251203-3.arrow"
# df.write_ipc(p_save)
df = pl.read_ipc(p_save) df = pl.read_ipc(p_save)
print(f"Number of entries: {len(df)}") print(f"Number of entries: {len(df)}")
# %% # %%
df.head() df.head()
# %% # %%
removed_rows = [] # removed_rows = []
raw_data = df.clone() # raw_data = df.clone()
print(f"Length raw data: {len(raw_data)}") # print(f"Length raw data: {len(raw_data)}")
filter_mandant = pl.col("BEDP_MAN").is_in((1, 90)) # filter_mandant = pl.col("BEDP_MAN").is_in((1, 90))
filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26 # filter_ignore_MNR26 = pl.col("MELDENUMMER") != 26
filtered = raw_data.filter(filter_mandant) # filtered = raw_data.filter(filter_mandant)
filtered_n = raw_data.filter(~filter_mandant) # filtered_n = raw_data.filter(~filter_mandant)
num_filter = len(filtered) # num_filter = len(filtered)
num_filter_n = len(filtered_n) # num_filter_n = len(filtered_n)
removed_rows.append(filtered_n) # removed_rows.append(filtered_n)
print(f"Length filtered: {num_filter}") # print(f"Length filtered: {num_filter}")
print(f"Length filtered out: {num_filter_n}") # print(f"Length filtered out: {num_filter_n}")
print(f"Length all: {num_filter + num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}")
raw_data = filtered # raw_data = filtered
out = pl.concat(removed_rows) # out = pl.concat(removed_rows)
print(f"Length out: {len(out)}") # print(f"Length out: {len(out)}")
# %% # # %%
print("---------------------------------------") # print("---------------------------------------")
filtered = raw_data.filter(filter_ignore_MNR26) # filtered = raw_data.filter(filter_ignore_MNR26)
filtered_n = raw_data.filter(~filter_ignore_MNR26) # filtered_n = raw_data.filter(~filter_ignore_MNR26)
num_filter = len(filtered) # num_filter = len(filtered)
num_filter_n = len(filtered_n) # num_filter_n = len(filtered_n)
len(filtered_n) # len(filtered_n)
# %% # # %%
removed_rows.append(filtered_n) # removed_rows.append(filtered_n)
print(f"Length filtered: {num_filter}") # print(f"Length filtered: {num_filter}")
print(f"Length filtered out: {num_filter_n}") # print(f"Length filtered out: {num_filter_n}")
print(f"Length all: {num_filter + num_filter_n}") # print(f"Length all: {num_filter + num_filter_n}")
out = pl.concat(removed_rows) # out = pl.concat(removed_rows)
print(f"Length out: {len(out)}") # print(f"Length out: {len(out)}")
# %% # %%
raw_data = df.clone() raw_data = df.clone()
pipe_res = get_empyt_pipeline_result(raw_data) pipe_res = get_empty_pipeline_result(raw_data)
pipe_res.results pipe_res.results
pipe_res = workflow_900(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# print(f"Base data and pipe result in line: {}")
# %% # %%
pipe_res = workflow_910(pipe_res) pipe_res = workflow_910(pipe_res)
pipe_res print(f"Length of base data: {len(raw_data):>18}")
# df_start = prepare_base_data(df_raw) print(f"Number of entries pipe data: {len(pipe_res):>10}")
# df_start print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res = workflow_100_petersen(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res.results.filter(pl.col("vorlage") == True)
# %%
raw_data.filter(pl.col("BEDARFNR") == 922160).filter(pl.col("BEDP_SEQUENZ") == 3)
# %%
raw_data.head()
# %%
results_init = get_empyt_pipeline_result()
results, filt_out = workflow_100_umbreit(results_init, df_start, VM_CRITERION)
# df is where results are known
# filt_out contains entries for other workflows
# filt_out at this point represents all entries which are to be analysed in other workflows
# %%
results
# %% # %%
filt_out filt_out

View File

@ -0,0 +1,175 @@
{
"BEDP_TITELNR": [
5641810,
7055141,
9388245,
5690882,
8420618,
5625063,
4894841,
8047302,
7133112,
5355081,
6871073,
9435273,
4136531,
4424591,
7687300,
2682366,
4364686,
2430598,
2037163,
2789480,
2770591,
2770577,
2770583,
4121829,
2787037,
6003708,
4407203,
8776286,
5402902,
5838480,
5989581,
4522891,
3980696,
950637,
4965472,
4228186,
4210552,
5002965,
5545604,
5880206,
2241251,
6370663,
7683723,
7010822,
5161076,
4147313,
5793208,
7907745,
4261009,
2717881,
6067021,
4365985,
8040512,
8890058,
1780135,
7262230,
4410469,
9000191,
6444167,
4948035,
252810,
4976957,
6135037,
9037465,
5989608,
5729058,
4395070,
5625122,
4267436,
7888648,
6110254,
9787272,
4336175,
5497657,
2793591,
6893056,
3030639,
5700267,
7010792,
5491873,
258070,
3853173,
6046715,
6125576,
5132452,
1504007,
4262953,
7935360,
922162,
1049053,
9720614,
5591810,
2544914,
2107970,
7965895,
7966115,
7966119,
7580940,
6132326,
3370678,
6261428,
6261430,
8254294,
8254295,
6132322,
8139591,
8139588,
8139587,
8139586,
8139585,
8254301,
3369002,
4836770,
4836769,
4838001,
4838000,
6178366,
6178370,
6178371,
4837536,
4837537,
6132318,
6132323,
3408132,
5227665,
5227661,
5227666,
5590678,
4119572,
4836779,
3370676,
9436407,
4012212,
4427503,
4577066,
9418557,
2008168,
7580941,
6086598,
6132319,
8139590,
8630511,
717868,
6633287,
5335386,
4836777,
4154513,
2770540,
5730873,
6160255,
6939447,
5545606,
6178367,
2010002,
5494490,
9206119,
7146063,
5227663,
3369003,
3030637,
7414855,
7945698,
5514005,
2537012,
4263003,
3408130,
6924305,
7966118,
139058,
4250548,
2770562
]
}

View File

@ -195,5 +195,7 @@ raw_data_query_schema_map: PolarsSchema = {
"BEDP_MAN": pl.UInt8, "BEDP_MAN": pl.UInt8,
"BEDP_MENGE_BEDARF_VM": pl.UInt32, "BEDP_MENGE_BEDARF_VM": pl.UInt32,
"MELDENUMMER": pl.UInt8, "MELDENUMMER": pl.UInt8,
"VERLAGSNR": pl.UInt32,
"MENGE_VORMERKER": pl.UInt32, "MENGE_VORMERKER": pl.UInt32,
"MANDFUEHR": pl.UInt8,
} }

View File

@ -21,8 +21,12 @@ class PipelineResult:
results: pl.DataFrame results: pl.DataFrame
open: pl.DataFrame open: pl.DataFrame
def __len__(self) -> int:
return len(self.results) + len(self.open)
class Freigabe(enum.Enum): class Freigabe(enum.Enum):
WF_100 = False WF_100 = False
WF_200 = False WF_200 = False
WF_900 = False
WF_910 = False WF_910 = False