# %% import importlib from pathlib import Path from pprint import pprint import polars as pl import sqlalchemy as sql from umbreit import db from umbreit.types import PolarsNullValues, PolarsSchema # %% db = importlib.reload(db) # %% db_path = (Path.cwd() / "../data/data.db").resolve() data_path = db_path.parent / "20251105" assert db_path.parent.exists() assert data_path.exists() and data_path.is_dir() def transform_csv_to_database( db_path: Path, target_folder: Path ) -> tuple[sql.Engine, list[pl.DataFrame]]: if db_path.exists(): db_path.unlink() engine = sql.create_engine(f"sqlite:///{str(db_path)}", echo=True) db.metadata.create_all(engine) map_file_db: dict[ str, tuple[sql.Table, PolarsSchema, PolarsNullValues], ] = {} # table name to db for table, schema, null_values in db.csv_tables: map_file_db[table.fullname] = table, schema, null_values csv_files = target_folder.glob("*.csv") dataframes: list[pl.DataFrame] = [] for file in csv_files: table_name = file.stem db_table, schema, null_values = map_file_db[table_name] print(f"Reading table: {table_name}") df = pl.read_csv( file, separator=";", try_parse_dates=True, schema_overrides=schema, null_values=null_values, ) dataframes.append(df) df.write_database(db_table.fullname, connection=engine, if_table_exists="replace") print(f"Successfully written database to: >{db_path}<") return engine, dataframes # %% engine, dataframes = transform_csv_to_database(db_path, data_path) # %% stmt = sql.text("SELECT * FROM EXT_AUFPAUF WHERE AUFTRAGSNUMMER=37847548 and TITELNR=6315273") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_titel_info WHERE MANDFUEHR=1 and TI_NUMMER=2928800") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM ext_bedpbed WHERE BEDARFNR=859131 and BEDP_SEQUENZ=2") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM EXT_BESPBES_INFO WHERE BESP_TITELNR=6312977") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% stmt = sql.text("SELECT * FROM results") with engine.connect() as conn: res = conn.execute(stmt) print(res.all()) # %% df = dataframes[1] # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %% len(df) # %% df.filter((pl.col("BEDP_MENGE_BEDARF_VM") != "") & (pl.col("BEDP_MENGE_BEDARF_VM") != "0")) # %% stmt = sql.text("SELECT * FROM ext_bedpbed") df = pl.read_database(stmt, engine) # %% df # %% # %% col_dtype = {} for col, dtype in zip(df.columns, df.dtypes): col_dtype[col] = dtype print("dtypes of DF...") pprint(col_dtype) # %%