umbreit-py/data_analysis/02-1_initial_workflow_test.py

305 lines
7.5 KiB
Python

# %%
import importlib
from collections.abc import Sequence
from pathlib import Path
from pprint import pprint
import polars as pl
import sqlalchemy as sql
from umbreit import db
# %%
# db = importlib.reload(db)
# %%
db_path = (Path.cwd() / "../data/data.db").resolve()
data_path = db_path.parent / "20251105"
assert db_path.parent.exists()
assert data_path.exists() and data_path.is_dir()
engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True)
# %%
join_condition = sql.and_(
db.ext_bedpbed.c.BEDP_TITELNR == db.ext_titel_info.c.TI_NUMMER,
db.ext_bedpbed.c.BEDP_MAN == db.ext_titel_info.c.MANDFUEHR,
)
stmt = sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MAN,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
db.ext_titel_info.c.MELDENUMMER,
db.ext_titel_info.c.MENGE_VORMERKER,
).select_from(db.ext_bedpbed.join(db.ext_titel_info, join_condition))
# %%
print(stmt.compile(engine))
# %%
df_raw = pl.read_database(stmt, engine)
# %%
df_raw
# %%
filter_meldenummer = pl.col("MELDENUMMER") == 18
# %%
# df_new = df.filter(pl.col("MENGE_VORMERKER").is_not_null() & pl.col("MENGE_VORMERKER") > 0)
# filter mandant: Umbreit
filter_mandant_umbreit = pl.col("BEDP_MAN") == 1
df_mandant = df_raw.filter(filter_mandant_umbreit)
df_mandant
# %%
# filter #VM
# VM_CRITERION = "MENGE_VORMERKER"
VM_CRITERION = "BEDP_MENGE_BEDARF_VM"
df_mandant = df_mandant.with_columns(pl.col(VM_CRITERION).fill_null(0))
filter_vm = pl.col(VM_CRITERION) > 0 # pl.col("MENGE_VORMERKER").is_not_null() &
df_new = df_mandant.filter(filter_vm)
# df_new = df_mandant.filter(pl.col("MENGE_VORMERKER").is_not_null()).filter(pl.col("MENGE_VORMERKER") > 0)
df_new
# %%
def apply_several_filters(
df: pl.DataFrame,
filters: Sequence[pl.Expr],
) -> tuple[pl.DataFrame, pl.DataFrame]:
df_current = df
removed_rows: list[pl.DataFrame] = []
for filter in filters:
removed = df_current.filter(~filter)
removed_rows.append(removed)
df_current = df_current.filter(filter)
df_removed = pl.concat(removed_rows)
return df_current, df_removed
def prepare_base_data(df: pl.DataFrame) -> pl.DataFrame:
df = df.with_columns(pl.col("MENGE_VORMERKER").fill_null(0))
return df
# def workflow_100_start(
# df: pl.DataFrame,
# ) -> tuple[pl.DataFrame, pl.DataFrame]:
# return apply_several_filters(df, (filter,))
def workflow_100_umbreit(
df: pl.DataFrame,
vm_criterion: str,
) -> tuple[pl.DataFrame, pl.DataFrame]:
filter_meldenummer = pl.col("MELDENUMMER") == 18
filter_mandant = pl.col("BEDP_MAN") == 1
filter_number_vm = pl.col(vm_criterion) > 0
return apply_several_filters(df, (filter_meldenummer, filter_mandant, filter_number_vm))
# %%
out_remainder: list[pl.DataFrame] = []
df_start = prepare_base_data(df_raw)
df_start
# %%
df, filt_out = workflow_100_umbreit(df_start, VM_CRITERION)
# filt_out at this point represents all entries which are to be analysed in other workflows
out_remainder.append(filt_out)
pipe_removed = pl.concat(out_remainder)
# %%
df
# %%
pipe_removed
# idea: use pipe_removed for other workflows
# in the end there should not be any open positions left (assuming all cases are implemented)
# %%
# post-processing the results
def results_workflow_100(
df: pl.DataFrame,
) -> pl.DataFrame:
df = df.rename(db.map_to_result)
df = df.with_columns(
[
pl.col("BEDP_MENGE_BEDARF_VM").alias("best_menge"),
pl.lit(True).alias("vorlage"),
pl.lit(100).alias("wf_id"),
pl.lit(False).alias("freigabe_auto"),
]
)
df = df.drop(
[
"BEDP_TITELNR",
"BEDP_MAN",
"BEDP_MENGE_BEDARF_VM",
"MELDENUMMER",
"MENGE_VORMERKER",
]
)
return df
# %%
pipe_post = results_workflow_100(df)
pipe_post
# %%
pipe_post.write_database(db.results.fullname, engine, if_table_exists="replace")
# %%
stmt = sql.select(db.results.c.bedarf_nr, db.results.c.bedarf_sequenz)
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
filt_out
# %%
df_umbreit_18 = workflow_100_umbreit(df, VM_CRITERION)
df_umbreit_18
# ----------------------------------------------------------------------------
# %%
target_bednr = df_new["BEDARFNR"].to_list()
target_seq = df_new["BEDP_SEQUENZ"].to_list()
# %%
stmt = (
sql.select(
db.ext_bedpbed.c.BEDARFNR,
db.ext_bedpbed.c.BEDP_SEQUENZ,
db.ext_bedpbed.c.BEDP_TITELNR,
db.ext_bedpbed.c.BEDP_MENGE_BEDARF_VM,
)
.where(db.ext_bedpbed.c.BEDARFNR.in_(target_bednr))
.where(db.ext_bedpbed.c.BEDP_SEQUENZ.in_(target_seq))
)
df_targets = pl.read_database(stmt, engine)
# %%
# df_targets.filter(pl.col("BEDARFNR") == 884174)
df_targets.filter(pl.col("BEDP_MENGE_BEDARF_VM") > 0)
# %%
# interesting order: 883697, 1, titleno: 7945981, 9964027
TITLE_NO = 7945981
# TITLE_NO = 9964027
stmt = sql.select(db.EXT_BESPBES_INFO).where(db.EXT_BESPBES_INFO.c.BESP_TITELNR == TITLE_NO)
title_buy = pl.read_database(stmt, engine)
# %%
title_buy
# %% when were the orders placed
stmt = sql.select(db.EXT_AUFPAUF).where(db.EXT_AUFPAUF.c.TITELNR == 7945981)
title_order = pl.read_database(stmt, engine)
# %%
title_order
# -------------------------------------------------------------------------------------------
# %%
# title DB complete?
# - includes only titles which are deliverable since 01.06.2025 and who are assigned to
# buyer "Fröhlich"
stmt = sql.select(db.ext_titel_info) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
titles = pl.read_database(stmt, engine, schema_overrides=db.ext_titel_info_schema_map)
# %%
titles["MANDFUEHR"].unique()
# %%
unique_titles = set(titles["TI_NUMMER"].to_list())
len(unique_titles)
# %%
# requirements?
# - includes only order since 05.11.2025
stmt = sql.select(db.ext_bedpbed) # .where(db.ext_titel_info.c.TI_NUMMER == 2928800)
reqs = pl.read_database(stmt, engine, schema_overrides=db.ext_bedpbed_schema_map)
# %%
reqs
# %%
reqs["BEDP_MAN"].unique()
# %%
# intersection between all titles and the titles contained in the requirements table
unique_titles_req = set(reqs["BEDP_TITELNR"].to_list())
len(unique_titles_req)
# %%
intersection = unique_titles & unique_titles_req
len(intersection)
# %%
# orders?
# - includes only order since 05.11.2025
stmt = sql.select(db.EXT_AUFPAUF)
orders = pl.read_database(stmt, engine, schema_overrides=db.EXT_AUFPAUF_schema_map)
# %%
orders.estimated_size(unit="mb")
# %%
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
df = dataframes[1]
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%
len(df)
# %%
df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0"))
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed")
df = pl.read_database(stmt, engine)
# %%
df
# %%
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%