from __future__ import annotations import datetime import os import re from pathlib import Path import polars as pl import sqlalchemy as sql from sqlalchemy import Column, String, Table, TypeDecorator from wce_crm import constants from wce_crm import types as t # // declarations class SafeDateTime(TypeDecorator): """Cleans non-standard ISO strings before parsing.""" impl = String # We treat the underlying data as a String first def process_result_value(self, value, dialect): if value is None: return None # 1. Remove the trailing 'ff' (or any trailing letters) # 2. Replace comma with dot (SQLAlchemy prefers . over ,) clean_value = re.sub(r"[a-zA-Z]+$", "", value).replace(",", ".") try: return datetime.datetime.fromisoformat(clean_value) except ValueError: # Fallback if it's still weird return None class UTCDateTime(TypeDecorator): """Safely coerces naive datetimes from SQLite into timezone-aware UTC.""" impl = sql.DateTime cache_ok = True def process_bind_param(self, value, dialect): """Runs when saving to the database.""" if value is not None: # Ensure it's converted to UTC before saving if value.tzinfo is None: value = value.replace(tzinfo=datetime.timezone.utc) else: value = value.astimezone(datetime.timezone.utc) return value def process_result_value(self, value, dialect): """Runs when fetching from the database.""" if value is not None and value.tzinfo is None: # Explicitly tell Python this data *is* UTC value = value.replace(tzinfo=datetime.timezone.utc) return value MD_CRM = sql.MetaData() MD_MAIN = sql.MetaData() ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}") # ---------- OLD "Kontaktliste" ---------- # md_kontaktliste = sql.MetaData() # ext_kl_unternehmen: sql.Table = Table( # "Unternehmen", # md_kontaktliste, # Column("u_id", sql.Integer, nullable=False, unique=True), # Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False), # Column("u_rechtsform", sql.Text, nullable=False), # Column("u_firmenname", sql.Text, nullable=False), # Column("u_strasse", sql.Text, nullable=False), # Column("u_hausnummer", sql.Text, nullable=False), # Column("u_adresszusatz", sql.Text, nullable=True), # Column("u_plz", sql.Text, nullable=False), # Column("u_ort", sql.Text, nullable=False), # Column("u_postfach", sql.Text, nullable=True), # Column("u_website", sql.Text, nullable=True), # Column("u_anrede", sql.Text, nullable=False), # Column("u_titel", sql.Text, nullable=True), # Column("u_vorname", sql.Text, nullable=False), # Column("u_nachname", sql.Text, nullable=False), # Column("u_funktion", sql.Text, nullable=False), # Column("u_mail", sql.Text, nullable=False), # Column("u_telefon", sql.Text, nullable=False), # Column("u_plz_postfach", sql.Text, nullable=True), # Column("u_einwilligung_inhaber", sql.Boolean, nullable=True), # Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True), # Column("u_aktiv", sql.Boolean, nullable=False, default=1), # ) # ext_kl_unternehmen_schema: t.PolarsSchema = { # "u_id": pl.UInt64, # "u_zeitstempel_eintrag": pl.Datetime, # "u_rechtsform": pl.String, # "u_firmenname": pl.String, # "u_strasse": pl.String, # "u_hausnummer": pl.String, # "u_adresszusatz": pl.String, # "u_plz": pl.String, # "u_ort": pl.String, # "u_postfach": pl.String, # "u_website": pl.String, # "u_anrede": pl.String, # "u_titel": pl.String, # "u_vorname": pl.String, # "u_nachname": pl.String, # "u_funktion": pl.String, # "u_mail": pl.String, # "u_telefon": pl.String, # "u_plz_postfach": pl.String, # "u_einwilligung_inhaber": pl.Boolean, # "u_einwilligung_ansprechpartner": pl.Boolean, # "u_aktiv": pl.Boolean, # } # def get_ext_kontaktliste( # db_path: Path | None, # ) -> pl.DataFrame: # if db_path is None: # ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None) # if ENV_PTH is None: # raise ValueError("No database path provided or found as ENV var.") # db_path = Path(ENV_PTH) # if not db_path.exists(): # raise FileNotFoundError(f"Database not found under >{db_path}<") # engine = sql.create_engine(f"sqlite:///{db_path}") # stmt = sql.select(ext_kl_unternehmen) # return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema) # df_kontaktliste = get_ext_kontaktliste(None) # ---------------------------------------------------- ext_crm_master: sql.Table = Table( "Master", MD_CRM, Column("ma_id", sql.Integer, nullable=False, unique=True), Column("wce_id", sql.ForeignKey("Nutzer.wce_id")), Column("ma_unternehmensname", sql.Text, nullable=True), Column("ma_branche", sql.Text, nullable=True), Column("ma_strasse", sql.Text, nullable=True), Column("ma_hausnummer", sql.Text, nullable=True), Column("ma_plz", sql.Text, nullable=True), Column("ma_ort", sql.Text, nullable=True), Column("ma_plz_postfach", sql.Text, nullable=True), Column("ma_postfach", sql.Text, nullable=True), Column("ma_website", sql.Text, nullable=True), Column("ma_mail", sql.Text, nullable=True), Column("ma_telefonnummer", sql.Text, nullable=True), Column("ma_faxnummer", sql.Text, nullable=True), Column("ma_ersteintrag_datum", SafeDateTime, nullable=True), Column("ma_aktualisierung_datum", SafeDateTime, nullable=True), Column("ma_aktualisierung_nutzer", sql.Text, nullable=True), Column("ma_sollprozess", sql.Text, nullable=True), Column("ma_auslaendische_mitarbeiter", sql.Text, nullable=True), Column("ma_quelle_information", sql.Text, nullable=True), Column("ma_bemerkung", sql.Text, nullable=True), Column("ma_kontakt", sql.Boolean, nullable=True), Column("ma_schlagworte", sql.Text, nullable=True), Column("ma_archiviert", sql.Boolean, nullable=True, default=False), ) ext_crm_master_schema: t.PolarsSchema = { "ma_id": pl.UInt64, "wce_id": pl.UInt64, "ma_unternehmensname": pl.String, "ma_branche": pl.String, "ma_strasse": pl.String, "ma_hausnummer": pl.String, "ma_plz": pl.String, "ma_ort": pl.String, "ma_plz_postfach": pl.String, "ma_postfach": pl.String, "ma_website": pl.String, "ma_mail": pl.String, "ma_telefonnummer": pl.String, "ma_faxnummer": pl.String, "ma_ersteintrag_datum": pl.Datetime, "ma_aktualisierung_datum": pl.Datetime, "ma_aktualisierung_nutzer": pl.String, "ma_sollprozess": pl.String, "ma_auslaendische_mitarbeiter": pl.String, "ma_quelle_information": pl.String, "ma_bemerkung": pl.String, "ma_kontakt": pl.Boolean, "ma_schlagworte": pl.String, "ma_archiviert": pl.Boolean, } def get_ext_crm_master( db_path: Path | None, ) -> pl.DataFrame: if db_path is None: ENV_PTH = os.environ.get("DOPT_DB_CRM", None) if ENV_PTH is None: raise ValueError("No database path provided or found as ENV var.") db_path = Path(ENV_PTH) if not db_path.exists(): raise FileNotFoundError(f"Database not found under >{db_path}<") engine = sql.create_engine(f"sqlite:///{db_path}") stmt = sql.select(ext_crm_master) return pl.read_database(stmt, engine, schema_overrides=ext_crm_master_schema) DF_CRM_MASTER = get_ext_crm_master(constants.Config.DB_PATH_CRM) ext_crm_nutzer: sql.Table = Table( "Nutzer", MD_CRM, Column("wce_id", sql.Integer, nullable=False, unique=True), Column("wce_name", sql.Text, nullable=True), Column("wce_vorname", sql.Text, nullable=True), Column("wce_kuerzel", sql.Text, nullable=True), Column("wce_passwort", sql.Text, nullable=True), Column("wce_angelegt_am", sql.DateTime, nullable=True), Column("wce_rolle", sql.Text, nullable=True), Column("wce_angelegt_von", sql.Text, nullable=True), Column("wce_aktiv", sql.Boolean, nullable=True), Column("wce_letzter_login", sql.DateTime, nullable=True), ) ext_crm_nutzer_schema: t.PolarsSchema = { "wce_id": pl.UInt64, "wce_name": pl.String, "wce_vorname": pl.String, "wce_kuerzel": pl.String, "wce_passwort": pl.String, "wce_angelegt_am": pl.Datetime, "wce_rolle": pl.String, "wce_angelegt_von": pl.String, "wce_aktiv": pl.Boolean, "wce_letzter_login": pl.Datetime, } ext_crm_contact_person: sql.Table = Table( "Ansprechpartner", MD_CRM, Column("an_id", sql.Integer, nullable=False, unique=True), Column("ma_id", sql.ForeignKey("Master.ma_id")), Column("wce_id", sql.ForeignKey("Nutzer.wce_id")), Column("st_id", sql.Integer, nullable=False), Column("an_sachgebiet", sql.Text, nullable=True), Column("an_anrede", sql.Text, nullable=True), Column("an_titel", sql.Text, nullable=True), Column("an_nachname", sql.Text, nullable=True), Column("an_vorname", sql.Text, nullable=True), Column("an_position", sql.Text, nullable=True), Column("an_mail", sql.Text, nullable=True), Column("an_festnetz", sql.Text, nullable=True), Column("an_mobil", sql.Text, nullable=True), Column("an_faxnummer", sql.Text, nullable=True), Column("an_hauptansprechpartner", sql.Text, nullable=True), Column("an_anrede_anschrift", sql.Text, nullable=True), Column("an_bemerkung", sql.Text, nullable=True), Column("an_aktualisierung_datum", SafeDateTime, nullable=True), Column("an_aktualisierung_nutzer", sql.Text, nullable=True), Column("an_letztes_kontaktdatum", SafeDateTime, nullable=True), Column("an_ersteintrag_datum", SafeDateTime, nullable=True), Column("an_archiviert", sql.Boolean, nullable=True, default=0), ) ext_crm_contact_person_schema: t.PolarsSchema = { "an_id": pl.UInt64, "ma_id": pl.UInt64, "wce_id": pl.UInt64, "st_id": pl.UInt64, "an_sachgebiet": pl.String, "an_anrede": pl.String, "an_titel": pl.String, "an_nachname": pl.String, "an_vorname": pl.String, "an_position": pl.String, "an_mail": pl.String, "an_festnetz": pl.String, "an_mobil": pl.String, "an_faxnummer": pl.String, "an_hauptansprechpartner": pl.String, "an_anrede_anschrift": pl.String, "an_bemerkung": pl.String, "an_aktualisierung_datum": pl.Datetime, "an_aktualisierung_nutzer": pl.String, "an_letztes_kontaktdatum": pl.Datetime, "an_ersteintrag_datum": pl.Datetime, "an_archiviert": pl.Boolean, } def get_ext_crm_contact_person( db_path: Path | None, ) -> pl.DataFrame: if db_path is None: ENV_PTH = os.environ.get("DOPT_DB_CRM", None) if ENV_PTH is None: raise ValueError("No database path provided or found as ENV var.") db_path = Path(ENV_PTH) if not db_path.exists(): raise FileNotFoundError(f"Database not found under >{db_path}<") engine = sql.create_engine(f"sqlite:///{db_path}") stmt = sql.select(ext_crm_contact_person) df = pl.read_database(stmt, engine, schema_overrides=ext_crm_contact_person_schema) df = df.with_columns( pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" ") ) return df # df_contact_person = get_ext_crm_contact_person(None) DF_CONTACT_PERSON = get_ext_crm_contact_person(constants.Config.DB_PATH_CRM) grunderfassung: sql.Table = Table( "grunderfassung", MD_MAIN, Column( "erfassung_id", sql.Integer, primary_key=True, autoincrement=True, ), Column( "Metadaten_erstellung", UTCDateTime, nullable=True, default=lambda: datetime.datetime.now(datetime.UTC), ), Column( "Metadaten_aktualisierung", UTCDateTime, nullable=True, default=lambda: datetime.datetime.now(datetime.UTC), onupdate=lambda: datetime.datetime.now(datetime.UTC), ), Column("Metadaten_nutzer", sql.String(20), nullable=True), Column("Metadaten_wiedereintrittsdatum", sql.Date, nullable=True, default=None), Column("Arbeitserfahrung", sql.Text, nullable=True), Column("Grunderfassung_fallnummer", sql.Text, nullable=True), Column("Grunderfassung_notiz", sql.Text, nullable=True), Column("HoehereBildung", sql.Text, nullable=True), Column("Kontaktperson__KP_adresse", sql.Text, nullable=True), Column("Kontaktperson__KP_anrede_anschrift", sql.Text, nullable=True), Column("Kontaktperson__KP_email", sql.Text, nullable=True), Column("Kontaktperson__KP_festnetznummer", sql.Text, nullable=True), Column("Kontaktperson__KP_funktion_beziehung", sql.Text, nullable=True), Column("Kontaktperson__KP_mobilfunknummer", sql.Text, nullable=True), Column("Kontaktperson__KP_name", sql.Text, nullable=True), Column( "Kontaktperson__KP_name_partner", sql.Text, nullable=True ), # TODO: check if needed when set by trigger Column("Kontaktperson__KP_titel", sql.Text, nullable=True), Column("Kontaktperson__KP_vorname", sql.Text, nullable=True), Column("Partnersuche__kanal_aufmerksamkeit", sql.Text, nullable=True, default=None), Column("Partnersuche__person_suche", sql.Integer, nullable=True, default=None), Column("Partnersuche__un_suche", sql.Integer, nullable=True, default=None), Column("Projektrelevanz__relevanz", sql.Text, nullable=True), Column("Projektrelevanz__foerderperiode", sql.Text, nullable=True), Column("Schulbildung", sql.Text, nullable=True), Column("Sprachkenntnisse", sql.Text, nullable=True), Column("Stammdaten__PLZ", sql.Text, nullable=True), Column("Stammdaten__anrede_anschrift", sql.Text, nullable=True), Column("Stammdaten__anzahl_kinder__alter", sql.Text, nullable=True), Column("Stammdaten__anzahl_kinder__anzahl", sql.Integer, nullable=True), Column("Stammdaten__aufenthaltsort", sql.Text, nullable=True), Column("Stammdaten__bundesland", sql.Text, nullable=True), Column("Stammdaten__land", sql.Text, nullable=True), Column("Stammdaten__email", sql.Text, nullable=True), Column("Stammdaten__familienstand", sql.Text, nullable=True), Column("Stammdaten__festnetznummer", sql.Text, nullable=True), Column("Stammdaten__geburtsdatum", sql.Date, nullable=True), Column("Stammdaten__hausnummer", sql.Text, nullable=True), Column("Stammdaten__herkunftsland", sql.Text, nullable=True), Column("Stammdaten__mobilfunknummer", sql.Text, nullable=True), Column("Stammdaten__name", sql.Text, nullable=True), Column("Stammdaten__ort", sql.Text, nullable=True), Column("Stammdaten__rueckkehrer", sql.Boolean, nullable=True), Column("Stammdaten__staatsangehoerigkeit", sql.Text, nullable=True), Column("Stammdaten__strasse", sql.Text, nullable=True), Column("Stammdaten__titel", sql.Text, nullable=True), Column("Stammdaten__vorname", sql.Text, nullable=True), Column("WeitereInfos__WI_arbeitsstatus", sql.Text, nullable=True), Column("WeitereInfos__WI_aufenthaltstitel", sql.Text, nullable=True), Column("WeitereInfos__WI_deutsch_sprache", sql.Text, nullable=True), Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Date, nullable=True), Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True), ) # MD_MAIN.create_all(ENGINE)