baseline for app

This commit is contained in:
Florian Förster 2025-12-17 16:33:03 +01:00
parent 156f967a64
commit 79551c33f8

View File

@ -0,0 +1,366 @@
# %%
import dataclasses as dc
import hashlib
import json
import sys
from pathlib import Path
from typing import cast
import click
import polars as pl
import rich.box
import rich.table
import sqlalchemy as sql
from dopt_basics import io
from rich.console import Console
from dopt_pollublock_blockchain import blockchain, db, types
from dopt_pollublock_blockchain import constants as const
console = Console()
p_base = io.search_folder_path(Path(__file__), "src", return_inclusive=False)
assert p_base is not None
p_bc_db = p_base / "prototypes/blockchain.db"
assert p_bc_db.exists(), "blockchain data path not found"
p_sensor_data = p_base / "prototypes/pollublock_data.db"
assert p_sensor_data.exists(), "sensor data path not found"
engine = sql.create_engine(f"sqlite:///{str(p_sensor_data)}")
# %%
# // common
def print_exception(
console: Console,
error_txt: str,
exc: Exception,
) -> None:
console.print(f":warning: [bold bright_red]{error_txt}")
console.print(f":warning: [bold bright_red]Details:\n{exc}")
def rich_table_from_db_entry(df: pl.DataFrame, title: str) -> rich.table.Table:
column_mapping: dict[str, str] = {
"Datetime": "Zeitstempel",
"Temperature_(Celsius)": "Temperatur (°C)",
"Pressure_(Pa)": "Druck (Pa)",
"Air_Quantity_(Percent)": "Luftmenge (%)",
"Blockchain_Block_Number": "Block-Nr. Blockchain",
}
q = (
df.lazy()
.with_columns(pl.col("Datetime").dt.replace_time_zone("UTC"))
.with_columns(
pl.col("Index").cast(pl.String),
pl.col("Datetime").dt.to_string("iso:strict"),
pl.col("Temperature_(Celsius)").map_elements(
lambda x: f"{x:.2f}", return_dtype=pl.String
),
pl.col("Pressure_(Pa)").map_elements(
lambda x: f"{x}".zfill(5), return_dtype=pl.String
),
pl.col("Air_Quantity_(Percent)").map_elements(
lambda x: f"{x:.14f}", return_dtype=pl.String
),
pl.col("Blockchain_Block_Number").cast(pl.String),
)
.rename(column_mapping)
)
df = q.collect()
table = rich.table.Table(title=title, box=rich.box.ROUNDED)
for col in df.columns:
table.add_column(col)
table.add_row(*df.row(0))
return table
def df_transform_hashing(
df: pl.DataFrame,
) -> pl.DataFrame:
q = (
df.lazy()
.with_columns(pl.col("Datetime").dt.replace_time_zone("UTC"))
.with_columns(
pl.col("Datetime").dt.to_string("iso:strict").alias("timestamp_str"),
pl.col("Temperature_(Celsius)")
.map_elements(lambda x: f"{x:.2f}", return_dtype=pl.String)
.alias("temp"),
pl.col("Pressure_(Pa)")
.map_elements(lambda x: f"{x}".zfill(5), return_dtype=pl.String)
.alias("pressure"),
pl.col("Air_Quantity_(Percent)")
.map_elements(lambda x: f"{x:.14f}", return_dtype=pl.String)
.alias("air"),
)
.with_columns(
(
pl.col("timestamp_str")
+ ","
+ pl.col("temp")
+ ","
+ pl.col("pressure")
+ ","
+ pl.col("air")
).alias("combined")
)
.select(["Index", "combined", "Blockchain_Block_Number"])
)
return q.collect()
# // blockchain
def blockchain_init() -> blockchain.Blockchain:
console.print("Einlesen der Blockchain...")
try:
chain = blockchain.Blockchain(p_bc_db)
chain.difficulty = const.BLOCKCHAIN_DIFFICULTY
chain.load(const.BLOCKCHAIN_LOADING_BATCH_SIZE)
if not chain.genesis_done:
raise RuntimeError("Blockchain enthält keinen Genesis-Block")
console.print("Einlesen der Blockchain erfolgreich")
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print("Validiere Blockchain...")
console.print("Prüfe Hashwerte und korrekte Verkettung...")
try:
success = chain.validate()
if not success:
raise RuntimeError(
"Blockchain konnte nicht validiert werden. "
"Die Daten scheinen nicht integer zu sein."
)
console.print("Validierung der Blockchain erfolgreich")
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
return chain
# // begin of both scenarios
def start_seeding() -> None:
console.print(
"\nWir arbeiten mit zufälligen Werten. Damit die Ergebnisse trotzdem reproduzierbar "
"sind, nutzen wir einen Initialwert für den Zufallsgenerator, den Sie "
"bestimmen können."
)
rng_seed: int = 0
while rng_seed == 0:
try:
user_input = console.input("[italic]Wählen Sie einen Wert zwischen 1 und 100: ")
if user_input.strip(" ") not in const.RNG_SEED_RANGE:
console.print("Es sind nur Werte zwischen '1' und '100' als Eingabe zulässig")
continue
rng_seed = int(user_input)
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print(f"Sie haben '{rng_seed}' gewählt")
const.RNG.seed(rng_seed)
console.print(
f"[green]:heavy_check_mark:[/green] Der Zufallsgenerator wurde erfolgreich mit "
f"Wert '{rng_seed}' initialisiert"
)
click.confirm("Fortfahren?", default=True)
def scenario_1(chain: blockchain.Blockchain) -> None:
console.rule("[bold]Szenario 1 - Datenvalidierung", style="yellow3")
max_idx: int = 0
try:
with engine.connect() as con:
res = con.execute(sql.select(sql.func.max(db.sensor_data.c.Index)))
max_idx = cast(int, res.scalar())
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print(f"Die Datenbank mit den Sensorwerten hat insgesamt {max_idx} Einträge")
click.confirm("Fortfahren?", default=True)
chosen_idx = const.RNG.choice(range(1, max_idx))
try:
stmt = sql.select(db.sensor_data).where(db.sensor_data.c.Index == chosen_idx)
db_entry = pl.read_database(
stmt, engine, schema_overrides=db.sensor_data_query_schema
)
table = rich_table_from_db_entry(
db_entry, title=f"Sensordaten zu Eintrag {chosen_idx}"
)
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print(
f"Zufällig ausgewählt wurde der Eintrag Nr. {chosen_idx} mit folgenden Eigenschaften:"
)
console.print(table)
click.confirm("Fortfahren?", default=True)
try:
df_hash = df_transform_hashing(db_entry)
data_to_hash = df_hash.select("combined").item()
sha256 = hashlib.sha256()
sha256.update(data_to_hash.encode("UTF-8"))
hashed_data_hex = sha256.hexdigest()
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print(
f"Die relevanten Eigenschaften kombinieren wir zu einem gemeinsamen "
f"Informationsblock: [bold]{data_to_hash}[/bold]",
highlight=False,
)
console.print(
f"Wir nutzen SHA256 als Hash-Algorithmus. Dies gibt uns den folgenden Hashwert zurück: "
f"[bold]{hashed_data_hex}[/bold]",
)
click.confirm("Fortfahren?", default=True)
try:
block_number = df_hash.select("Blockchain_Block_Number").item()
py_block = chain.get_block(block_number)
py_block_data = dc.asdict(py_block.as_dataclass())
py_block_data["Timestamp"] = py_block_data["Timestamp"].isoformat()
py_block_json = json.dumps(py_block_data)
py_block_data_hash = py_block_data["Data"]
if py_block_data_hash != hashed_data_hex:
raise RuntimeError("Hash values do not match, but they should. Data corrupted?")
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
console.print(
f"Nun gleichen wir diesen Hashwert mit dem Wert aus der Blockchain ab. Die relevante "
f"Block-Nummer erhalten wir aus der Datenbank (siehe Tabelle oben). Sie "
f"lautet: {block_number}"
)
click.confirm("Fortfahren?", default=True)
console.print(
"Wir fragen den Block mit dieser Nummer aus der Blockchain ab und "
"erhalten dazu folgende Daten:"
)
console.print_json(py_block_json)
click.confirm("Fortfahren?", default=True)
console.print(
f"Der Hashwert der dazugehörigen Daten lautet also demnach: "
f"[bold]{py_block_data_hash}[/bold]"
)
click.confirm("Fortfahren?", default=True)
console.print("Zum Vergleich hier nochmals beide Hashwerte zusammen:")
console.print(f"Ermittelter Hashwert der Einträge:\t[bold]{hashed_data_hex}[/bold]")
console.print(f"Hashwert der Blockchain:\t\t[bold]{py_block_data_hash}[/bold]")
console.print(
"[green3]Wir sehen, dass beide Werte übereinstimmen. Das bedeutet, dass der Datensatz "
"integer ist und seit der Referenzierung in der Blockchain nicht verändert wurde."
)
# %%
# // app start
def app() -> None:
console.rule("[bold]Willkommen beim POLLU-BLOCK-Demonstrator!", style="yellow3")
console.print(
"Diese Anwendung zeigt Ihnen, wie eine Blockchain als "
"Werkzeug zur Integritätsprüfung gespeicherter Daten eingesetzt werden kann. "
"Hierfür wurden die im Rahmen des Projekts gewonnenen Sensordaten in eine Datenbank "
"integriert und mit einer eigenen lokalen Blockchain verbunden. Mithilfe der "
"Blockchain kann geprüft werden, ob die Einträge der Datenbank geändert wurden.\n"
"Die Blockchain verwendet Proof-of-Work (PoW) als Konsensus-Algorithmus."
)
click.confirm("Fortfahren?", default=True)
console.print(
"\nVor dem Start der Anwendung müssen noch ein paar Dinge im Hintergrund "
"erledigt werden..."
)
chain = blockchain_init()
start_seeding()
console.rule("\n[bold]Szenario-Auswahl", style="yellow3")
console.print("Dieser Demonstrator erlaubt die Wahl zwischen zwei Anwendungsszenarien:")
console.print(
" • [bold]Szenario 1: Integritätsprüfung einer bereits verfügbaren Datenreihe[/bold]\n"
" Es wird ein Datenbankeintrag ausgewählt und die Integritätsprüfung\n"
" in einem geführten Prozess veranschaulicht."
)
console.print(
" • [bold]Szenario 2: Generierung einer neuen Datenreihe[/bold]\n"
" Es wird ein neuer zufälliger Datenbankeintrag generiert und in der\n"
" Blockchain hinzugefügt. Dabei wird live ein neuer Block generiert.\n"
" Anschließend wird anhand des Hashwerts der Datenreihe gezeigt, dass die\n"
" Daten erfolgreich in der Blockchain hinterlegt wurden und für eine\n"
" Integritätsprüfung wie in Szenario 1 zur Verfügung stehen."
)
console.print(
"\nBitte wählen Sie das gewünschte Anwendungsszenario, indem Sie '1' oder '2' eintippen..."
)
scenario: int = 0
while scenario == 0:
try:
user_input = console.input("[italic]Wählen Sie das Szenario: ")
if user_input.strip(" ") not in ("1", "2"):
console.print("Es sind nur '1' und '2' als Eingabe zulässig")
continue
scenario = int(user_input)
except Exception:
console.print_exception(max_frames=20)
sys.exit(1)
shutdown: bool = False
while not shutdown:
if scenario == types.ApplicationScenarios.DATA_VALIDATION:
console.print(
"[green]:heavy_check_mark:[/green] Sie haben Szenario 1 "
"'Datenvalidierung' gewählt"
)
scenario_finished: bool = False
while not scenario_finished:
scenario_1(chain)
scenario_finished = not click.confirm(
"Möchten Sie einen weiteren Datensatz analysieren?"
)
elif scenario == types.ApplicationScenarios.DATA_GENERATION:
console.print(
"[green]:heavy_check_mark:[/green] Sie haben Szenario 2 "
"'Datengenerierung' gewählt"
)
raise NotImplementedError("Scenario 2 not implemented yet")
shutdown = not click.confirm("Möchten Sie das Szenario wechseln?")
if not shutdown and scenario == types.ApplicationScenarios.DATA_VALIDATION:
scenario = types.ApplicationScenarios.DATA_GENERATION
elif not shutdown and scenario == types.ApplicationScenarios.DATA_GENERATION:
scenario = types.ApplicationScenarios.DATA_VALIDATION
sys.exit(0)
def main() -> None:
try:
app()
except (KeyboardInterrupt, click.exceptions.Abort):
console.print("\n[italic]Die Anwendung wurde durch den Nutzer beendet.")
if __name__ == "__main__":
main()
# %%