basic database interaction

This commit is contained in:
2026-05-26 12:17:57 +02:00
parent 76a9bd4329
commit 5b2ca4f694
3 changed files with 214 additions and 98 deletions

View File

@@ -21,6 +21,7 @@ from typing_extensions import override
import babel
from dopt_basics.logging import BASE_LOGGER, setup_logging
from pydantic import (
AwareDatetime,
BaseModel,
ConfigDict,
EmailStr,
@@ -76,7 +77,9 @@ logger.setLevel(logging.DEBUG)
logger_search_widget = logger.getChild("search_widget")
logger_search_widget.setLevel(logging.DEBUG)
logger_get_data = logger.getChild("get_data")
logger_get_data.setLevel(logging.DEBUG)
logger_get_data.setLevel(logging.INFO)
logger_get_data_auto_form = logger.getChild("get_data_auto_form")
logger_get_data_auto_form.setLevel(logging.DEBUG)
QSS = """
*[styleClass="stempel"] {
@@ -770,8 +773,8 @@ def reset_form(
# TODO check if this behaviour is correct in other contexts, deactivated because of
# TODO company search widget
if form_field.readonly:
continue
# if form_field.readonly:
# continue
if isinstance(widget, QLineEdit):
widget.clear()
@@ -987,6 +990,7 @@ def set_widget_value(
if isinstance(widget, QLineEdit):
if isinstance(value, datetime.datetime):
value = value.astimezone()
value = value.strftime(DATETIME_FMT)
elif isinstance(value, datetime.date):
value = value.strftime(DATE_FMT)
@@ -1110,8 +1114,8 @@ def validate_form_data(
class Grunderfassung_Unternehmen(FlatBaseModel):
# default in SQLAlchemy with lambda and timezone-aware datetime)
Metadaten_erstellung: datetime.datetime | None = None
Metadaten_aktualisierung: datetime.datetime | None = None # see above
Metadaten_erstellung: AwareDatetime | None = None
Metadaten_aktualisierung: AwareDatetime | None = None # see above
Metadaten_nutzer: str | None
Grunderfassung_fallnummer: str
Grunderfassung_notiz: str | None
@@ -1722,7 +1726,7 @@ class AutoForm(QWidget):
if DEBUG:
self.test_button = QPushButton("Initialisiere Laden")
self.test_button.clicked.connect(self.on_load_clicked)
self.test_button.clicked.connect(self.load_data)
self.test_button.setFixedHeight(50)
self.test_button.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed
@@ -1731,6 +1735,19 @@ class AutoForm(QWidget):
button = QPushButton("GET DATA")
button.clicked.connect(self.get_form_data)
self.main_layout.addWidget(button)
id_field_layout = QHBoxLayout()
id_field_layout.setContentsMargins(0, 0, 0, 0)
id_field_layout.setSpacing(5)
id_field_label = QLabel("ID Datenbank:")
self.id_field_input = QLineEdit()
id_field_layout.addWidget(id_field_label)
id_field_layout.addWidget(self.id_field_input)
self.main_layout.addLayout(id_field_layout)
button_db_index = QPushButton("Setze DB Index")
button_db_index.clicked.connect(self._set_db_index)
self.main_layout.addWidget(button_db_index)
self.main_layout.addSpacing(10)
self.top_level_form_layout = QFormLayout()
@@ -1771,10 +1788,16 @@ class AutoForm(QWidget):
self.layout_btn.addWidget(self.reset_btn)
self.ignored_keys = ignored_keys
self.current_id: int = -1
# test button
# self.main_layout.addSpacing(10)
# self.main_layout.addWidget(self.test_button)
def _set_db_index(self) -> None:
index = int(self.id_field_input.text())
self.current_id = index
logger.debug("Set index to %d", self.current_id)
def _disable_save(self) -> None:
self.save_btn.setEnabled(False)
self.save_btn.setText(self.save_btn_txt_disabled)
@@ -1787,20 +1810,36 @@ class AutoForm(QWidget):
QTimer.singleShot(timeout + 1, lambda: self.save_btn.setShortcut("Ctrl+S"))
self.save_btn.setText(self.save_btn_txt_enabled)
def on_load_clicked(self) -> None:
def load_data(
self,
lookup_id: int | None = None,
) -> None:
# TODO change logic to database backend
logger_get_data.info(">>>> LOAD CLICKED")
if lookup_id is None or lookup_id == 0:
lookup_id = self.current_id
self.reset_form()
logger_get_data_auto_form.debug("Lookup ID: %d", lookup_id)
if lookup_id > 0:
logger_get_data_auto_form.debug("Load from DB:")
loaded_data = be_init_rec.get_initial_recording(lookup_id)
else:
logger_get_data_auto_form.debug("Load from pickled object:")
loaded_data = load_pydantic_model_dict_db()
logger_get_data.debug("Loaded data dict. Passing to Pydantic...")
logger_get_data_auto_form.debug(
"Loaded data dict:\n%s Passing to Pydantic...", pformat(loaded_data)
)
model = Grunderfassung_Unternehmen(**loaded_data)
logger_get_data.debug("Loaded to Pydantic.")
logger_get_data.debug("Convert to GUI structure...")
logger_get_data_auto_form.debug("Loaded to Pydantic.")
logger_get_data_auto_form.debug("Convert to GUI structure...")
form_data = model.to_gui()
logger_get_data.debug("Set form data...")
logger_get_data.debug("Form data:\n%s", pformat(form_data))
# logger_get_data.debug("Widget registry:\n")
# pprint_registry(self.widget_registry)
logger_get_data_auto_form.debug("Set form data...")
# logger_get_data_auto_form.debug("Form data:\n%s", pformat(form_data))
self.set_form_data(form_data)
self.current_id = lookup_id
def on_save_clicked(self) -> None:
self._disable_save()
@@ -1827,8 +1866,7 @@ class AutoForm(QWidget):
try:
validated_data = Grunderfassung_Unternehmen(**form_data)
# # TODO remove
# validated_data.Metadaten_erstellung = datetime.datetime.now()
# # TODO add user?
# logger.debug("%s", pformat(validated_data.model_dump()))
except ValidationError as e:
# catch errors and show them in GUI
@@ -1854,8 +1892,8 @@ class AutoForm(QWidget):
QMessageBox.warning(self, "Eingabefehler", "\n".join(fehler_texte))
else:
# !! this code is only called if the 'try' block was successful
self.reset_form()
save_pydantic_model_dict_db(validated_data)
# save_pydantic_model_dict_db(validated_data)
# TODO save data to database
logger.info(
"\n\n>>>>>>>>>>>>> Form data without 'exlude':\n%s",
@@ -1869,7 +1907,16 @@ class AutoForm(QWidget):
),
pformat(validated_data.to_db(exclude=self.ignored_keys)),
)
db_data = validated_data.to_db(exclude=self.ignored_keys)
if self.current_id < 0:
logger.debug("Insert triggered")
be_init_rec.insert_initial_recording(db_data)
else:
logger.debug("Update triggered")
be_init_rec.update_initial_recording(self.current_id, db_data)
logger_get_data.info("Data saved successfully")
self.reset_form()
finally:
# always re-enable save, even if error occurred
self._enable_save()
@@ -1880,6 +1927,7 @@ class AutoForm(QWidget):
def reset_form(self) -> None:
reset_form(self.widget_registry)
self.current_id = -1
def get_form_data(self) -> dict[str, Any]:
form_data = get_form_data(self.widget_registry)

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TypedDict, cast
from typing import Any, TypedDict, cast
import polars as pl
@@ -80,7 +80,7 @@ def _transform_for_gui_output(
def comp_search_choices() -> tuple[tuple[str, int], ...]:
# TODO no reload functionality
q = db.df_crm_master.lazy()
q = db.DF_CRM_MASTER.lazy()
counter = pl.int_range(0, pl.len()).over(pl.col.ma_unternehmensname)
q = q.with_columns(
dedupl=pl.when(counter == 0)
@@ -89,14 +89,13 @@ def comp_search_choices() -> tuple[tuple[str, int], ...]:
)
df = q.collect()
# return dict(zip(df["dedupl"], df["ma_id"]))
return tuple(zip(df["dedupl"], df["ma_id"]))
def comp_search_get_info(
ma_id: int,
) -> CompanyInfo:
df = db.df_crm_master.filter(pl.col.ma_id == ma_id)
df = db.DF_CRM_MASTER.filter(pl.col.ma_id == ma_id)
if df.height > 1 or df.height == 0:
raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}")
@@ -109,7 +108,7 @@ def contact_person_search_choices(
use_both_names: bool,
) -> tuple[tuple[str, int], ...]:
# TODO no reload functionality
q = db.df_contact_person.lazy()
q = db.DF_CONTACT_PERSON.lazy()
if ma_id is not None:
q = q.filter(pl.col.ma_id == ma_id)
@@ -128,16 +127,52 @@ def contact_person_search_choices(
)
df = q.collect()
# return dict(zip(df["dedupl"], df["an_id"]))
return tuple(zip(df["dedupl"], df["an_id"]))
def contact_person_search_get_info(
an_id: int,
) -> ContactPersonInfo:
df = db.df_contact_person.filter(pl.col.an_id == an_id)
df = db.DF_CONTACT_PERSON.filter(pl.col.an_id == an_id)
if df.height > 1 or df.height == 0:
raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}")
df = _transform_for_gui_output(df)
return cast(ContactPersonInfo, df.row(0, named=True))
def insert_initial_recording(
data: dict[str, Any],
) -> None:
stmt = db.grunderfassung_unternehmen.insert().values(data)
with db.ENGINE.begin() as conn:
conn.execute(stmt)
def update_initial_recording(
erfassung_id: int,
data: dict[str, Any],
) -> None:
stmt = (
db.grunderfassung_unternehmen.update()
.where(db.grunderfassung_unternehmen.c.erfassung_id == erfassung_id)
.values(data)
)
with db.ENGINE.begin() as conn:
conn.execute(stmt)
def get_initial_recording(
erfassung_id: int,
) -> dict[str, Any]:
stmt = db.grunderfassung_unternehmen.select().where(
db.grunderfassung_unternehmen.c.erfassung_id == erfassung_id
)
with db.ENGINE.begin() as conn:
ret = conn.execute(stmt)
row = ret.fetchone()
if row is None:
raise KeyError(f"Database ID {erfassung_id} not found")
return row._asdict()

View File

@@ -9,6 +9,7 @@ import polars as pl
import sqlalchemy as sql
from sqlalchemy import Column, String, Table, TypeDecorator
from wce_crm import constants
from wce_crm import types as t
@@ -32,84 +33,109 @@ class SafeDateTime(TypeDecorator):
return None
class UTCDateTime(TypeDecorator):
"""Safely coerces naive datetimes from SQLite into timezone-aware UTC."""
impl = sql.DateTime
cache_ok = True
def process_bind_param(self, value, dialect):
"""Runs when saving to the database."""
if value is not None:
# Ensure it's converted to UTC before saving
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
else:
value = value.astimezone(datetime.timezone.utc)
return value
def process_result_value(self, value, dialect):
"""Runs when fetching from the database."""
if value is not None and value.tzinfo is None:
# Explicitly tell Python this data *is* UTC
value = value.replace(tzinfo=datetime.timezone.utc)
return value
md_crm = sql.MetaData()
md_main = sql.MetaData()
ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}")
# ---------- OLD "Kontaktliste" ----------
md_kontaktliste = sql.MetaData()
# md_kontaktliste = sql.MetaData()
ext_kl_unternehmen: sql.Table = Table(
"Unternehmen",
md_kontaktliste,
Column("u_id", sql.Integer, nullable=False, unique=True),
Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False),
Column("u_rechtsform", sql.Text, nullable=False),
Column("u_firmenname", sql.Text, nullable=False),
Column("u_strasse", sql.Text, nullable=False),
Column("u_hausnummer", sql.Text, nullable=False),
Column("u_adresszusatz", sql.Text, nullable=True),
Column("u_plz", sql.Text, nullable=False),
Column("u_ort", sql.Text, nullable=False),
Column("u_postfach", sql.Text, nullable=True),
Column("u_website", sql.Text, nullable=True),
Column("u_anrede", sql.Text, nullable=False),
Column("u_titel", sql.Text, nullable=True),
Column("u_vorname", sql.Text, nullable=False),
Column("u_nachname", sql.Text, nullable=False),
Column("u_funktion", sql.Text, nullable=False),
Column("u_mail", sql.Text, nullable=False),
Column("u_telefon", sql.Text, nullable=False),
Column("u_plz_postfach", sql.Text, nullable=True),
Column("u_einwilligung_inhaber", sql.Boolean, nullable=True),
Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True),
Column("u_aktiv", sql.Boolean, nullable=False, default=1),
)
# ext_kl_unternehmen: sql.Table = Table(
# "Unternehmen",
# md_kontaktliste,
# Column("u_id", sql.Integer, nullable=False, unique=True),
# Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False),
# Column("u_rechtsform", sql.Text, nullable=False),
# Column("u_firmenname", sql.Text, nullable=False),
# Column("u_strasse", sql.Text, nullable=False),
# Column("u_hausnummer", sql.Text, nullable=False),
# Column("u_adresszusatz", sql.Text, nullable=True),
# Column("u_plz", sql.Text, nullable=False),
# Column("u_ort", sql.Text, nullable=False),
# Column("u_postfach", sql.Text, nullable=True),
# Column("u_website", sql.Text, nullable=True),
# Column("u_anrede", sql.Text, nullable=False),
# Column("u_titel", sql.Text, nullable=True),
# Column("u_vorname", sql.Text, nullable=False),
# Column("u_nachname", sql.Text, nullable=False),
# Column("u_funktion", sql.Text, nullable=False),
# Column("u_mail", sql.Text, nullable=False),
# Column("u_telefon", sql.Text, nullable=False),
# Column("u_plz_postfach", sql.Text, nullable=True),
# Column("u_einwilligung_inhaber", sql.Boolean, nullable=True),
# Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True),
# Column("u_aktiv", sql.Boolean, nullable=False, default=1),
# )
ext_kl_unternehmen_schema: t.PolarsSchema = {
"u_id": pl.UInt64,
"u_zeitstempel_eintrag": pl.Datetime,
"u_rechtsform": pl.String,
"u_firmenname": pl.String,
"u_strasse": pl.String,
"u_hausnummer": pl.String,
"u_adresszusatz": pl.String,
"u_plz": pl.String,
"u_ort": pl.String,
"u_postfach": pl.String,
"u_website": pl.String,
"u_anrede": pl.String,
"u_titel": pl.String,
"u_vorname": pl.String,
"u_nachname": pl.String,
"u_funktion": pl.String,
"u_mail": pl.String,
"u_telefon": pl.String,
"u_plz_postfach": pl.String,
"u_einwilligung_inhaber": pl.Boolean,
"u_einwilligung_ansprechpartner": pl.Boolean,
"u_aktiv": pl.Boolean,
}
# ext_kl_unternehmen_schema: t.PolarsSchema = {
# "u_id": pl.UInt64,
# "u_zeitstempel_eintrag": pl.Datetime,
# "u_rechtsform": pl.String,
# "u_firmenname": pl.String,
# "u_strasse": pl.String,
# "u_hausnummer": pl.String,
# "u_adresszusatz": pl.String,
# "u_plz": pl.String,
# "u_ort": pl.String,
# "u_postfach": pl.String,
# "u_website": pl.String,
# "u_anrede": pl.String,
# "u_titel": pl.String,
# "u_vorname": pl.String,
# "u_nachname": pl.String,
# "u_funktion": pl.String,
# "u_mail": pl.String,
# "u_telefon": pl.String,
# "u_plz_postfach": pl.String,
# "u_einwilligung_inhaber": pl.Boolean,
# "u_einwilligung_ansprechpartner": pl.Boolean,
# "u_aktiv": pl.Boolean,
# }
def get_ext_kontaktliste(
db_path: Path | None,
) -> pl.DataFrame:
if db_path is None:
ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None)
if ENV_PTH is None:
raise ValueError("No database path provided or found as ENV var.")
db_path = Path(ENV_PTH)
# def get_ext_kontaktliste(
# db_path: Path | None,
# ) -> pl.DataFrame:
# if db_path is None:
# ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None)
# if ENV_PTH is None:
# raise ValueError("No database path provided or found as ENV var.")
# db_path = Path(ENV_PTH)
if not db_path.exists():
raise FileNotFoundError(f"Database not found under >{db_path}<")
# if not db_path.exists():
# raise FileNotFoundError(f"Database not found under >{db_path}<")
engine = sql.create_engine(f"sqlite:///{db_path}")
stmt = sql.select(ext_kl_unternehmen)
return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema)
# engine = sql.create_engine(f"sqlite:///{db_path}")
# stmt = sql.select(ext_kl_unternehmen)
# return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema)
df_kontaktliste = get_ext_kontaktliste(None)
# df_kontaktliste = get_ext_kontaktliste(None)
# ----------------------------------------------------
@@ -188,7 +214,7 @@ def get_ext_crm_master(
return pl.read_database(stmt, engine, schema_overrides=ext_crm_master_schema)
df_crm_master = get_ext_crm_master(None)
DF_CRM_MASTER = get_ext_crm_master(constants.Config.DB_PATH_CRM)
ext_crm_nutzer: sql.Table = Table(
"Nutzer",
@@ -287,7 +313,6 @@ def get_ext_crm_contact_person(
engine = sql.create_engine(f"sqlite:///{db_path}")
stmt = sql.select(ext_crm_contact_person)
df = pl.read_database(stmt, engine, schema_overrides=ext_crm_contact_person_schema)
# TODO Database seems to contain entries with invalid characters like \t or \n
df = df.with_columns(
pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" ")
)
@@ -295,22 +320,27 @@ def get_ext_crm_contact_person(
return df
df_contact_person = get_ext_crm_contact_person(None)
# df_contact_person = get_ext_crm_contact_person(None)
DF_CONTACT_PERSON = get_ext_crm_contact_person(constants.Config.DB_PATH_CRM)
grunderfassung_unternehmen: sql.Table = Table(
"grunderfassung_unternehmen",
md_main,
Column("erfassung_id", sql.Integer, nullable=False, unique=True, autoincrement=True),
Column(
"erfassung_id",
sql.Integer,
primary_key=True,
autoincrement=True,
),
Column(
"Metadaten_erstellung",
sql.DateTime(timezone=True),
UTCDateTime,
nullable=True,
default=lambda: datetime.datetime.now(datetime.UTC),
),
Column(
"Metadaten_aktualisierung",
sql.DateTime(timezone=True),
UTCDateTime,
nullable=True,
default=lambda: datetime.datetime.now(datetime.UTC),
onupdate=lambda: datetime.datetime.now(datetime.UTC),
@@ -344,6 +374,7 @@ grunderfassung_unternehmen: sql.Table = Table(
Column("Stammdaten__anzahl_kinder__anzahl", sql.Text, nullable=True),
Column("Stammdaten__aufenthaltsort", sql.Text, nullable=True),
Column("Stammdaten__bundesland", sql.Text, nullable=True),
Column("Stammdaten__land", sql.Text, nullable=True),
Column("Stammdaten__email", sql.Text, nullable=True),
Column("Stammdaten__familienstand", sql.Text, nullable=True),
Column("Stammdaten__festnetznummer", sql.Text, nullable=True),
@@ -364,3 +395,5 @@ grunderfassung_unternehmen: sql.Table = Table(
Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Date, nullable=True),
Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True),
)
md_main.create_all(ENGINE)