major refactoring inlcuding renaming

This commit was merged in pull request #1.
This commit is contained in:
2025-12-16 13:43:48 +01:00
parent 05fa8548b0
commit d155d1afad
16 changed files with 27004 additions and 1771 deletions

View File

@@ -0,0 +1,16 @@
// block.hpp
#pragma once
#include <string>
struct Block {
uint64_t index;
double timestamp;
uint64_t nonce;
std::string data;
std::string prev_hash;
std::string hash;
Block() : index(0), timestamp(0), nonce(0), data(), prev_hash(), hash() {}
Block(uint64_t idx, uint64_t ts, uint64_t n, const std::string& d, const std::string& pre_h, const std::string& h)
: index(idx), timestamp(ts), nonce(n), data(d), prev_hash(pre_h), hash(h) {}
};

View File

@@ -0,0 +1,14 @@
from libcpp.string cimport string
from libc cimport stdint
cdef extern from "block.hpp" namespace "":
cdef cppclass Block:
stdint.uint64_t index
stdint.uint64_t timestamp
stdint.uint64_t nonce
string data
string prev_hash
string hash
Block() except +
Block(stdint.uint64_t, double, stdint.uint64_t, string, string, string) except +

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,630 @@
# distutils: language = c++
"""placeholder module for compilation"""
from __future__ import annotations
import datetime
from pathlib import Path
import dopt_basics.datetime
import sqlalchemy as sql
from dopt_pollublock_blockchain import db
from dopt_pollublock_blockchain.block cimport Block
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector
from libcpp.string cimport string
from libc.stdint cimport uint64_t
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
from cython.operator import postincrement, dereference
cimport dopt_pollublock_blockchain.openssl_evp as ossl
ctypedef unsigned long ULong
ctypedef unordered_map[uint64_t, Block*] BcHashmap
cdef const size_t NONCE_OFFSET = <size_t>16
cdef timestamp_to_datetime(uint64_t ts):
return datetime.datetime.fromtimestamp(float(ts), dopt_basics.datetime.TIMEZONE_UTC)
cdef uint64_t current_timestamp_integer():
cdef uint64_t ts
dt = dopt_basics.datetime.current_time_tz(cut_microseconds=True)
ts = <uint64_t>int(dt.timestamp())
return ts
cdef int serialize_uint32(unsigned char* out, unsigned int v) except -1 nogil:
out[0] = (v >> 24) & 0xFF
out[1] = (v >> 16) & 0xFF
out[2] = (v >> 8) & 0xFF
out[3] = v & 0xFF
cdef int serialize_uint64(unsigned char* out, unsigned long long v) except -1 nogil:
out[0] = (v >> 56) & 0xFF
out[1] = (v >> 48) & 0xFF
out[2] = (v >> 40) & 0xFF
out[3] = (v >> 32) & 0xFF
out[4] = (v >> 24) & 0xFF
out[5] = (v >> 16) & 0xFF
out[6] = (v >> 8) & 0xFF
out[7] = v & 0xFF
cdef inline bint has_leading_zero_bits(const unsigned char *digest, int num_bits) nogil:
cdef int i, full_bytes = num_bits // 8
cdef int rem_bits = num_bits % 8
for i in range(full_bytes):
if digest[i] != 0:
return False
if rem_bits:
if digest[full_bytes] >> (8 - rem_bits) != 0:
return False
return True
cdef inline bint has_leading_zero_bytes(const unsigned char *digest, int num_bytes) nogil:
cdef int i, full_bytes = num_bytes // 8
cdef int rem_bits = num_bytes % 8
for i in range(num_bytes):
if digest[i] != 0:
return False
return True
cdef class PyBlock:
cdef:
Block *BlockC
bint ptr_owner
def __cinit__(self,
index,
nonce,
data,
previous_hash,
from_ptr=False,
):
self.ptr_owner = False
if not from_ptr:
self.BlockC = new Block(
index,
current_timestamp_integer(),
nonce,
data.encode("UTF-8"),
previous_hash.encode("UTF-8"),
"".encode("UTF-8"),
)
if self.BlockC is NULL:
raise MemoryError()
self.ptr_owner = True
def __init__(self, *args, **kwargs):
pass
def __dealloc__(self):
if self.BlockC is not NULL and self.ptr_owner is True:
del self.BlockC
self.BlockC = NULL
@staticmethod
cdef PyBlock from_ptr(Block *block, bint owner=False):
cdef PyBlock py_block = PyBlock.__new__(
PyBlock,
block.index,
block.nonce,
block.data,
block.prev_hash,
True,
)
py_block.BlockC = block
py_block.ptr_owner = owner
return py_block
def __repr__(self):
return (
f"PyBlock(\n\tIndex:\t\t{self.index}\n\ttimestamp:\t{self.timestamp}\n\tnonce:\t\t{self.nonce}\n\t"
f"Prev Hash:\t{self.prev_hash}\n\tHash:\t\t{self.hash}\n\tData:\t\t{self.data}\n)"
)
def __str__(self):
return self.__repr__()
def serialize_dict(self):
contents = {}
contents["index"] = self.BlockC.index
contents["timestamp"] = self.BlockC.timestamp
contents["nonce"] = self.BlockC.nonce
contents["previous_hash"] = self.prev_hash
contents["hash"] = self.hash
contents["data"] = self.data
return contents
# Python public API
@property
def index(self):
return self.BlockC.index
@property
def timestamp(self):
return timestamp_to_datetime(self.BlockC.timestamp)
@property
def data(self):
return self.BlockC.data.decode("UTF-8")
@property
def prev_hash(self):
return self.BlockC.prev_hash.decode("UTF-8")
@property
def nonce(self):
return self.BlockC.nonce
@property
def hash(self):
return self.BlockC.hash.decode("UTF-8")
def bytes_serialize(self):
cdef:
unsigned char *serialize_res
size_t serialize_size
try:
serialize_res = bytes_serialize_c(self.BlockC, &serialize_size)
return serialize_res[:serialize_size]
finally:
free(serialize_res)
def perform_hash(self):
cdef:
unsigned char *digest
size_t digest_size
try:
digest = perform_hash_c(self.BlockC, &digest_size)
if digest is NULL:
raise MemoryError()
# TODO out: hash assignment in blockchain
self.BlockC.hash = bytes(digest[:digest_size]).hex().encode("UTF-8")
finally:
free(digest)
# TODO rework
return self.hash
cdef unsigned char* bytes_serialize_c(Block *block, size_t *size) nogil:
cdef:
size_t total_len
unsigned char* buf
size_t pos = 0
# index (8), timestamp (8), nonce (8), data, prev_hash
size[0] = (
<size_t>(8 * 3) +
block.data.size() +
block.prev_hash.size()
)
buf = <unsigned char*>malloc(size[0] * sizeof(unsigned char))
if buf is NULL:
return NULL
serialize_uint64(buf + pos, block.index)
pos += 8
serialize_uint64(buf + pos, block.timestamp)
pos += 8
serialize_uint64(buf + pos, block.nonce)
pos += 8
# Copy data
memcpy(
buf + pos,
block.data.c_str(),
block.data.size(),
)
pos += block.data.size()
# Copy prev_hash
memcpy(
buf + pos,
block.prev_hash.c_str(),
block.prev_hash.size(),
)
pos += block.prev_hash.size()
return buf
cdef unsigned char* SHA256_digest(const void *data, size_t data_size, size_t *digest_size) nogil:
cdef ossl.EVP_MD_CTX *ctx = ossl.EVP_MD_CTX_new()
if ctx is NULL:
return NULL
cdef const ossl.EVP_MD *algo = ossl.EVP_sha256()
if algo is NULL:
return NULL
cdef:
unsigned char* digest
size_t dig_buff_len
unsigned int digest_len
dig_buff_len = <size_t>ossl.EVP_MD_size(algo)
digest_size[0] = dig_buff_len
digest = <unsigned char*>malloc(dig_buff_len * sizeof(unsigned char))
ossl.EVP_DigestInit_ex(ctx, algo, NULL)
ossl.EVP_DigestUpdate(ctx, data, data_size)
ossl.EVP_DigestFinal_ex(ctx, digest, &digest_len)
ossl.EVP_MD_CTX_free(ctx)
return digest
cdef unsigned char* perform_hash_c(Block *block, size_t *digest_size) nogil:
cdef:
unsigned char *serialize_res
size_t serialize_size
unsigned char *digest
serialize_res = bytes_serialize_c(block, &serialize_size)
if serialize_res is NULL:
return NULL
digest = SHA256_digest(serialize_res, serialize_size, digest_size)
free(serialize_res)
if digest is NULL:
return NULL
return digest
cdef int mine_block(Block *block, unsigned int difficulty, uint64_t *nonce_solution, unsigned int max_nonce=0xFFFFFFFF) nogil:
cdef:
unsigned char *serial_buf
size_t serialize_size
unsigned char *digest
size_t digest_size
bint nonce_found = False
int nonce
serial_buf = bytes_serialize_c(block, &serialize_size)
with nogil:
for nonce in range(max_nonce):
serialize_uint64(serial_buf + NONCE_OFFSET, <uint64_t>nonce)
digest = SHA256_digest(serial_buf, serialize_size, &digest_size)
if has_leading_zero_bits(digest, difficulty):
nonce_found = True
nonce_solution[0] = nonce
break
free(digest)
free(serial_buf)
if not nonce_found:
return 1
return 0
cdef class Blockchain:
cdef:
unsigned int _difficulty
uint64_t _index
BcHashmap *_chain_map
vector[Block*] *_chain
bint _genesis_done
bint _loaded
readonly object db_path
readonly object _engine
def __cinit__(self, *args, **kwargs):
self._difficulty = 26
self._index = <uint64_t>0
self._genesis_done = False
self._loaded = False
self._chain_map = new unordered_map[uint64_t, Block*]()
self._chain = new vector[Block*]()
if self._chain_map is NULL:
raise MemoryError("Could not allocate hasmap")
if self._chain is NULL:
raise MemoryError("Could not allocate vector")
def __init__(self, db_path):
self.db_path = Path(db_path).resolve()
if not self.db_path.parent.exists():
raise FileNotFoundError(
"The parent directory of the provided database path does not exist"
)
self._engine = sql.create_engine(f"sqlite:///{str(self.db_path)}")
db.metadata_blockchain.create_all(self._engine)
def __dealloc__(self):
# ownership is typically not transferred from the Blockchain extension class
cdef BcHashmap.iterator it = self._chain_map.begin()
if self._chain_map is not NULL:
while it != self._chain_map.end():
del dereference(it).second
postincrement(it)
del self._chain_map
self._chain_map = NULL
if self._chain is not NULL:
del self._chain
self._chain = NULL
cdef Block* get_block_c(self, uint64_t idx) nogil:
if idx > self._index:
return NULL
return self._chain_map[0][idx]
cdef void add_block_from_loading(self, Block *block) nogil:
self._chain[0].push_back(block)
self._chain_map[0][block.index] = block
self._index = block.index
if not self._genesis_done:
self._genesis_done = True
cdef int add_block(self, Block *block) nogil:
cdef:
uint64_t mined_nonce
size_t digest_size
unsigned char *sha256_digest
# mine block
if mine_block(block, self._difficulty, &mined_nonce) != 0:
return 1
block.nonce = mined_nonce
# hash block, add hash to block, add block to blockchain hashmap
sha256_digest = perform_hash_c(block, &digest_size)
with gil:
block.hash = bytes(sha256_digest[:digest_size]).hex().encode("UTF-8")
free(sha256_digest)
self._chain[0].push_back(block)
self._chain_map[0][block.index] = block
if self._genesis_done:
self._index += 1
return 0
cdef string hash_data(self, data):
cdef:
string data_str
unsigned char *data_digest
size_t digest_size
data_str = data.encode("UTF-8")
data_digest = SHA256_digest(data_str.c_str(), data_str.size(), &digest_size)
if data_digest is NULL:
raise RuntimeError("Failed to hash data")
data_str = bytes(data_digest[:digest_size]).hex().encode("UTF-8")
free(data_digest)
return data_str
cdef load_from_batch(self, batch):
cdef Block *block
for entry in batch:
block = new Block(
entry[0],
entry[1],
entry[2],
entry[5].encode("UTF-8"),
entry[3].encode("UTF-8"),
entry[4].encode("UTF-8"),
)
self.add_block_from_loading(block)
# // Python public API
def __len__(self):
return self._index + 1
@property
def difficulty(self):
return self._difficulty
@difficulty.setter
def difficulty(self, value):
if not isinstance(value, int):
raise TypeError("Difficulty must be integer value")
if value <= 0:
raise ValueError("Difficulty must be greater than 0")
self._difficulty = value
@property
def genesis_done(self):
return self._genesis_done
@property
def index(self):
return self._index
def _print_key_value_pair(self):
cdef BcHashmap.iterator it = self._chain_map.begin()
cdef Block *block
while it != self._chain_map.end():
print(dereference(it).first)
block = dereference(it).second
py_block = PyBlock.from_ptr(block)
print(py_block)
postincrement(it)
def print_blocks(self, max_num):
cdef:
Block *block
int max_nummber = max_num
int idx, num = 0
if max_num <= 0:
raise ValueError("Maximum number must be greater than 0")
for idx in range(self._chain[0].size()):
block = self._chain[0][idx]
py_block = PyBlock.from_ptr(block)
print(py_block)
num += 1
if num == max_nummber:
break
def get_block(self, idx):
if idx < 0 or idx > self._index:
raise IndexError("Index value is out of bounds")
cdef Block *block = self.get_block_c(idx)
if block is NULL:
raise IndexError("Provided index not found")
return PyBlock.from_ptr(block, owner=False)
def create_genesis_block(self):
if self._genesis_done:
raise RuntimeError(
("Blockchain already has a genesis block. "
"Either it was created or loaded.")
)
genesis_prev_hash = ("0" * 64).encode("UTF-8")
cdef string data_str = self.hash_data("Genesis Block")
cdef Block *block = new Block(
self._index,
current_timestamp_integer(),
0,
data_str,
genesis_prev_hash,
"".encode("UTF-8"),
)
cdef int res = self.add_block(block)
if res != 0:
raise RuntimeError("Could not mine block. No nonce found")
self._genesis_done = True
def new_block(self, data):
cdef:
Block *prev_block
string prev_hash
uint64_t new_idx
string data_str
unsigned char *data_digest
size_t digest_size
if not self._genesis_done:
raise RuntimeError("Create a genesis block first.")
if not isinstance(data, str):
raise TypeError("Data must be a string")
data_str = self.hash_data(data)
prev_block = self.get_block_c(self._index)
prev_hash = prev_block.hash
new_idx = self._index + 1
cdef Block *block = new Block(
new_idx,
current_timestamp_integer(),
0,
data_str,
prev_hash,
"".encode("UTF-8"),
)
cdef int res = self.add_block(block)
if res != 0:
raise RuntimeError("Could not mine block. No nonce found")
def validate(self):
cdef:
Block *block
Block *prev_block = NULL
int idx = 0
unsigned char *digest
size_t digest_size
for idx in range(self._chain[0].size()):
block = self._chain[0][idx]
py_bytes = bytes.fromhex(block.hash.decode("UTF-8"))
digest = perform_hash_c(block, &digest_size)
py_bytes_rehashed = bytes(digest[:digest_size])
free(digest)
if py_bytes != py_bytes_rehashed:
print(f"Index {idx}: Hashes to not match. Abort.")
return False
if prev_block is not NULL:
if prev_block.hash != block.prev_hash:
print(
(f"Index {idx}: Hash mismatch. Hash of previous block does not "
"match the saved one in the current block. Abort.")
)
return False
prev_block = block
return True
def get_saving_entries(self, max_idx):
entries = []
cdef:
Block *block
int idx = 0
int _max_idx
if max_idx is None:
_max_idx = -1
else:
_max_idx = max_idx
for idx in range(self._chain[0].size()):
if idx <= _max_idx:
continue
block = self._chain[0][idx]
contents = {}
contents["index"] = block.index
contents["timestamp"] = block.timestamp
contents["nonce"] = block.nonce
contents["previous_hash"] = block.prev_hash.decode("UTF-8")
contents["hash"] = block.hash.decode("UTF-8")
contents["data"] = block.data.decode("UTF-8")
entries.append(contents)
return entries
def save(self):
# get max index from db
# only retrieve indices greater than max value
stmt = sql.select(sql.func.max(db.blocks.c.index))
with self._engine.connect() as conn:
result = conn.execute(stmt)
max_value = result.scalar()
entries = self.get_saving_entries(max_value)
if not entries:
return
with self._engine.begin() as con:
con.execute(sql.insert(db.blocks), entries)
def close_db_connections(self):
self._engine.dispose()
def load(self, batch_size):
if self._loaded:
raise RuntimeError("Blockchain was already loaded")
with self._engine.connect() as con:
res = con.execute(sql.select(db.blocks).order_by(db.blocks.c.index.asc()))
for batch in res.partitions(batch_size):
self.load_from_batch(batch)
self._loaded = True

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import sqlalchemy as sql
metadata_blockchain: sql.MetaData = sql.MetaData()
blocks = sql.Table(
"blocks",
metadata_blockchain,
sql.Column("index", sql.BigInteger, primary_key=True),
sql.Column("timestamp", sql.BigInteger, nullable=False),
sql.Column("nonce", sql.BigInteger, nullable=False),
sql.Column("previous_hash", sql.String(64), nullable=False),
sql.Column("hash", sql.String(64), nullable=False),
sql.Column("data", sql.String(64), nullable=False),
)

