generated from dopt-python/py311
further prototyping, added first DB interactions
This commit is contained in:
213
src/wce_crm/db.py
Normal file
213
src/wce_crm/db.py
Normal file
@@ -0,0 +1,213 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import polars as pl
|
||||
import sqlalchemy as sql
|
||||
from sqlalchemy import Column, String, Table, TypeDecorator
|
||||
|
||||
from wce_crm import types as t
|
||||
|
||||
|
||||
class SafeDateTime(TypeDecorator):
|
||||
"""Cleans non-standard ISO strings before parsing."""
|
||||
|
||||
impl = String # We treat the underlying data as a String first
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
# 1. Remove the trailing 'ff' (or any trailing letters)
|
||||
# 2. Replace comma with dot (SQLAlchemy prefers . over ,)
|
||||
clean_value = re.sub(r"[a-zA-Z]+$", "", value).replace(",", ".")
|
||||
|
||||
try:
|
||||
return datetime.fromisoformat(clean_value)
|
||||
except ValueError:
|
||||
# Fallback if it's still weird
|
||||
return None
|
||||
|
||||
|
||||
md_kontaktliste = sql.MetaData()
|
||||
md_crm = sql.MetaData()
|
||||
|
||||
ext_kl_unternehmen: sql.Table = Table(
|
||||
"Unternehmen",
|
||||
md_kontaktliste,
|
||||
Column("u_id", sql.Integer, nullable=False, unique=True),
|
||||
Column("u_zeitstempel_eintrag", sql.DateTime, nullable=False),
|
||||
Column("u_rechtsform", sql.Text, nullable=False),
|
||||
Column("u_firmenname", sql.Text, nullable=False),
|
||||
Column("u_strasse", sql.Text, nullable=False),
|
||||
Column("u_hausnummer", sql.Text, nullable=False),
|
||||
Column("u_adresszusatz", sql.Text, nullable=True),
|
||||
Column("u_plz", sql.Text, nullable=False),
|
||||
Column("u_ort", sql.Text, nullable=False),
|
||||
Column("u_postfach", sql.Text, nullable=True),
|
||||
Column("u_website", sql.Text, nullable=True),
|
||||
Column("u_anrede", sql.Text, nullable=False),
|
||||
Column("u_titel", sql.Text, nullable=True),
|
||||
Column("u_vorname", sql.Text, nullable=False),
|
||||
Column("u_nachname", sql.Text, nullable=False),
|
||||
Column("u_funktion", sql.Text, nullable=False),
|
||||
Column("u_mail", sql.Text, nullable=False),
|
||||
Column("u_telefon", sql.Text, nullable=False),
|
||||
Column("u_plz_postfach", sql.Text, nullable=True),
|
||||
Column("u_einwilligung_inhaber", sql.Boolean, nullable=True),
|
||||
Column("u_einwilligung_ansprechpartner", sql.Boolean, nullable=True),
|
||||
Column("u_aktiv", sql.Boolean, nullable=False, default=1),
|
||||
)
|
||||
|
||||
ext_kl_unternehmen_schema: t.PolarsSchema = {
|
||||
"u_id": pl.UInt64,
|
||||
"u_zeitstempel_eintrag": pl.Datetime,
|
||||
"u_rechtsform": pl.String,
|
||||
"u_firmenname": pl.String,
|
||||
"u_strasse": pl.String,
|
||||
"u_hausnummer": pl.String,
|
||||
"u_adresszusatz": pl.String,
|
||||
"u_plz": pl.String,
|
||||
"u_ort": pl.String,
|
||||
"u_postfach": pl.String,
|
||||
"u_website": pl.String,
|
||||
"u_anrede": pl.String,
|
||||
"u_titel": pl.String,
|
||||
"u_vorname": pl.String,
|
||||
"u_nachname": pl.String,
|
||||
"u_funktion": pl.String,
|
||||
"u_mail": pl.String,
|
||||
"u_telefon": pl.String,
|
||||
"u_plz_postfach": pl.String,
|
||||
"u_einwilligung_inhaber": pl.Boolean,
|
||||
"u_einwilligung_ansprechpartner": pl.Boolean,
|
||||
"u_aktiv": pl.Boolean,
|
||||
}
|
||||
|
||||
|
||||
def get_ext_kontaktliste(
|
||||
db_path: Path | None,
|
||||
) -> pl.DataFrame:
|
||||
if db_path is None:
|
||||
ENV_PTH = os.environ.get("DOPT_DB_KONTAKTLISTE", None)
|
||||
if ENV_PTH is None:
|
||||
raise ValueError("No database path provided or found as ENV var.")
|
||||
db_path = Path(ENV_PTH)
|
||||
|
||||
if not db_path.exists():
|
||||
raise FileNotFoundError(f"Database not found under >{db_path}<")
|
||||
|
||||
engine = sql.create_engine(f"sqlite:///{db_path}")
|
||||
stmt = sql.select(ext_kl_unternehmen)
|
||||
return pl.read_database(stmt, engine, schema_overrides=ext_kl_unternehmen_schema)
|
||||
|
||||
|
||||
df_kontaktliste = get_ext_kontaktliste(None)
|
||||
|
||||
ext_crm_master: sql.Table = Table(
|
||||
"Master",
|
||||
md_crm,
|
||||
Column("ma_id", sql.Integer, nullable=False, unique=True),
|
||||
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
|
||||
Column("ma_unternehmensname", sql.Text, nullable=True),
|
||||
Column("ma_branche", sql.Text, nullable=True),
|
||||
Column("ma_strasse", sql.Text, nullable=True),
|
||||
Column("ma_hausnummer", sql.Text, nullable=True),
|
||||
Column("ma_plz", sql.Text, nullable=True),
|
||||
Column("ma_ort", sql.Text, nullable=True),
|
||||
Column("ma_plz_postfach", sql.Text, nullable=True),
|
||||
Column("ma_postfach", sql.Text, nullable=True),
|
||||
Column("ma_website", sql.Text, nullable=True),
|
||||
Column("ma_mail", sql.Text, nullable=True),
|
||||
Column("ma_telefonnummer", sql.Text, nullable=True),
|
||||
Column("ma_faxnummer", sql.Text, nullable=True),
|
||||
Column("ma_ersteintrag_datum", SafeDateTime, nullable=True),
|
||||
Column("ma_aktualisierung_datum", SafeDateTime, nullable=True),
|
||||
Column("ma_aktualisierung_nutzer", sql.Text, nullable=True),
|
||||
Column("ma_sollprozess", sql.Text, nullable=True),
|
||||
Column("ma_auslaendische_mitarbeiter", sql.Text, nullable=True),
|
||||
Column("ma_quelle_information", sql.Text, nullable=True),
|
||||
Column("ma_bemerkung", sql.Text, nullable=True),
|
||||
Column("ma_kontakt", sql.Boolean, nullable=True),
|
||||
Column("ma_schlagworte", sql.Text, nullable=True),
|
||||
Column("ma_archiviert", sql.Boolean, nullable=True, default=False),
|
||||
)
|
||||
|
||||
|
||||
ext_crm_master_schema: t.PolarsSchema = {
|
||||
"ma_id": pl.UInt64,
|
||||
"wce_id": pl.UInt64,
|
||||
"ma_unternehmensname": pl.String,
|
||||
"ma_branche": pl.String,
|
||||
"ma_strasse": pl.String,
|
||||
"ma_hausnummer": pl.String,
|
||||
"ma_plz": pl.String,
|
||||
"ma_ort": pl.String,
|
||||
"ma_plz_postfach": pl.String,
|
||||
"ma_postfach": pl.String,
|
||||
"ma_website": pl.String,
|
||||
"ma_mail": pl.String,
|
||||
"ma_telefonnummer": pl.String,
|
||||
"ma_faxnummer": pl.String,
|
||||
"ma_ersteintrag_datum": pl.Datetime,
|
||||
"ma_aktualisierung_datum": pl.Datetime,
|
||||
"ma_aktualisierung_nutzer": pl.String,
|
||||
"ma_sollprozess": pl.String,
|
||||
"ma_auslaendische_mitarbeiter": pl.String,
|
||||
"ma_quelle_information": pl.String,
|
||||
"ma_bemerkung": pl.String,
|
||||
"ma_kontakt": pl.Boolean,
|
||||
"ma_schlagworte": pl.String,
|
||||
"ma_archiviert": pl.Boolean,
|
||||
}
|
||||
|
||||
|
||||
def get_ext_crm_master(
|
||||
db_path: Path | None,
|
||||
) -> pl.DataFrame:
|
||||
if db_path is None:
|
||||
ENV_PTH = os.environ.get("DOPT_DB_CRM", None)
|
||||
if ENV_PTH is None:
|
||||
raise ValueError("No database path provided or found as ENV var.")
|
||||
db_path = Path(ENV_PTH)
|
||||
|
||||
if not db_path.exists():
|
||||
raise FileNotFoundError(f"Database not found under >{db_path}<")
|
||||
|
||||
engine = sql.create_engine(f"sqlite:///{db_path}")
|
||||
stmt = sql.select(ext_crm_master)
|
||||
return pl.read_database(stmt, engine, schema_overrides=ext_crm_master_schema)
|
||||
|
||||
|
||||
df_crm_master = get_ext_crm_master(None)
|
||||
|
||||
ext_crm_nutzer: sql.Table = Table(
|
||||
"Nutzer",
|
||||
md_crm,
|
||||
Column("wce_id", sql.Integer, nullable=False, unique=True),
|
||||
Column("wce_name", sql.Text, nullable=True),
|
||||
Column("wce_vorname", sql.Text, nullable=True),
|
||||
Column("wce_kuerzel", sql.Text, nullable=True),
|
||||
Column("wce_passwort", sql.Text, nullable=True),
|
||||
Column("wce_angelegt_am", sql.DateTime, nullable=True),
|
||||
Column("wce_rolle", sql.Text, nullable=True),
|
||||
Column("wce_angelegt_von", sql.Text, nullable=True),
|
||||
Column("wce_aktiv", sql.Boolean, nullable=True),
|
||||
Column("wce_letzter_login", sql.DateTime, nullable=True),
|
||||
)
|
||||
|
||||
ext_crm_nutzer_schema: t.PolarsSchema = {
|
||||
"wce_id": pl.UInt64,
|
||||
"wce_name": pl.String,
|
||||
"wce_vorname": pl.String,
|
||||
"wce_kuerzel": pl.String,
|
||||
"wce_passwort": pl.String,
|
||||
"wce_angelegt_am": pl.Datetime,
|
||||
"wce_rolle": pl.String,
|
||||
"wce_angelegt_von": pl.String,
|
||||
"wce_aktiv": pl.Boolean,
|
||||
"wce_letzter_login": pl.Datetime,
|
||||
}
|
||||
Reference in New Issue
Block a user