finish WF-100 and begin WF-200

This commit is contained in:
Florian Förster 2026-01-14 12:22:54 +01:00
parent f98a2e2829
commit ab4eb1cbac
3 changed files with 238 additions and 90 deletions

View File

@ -229,22 +229,55 @@ df.head()
# %%
# // NO LIVE DATA NEEDED
# SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow"
p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow"
# df.write_ipc(p_save)
df = pl.read_ipc(p_save)
# %%
print(len(df))
df.head()
# %%
df.head()
# %%
# ** CHECK: duplicates
temp = df.fill_null(0)
mask = temp.select(pl.exclude(("BEDARFNR", "BEDP_SEQUENZ"))).is_duplicated()
temp.filter(mask)
# %%
df.filter(pl.col.BEDP_TITELNR.is_duplicated()).sort("BEDP_TITELNR", descending=False)
# %%
# ** CHECK: positions without titlenumber
df.filter(pl.col.VERLAGSNR.is_null())["BEDP_MAN"].unique()
# %%
# ** CHECK: unique title number?
df.with_columns(titlenumber_count=pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR")).select(
["BEDP_TITELNR", "titlenumber_count"]
).unique().filter(pl.col("titlenumber_count") > 1)
df.group_by("BEDP_TITELNR").agg(
pl.col("BEDP_TITELNR").len().alias("count"),
pl.col.BEDP_MAN.unique().alias("unique_bedp_man"),
pl.col.MANDFUEHR.unique().alias("unique_man_fuehr"),
).unique().filter(pl.col("count") > 1)
# %%
df.filter(pl.col.BEDP_TITELNR == 8679893)
# %%
df.with_columns(
pl.col("BEDP_TITELNR").count().over("BEDP_TITELNR").alias("titlenumber_count")
).select(["BEDP_TITELNR", "titlenumber_count"]).unique().filter(
pl.col("titlenumber_count") > 1
)
# %%
# ** CHECK: distribution of MELDENUMMER
df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select(
pl.len()
temp = (
df.group_by("MELDENUMMER")
.agg(pl.col("MELDENUMMER").len().alias("count"))
.sort("count", descending=True)
)
sum_entries = len(df)
temp = temp.with_columns((pl.col.count / sum_entries).alias("proportion"))
temp = temp.with_columns(pl.col.proportion.cum_sum().alias("cum"))
temp
# df.filter(pl.col("MELDENUMMER").is_not_null() & pl.col("MELDENUMMER").is_in((17, 18))).select(
# pl.len()
# )
# p_save = Path.cwd() / "meldenummer_anteile_20260114-1.xlsx"
# temp.write_excel(p_save)
# %%
# ** CHECK: differences MANDANT in BEDP and in TINFO
# 4591588: in title database with different MANDANT (are MANDANTFUEHR and BEDP_MAN feasible for matching?)
@ -458,41 +491,12 @@ class PipelineResult:
(self._subtracted_indices, data[self._index_cols])
)
# TODO remove
# def _subtract_from_open(
# self,
# data: pl.DataFrame,
# ) -> None:
# self._open = self._open.join(data, on=self._index_cols, how="anti")
# self._subtracted_indices = pl.concat(
# (self._subtracted_indices, data[self._index_cols])
# )
# def _subtract_from_indices(
# self,
# indices: pl.DataFrame,
# ) -> None:
# self._open = self._open.join(indices, on=self._index_cols, how="anti")
# self._subtracted_indices = pl.concat(
# (self._subtracted_indices, indices[self._index_cols])
# )
def _add_results(
self,
data: pl.DataFrame,
) -> None:
self._results = pl.concat([self._results, data])
# TODO remove
# def add_pipeline_results(self, pipeline: PipelineResult) -> None:
# self._add_results(pipeline.results)
# def subtract_pipeline(
# self,
# pipeline: PipelineResult,
# ) -> None:
# self._subtract_data(pipeline.subtracted_indices)
def merge_pipeline(
self,
pipeline: PipelineResult,
@ -508,6 +512,7 @@ class PipelineResult:
freigabe_auto: types.Freigabe,
is_out: bool,
) -> None:
# TODO move to other position
ORDER_QTY_CRIT: typing.Final[str] = "BEDP_MENGE_BEDARF_VM"
results = data.rename(db.map_to_result)
@ -781,24 +786,19 @@ def wf100_petersen_wdb_sub1(
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the
# past 6 months
title_nos = res.out_["BEDP_TITELNR"].to_list()
# !! query used because of slow pre-filtering queries
# TODO check for more native pre-filtering within the database when
# TODO performance problems are solved
start_date = get_starting_date(180)
filter_ = sql.and_(
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(title_nos),
db.EXT_BESPBES_INFO.c.BESP_TITELNR.in_(res.out_["BEDP_TITELNR"].to_list()),
db.EXT_BESPBES_INFO.c.BES_DATUM >= start_date,
)
stmt = sql.select(db.EXT_BESPBES_INFO).where(filter_)
df_order = pl.read_database(stmt, engine, schema_overrides=db.EXT_BESPBES_INFO_schema_map)
df_show = (
entries_show = (
df_order.group_by("BESP_TITELNR")
.agg(pl.col("BESP_TITELNR").count().alias("count"))
.filter(pl.col("count") > 1)
)
entries_to_show = df_show["BESP_TITELNR"].to_list()
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show)
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_show["BESP_TITELNR"].to_list())
res = _apply_several_filters(pipe_result.open, (filter_titleno,))
pipe_result.write_results(
data=res.in_,
@ -820,7 +820,7 @@ def wf100_petersen_wdb_sub1(
# %%
# SAVING/LOADING
p_save = Path.cwd() / "raw_data_from_sql_query_20260109-1.arrow"
p_save = Path.cwd() / "raw_data_from_sql_query_20260114-1.arrow"
df = pl.read_ipc(p_save)
print(f"Number of entries: {len(df)}")
@ -879,7 +879,7 @@ print(f"Number of entries open data: {len(pipe_res.open):>10}")
pipe_res.results
# raw_data.filter(pl.col("BEDARFNR") == 166982).filter(pl.col("BEDP_SEQUENZ") == 1)
# %%
pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# pipe_res.open.filter(pl.col("BEDP_MENGE_BEDARF_VM") > pl.col("MENGE_VORMERKER"))
# %%
pipe_res = workflow_910(pipe_res)
print(f"Length of base data: {len(raw_data):>18}")
@ -887,7 +887,7 @@ print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.results.select(pl.col("vorlage").value_counts())
# pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
pipe_res = workflow_100_umbreit(pipe_res, VM_CRITERION)
print(f"Length of base data: {len(raw_data):>18}")
@ -900,6 +900,9 @@ print(f"Length of base data: {len(raw_data):>18}")
print(f"Number of entries pipe data: {len(pipe_res):>10}")
print(f"Number of entries result data: {len(pipe_res.results):>8}")
print(f"Number of entries open data: {len(pipe_res.open):>10}")
# %%
pipe_res.open.filter(pl.col.MELDENUMMER == 18).filter(pl.col.BEDP_MENGE_BEDARF_VM > 0)
# %%
pipe_res.results.select(pl.col("vorlage").value_counts())
# %%
@ -918,28 +921,89 @@ filt_out
# Workflow 200 (Umbreit only)
# ---------------------------------------------------------------------------- #
# %%
wf_200_start_data = filt_out.clone()
wf_200_start_data = pipe_res.open.clone()
wf_200_start_data
# %%
def _init_workflow_200_umbreit(
results: pl.DataFrame,
data: pl.DataFrame,
vm_criterion: str,
) -> tuple[pl.DataFrame, pl.DataFrame]:
def workflow_200_umbreit(
pipe_result: PipelineResult,
) -> PipelineResult:
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) == 0
# not relevant, because already done in WF-100
# filter_number_vm = pl.col(vm_criterion) == 0
relevant, filt = _apply_several_filters(
data, (filter_meldenummer, filter_mandant, filter_number_vm)
res = _apply_several_filters(
pipe_result.open,
(filter_meldenummer, filter_mandant),
)
return relevant, filt
relevant_mnr: tuple[int, ...] = (17, 18)
filter_meldenummer = pl.col("MELDENUMMER").is_in(relevant_mnr)
filter_mandant = pl.col("BEDP_MAN") == 1
res = _apply_several_filters(
pipe_res.open,
(filter_meldenummer, filter_mandant),
)
# %%
# these entries must be checked for relevant orders
# therefore, a temp table must be created in the database to execute efficient
# queries, other approaches are just hacks
# SOLUTION:
# - save these entries to a temp table 'temp'
# - look up the order history of the past 3 months
# -- JOIN ON temp.BEDP_TITELNR = EXT_AUFPAUF.TITELNR
# -- WHERE EXT_AUFPAUF.AUFTRAGS_DATUM > (CURRENT_DATE - 3 months) AND
# -- EXT_AUFPAUF.KUNDE_RECHNUNG NOT IN (608991, 260202) AND
# --
res.in_
# %%
# // demo query with IN statement
data = res.in_.clone()
title_sub_choice = data["BEDP_TITELNR"][:300].to_list()
rel_date = get_starting_date(90)
rel_date
# %%
filter_ = sql.and_(
db.EXT_AUFPAUF.c.TITELNR.in_(title_sub_choice),
db.EXT_AUFPAUF.c.AUFTRAGS_DATUM >= rel_date,
db.EXT_AUFPAUF.c.KUNDE_RECHNUNG.not_in((608991, 260202)),
)
stmt = sql.select(db.EXT_AUFPAUF).where(filter_)
print(stmt.compile(engine))
# %%
demo = pl.read_database(
stmt,
engine,
schema_overrides=db.EXT_AUFPAUF_schema_map,
)
# %%
demo.head()
# %%
demo_2 = demo.clone()
demo_2.head()
print(f"Number of titles before filtering: {len(demo_2)}")
demo_2 = demo_2.filter(pl.col.AUFTRAGS_ART.is_in((1, 99)))
demo_2 = (
demo_2.group_by("TITELNR", maintain_order=True)
.agg(
pl.len().alias("count"),
pl.col.KUNDE_RECHNUNG.n_unique().alias("customer_count"),
)
.filter(pl.col.customer_count >= 3)
)
# these remaining titles are relevant and should be shown
# the others should be disposed
print(f"Number of titles which are relevant: {len(demo_2)}")
print(f"Number of titles which are to be disposed: {len(demo) - len(demo_2)}")
demo_2
# %%
df, filt_out = _init_workflow_200_umbreit(results, wf_200_start_data, VM_CRITERION)
df
@ -1155,7 +1219,8 @@ res_vm_crit.out_
# filtered out entries (WDB with #VM == 1) must be analysed for orders in the past 6 months
title_nos = res_vm_crit.out_["BEDP_TITELNR"].to_list()
len(title_nos)
# %%
title_nos
# %%
# define starting date for 6 month interval
@ -1175,6 +1240,7 @@ df_show = (
.agg(pl.col("BESP_TITELNR").count().alias("count"))
.filter(pl.col("count") > 1)
)
df_show
# %%
# !! show these entries
# !! do not show others
@ -1183,7 +1249,7 @@ print(f"Number of entries relevant: {len(entries_to_show)}")
# %%
res_vm_crit.out_
# %%
filter_titleno = pl.col("BEDP_TITELNR").is_in(entries_to_show)
filter_titleno = pl.col("BEDP_TITELNR").is_in(df_show["BESP_TITELNR"].implode())
res_wdb = _apply_several_filters(res_vm_crit.out_, (filter_titleno,))

View File

@ -39,40 +39,102 @@ set timing on
-- -- WHERE bedp.BEDP_MAN IN (1, 90) AND t_info.MELDENUMMER != 26;
-- PROMPT ######################################
PROMPT #################################################
SELECT * FROM (
SELECT
bedp.BEDARFNR,
bedp.BEDP_SEQUENZ,
bedp.BEDP_TITELNR,
bedp.BEDP_MAN,
bedp.BEDP_MENGE_BEDARF_VM,
t_info.MELDENUMMER,
t_info.VERLAGSNR,
t_info.MENGE_VORMERKER,
t_info.MANDFUEHR
FROM EXT_BEDPBED bedp
LEFT JOIN EXT_TITEL_INFO t_info
ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
) view1
WHERE view1.VERLAGSNR IN (76008, 76070)
FETCH FIRST 100 ROWS ONLY;
SELECT * FROM EXT_BESPBES_INFO besp
WHERE besp.BESP_TITELNR = 7590554 AND
besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD');
-- SELECT COUNT(*) FROM (
-- SELECT /*+ NO_USE_HASH(bedp t_info) */
-- view1.BEDP_TITELNR,
-- SELECT * FROM (
-- SELECT
-- bedp.BEDARFNR,
-- bedp.BEDP_SEQUENZ,
-- bedp.BEDP_TITELNR,
-- bedp.BEDP_MAN,
-- bedp.BEDP_MENGE_BEDARF_VM,
-- t_info.MELDENUMMER,
-- t_info.VERLAGSNR,
-- t_info.MENGE_VORMERKER,
-- t_info.MANDFUEHR
-- FROM
-- (SELECT DISTINCT bedp.BEDP_TITELNR FROM EXT_BEDPBED bedp) view1
-- FROM EXT_BEDPBED bedp
-- LEFT JOIN EXT_TITEL_INFO t_info
-- ON view1.BEDP_TITELNR = t_info.TI_NUMMER
-- ) sub1 WHERE sub1.MANDFUEHR IN (1,90) AND sub1.MELDENUMMER in (17, 18);
-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
-- ) view1
-- WHERE view1.VERLAGSNR IN (76008, 76070)
-- FETCH FIRST 100 ROWS ONLY;
-- ### default query for raw data ###
-- SELECT
-- bedp.BEDARFNR,
-- bedp.BEDP_SEQUENZ,
-- bedp.BEDP_TITELNR,
-- bedp.BEDP_MAN,
-- bedp.BEDP_MENGE_BEDARF_VM,
-- t_info.MELDENUMMER,
-- t_info.VERLAGSNR,
-- t_info.MENGE_VORMERKER,
-- t_info.MANDFUEHR
-- FROM EXT_BEDPBED bedp
-- LEFT JOIN EXT_TITEL_INFO_NEU t_info
-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER;
SELECT * FROM EXT_BESPBES_INFO besp
WHERE besp.BESP_TITELNR IN (
6351156,
5102709,
8379474,
2461414,
5244007,
236729,
7776227,
6820705,
2032829,
5087975,
7306207,
8030748,
6722678,
9514688,
1059243,
7831209,
1778312,
7738343,
3920800,
3506292,
1985266,
7168195,
6235581,
7981302,
1136555,
9875369
) AND
besp.BES_DATUM > TO_DATE('2025-06-01', 'YYYY-MM-DD')
ORDER BY besp.BES_DATUM ASC;
-- SELECT
-- bedp.BEDARFNR,
-- bedp.BEDP_SEQUENZ,
-- bedp.BEDP_TITELNR,
-- bedp.BEDP_MAN,
-- bedp.BEDP_MENGE_BEDARF_VM,
-- t_info.MELDENUMMER,
-- t_info.VERLAGSNR,
-- t_info.MENGE_VORMERKER,
-- t_info.MANDFUEHR
-- FROM EXT_BEDPBED bedp
-- LEFT JOIN EXT_TITEL_INFO_NEU t_info
-- ON bedp.BEDP_TITELNR = t_info.TI_NUMMER;
SELECT COUNT(*) FROM (SELECT
bedp.BEDARFNR,
bedp.BEDP_SEQUENZ,
bedp.BEDP_TITELNR,
bedp.BEDP_MAN,
bedp.BEDP_MENGE_BEDARF_VM,
t_info.MELDENUMMER,
t_info.VERLAGSNR,
t_info.MENGE_VORMERKER,
t_info.MANDFUEHR
FROM EXT_BEDPBED bedp
LEFT JOIN EXT_TITEL_INFO_NEU t_info
ON bedp.BEDP_TITELNR = t_info.TI_NUMMER
WHERE bedp.BEDP_MAN IN (1,90) AND t_info.MELDENUMMER in (17, 18) AND
bedp.BEDP_MENGE_BEDARF_VM = 0);
-- SELECT * FROM (
@ -175,4 +237,24 @@ besp.BES_DATUM > TO_DATE('2023-06-01', 'YYYY-MM-DD');
-- SELECT *
-- FROM EXT_AUFPAUF auf
-- WHERE auf.AUFTRAGS_DATUM > TO_DATE('2025-11-18', 'YYYY-MM-DD');
-- WHERE auf.AUFTRAGS_DATUM > TO_DATE('2025-11-18', 'YYYY-MM-DD');
-- system queries
-- SELECT owner, table_name
-- FROM all_tables
-- ORDER BY owner, table_name;
-- SELECT table_name
-- FROM user_tables
-- ORDER BY table_name;
-- SELECT view_name
-- FROM user_views
-- ORDER BY view_name;
-- SELECT owner, view_name
-- FROM all_views
-- WHERE owner = 'UMB'
-- ORDER BY view_name;
-- DESC all_views;
DESC EXT_AUFPAUF;

View File

@ -51,7 +51,7 @@ ext_bedpbed_null_values: PolarsNullValues = {
}
ext_titel_info = Table(
"ext_titel_info",
"ext_titel_info_neu",
metadata,
Column("TI_NUMMER", sql.Integer, primary_key=True, autoincrement=False, nullable=False),
Column("MANDFUEHR", sql.Integer, primary_key=True, autoincrement=False, nullable=False),