From 5b2ca4f694f9eb08ef5911be6f9922b5d2dc0dda Mon Sep 17 00:00:00 2001 From: foefl Date: Tue, 26 May 2026 12:17:57 +0200 Subject: [PATCH] basic database interaction --- prototypes/t_qt_2.py | 86 ++++++++--- src/wce_crm/backend/initial_recording.py | 49 ++++++- src/wce_crm/db.py | 177 ++++++++++++++--------- 3 files changed, 214 insertions(+), 98 deletions(-) diff --git a/prototypes/t_qt_2.py b/prototypes/t_qt_2.py index 399946a..1d42d4b 100644 --- a/prototypes/t_qt_2.py +++ b/prototypes/t_qt_2.py @@ -21,6 +21,7 @@ from typing_extensions import override import babel from dopt_basics.logging import BASE_LOGGER, setup_logging from pydantic import ( + AwareDatetime, BaseModel, ConfigDict, EmailStr, @@ -76,7 +77,9 @@ logger.setLevel(logging.DEBUG) logger_search_widget = logger.getChild("search_widget") logger_search_widget.setLevel(logging.DEBUG) logger_get_data = logger.getChild("get_data") -logger_get_data.setLevel(logging.DEBUG) +logger_get_data.setLevel(logging.INFO) +logger_get_data_auto_form = logger.getChild("get_data_auto_form") +logger_get_data_auto_form.setLevel(logging.DEBUG) QSS = """ *[styleClass="stempel"] { @@ -770,8 +773,8 @@ def reset_form( # TODO check if this behaviour is correct in other contexts, deactivated because of # TODO company search widget - if form_field.readonly: - continue + # if form_field.readonly: + # continue if isinstance(widget, QLineEdit): widget.clear() @@ -987,6 +990,7 @@ def set_widget_value( if isinstance(widget, QLineEdit): if isinstance(value, datetime.datetime): + value = value.astimezone() value = value.strftime(DATETIME_FMT) elif isinstance(value, datetime.date): value = value.strftime(DATE_FMT) @@ -1110,8 +1114,8 @@ def validate_form_data( class Grunderfassung_Unternehmen(FlatBaseModel): # default in SQLAlchemy with lambda and timezone-aware datetime) - Metadaten_erstellung: datetime.datetime | None = None - Metadaten_aktualisierung: datetime.datetime | None = None # see above + Metadaten_erstellung: AwareDatetime | None = None + Metadaten_aktualisierung: AwareDatetime | None = None # see above Metadaten_nutzer: str | None Grunderfassung_fallnummer: str Grunderfassung_notiz: str | None @@ -1722,7 +1726,7 @@ class AutoForm(QWidget): if DEBUG: self.test_button = QPushButton("Initialisiere Laden") - self.test_button.clicked.connect(self.on_load_clicked) + self.test_button.clicked.connect(self.load_data) self.test_button.setFixedHeight(50) self.test_button.setSizePolicy( QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed @@ -1731,6 +1735,19 @@ class AutoForm(QWidget): button = QPushButton("GET DATA") button.clicked.connect(self.get_form_data) self.main_layout.addWidget(button) + + id_field_layout = QHBoxLayout() + id_field_layout.setContentsMargins(0, 0, 0, 0) + id_field_layout.setSpacing(5) + id_field_label = QLabel("ID Datenbank:") + self.id_field_input = QLineEdit() + id_field_layout.addWidget(id_field_label) + id_field_layout.addWidget(self.id_field_input) + self.main_layout.addLayout(id_field_layout) + button_db_index = QPushButton("Setze DB Index") + button_db_index.clicked.connect(self._set_db_index) + self.main_layout.addWidget(button_db_index) + self.main_layout.addSpacing(10) self.top_level_form_layout = QFormLayout() @@ -1771,10 +1788,16 @@ class AutoForm(QWidget): self.layout_btn.addWidget(self.reset_btn) self.ignored_keys = ignored_keys + self.current_id: int = -1 # test button # self.main_layout.addSpacing(10) # self.main_layout.addWidget(self.test_button) + def _set_db_index(self) -> None: + index = int(self.id_field_input.text()) + self.current_id = index + logger.debug("Set index to %d", self.current_id) + def _disable_save(self) -> None: self.save_btn.setEnabled(False) self.save_btn.setText(self.save_btn_txt_disabled) @@ -1787,20 +1810,36 @@ class AutoForm(QWidget): QTimer.singleShot(timeout + 1, lambda: self.save_btn.setShortcut("Ctrl+S")) self.save_btn.setText(self.save_btn_txt_enabled) - def on_load_clicked(self) -> None: + def load_data( + self, + lookup_id: int | None = None, + ) -> None: # TODO change logic to database backend logger_get_data.info(">>>> LOAD CLICKED") - loaded_data = load_pydantic_model_dict_db() - logger_get_data.debug("Loaded data dict. Passing to Pydantic...") + if lookup_id is None or lookup_id == 0: + lookup_id = self.current_id + self.reset_form() + + logger_get_data_auto_form.debug("Lookup ID: %d", lookup_id) + + if lookup_id > 0: + logger_get_data_auto_form.debug("Load from DB:") + loaded_data = be_init_rec.get_initial_recording(lookup_id) + else: + logger_get_data_auto_form.debug("Load from pickled object:") + loaded_data = load_pydantic_model_dict_db() + + logger_get_data_auto_form.debug( + "Loaded data dict:\n%s Passing to Pydantic...", pformat(loaded_data) + ) model = Grunderfassung_Unternehmen(**loaded_data) - logger_get_data.debug("Loaded to Pydantic.") - logger_get_data.debug("Convert to GUI structure...") + logger_get_data_auto_form.debug("Loaded to Pydantic.") + logger_get_data_auto_form.debug("Convert to GUI structure...") form_data = model.to_gui() - logger_get_data.debug("Set form data...") - logger_get_data.debug("Form data:\n%s", pformat(form_data)) - # logger_get_data.debug("Widget registry:\n") - # pprint_registry(self.widget_registry) + logger_get_data_auto_form.debug("Set form data...") + # logger_get_data_auto_form.debug("Form data:\n%s", pformat(form_data)) self.set_form_data(form_data) + self.current_id = lookup_id def on_save_clicked(self) -> None: self._disable_save() @@ -1827,8 +1866,7 @@ class AutoForm(QWidget): try: validated_data = Grunderfassung_Unternehmen(**form_data) - # # TODO remove - # validated_data.Metadaten_erstellung = datetime.datetime.now() + # # TODO add user? # logger.debug("%s", pformat(validated_data.model_dump())) except ValidationError as e: # catch errors and show them in GUI @@ -1854,8 +1892,8 @@ class AutoForm(QWidget): QMessageBox.warning(self, "Eingabefehler", "\n".join(fehler_texte)) else: # !! this code is only called if the 'try' block was successful - self.reset_form() - save_pydantic_model_dict_db(validated_data) + + # save_pydantic_model_dict_db(validated_data) # TODO save data to database logger.info( "\n\n>>>>>>>>>>>>> Form data without 'exlude':\n%s", @@ -1869,7 +1907,16 @@ class AutoForm(QWidget): ), pformat(validated_data.to_db(exclude=self.ignored_keys)), ) + db_data = validated_data.to_db(exclude=self.ignored_keys) + if self.current_id < 0: + logger.debug("Insert triggered") + be_init_rec.insert_initial_recording(db_data) + else: + logger.debug("Update triggered") + be_init_rec.update_initial_recording(self.current_id, db_data) + logger_get_data.info("Data saved successfully") + self.reset_form() finally: # always re-enable save, even if error occurred self._enable_save() @@ -1880,6 +1927,7 @@ class AutoForm(QWidget): def reset_form(self) -> None: reset_form(self.widget_registry) + self.current_id = -1 def get_form_data(self) -> dict[str, Any]: form_data = get_form_data(self.widget_registry) diff --git a/src/wce_crm/backend/initial_recording.py b/src/wce_crm/backend/initial_recording.py index d3cd30e..4b8a225 100644 --- a/src/wce_crm/backend/initial_recording.py +++ b/src/wce_crm/backend/initial_recording.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TypedDict, cast +from typing import Any, TypedDict, cast import polars as pl @@ -80,7 +80,7 @@ def _transform_for_gui_output( def comp_search_choices() -> tuple[tuple[str, int], ...]: # TODO no reload functionality - q = db.df_crm_master.lazy() + q = db.DF_CRM_MASTER.lazy() counter = pl.int_range(0, pl.len()).over(pl.col.ma_unternehmensname) q = q.with_columns( dedupl=pl.when(counter == 0) @@ -89,14 +89,13 @@ def comp_search_choices() -> tuple[tuple[str, int], ...]: ) df = q.collect() - # return dict(zip(df["dedupl"], df["ma_id"])) return tuple(zip(df["dedupl"], df["ma_id"])) def comp_search_get_info( ma_id: int, ) -> CompanyInfo: - df = db.df_crm_master.filter(pl.col.ma_id == ma_id) + df = db.DF_CRM_MASTER.filter(pl.col.ma_id == ma_id) if df.height > 1 or df.height == 0: raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}") @@ -109,7 +108,7 @@ def contact_person_search_choices( use_both_names: bool, ) -> tuple[tuple[str, int], ...]: # TODO no reload functionality - q = db.df_contact_person.lazy() + q = db.DF_CONTACT_PERSON.lazy() if ma_id is not None: q = q.filter(pl.col.ma_id == ma_id) @@ -128,16 +127,52 @@ def contact_person_search_choices( ) df = q.collect() - # return dict(zip(df["dedupl"], df["an_id"])) return tuple(zip(df["dedupl"], df["an_id"])) def contact_person_search_get_info( an_id: int, ) -> ContactPersonInfo: - df = db.df_contact_person.filter(pl.col.an_id == an_id) + df = db.DF_CONTACT_PERSON.filter(pl.col.an_id == an_id) if df.height > 1 or df.height == 0: raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}") df = _transform_for_gui_output(df) return cast(ContactPersonInfo, df.row(0, named=True)) + + +def insert_initial_recording( + data: dict[str, Any], +) -> None: + stmt = db.grunderfassung_unternehmen.insert().values(data) + with db.ENGINE.begin() as conn: + conn.execute(stmt) + + +def update_initial_recording( + erfassung_id: int, + data: dict[str, Any], +) -> None: + stmt = ( + db.grunderfassung_unternehmen.update() + .where(db.grunderfassung_unternehmen.c.erfassung_id == erfassung_id) + .values(data) + ) + with db.ENGINE.begin() as conn: + conn.execute(stmt) + + +def get_initial_recording( + erfassung_id: int, +) -> dict[str, Any]: + stmt = db.grunderfassung_unternehmen.select().where( + db.grunderfassung_unternehmen.c.erfassung_id == erfassung_id + ) + with db.ENGINE.begin() as conn: + ret = conn.execute(stmt) + + row = ret.fetchone() + if row is None: + raise KeyError(f"Database ID {erfassung_id} not found") + + return row._asdict() diff --git a/src/wce_crm/db.py b/src/wce_crm/db.py index 3bb6259..14a9895 100644 --- a/src/wce_crm/db.py +++ b/src/wce_crm/db.py @@ -9,6 +9,7 @@ import polars as pl import sqlalchemy as sql from sqlalchemy import Column, String, Table, TypeDecorator +from wce_crm import constants from wce_crm import types as t @@ -32,84 +33,109 @@ class SafeDateTime(TypeDecorator): return None +class UTCDateTime(TypeDecorator): + """Safely coerces naive datetimes from SQLite into timezone-aware UTC.""" + + impl = sql.DateTime + cache_ok = True + + def process_bind_param(self, value, dialect): + """Runs when saving to the database.""" + if value is not None: + # Ensure it's converted to UTC before saving + if value.tzinfo is None: + value = value.replace(tzinfo=datetime.timezone.utc) + else: + value = value.astimezone(datetime.timezone.utc) + return value + + def process_result_value(self, value, dialect): + """Runs when fetching from the database.""" + if value is not None and value.tzinfo is None: + # Explicitly tell Python this data *is* UTC + value = value.replace(tzinfo=datetime.timezone.utc) + return value + + md_crm = sql.MetaData() md_main = sql.MetaData() +ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}") # ---------- OLD "Kontaktliste" ---------- -md_kontaktliste = sql.MetaData() +# md_kontaktliste = sql.MetaData() -ext_kl_unternehmen: sql.Table = Table( - "Unternehmen", - md_kontaktliste, - Column("u_id", sql.Integer, nullable=False, unique=True), - Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False), - Column("u_rechtsform", sql.Text, nullable=False), - Column("u_firmenname", sql.Text, nullable=False), - Column("u_strasse", sql.Text, nullable=False), - Column("u_hausnummer", sql.Text, nullable=False), - Column("u_adresszusatz", sql.Text, nullable=True), - Column("u_plz", sql.Text, nullable=False), - Column("u_ort", sql.Text, nullable=False), - Column("u_postfach", sql.Text, nullable=True), - Column("u_website", sql.Text, nullable=True), - Column("u_anrede", sql.Text, nullable=False), - Column("u_titel", sql.Text, nullable=True), - Column("u_vorname", sql.Text, nullable=False), - Column("u_nachname", sql.Text, nullable=False), - Column("u_funktion", sql.Text, nullable=False), - Column("u_mail", sql.Text, nullable=False), - Column("u_telefon", sql.Text, nullable=False), - Column("u_plz_postfach", sql.Text, nullable=True), - Column("u_einwilligung_inhaber", sql.Boolean, nullable=True), - Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True), - Column("u_aktiv", sql.Boolean, nullable=False, default=1), -) +# ext_kl_unternehmen: sql.Table = Table( +# "Unternehmen", +# md_kontaktliste, +# Column("u_id", sql.Integer, nullable=False, unique=True), +# Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False), +# Column("u_rechtsform", sql.Text, nullable=False), +# Column("u_firmenname", sql.Text, nullable=False), +# Column("u_strasse", sql.Text, nullable=False), +# Column("u_hausnummer", sql.Text, nullable=False), +# Column("u_adresszusatz", sql.Text, nullable=True), +# Column("u_plz", sql.Text, nullable=False), +# Column("u_ort", sql.Text, nullable=False), +# Column("u_postfach", sql.Text, nullable=True), +# Column("u_website", sql.Text, nullable=True), +# Column("u_anrede", sql.Text, nullable=False), +# Column("u_titel", sql.Text, nullable=True), +# Column("u_vorname", sql.Text, nullable=False), +# Column("u_nachname", sql.Text, nullable=False), +# Column("u_funktion", sql.Text, nullable=False), +# Column("u_mail", sql.Text, nullable=False), +# Column("u_telefon", sql.Text, nullable=False), +# Column("u_plz_postfach", sql.Text, nullable=True), +# Column("u_einwilligung_inhaber", sql.Boolean, nullable=True), +# Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True), +# Column("u_aktiv", sql.Boolean, nullable=False, default=1), +# ) -ext_kl_unternehmen_schema: t.PolarsSchema = { - "u_id": pl.UInt64, - "u_zeitstempel_eintrag": pl.Datetime, - "u_rechtsform": pl.String, - "u_firmenname": pl.String, - "u_strasse": pl.String, - "u_hausnummer": pl.String, - "u_adresszusatz": pl.String, - "u_plz": pl.String, - "u_ort": pl.String, - "u_postfach": pl.String, - "u_website": pl.String, - "u_anrede": pl.String, - "u_titel": pl.String, - "u_vorname": pl.String, - "u_nachname": pl.String, - "u_funktion": pl.String, - "u_mail": pl.String, - "u_telefon": pl.String, - "u_plz_postfach": pl.String, - "u_einwilligung_inhaber": pl.Boolean, - "u_einwilligung_ansprechpartner": pl.Boolean, - "u_aktiv": pl.Boolean, -} +# ext_kl_unternehmen_schema: t.PolarsSchema = { +# "u_id": pl.UInt64, +# "u_zeitstempel_eintrag": pl.Datetime, +# "u_rechtsform": pl.String, +# "u_firmenname": pl.String, +# "u_strasse": pl.String, +# "u_hausnummer": pl.String, +# "u_adresszusatz": pl.String, +# "u_plz": pl.String, +# "u_ort": pl.String, +# "u_postfach": pl.String, +# "u_website": pl.String, +# "u_anrede": pl.String, +# "u_titel": pl.String, +# "u_vorname": pl.String, +# "u_nachname": pl.String, +# "u_funktion": pl.String, +# "u_mail": pl.String, +# "u_telefon": pl.String, +# "u_plz_postfach": pl.String, +# "u_einwilligung_inhaber": pl.Boolean, +# "u_einwilligung_ansprechpartner": pl.Boolean, +# "u_aktiv": pl.Boolean, +# } -def get_ext_kontaktliste( - db_path: Path | None, -) -> pl.DataFrame: - if db_path is None: - ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None) - if ENV_PTH is None: - raise ValueError("No database path provided or found as ENV var.") - db_path = Path(ENV_PTH) +# def get_ext_kontaktliste( +# db_path: Path | None, +# ) -> pl.DataFrame: +# if db_path is None: +# ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None) +# if ENV_PTH is None: +# raise ValueError("No database path provided or found as ENV var.") +# db_path = Path(ENV_PTH) - if not db_path.exists(): - raise FileNotFoundError(f"Database not found under >{db_path}<") +# if not db_path.exists(): +# raise FileNotFoundError(f"Database not found under >{db_path}<") - engine = sql.create_engine(f"sqlite:///{db_path}") - stmt = sql.select(ext_kl_unternehmen) - return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema) +# engine = sql.create_engine(f"sqlite:///{db_path}") +# stmt = sql.select(ext_kl_unternehmen) +# return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema) -df_kontaktliste = get_ext_kontaktliste(None) +# df_kontaktliste = get_ext_kontaktliste(None) # ---------------------------------------------------- @@ -188,7 +214,7 @@ def get_ext_crm_master( return pl.read_database(stmt, engine, schema_overrides=ext_crm_master_schema) -df_crm_master = get_ext_crm_master(None) +DF_CRM_MASTER = get_ext_crm_master(constants.Config.DB_PATH_CRM) ext_crm_nutzer: sql.Table = Table( "Nutzer", @@ -287,7 +313,6 @@ def get_ext_crm_contact_person( engine = sql.create_engine(f"sqlite:///{db_path}") stmt = sql.select(ext_crm_contact_person) df = pl.read_database(stmt, engine, schema_overrides=ext_crm_contact_person_schema) - # TODO Database seems to contain entries with invalid characters like \t or \n df = df.with_columns( pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" ") ) @@ -295,22 +320,27 @@ def get_ext_crm_contact_person( return df -df_contact_person = get_ext_crm_contact_person(None) - +# df_contact_person = get_ext_crm_contact_person(None) +DF_CONTACT_PERSON = get_ext_crm_contact_person(constants.Config.DB_PATH_CRM) grunderfassung_unternehmen: sql.Table = Table( "grunderfassung_unternehmen", md_main, - Column("erfassung_id", sql.Integer, nullable=False, unique=True, autoincrement=True), + Column( + "erfassung_id", + sql.Integer, + primary_key=True, + autoincrement=True, + ), Column( "Metadaten_erstellung", - sql.DateTime(timezone=True), + UTCDateTime, nullable=True, default=lambda: datetime.datetime.now(datetime.UTC), ), Column( "Metadaten_aktualisierung", - sql.DateTime(timezone=True), + UTCDateTime, nullable=True, default=lambda: datetime.datetime.now(datetime.UTC), onupdate=lambda: datetime.datetime.now(datetime.UTC), @@ -344,6 +374,7 @@ grunderfassung_unternehmen: sql.Table = Table( Column("Stammdaten__anzahl_kinder__anzahl", sql.Text, nullable=True), Column("Stammdaten__aufenthaltsort", sql.Text, nullable=True), Column("Stammdaten__bundesland", sql.Text, nullable=True), + Column("Stammdaten__land", sql.Text, nullable=True), Column("Stammdaten__email", sql.Text, nullable=True), Column("Stammdaten__familienstand", sql.Text, nullable=True), Column("Stammdaten__festnetznummer", sql.Text, nullable=True), @@ -364,3 +395,5 @@ grunderfassung_unternehmen: sql.Table = Table( Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Date, nullable=True), Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True), ) + +md_main.create_all(ENGINE)