generated from dopt-python/py311
401 lines
15 KiB
Python
401 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import datetime
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
import sqlalchemy as sql
|
|
from sqlalchemy import Column, String, Table, TypeDecorator
|
|
|
|
from wce_crm import constants
|
|
from wce_crm import types as t
|
|
|
|
|
|
class SafeDateTime(TypeDecorator):
|
|
"""Cleans non-standard ISO strings before parsing."""
|
|
|
|
impl = String # We treat the underlying data as a String first
|
|
|
|
def process_result_value(self, value, dialect):
|
|
if value is None:
|
|
return None
|
|
|
|
# 1. Remove the trailing 'ff' (or any trailing letters)
|
|
# 2. Replace comma with dot (SQLAlchemy prefers . over ,)
|
|
clean_value = re.sub(r"[a-zA-Z]+$", "", value).replace(",", ".")
|
|
|
|
try:
|
|
return datetime.datetime.fromisoformat(clean_value)
|
|
except ValueError:
|
|
# Fallback if it's still weird
|
|
return None
|
|
|
|
|
|
class UTCDateTime(TypeDecorator):
|
|
"""Safely coerces naive datetimes from SQLite into timezone-aware UTC."""
|
|
|
|
impl = sql.DateTime
|
|
cache_ok = True
|
|
|
|
def process_bind_param(self, value, dialect):
|
|
"""Runs when saving to the database."""
|
|
if value is not None:
|
|
# Ensure it's converted to UTC before saving
|
|
if value.tzinfo is None:
|
|
value = value.replace(tzinfo=datetime.timezone.utc)
|
|
else:
|
|
value = value.astimezone(datetime.timezone.utc)
|
|
return value
|
|
|
|
def process_result_value(self, value, dialect):
|
|
"""Runs when fetching from the database."""
|
|
if value is not None and value.tzinfo is None:
|
|
# Explicitly tell Python this data *is* UTC
|
|
value = value.replace(tzinfo=datetime.timezone.utc)
|
|
return value
|
|
|
|
|
|
md_crm = sql.MetaData()
|
|
md_main = sql.MetaData()
|
|
ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}")
|
|
|
|
# ---------- OLD "Kontaktliste" ----------
|
|
|
|
# md_kontaktliste = sql.MetaData()
|
|
|
|
# ext_kl_unternehmen: sql.Table = Table(
|
|
# "Unternehmen",
|
|
# md_kontaktliste,
|
|
# Column("u_id", sql.Integer, nullable=False, unique=True),
|
|
# Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False),
|
|
# Column("u_rechtsform", sql.Text, nullable=False),
|
|
# Column("u_firmenname", sql.Text, nullable=False),
|
|
# Column("u_strasse", sql.Text, nullable=False),
|
|
# Column("u_hausnummer", sql.Text, nullable=False),
|
|
# Column("u_adresszusatz", sql.Text, nullable=True),
|
|
# Column("u_plz", sql.Text, nullable=False),
|
|
# Column("u_ort", sql.Text, nullable=False),
|
|
# Column("u_postfach", sql.Text, nullable=True),
|
|
# Column("u_website", sql.Text, nullable=True),
|
|
# Column("u_anrede", sql.Text, nullable=False),
|
|
# Column("u_titel", sql.Text, nullable=True),
|
|
# Column("u_vorname", sql.Text, nullable=False),
|
|
# Column("u_nachname", sql.Text, nullable=False),
|
|
# Column("u_funktion", sql.Text, nullable=False),
|
|
# Column("u_mail", sql.Text, nullable=False),
|
|
# Column("u_telefon", sql.Text, nullable=False),
|
|
# Column("u_plz_postfach", sql.Text, nullable=True),
|
|
# Column("u_einwilligung_inhaber", sql.Boolean, nullable=True),
|
|
# Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True),
|
|
# Column("u_aktiv", sql.Boolean, nullable=False, default=1),
|
|
# )
|
|
|
|
# ext_kl_unternehmen_schema: t.PolarsSchema = {
|
|
# "u_id": pl.UInt64,
|
|
# "u_zeitstempel_eintrag": pl.Datetime,
|
|
# "u_rechtsform": pl.String,
|
|
# "u_firmenname": pl.String,
|
|
# "u_strasse": pl.String,
|
|
# "u_hausnummer": pl.String,
|
|
# "u_adresszusatz": pl.String,
|
|
# "u_plz": pl.String,
|
|
# "u_ort": pl.String,
|
|
# "u_postfach": pl.String,
|
|
# "u_website": pl.String,
|
|
# "u_anrede": pl.String,
|
|
# "u_titel": pl.String,
|
|
# "u_vorname": pl.String,
|
|
# "u_nachname": pl.String,
|
|
# "u_funktion": pl.String,
|
|
# "u_mail": pl.String,
|
|
# "u_telefon": pl.String,
|
|
# "u_plz_postfach": pl.String,
|
|
# "u_einwilligung_inhaber": pl.Boolean,
|
|
# "u_einwilligung_ansprechpartner": pl.Boolean,
|
|
# "u_aktiv": pl.Boolean,
|
|
# }
|
|
|
|
|
|
# def get_ext_kontaktliste(
|
|
# db_path: Path | None,
|
|
# ) -> pl.DataFrame:
|
|
# if db_path is None:
|
|
# ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None)
|
|
# if ENV_PTH is None:
|
|
# raise ValueError("No database path provided or found as ENV var.")
|
|
# db_path = Path(ENV_PTH)
|
|
|
|
# if not db_path.exists():
|
|
# raise FileNotFoundError(f"Database not found under >{db_path}<")
|
|
|
|
# engine = sql.create_engine(f"sqlite:///{db_path}")
|
|
# stmt = sql.select(ext_kl_unternehmen)
|
|
# return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema)
|
|
|
|
|
|
# df_kontaktliste = get_ext_kontaktliste(None)
|
|
|
|
# ----------------------------------------------------
|
|
|
|
ext_crm_master: sql.Table = Table(
|
|
"Master",
|
|
md_crm,
|
|
Column("ma_id", sql.Integer, nullable=False, unique=True),
|
|
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
|
|
Column("ma_unternehmensname", sql.Text, nullable=True),
|
|
Column("ma_branche", sql.Text, nullable=True),
|
|
Column("ma_strasse", sql.Text, nullable=True),
|
|
Column("ma_hausnummer", sql.Text, nullable=True),
|
|
Column("ma_plz", sql.Text, nullable=True),
|
|
Column("ma_ort", sql.Text, nullable=True),
|
|
Column("ma_plz_postfach", sql.Text, nullable=True),
|
|
Column("ma_postfach", sql.Text, nullable=True),
|
|
Column("ma_website", sql.Text, nullable=True),
|
|
Column("ma_mail", sql.Text, nullable=True),
|
|
Column("ma_telefonnummer", sql.Text, nullable=True),
|
|
Column("ma_faxnummer", sql.Text, nullable=True),
|
|
Column("ma_ersteintrag_datum", SafeDateTime, nullable=True),
|
|
Column("ma_aktualisierung_datum", SafeDateTime, nullable=True),
|
|
Column("ma_aktualisierung_nutzer", sql.Text, nullable=True),
|
|
Column("ma_sollprozess", sql.Text, nullable=True),
|
|
Column("ma_auslaendische_mitarbeiter", sql.Text, nullable=True),
|
|
Column("ma_quelle_information", sql.Text, nullable=True),
|
|
Column("ma_bemerkung", sql.Text, nullable=True),
|
|
Column("ma_kontakt", sql.Boolean, nullable=True),
|
|
Column("ma_schlagworte", sql.Text, nullable=True),
|
|
Column("ma_archiviert", sql.Boolean, nullable=True, default=False),
|
|
)
|
|
|
|
|
|
ext_crm_master_schema: t.PolarsSchema = {
|
|
"ma_id": pl.UInt64,
|
|
"wce_id": pl.UInt64,
|
|
"ma_unternehmensname": pl.String,
|
|
"ma_branche": pl.String,
|
|
"ma_strasse": pl.String,
|
|
"ma_hausnummer": pl.String,
|
|
"ma_plz": pl.String,
|
|
"ma_ort": pl.String,
|
|
"ma_plz_postfach": pl.String,
|
|
"ma_postfach": pl.String,
|
|
"ma_website": pl.String,
|
|
"ma_mail": pl.String,
|
|
"ma_telefonnummer": pl.String,
|
|
"ma_faxnummer": pl.String,
|
|
"ma_ersteintrag_datum": pl.Datetime,
|
|
"ma_aktualisierung_datum": pl.Datetime,
|
|
"ma_aktualisierung_nutzer": pl.String,
|
|
"ma_sollprozess": pl.String,
|
|
"ma_auslaendische_mitarbeiter": pl.String,
|
|
"ma_quelle_information": pl.String,
|
|
"ma_bemerkung": pl.String,
|
|
"ma_kontakt": pl.Boolean,
|
|
"ma_schlagworte": pl.String,
|
|
"ma_archiviert": pl.Boolean,
|
|
}
|
|
|
|
|
|
def get_ext_crm_master(
|
|
db_path: Path | None,
|
|
) -> pl.DataFrame:
|
|
if db_path is None:
|
|
ENV_PTH = os.environ.get("DOPT_DB_CRM", None)
|
|
if ENV_PTH is None:
|
|
raise ValueError("No database path provided or found as ENV var.")
|
|
db_path = Path(ENV_PTH)
|
|
|
|
if not db_path.exists():
|
|
raise FileNotFoundError(f"Database not found under >{db_path}<")
|
|
|
|
engine = sql.create_engine(f"sqlite:///{db_path}")
|
|
stmt = sql.select(ext_crm_master)
|
|
return pl.read_database(stmt, engine, schema_overrides=ext_crm_master_schema)
|
|
|
|
|
|
DF_CRM_MASTER = get_ext_crm_master(constants.Config.DB_PATH_CRM)
|
|
|
|
ext_crm_nutzer: sql.Table = Table(
|
|
"Nutzer",
|
|
md_crm,
|
|
Column("wce_id", sql.Integer, nullable=False, unique=True),
|
|
Column("wce_name", sql.Text, nullable=True),
|
|
Column("wce_vorname", sql.Text, nullable=True),
|
|
Column("wce_kuerzel", sql.Text, nullable=True),
|
|
Column("wce_passwort", sql.Text, nullable=True),
|
|
Column("wce_angelegt_am", sql.DateTime, nullable=True),
|
|
Column("wce_rolle", sql.Text, nullable=True),
|
|
Column("wce_angelegt_von", sql.Text, nullable=True),
|
|
Column("wce_aktiv", sql.Boolean, nullable=True),
|
|
Column("wce_letzter_login", sql.DateTime, nullable=True),
|
|
)
|
|
|
|
ext_crm_nutzer_schema: t.PolarsSchema = {
|
|
"wce_id": pl.UInt64,
|
|
"wce_name": pl.String,
|
|
"wce_vorname": pl.String,
|
|
"wce_kuerzel": pl.String,
|
|
"wce_passwort": pl.String,
|
|
"wce_angelegt_am": pl.Datetime,
|
|
"wce_rolle": pl.String,
|
|
"wce_angelegt_von": pl.String,
|
|
"wce_aktiv": pl.Boolean,
|
|
"wce_letzter_login": pl.Datetime,
|
|
}
|
|
|
|
ext_crm_contact_person: sql.Table = Table(
|
|
"Ansprechpartner",
|
|
md_crm,
|
|
Column("an_id", sql.Integer, nullable=False, unique=True),
|
|
Column("ma_id", sql.ForeignKey("Master.ma_id")),
|
|
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
|
|
Column("st_id", sql.Integer, nullable=False),
|
|
Column("an_sachgebiet", sql.Text, nullable=True),
|
|
Column("an_anrede", sql.Text, nullable=True),
|
|
Column("an_titel", sql.Text, nullable=True),
|
|
Column("an_nachname", sql.Text, nullable=True),
|
|
Column("an_vorname", sql.Text, nullable=True),
|
|
Column("an_position", sql.Text, nullable=True),
|
|
Column("an_mail", sql.Text, nullable=True),
|
|
Column("an_festnetz", sql.Text, nullable=True),
|
|
Column("an_mobil", sql.Text, nullable=True),
|
|
Column("an_faxnummer", sql.Text, nullable=True),
|
|
Column("an_hauptansprechpartner", sql.Text, nullable=True),
|
|
Column("an_anrede_anschrift", sql.Text, nullable=True),
|
|
Column("an_bemerkung", sql.Text, nullable=True),
|
|
Column("an_aktualisierung_datum", SafeDateTime, nullable=True),
|
|
Column("an_aktualisierung_nutzer", sql.Text, nullable=True),
|
|
Column("an_letztes_kontaktdatum", SafeDateTime, nullable=True),
|
|
Column("an_ersteintrag_datum", SafeDateTime, nullable=True),
|
|
Column("an_archiviert", sql.Boolean, nullable=True, default=0),
|
|
)
|
|
|
|
|
|
ext_crm_contact_person_schema: t.PolarsSchema = {
|
|
"an_id": pl.UInt64,
|
|
"ma_id": pl.UInt64,
|
|
"wce_id": pl.UInt64,
|
|
"st_id": pl.UInt64,
|
|
"an_sachgebiet": pl.String,
|
|
"an_anrede": pl.String,
|
|
"an_titel": pl.String,
|
|
"an_nachname": pl.String,
|
|
"an_vorname": pl.String,
|
|
"an_position": pl.String,
|
|
"an_mail": pl.String,
|
|
"an_festnetz": pl.String,
|
|
"an_mobil": pl.String,
|
|
"an_faxnummer": pl.String,
|
|
"an_hauptansprechpartner": pl.String,
|
|
"an_anrede_anschrift": pl.String,
|
|
"an_bemerkung": pl.String,
|
|
"an_aktualisierung_datum": pl.Datetime,
|
|
"an_aktualisierung_nutzer": pl.String,
|
|
"an_letztes_kontaktdatum": pl.Datetime,
|
|
"an_ersteintrag_datum": pl.Datetime,
|
|
"an_archiviert": pl.Boolean,
|
|
}
|
|
|
|
|
|
def get_ext_crm_contact_person(
|
|
db_path: Path | None,
|
|
) -> pl.DataFrame:
|
|
if db_path is None:
|
|
ENV_PTH = os.environ.get("DOPT_DB_CRM", None)
|
|
if ENV_PTH is None:
|
|
raise ValueError("No database path provided or found as ENV var.")
|
|
db_path = Path(ENV_PTH)
|
|
|
|
if not db_path.exists():
|
|
raise FileNotFoundError(f"Database not found under >{db_path}<")
|
|
|
|
engine = sql.create_engine(f"sqlite:///{db_path}")
|
|
stmt = sql.select(ext_crm_contact_person)
|
|
df = pl.read_database(stmt, engine, schema_overrides=ext_crm_contact_person_schema)
|
|
df = df.with_columns(
|
|
pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" ")
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
# df_contact_person = get_ext_crm_contact_person(None)
|
|
DF_CONTACT_PERSON = get_ext_crm_contact_person(constants.Config.DB_PATH_CRM)
|
|
|
|
grunderfassung_unternehmen: sql.Table = Table(
|
|
"grunderfassung_unternehmen",
|
|
md_main,
|
|
Column(
|
|
"erfassung_id",
|
|
sql.Integer,
|
|
primary_key=True,
|
|
autoincrement=True,
|
|
),
|
|
Column(
|
|
"Metadaten_erstellung",
|
|
UTCDateTime,
|
|
nullable=True,
|
|
default=lambda: datetime.datetime.now(datetime.UTC),
|
|
),
|
|
Column(
|
|
"Metadaten_aktualisierung",
|
|
UTCDateTime,
|
|
nullable=True,
|
|
default=lambda: datetime.datetime.now(datetime.UTC),
|
|
onupdate=lambda: datetime.datetime.now(datetime.UTC),
|
|
),
|
|
Column("Metadaten_nutzer", sql.String(20), nullable=True),
|
|
Column("Arbeitserfahrung", sql.Text, nullable=True),
|
|
Column("Grunderfassung_fallnummer", sql.Text, nullable=True),
|
|
Column("Grunderfassung_notiz", sql.Text, nullable=True),
|
|
Column("HoehereBildung", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_adresse", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_anrede_anschrift", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_email", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_festnetznummer", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_funktion_beziehung", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_mobilfunknummer", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_name", sql.Text, nullable=True),
|
|
Column(
|
|
"Kontaktperson__KP_name_partner", sql.Text, nullable=True
|
|
), # TODO: check if needed when set by trigger
|
|
Column("Kontaktperson__KP_titel", sql.Text, nullable=True),
|
|
Column("Kontaktperson__KP_vorname", sql.Text, nullable=True),
|
|
Column("Partnersuche__kanal_aufmerksamkeit", sql.Text, nullable=True),
|
|
Column("Partnersuche__person_suche", sql.Integer, nullable=True),
|
|
Column("Partnersuche__un_suche", sql.Integer, nullable=True),
|
|
Column("Projektrelevanz__relevanz", sql.Text, nullable=True),
|
|
Column("Projektrelevanz__foerderperiode", sql.Text, nullable=True),
|
|
Column("Schulbildung", sql.Text, nullable=True),
|
|
Column("Sprachkenntnisse", sql.Text, nullable=True),
|
|
Column("Stammdaten__PLZ", sql.Text, nullable=True),
|
|
Column("Stammdaten__anrede_anschrift", sql.Text, nullable=True),
|
|
Column("Stammdaten__anzahl_kinder__alter", sql.Text, nullable=True),
|
|
Column("Stammdaten__anzahl_kinder__anzahl", sql.Integer, nullable=True),
|
|
Column("Stammdaten__aufenthaltsort", sql.Text, nullable=True),
|
|
Column("Stammdaten__bundesland", sql.Text, nullable=True),
|
|
Column("Stammdaten__land", sql.Text, nullable=True),
|
|
Column("Stammdaten__email", sql.Text, nullable=True),
|
|
Column("Stammdaten__familienstand", sql.Text, nullable=True),
|
|
Column("Stammdaten__festnetznummer", sql.Text, nullable=True),
|
|
Column("Stammdaten__geburtsdatum", sql.Date, nullable=True),
|
|
Column("Stammdaten__hausnummer", sql.Text, nullable=True),
|
|
Column("Stammdaten__herkunftsland", sql.Text, nullable=True),
|
|
Column("Stammdaten__mobilfunknummer", sql.Text, nullable=True),
|
|
Column("Stammdaten__name", sql.Text, nullable=True),
|
|
Column("Stammdaten__ort", sql.Text, nullable=True),
|
|
Column("Stammdaten__rueckkehrer", sql.Boolean, nullable=True),
|
|
Column("Stammdaten__staatsangehoerigkeit", sql.Text, nullable=True),
|
|
Column("Stammdaten__strasse", sql.Text, nullable=True),
|
|
Column("Stammdaten__titel", sql.Text, nullable=True),
|
|
Column("Stammdaten__vorname", sql.Text, nullable=True),
|
|
Column("WeitereInfos__WI_arbeitsstatus", sql.Text, nullable=True),
|
|
Column("WeitereInfos__WI_aufenthaltstitel", sql.Text, nullable=True),
|
|
Column("WeitereInfos__WI_deutsch_sprache", sql.Text, nullable=True),
|
|
Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Date, nullable=True),
|
|
Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True),
|
|
)
|
|
|
|
md_main.create_all(ENGINE)
|