View File

@@ -0,0 +1,14 @@
cdef extern from "openssl/evp.h":
ctypedef struct EVP_MD_CTX:
pass
ctypedef struct EVP_MD:
pass
EVP_MD_CTX *EVP_MD_CTX_new() nogil
void EVP_MD_CTX_free(EVP_MD_CTX *) nogil
const EVP_MD *EVP_sha256() nogil
int EVP_DigestInit_ex(EVP_MD_CTX *ctx, const EVP_MD *type, void *impl) nogil
int EVP_DigestUpdate(EVP_MD_CTX *ctx, const void *d, size_t cnt) nogil
int EVP_DigestFinal_ex(EVP_MD_CTX *ctx, unsigned char *md, unsigned int *s) nogil
int EVP_MD_size(const EVP_MD *md) nogil
int EVP_MD_CTX_reset(EVP_MD_CTX *ctx) nogil

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,228 @@
"""placeholder module for compilation"""
from __future__ import annotations
import datetime
import hashlib
import struct
import time
import cython
import dopt_basics.datetime
# from cython.cimports.cpython.ref import Py_INCREF, PyObject
PyStringC = cython.struct(
ptr=cython.p_char,
length=cython.Py_ssize_t,
)
@cython.cfunc
def PyStringC_to_unicode(
string: PyStringC,
) -> str:
return string.ptr[: string.length].decode("UTF-8", "strict")
@cython.cfunc
def PyStringC_to_bytes(
string: PyStringC,
) -> bytes:
return cython.cast(bytes, string.ptr[: string.length])
@cython.cfunc
def timestamp_to_datetime(
ts: cython.double,
) -> datetime.datetime:
return datetime.datetime.fromtimestamp(ts, dopt_basics.datetime.TIMEZONE_UTC)
@cython.cclass
class StringHolder:
c = cython.declare(PyStringC, visibility="readonly")
_py_data = cython.declare(object, visibility="private")
def __cinit__(self, data: str):
"""accepting Python str
Parameters
----------
data : str
_description_
"""
if not isinstance(data, str):
raise TypeError("Data not 'str'")
self._py_data = data.encode("UTF-8")
self.c = PyStringC(self._py_data, len(self._py_data))
@property
def py_string(self):
return PyStringC_to_unicode(self.c)
@property
def py_bytes(self):
return PyStringC_to_bytes(self.c)
# BlockC = cython.struct(
# index=cython.ulonglong,
# timestamp=cython.double,
# data=PyStringC,
# previous_hash=PyStringC,
# nonce=cython.ulonglong,
# )
@cython.cfunc
def float_to_bytes(num: cython.double):
return struct.pack(">d", num)
@cython.cclass
class Block:
_index = cython.declare(cython.ulong, visibility="private")
_timestamp = cython.declare(cython.double, visibility="private")
_data = cython.declare(StringHolder, visibility="private")
_prev_hash = cython.declare(StringHolder, visibility="private")
_nonce = cython.declare(cython.ulong, visibility="private")
_hash = cython.declare(StringHolder, visibility="private")
def __init__(
self,
index: int,
data: str,
previous_hash: str,
nonce: int,
):
self._index = index
# self._timestamp = dopt_basics.datetime.current_time_tz().timestamp()
self._timestamp = datetime.datetime(2025, 12, 1, 12, 0, 0).timestamp()
self._data = StringHolder.__new__(StringHolder, data)
self._prev_hash = StringHolder.__new__(StringHolder, previous_hash)
self._nonce = nonce
self._hash = StringHolder.__new__(StringHolder, "")
@cython.ccall
def _perform_hash(self):
parts = bytearray()
parts.extend(self._index.to_bytes(8, "big"))
parts.extend(float_to_bytes(self._timestamp))
parts.extend(self._data.py_bytes)
parts.extend(self._prev_hash.py_bytes)
parts.extend(self._nonce.to_bytes(8, "big"))
return hashlib.sha256(parts)
def compute_hash_bytes(self):
return self._perform_hash().digest()
def compute_hash(self):
return self._perform_hash().hexdigest()
# Python public API
@property
def index(self):
return self._index
@property
def timestamp(self):
return timestamp_to_datetime(self._timestamp)
@property
def data(self):
return self._data.py_string
@property
def prev_hash(self):
return self._prev_hash.py_string
@property
def nonce(self):
return self._nonce
@nonce.setter
def nonce(self, value: int):
self._nonce = value
@property
def hash(self):
return self._hash.py_string
@hash.setter
def hash(self, value: str):
if not isinstance(value, str):
raise TypeError("No string")
self._hash = StringHolder.__new__(StringHolder, value)
@cython.cclass
class Blockchain:
_difficulty = cython.declare(cython.int, visibility="private")
_index = cython.declare(cython.Py_ssize_t, visibility="private")
chain = cython.declare(list[Block], visibility="public")
def __cinit__(self):
self._index = 0
def __init__(
self,
difficulty: int = 1,
) -> None:
self.chain: list[Block] = []
# self.difficulty = difficulty.to_bytes(8, "big")
self._difficulty = difficulty
@property
def index(self):
return self._index
@property
def difficulty(self):
return self._difficulty
@cython.ccall
def create_genesis_block(self):
genesis = Block(
index=0,
data="Genesis Block",
previous_hash="0",
nonce=0,
)
genesis.hash = genesis.compute_hash()
self.chain.append(genesis)
@cython.ccall
def proof_of_work(
self,
block: Block,
) -> str:
prefix = "0" * self._difficulty
while True:
block_hash = block.compute_hash()
if block_hash.startswith(prefix):
return block_hash
block._nonce += 1
@cython.ccall
def add_block(self, data: str) -> Block:
prev_hash = self.chain[self._index].hash
new_block = Block(
index=(self._index + 1),
data=data,
previous_hash=prev_hash,
nonce=0,
)
# start = time.perf_counter()
new_block.hash = self.proof_of_work(new_block)
# elapsed = time.perf_counter() - start
self.chain.append(new_block)
self._index += 1
print(f"Mined block {new_block.index} with nonce={new_block.nonce}")
# print(f"Mined block {new_block.index} in {elapsed:.3f}s with nonce={new_block.nonce}")
return new_block

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,139 @@
"""placeholder module for compilation"""
from __future__ import annotations
import datetime
import hashlib
import struct
import time
import dopt_basics.datetime
def float_to_bytes(num: float):
return struct.pack(">d", num)
class Block:
__slots__ = (
"_index",
"_timestamp",
"_data",
"_prev_hash",
"_nonce",
"_hash",
)
def __init__(
self,
index: int,
data: str,
previous_hash: str,
nonce: int,
):
self._index = index
self._timestamp = datetime.datetime(2025, 12, 1, 12, 0, 0).timestamp()
# self._timestamp = dopt_basics.datetime.current_time_tz().timestamp()
self._data = data
self._prev_hash = previous_hash
self._nonce = nonce
self._hash = ""
def _perform_hash(self):
parts = bytearray()
parts.extend(self._index.to_bytes(8, "big"))
parts.extend(float_to_bytes(self._timestamp))
parts.extend(self._data.encode("UTF-8"))
parts.extend(self._prev_hash.encode("UTF-8"))
parts.extend(self._nonce.to_bytes(8, "big"))
return hashlib.sha256(parts)
def compute_hash_bytes(self):
return self._perform_hash().digest()
def compute_hash(self):
return self._perform_hash().hexdigest()
# Python public API
@property
def index(self) -> int:
return self._index
@property
def timestamp(self) -> datetime.datetime:
return datetime.datetime.fromtimestamp(self._timestamp)
@property
def data(self) -> str:
return self._data
@property
def prev_hash(self) -> str:
return self._prev_hash
@property
def nonce(self):
return self._nonce
@nonce.setter
def nonce(self, value: int):
self._nonce = value
@property
def hash(self) -> str:
return self._hash
@hash.setter
def hash(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError("No string")
self._hash = value
class Blockchain:
def __init__(
self,
difficulty: int = 1,
) -> None:
self.chain: list[Block] = []
# self.difficulty = difficulty.to_bytes(8, "big")
self.difficulty = difficulty
def create_genesis_block(self):
genesis = Block(
index=0,
data="Genesis Block",
previous_hash="0",
nonce=0,
)
genesis.hash = genesis.compute_hash()
self.chain.append(genesis)
def proof_of_work(
self,
block: Block,
) -> str:
prefix = "0" * self.difficulty
while True:
block_hash = block.compute_hash()
if block_hash.startswith(prefix):
return block_hash
block.nonce += 1
def add_block(self, data: str) -> Block:
new_block = Block(
index=len(self.chain),
data=data,
previous_hash=self.chain[-1].hash,
nonce=0,
)
start = time.perf_counter()
new_block.hash = self.proof_of_work(new_block)
elapsed = time.perf_counter() - start
self.chain.append(new_block)
print(f"Mined block {new_block.index} in {elapsed:.3f}s with nonce={new_block.nonce}")
return new_block

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,139 @@
"""placeholder module for compilation"""
from __future__ import annotations
import datetime
import hashlib
import struct
import time
import dopt_basics.datetime
def float_to_bytes(num: float):
return struct.pack(">d", num)
class Block:
__slots__ = (
"_index",
"_timestamp",
"_data",
"_prev_hash",
"_nonce",
"_hash",
)
def __init__(
self,
index: int,
data: str,
previous_hash: str,
nonce: int,
):
self._index = index
self._timestamp = datetime.datetime(2025, 12, 1, 12, 0, 0).timestamp()
# self._timestamp = dopt_basics.datetime.current_time_tz().timestamp()
self._data = data
self._prev_hash = previous_hash
self._nonce = nonce
self._hash = ""
def _perform_hash(self):
parts = bytearray()
parts.extend(self._index.to_bytes(8, "big"))
parts.extend(float_to_bytes(self._timestamp))
parts.extend(self._data.encode("UTF-8"))
parts.extend(self._prev_hash.encode("UTF-8"))
parts.extend(self._nonce.to_bytes(8, "big"))
return hashlib.sha256(parts)
def compute_hash_bytes(self):
return self._perform_hash().digest()
def compute_hash(self):
return self._perform_hash().hexdigest()
# Python public API
@property
def index(self) -> int:
return self._index
@property
def timestamp(self) -> datetime.datetime:
return datetime.datetime.fromtimestamp(self._timestamp)
@property
def data(self) -> str:
return self._data
@property
def prev_hash(self) -> str:
return self._prev_hash
@property
def nonce(self):
return self._nonce
@nonce.setter
def nonce(self, value: int):
self._nonce = value
@property
def hash(self) -> str:
return self._hash
@hash.setter
def hash(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError("No string")
self._hash = value
class Blockchain:
def __init__(
self,
difficulty: int = 1,
) -> None:
self.chain: list[Block] = []
# self.difficulty = difficulty.to_bytes(8, "big")
self.difficulty = difficulty
def create_genesis_block(self):
genesis = Block(
index=0,
data="Genesis Block",
previous_hash="0",
nonce=0,
)
genesis.hash = genesis.compute_hash()
self.chain.append(genesis)
def proof_of_work(
self,
block: Block,
) -> str:
prefix = "0" * self.difficulty
while True:
block_hash = block.compute_hash()
if block_hash.startswith(prefix):
return block_hash
block.nonce += 1
def add_block(self, data: str) -> Block:
new_block = Block(
index=len(self.chain),
data=data,
previous_hash=self.chain[-1].hash,
nonce=0,
)
start = time.perf_counter()
new_block.hash = self.proof_of_work(new_block)
elapsed = time.perf_counter() - start
self.chain.append(new_block)
print(f"Mined block {new_block.index} in {elapsed:.3f}s with nonce={new_block.nonce}")
return new_block