umbreit-py/data_analysis/01-1_transform_db.py

121 lines
2.9 KiB
Python

# %%
import importlib
from pathlib import Path
from pprint import pprint
import polars as pl
import sqlalchemy as sql
from umbreit import db
from umbreit.types import PolarsNullValues, PolarsSchema
# %%
db = importlib.reload(db)
# %%
db_path = (Path.cwd() / "../data/data.db").resolve()
data_path = db_path.parent / "20251105"
assert db_path.parent.exists()
assert data_path.exists() and data_path.is_dir()
def transform_csv_to_database(
db_path: Path, target_folder: Path
) -> tuple[sql.Engine, list[pl.DataFrame]]:
if db_path.exists():
db_path.unlink()
engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True)
db.metadata.create_all(engine)
map_file_db: dict[
str,
tuple[sql.Table, PolarsSchema, PolarsNullValues],
] = {} # table name to db
for table, schema, null_values in db.csv_tables:
map_file_db[table.fullname] = table, schema, null_values
csv_files = target_folder.glob("*.csv")
dataframes: list[pl.DataFrame] = []
for file in csv_files:
table_name = file.stem
db_table, schema, null_values = map_file_db[table_name]
print(f"Reading table: {table_name}")
df = pl.read_csv(
file,
separator=";",
try_parse_dates=True,
schema_overrides=schema,
null_values=null_values,
)
dataframes.append(df)
df.write_database(db_table.fullname, connection=engine, if_table_exists="replace")
print(f"Successfully written database to: >{db_path}<")
return engine, dataframes
# %%
engine, dataframes = transform_csv_to_database(db_path, data_path)
# %%
stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM ext_titel_info WHERE MANDFUEHR=1 and TI_NUMMER=2928800")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
stmt = sql.text("SELECT * FROM results")
with engine.connect() as conn:
res = conn.execute(stmt)
print(res.all())
# %%
df = dataframes[1]
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%
len(df)
# %%
df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0"))
# %%
stmt = sql.text("SELECT * FROM ext_bedpbed")
df = pl.read_database(stmt, engine)
# %%
df
# %%
# %%
col_dtype = {}
for col, dtype in zip(df.columns, df.dtypes):
col_dtype[col] = dtype
print("dtypes of DF...")
pprint(col_dtype)
# %%