# %% import dataclasses as dc import hashlib import json import random import sys from pathlib import Path from typing import cast import click import dopt_basics.datetime as dopt_dt import polars as pl import rich.box import rich.table import sqlalchemy as sql from dopt_basics import io from rich.console import Console from dopt_pollublock_blockchain import blockchain, db, types from dopt_pollublock_blockchain import constants as const user_seed: int | None = const.RNG_DEFAULT_SEED console = Console() p_base = io.search_folder_path(Path(__file__), "src", return_inclusive=False) assert p_base is not None p_bc_db = p_base / "prototypes/blockchain.db" assert p_bc_db.exists(), "blockchain data path not found" p_sensor_data = p_base / "prototypes/pollublock_data.db" assert p_sensor_data.exists(), "sensor data path not found" engine = sql.create_engine(f"sqlite:///{str(p_sensor_data)}") # // common helper functions def print_exception( console: Console, error_txt: str, exc: Exception, ) -> None: console.print(f":warning: [bold bright_red]{error_txt}") console.print(f":warning: [bold bright_red]Details:\n{exc}") def rich_table_from_db_entry(df: pl.DataFrame, title: str) -> rich.table.Table: column_mapping: dict[str, str] = { "Datetime": "Zeitstempel", "Temperature_(Celsius)": "Temperatur (°C)", "Pressure_(Pa)": "Druck (Pa)", "Air_Quantity_(Percent)": "Luftmenge (%)", "Blockchain_Block_Number": "Block-Nr. Blockchain", } q = ( df.lazy() .with_columns(pl.col("Datetime").dt.replace_time_zone("UTC")) .with_columns( pl.col("Index").cast(pl.String), pl.col("Datetime").dt.to_string("iso:strict"), pl.col("Temperature_(Celsius)").map_elements( lambda x: f"{x:.2f}", return_dtype=pl.String ), pl.col("Pressure_(Pa)").map_elements( lambda x: f"{x}".zfill(5), return_dtype=pl.String ), pl.col("Air_Quantity_(Percent)").map_elements( lambda x: f"{x:.14f}", return_dtype=pl.String ), pl.col("Blockchain_Block_Number").cast(pl.String), ) .rename(column_mapping) ) df = q.collect() table = rich.table.Table(title=title, box=rich.box.ROUNDED) for col in df.columns: table.add_column(col) table.add_row(*df.row(0)) return table def df_transform_hashing( df: pl.DataFrame, ) -> pl.DataFrame: q = ( df.lazy() .with_columns(pl.col("Datetime").dt.replace_time_zone("UTC")) .with_columns( pl.col("Datetime").dt.to_string("iso:strict").alias("timestamp_str"), pl.col("Temperature_(Celsius)") .map_elements(lambda x: f"{x:.2f}", return_dtype=pl.String) .alias("temp"), pl.col("Pressure_(Pa)") .map_elements(lambda x: f"{x}".zfill(5), return_dtype=pl.String) .alias("pressure"), pl.col("Air_Quantity_(Percent)") .map_elements(lambda x: f"{x:.14f}", return_dtype=pl.String) .alias("air"), ) .with_columns( ( pl.col("timestamp_str") + "," + pl.col("temp") + "," + pl.col("pressure") + "," + pl.col("air") ).alias("combined") ) .select(["Index", "combined", "Blockchain_Block_Number"]) ) return q.collect() def sample_entry( rng: random.Random, ) -> types.RandomSampleEntry: temp = round(rng.gauss(const.TEMPERATURE_MEAN, const.TEMPERATURE_STD), 2) pressure = rng.choice(range(const.PRESSURE_MIN, const.PRESSURE_MAX + 1)) air_qty = round(rng.gauss(const.AIRQTY_MEAN, const.AIRQTY_STD), 6) sampled_entry: types.RandomSampleEntry = { "Datetime": dopt_dt.current_time_tz(cut_microseconds=True), "Temperature_(Celsius)": temp, "Pressure_(Pa)": pressure, "Air_Quantity_(Percent)": air_qty, "Blockchain_Block_Number": None, } return sampled_entry # // blockchain def blockchain_init() -> blockchain.Blockchain: console.print("Einlesen der Blockchain...") try: chain = blockchain.Blockchain(p_bc_db) chain.difficulty = const.BLOCKCHAIN_DIFFICULTY chain.load(const.BLOCKCHAIN_LOADING_BATCH_SIZE) if not chain.genesis_done: raise RuntimeError("Blockchain enthält keinen Genesis-Block") console.print("Einlesen der Blockchain erfolgreich") except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"Die Blockchain enthält [bold]{len(chain)} Blöcke [/bold]. Darin eingeschlossen " f"ist der initiale Start-Block, auch 'Genesis-Block' genannt." ) console.print("Validiere Blockchain...") console.print("Prüfe Hashwerte und korrekte Verkettung...") try: success = chain.validate() if not success: raise RuntimeError( "Blockchain konnte nicht validiert werden. " "Die Daten scheinen nicht integer zu sein." ) console.print("Validierung der Blockchain erfolgreich") except Exception: console.print_exception(max_frames=20) sys.exit(1) return chain def start_seeding() -> bool: global user_seed console.print( "\nWir arbeiten mit zufälligen Werten. Damit die Ergebnisse trotzdem reproduzierbar " "sind, nutzen wir einen Initialwert für den Zufallsgenerator, den Sie " "bestimmen können." ) rng_seed: int = 0 while rng_seed == 0: try: user_input = console.input("[italic]Wählen Sie einen Wert zwischen 1 und 100: ") if user_input.strip(" ") not in const.RNG_SEED_RANGE: console.print( "Es sind nur Werte zwischen [bold]'1'[/bold] und [bold]'100'" "[/bold] als Eingabe zulässig" ) continue rng_seed = int(user_input) except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print(f"Sie haben '{rng_seed}' gewählt") user_seed = rng_seed const.RNG.seed(user_seed) console.print( f"[green]:heavy_check_mark:[/green] Der Zufallsgenerator wurde erfolgreich mit " f"Wert '{user_seed}' initialisiert" ) if not click.confirm("Fortfahren?", default=True): return True return False def reseed() -> bool: console.print("Zunächst initialisieren wir den Zufallsgenerator erneut...") const.RNG.seed(user_seed) console.print( f"[green]:heavy_check_mark:[/green] Der Zufallsgenerator wurde erfolgreich mit " f"Wert '{user_seed}' initialisiert" ) if not click.confirm("Fortfahren?", default=True): return True return False def scenario_1(chain: blockchain.Blockchain) -> bool: console.rule("[bold]Szenario 1 - Datenvalidierung", style="yellow3") max_idx: int = 0 try: with engine.connect() as con: res = con.execute(sql.select(sql.func.max(db.sensor_data.c.Index))) max_idx = cast(int, res.scalar()) except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print(f"Die Datenbank mit den Sensorwerten hat insgesamt {max_idx} Einträge") if not click.confirm("Fortfahren?", default=True): return True chosen_idx = const.RNG.choice(range(1, max_idx)) try: stmt = sql.select(db.sensor_data).where(db.sensor_data.c.Index == chosen_idx) db_entry = pl.read_database( stmt, engine, schema_overrides=db.sensor_data_query_schema ) table = rich_table_from_db_entry( db_entry, title=f"Sensordaten zu Eintrag {chosen_idx}" ) except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"Zufällig ausgewählt wurde der Eintrag Nr. {chosen_idx} mit folgenden Eigenschaften:" ) console.print(table) if not click.confirm("Fortfahren?", default=True): return True try: df_hash = df_transform_hashing(db_entry) data_to_hash = df_hash.select("combined").item() sha256 = hashlib.sha256() sha256.update(data_to_hash.encode("UTF-8")) hashed_data_hex = sha256.hexdigest() except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"Die relevanten Eigenschaften kombinieren wir zu einem gemeinsamen " f"Informationsblock: [bold]{data_to_hash}[/bold]", highlight=False, ) console.print( f"Wir nutzen SHA256 als Hash-Algorithmus. Dies gibt uns den folgenden Hashwert zurück: " f"[bold]{hashed_data_hex}[/bold]", ) if not click.confirm("Fortfahren?", default=True): return True try: block_number = df_hash.select("Blockchain_Block_Number").item() py_block = chain.get_block(block_number) py_block_data = dc.asdict(py_block.as_dataclass()) py_block_data["Timestamp"] = py_block_data["Timestamp"].isoformat() py_block_json = json.dumps(py_block_data) py_block_data_hash = py_block_data["Data"] if py_block_data_hash != hashed_data_hex: raise RuntimeError("Hash values do not match, but they should. Data corrupted?") except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"Nun gleichen wir diesen Hashwert mit dem Wert aus der Blockchain ab. Die relevante " f"Block-Nummer erhalten wir aus der Datenbank (siehe Tabelle oben). Sie " f"lautet: {block_number}" ) if not click.confirm("Fortfahren?", default=True): return True console.print( "Wir fragen den Block mit dieser Nummer aus der Blockchain ab und " "erhalten dazu folgende Daten:" ) console.print_json(py_block_json) if not click.confirm("Fortfahren?", default=True): return True console.print( f"Der Hashwert der dazugehörigen Daten lautet also demnach: " f"[bold]{py_block_data_hash}[/bold]" ) if not click.confirm("Fortfahren?", default=True): return True console.print("Zum Vergleich hier nochmals beide Hashwerte zusammen:") console.print(f"Ermittelter Hashwert der Einträge:\t[bold]{hashed_data_hex}[/bold]") console.print(f"Hashwert der Blockchain:\t\t[bold]{py_block_data_hash}[/bold]") console.print( "[green3]Wir sehen, dass beide Werte übereinstimmen. Das bedeutet, dass der Datensatz " "integer ist und seit der Referenzierung in der Blockchain nicht verändert wurde." ) return False def scenario_2(chain: blockchain.Blockchain) -> bool: console.rule("[bold]Szenario 2 - Datengenerierung", style="yellow3") max_idx: int = 0 try: with engine.connect() as con: res = con.execute(sql.select(sql.func.max(db.sensor_data.c.Index))) max_idx = cast(int, res.scalar()) except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print(f"Die Datenbank mit den Sensorwerten hat insgesamt {max_idx} Einträge") if not click.confirm("Fortfahren?", default=True): return True console.print( "\nNun werden wir einen Datenbankeintrag zufällig generieren und abspeichern" ) try: sampled_entry = sample_entry(const.RNG) stmt_insert = sql.insert(db.sensor_data).values(sampled_entry) stmt_sel = sql.select(db.sensor_data).order_by(db.sensor_data.c.Index.desc()).limit(1) with engine.begin() as con: res = con.execute(stmt_insert, sampled_entry) if res.rowcount <= 0: raise RuntimeError("Database query not successful") db_entry = pl.read_database( stmt_sel, engine, schema_overrides=db.sensor_data_query_schema ) table = rich_table_from_db_entry(db_entry, title="Zufällig generierte Sensordaten") except Exception: console.print_exception(max_frames=20) sys.exit(1) max_idx: int = 0 try: with engine.connect() as con: res = con.execute(sql.select(sql.func.max(db.sensor_data.c.Index))) max_idx = cast(int, res.scalar()) except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print(f"Die Datenbank mit den Sensorwerten hat nun insgesamt {max_idx} Einträge") console.print(f"Eingefügt wurde der Eintrag Nr. {max_idx} mit folgenden Eigenschaften:") console.print(table) if not click.confirm("Fortfahren?", default=True): return True try: df_hash = df_transform_hashing(db_entry) data_to_hash = df_hash.select("combined").item() sha256 = hashlib.sha256() sha256.update(data_to_hash.encode("UTF-8")) hashed_data_hex = sha256.hexdigest() except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"Die relevanten Eigenschaften kombinieren wir zu einem gemeinsamen " f"Informationsblock: [bold]{data_to_hash}[/bold]", highlight=False, ) console.print( f"Wir nutzen SHA256 als Hash-Algorithmus. Dies gibt uns den folgenden Hashwert zurück: " f"[bold]{hashed_data_hex}[/bold]", ) if not click.confirm("Fortfahren?", default=True): return True console.print("\nDiesen erzeugten Hash werden wir nun in der Blockchain speichern") console.print( "\nHierzu muss ein neuer Block 'geschürft' werden. International " "wird deshalb auch von 'Mining' gesprochen." ) try: with console.status( "Mining: Schürfen eines neuen Blocks mit unseren Daten...", spinner="dots2" ): block_number = chain.new_block(data_to_hash) chain.save() except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( f"[green]:heavy_check_mark:[/green] Unsere Daten wurden erfolgreich im Block " f"mit der Nummer '{block_number}' gespeichert" ) if not click.confirm("Fortfahren?", default=True): return True console.print( "Diese Information müssen wir noch in der Datenbank ablegen, damit sie " "für einen späteren Abgleich zur Verfügung steht." ) try: stmt = ( sql.update(db.sensor_data) .where(db.sensor_data.c.Index == sql.bindparam("idx")) .values(Blockchain_Block_Number=sql.bindparam("bc_num")) ) with engine.begin() as con: res = con.execute(stmt, {"idx": max_idx, "bc_num": block_number}) if res.rowcount <= 0: raise RuntimeError("Database query not successful") stmt_sel = sql.select(db.sensor_data).where(db.sensor_data.c.Index == max_idx) db_entry = pl.read_database( stmt_sel, engine, schema_overrides=db.sensor_data_query_schema ) table = rich_table_from_db_entry(db_entry, title=f"Sensordaten zu Eintrag {max_idx}") except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print("Unsere Datenbank enthält zu diesem Eintrag nun folgende Informationen:") console.print(table) console.print("Die Nummer des relevanten Blocks ist nun ebenfalls gespeichert.") if not click.confirm("Fortfahren?", default=True): return True try: py_block = chain.get_block(block_number) py_block_data = dc.asdict(py_block.as_dataclass()) py_block_data["Timestamp"] = py_block_data["Timestamp"].isoformat() py_block_json = json.dumps(py_block_data) py_block_data_hash = py_block_data["Data"] if py_block_data_hash != hashed_data_hex: raise RuntimeError("Hash values do not match, but they should. Data corrupted?") except Exception: console.print_exception(max_frames=20) sys.exit(1) console.print( "Nun ermitteln wir den Hashwert, der in der Blockchain gespeichert wurde. " "Die relevante Block-Nummer haben wir bereits erhalten. Sie " f"lautet: {block_number}" ) console.print( "Wir fragen den Block mit dieser Nummer aus der Blockchain ab und " "erhalten dazu folgende Daten:" ) console.print_json(py_block_json) if not click.confirm("Fortfahren?", default=True): return True console.print( f"Der Hashwert der dazugehörigen Daten lautet also demnach: " f"[bold]{py_block_data_hash}[/bold]" ) if not click.confirm("Fortfahren?", default=True): return True console.print( "Nun gleichen wir den Hashwert unserer Daten mit dem Hashwert aus der Blockchain ab." ) console.print(f"Ermittelter Hashwert der Einträge:\t[bold]{hashed_data_hex}[/bold]") console.print(f"Hashwert der Blockchain:\t\t[bold]{py_block_data_hash}[/bold]") console.print( "[green3]Wir sehen, dass beide Werte übereinstimmen. Das bedeutet, dass der Datensatz " "erfolgreich abgespeichert wurde und für eine zukünftige Prüfung zur Verfügung steht." ) return False # %% # // app start def app() -> bool: console.rule("[bold]Willkommen beim POLLU-BLOCK-Demonstrator!", style="yellow3") console.print( "Diese Anwendung zeigt Ihnen, wie eine Blockchain als " "Werkzeug zur Integritätsprüfung gespeicherter Daten eingesetzt werden kann. " "Hierfür wurden die im Rahmen des Projekts gewonnenen Sensordaten in eine Datenbank " "integriert und mit einer eigenen lokalen Blockchain verbunden. Mithilfe der " "Blockchain kann geprüft werden, ob die Einträge der Datenbank geändert wurden.\n" "Die Blockchain verwendet Proof-of-Work (PoW) als Konsensus-Algorithmus." ) if not click.confirm("Fortfahren?", default=True): return True console.print( "\nVor dem Start der Anwendung müssen noch ein paar Dinge im Hintergrund " "erledigt werden..." ) chain = blockchain_init() aborted = start_seeding() if aborted: return True console.rule("\n[bold]Szenario-Auswahl", style="yellow3") console.print("Dieser Demonstrator erlaubt die Wahl zwischen zwei Anwendungsszenarien:") console.print( " • [bold]Szenario 1: Integritätsprüfung einer bereits verfügbaren Datenreihe[/bold]\n" " Es wird ein Datenbankeintrag ausgewählt und die Integritätsprüfung\n" " in einem geführten Prozess veranschaulicht." ) console.print( " • [bold]Szenario 2: Generierung einer neuen Datenreihe[/bold]\n" " Es wird ein neuer zufälliger Datenbankeintrag generiert und in der\n" " Blockchain hinzugefügt. Dabei wird live ein neuer Block generiert.\n" " Anschließend wird anhand des Hashwerts der Datenreihe gezeigt, dass die\n" " Daten erfolgreich in der Blockchain hinterlegt wurden und für eine\n" " Integritätsprüfung wie in Szenario 1 zur Verfügung stehen." ) console.print( "\nBitte wählen Sie das gewünschte Anwendungsszenario, indem Sie [bold]'1'[/bold] " "oder [bold]'2'[/bold] eintippen..." ) scenario: int = 0 while scenario == 0: try: user_input = console.input("[italic]Wählen Sie das Szenario: ") if user_input.strip(" ") not in ("1", "2"): console.print("Es sind nur '1' und '2' als Eingabe zulässig") continue scenario = int(user_input) except Exception: console.print_exception(max_frames=20) sys.exit(1) shutdown: bool = False init_state: bool = True while not shutdown: if scenario == types.ApplicationScenarios.DATA_VALIDATION: console.print( "[green]:heavy_check_mark:[/green] Sie haben Szenario 1 " "'Datenvalidierung' gewählt" ) if not init_state: if click.confirm( "Soll der Zufallsgenerator erneut initialisiert werden?", default=False ): reseed() init_state = False scenario_finished: bool = False while not scenario_finished: scenario_1(chain) scenario_finished = not click.confirm( "Möchten Sie einen weiteren Datensatz analysieren?" ) elif scenario == types.ApplicationScenarios.DATA_GENERATION: console.print( "[green]:heavy_check_mark:[/green] Sie haben Szenario 2 " "'Datengenerierung' gewählt" ) if not init_state: if click.confirm( "Soll der Zufallsgenerator erneut initialisiert werden?", default=False ): reseed() init_state = False scenario_finished: bool = False while not scenario_finished: scenario_2(chain) scenario_finished = not click.confirm( "Möchten Sie einen weiteren Datensatz generieren?" ) shutdown = not click.confirm("Möchten Sie das Szenario wechseln?") if not shutdown and scenario == types.ApplicationScenarios.DATA_VALIDATION: scenario = types.ApplicationScenarios.DATA_GENERATION elif not shutdown and scenario == types.ApplicationScenarios.DATA_GENERATION: scenario = types.ApplicationScenarios.DATA_VALIDATION return False def main() -> None: aborted: bool = False try: aborted = app() except (KeyboardInterrupt, click.exceptions.Abort): console.print("\n[italic]Die Anwendung wurde durch den Nutzer beendet.") sys.exit(1) if aborted: console.print("\n[italic]Die Anwendung wurde durch den Nutzer beendet.") sys.exit(1) console.print("\n[italic]Die Anwendung wurde beendet.") sys.exit(0) if __name__ == "__main__": main() # %%