prepare better DB interaction with reload

This commit is contained in:
Florian Förster 2026-05-06 15:35:49 +02:00
parent 3dbc9ecfcb
commit c3462d3d3a
4 changed files with 365 additions and 76 deletions

View File

@ -72,6 +72,17 @@ df.row(0, named=True)
db.df_crm_master.estimated_size("mb")
# %%
# // CRM Ansprechpartner
df = db.df_contact_person.filter(pl.col.ma_id == 410)
df = df.with_columns(pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" "))
df
# %%
tuple(zip(df["ma_id"], df["an_id"]))
# %%
db.df_crm_master
# %%
# // CRM Nutzer
stmt = sql.select(db.ext_crm_nutzer).limit(20)

View File

@ -7,8 +7,8 @@ import time
import uuid
from collections.abc import Sequence
from PySide6.QtCore import QDate, Qt, QTimer, Signal # Signal ist wichtig!
from PySide6.QtGui import QAction
from PySide6.QtCore import QDate, QModelIndex, QStringListModel, Qt, QTimer, Signal
from PySide6.QtGui import QAction, QStandardItem, QStandardItemModel
from PySide6.QtWidgets import (
QApplication,
QComboBox,
@ -152,7 +152,9 @@ ADDRESSES = [
]
class AddressForm_Search(QWidget):
class CompanyForm_Search(QWidget):
company_selected = Signal(int)
def __init__(self):
super().__init__()
@ -161,24 +163,18 @@ class AddressForm_Search(QWidget):
form_layout = QFormLayout()
form_layout.setSpacing(10)
# title
title = QLabel("--- Suche Unternehmen ---")
title.setStyleSheet("font-size: 14px; font-style: italic;") # font-weight: bold;
main_layout.addWidget(title)
self.search_input = QLineEdit(placeholderText="Tippen zum Suchen...")
form_layout.addRow("Suche:", self.search_input)
# search_data = [addr.name for addr in ADDRESSES]
# self.SEARCH_MAP = {addr.name: addr for addr in ADDRESSES}
# TODO Qt supports the addition of custom data like addItem + Qt.UserRole
# TODO or setProperty
self.SEARCH_MAP = be_init_rec.comp_search_choice_mapping()
self.search_data = tuple(self.SEARCH_MAP.keys())
self.completer = QCompleter(self.search_data)
self.completer = QCompleter()
self.completer.setCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.completer.setFilterMode(Qt.MatchFlag.MatchContains)
self.search_input.setCompleter(self.completer)
self.completer.activated.connect(self.search_result_selected)
self.completer.activated[QModelIndex].connect(self.search_result_selected) # type: ignore
# --- 1. Einfaches Feld: Firmenname ---
self.company_input = QLineEdit(placeholderText="Name des Partners")
@ -190,8 +186,6 @@ class AddressForm_Search(QWidget):
self.street_input = QLineEdit(placeholderText="Straße")
self.number_input = QLineEdit(placeholderText="Nr.")
# Optik-Trick: Hausnummern-Feld begrenzen, damit es nicht so breit wie die Straße wird
self.number_input.setMaximumWidth(80)
# Mit "stretch" definieren wir das Breitenverhältnis (Straße nimmt restlichen Platz)
@ -207,9 +201,7 @@ class AddressForm_Search(QWidget):
self.zip_input = QLineEdit(placeholderText="PLZ")
self.city_input = QLineEdit(placeholderText="Ort")
self.zip_input.setMaximumWidth(100) # PLZ ist immer relativ kurz
city_layout.addWidget(self.zip_input, stretch=1)
city_layout.addWidget(self.city_input, stretch=3)
@ -240,66 +232,166 @@ class AddressForm_Search(QWidget):
}
""")
def fill_out(self, comp_info: be_init_rec.CompanyInfo):
# addr_ = address.export()
self.update_search_data()
# for field, value in zip(self.autofilled_fields, addr_.values()):
# field.setText(value)
def fill_out(self, comp_info: be_init_rec.CompanyInfo):
self.company_input.setText(comp_info["ma_unternehmensname"])
self.street_input.setText(comp_info["ma_strasse"])
self.number_input.setText(comp_info["ma_hausnummer"])
self.zip_input.setText(comp_info["ma_plz"])
self.city_input.setText(comp_info["ma_ort"])
def search_result_selected(self, name):
def clear_fields(self) -> None:
self.search_input.clear()
for field in self.autofilled_fields:
field.clear()
def update_search_data(self) -> None:
self.clear_fields()
search_items = QStandardItemModel()
search_choices = be_init_rec.comp_search_choices()
for item, db_index in search_choices:
qitem = QStandardItem(item)
qitem.setData(db_index, Qt.ItemDataRole.UserRole)
search_items.appendRow(qitem)
self.completer.setModel(search_items)
def search_result_selected(self, index: QModelIndex):
ma_id = index.data(Qt.ItemDataRole.UserRole)
comp_info = be_init_rec.comp_search_get_info(
ma_id=self.SEARCH_MAP[name],
ma_id=ma_id,
)
self.fill_out(comp_info)
self.company_selected.emit(ma_id)
class ContactPersonForm_Search(QWidget):
def __init__(self):
super().__init__()
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
form_layout = QFormLayout()
form_layout.setSpacing(10)
title = QLabel("--- Suche Nutzer ---")
title.setStyleSheet("font-size: 14px; font-style: italic;") # font-weight: bold;
main_layout.addWidget(title)
self.search_input = QComboBox(placeholderText="Tippen zum Suchen...")
self.search_input.setEditable(True)
self.search_input.setInsertPolicy(QComboBox.InsertPolicy.NoInsert)
line_edit = self.search_input.lineEdit()
assert line_edit
line_edit.setPlaceholderText("Suchen...")
form_layout.addRow("Suche Ansprechpartner:", self.search_input)
# self.completer = QCompleter()
self.completer = self.search_input.completer()
assert self.completer
self.completer.setCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.completer.setFilterMode(Qt.MatchFlag.MatchContains)
self.completer.setCompletionMode(QCompleter.CompletionMode.PopupCompletion)
self.search_input.activated.connect(self.search_result_selected)
self.gui_titel = QLineEdit()
self.gui_anrede = QLineEdit()
hor_layout = QHBoxLayout()
hor_layout.addWidget(self.gui_anrede, stretch=1)
hor_layout.addWidget(self.gui_titel, stretch=1)
form_layout.addRow("Anrede / Titel:", hor_layout)
# form_layout.addRow("Titel:", self.gui_titel)
# form_layout.addRow("Anrede", self.gui_anrede)
self.gui_nachname = QLineEdit()
# form_layout.addRow("Nachname", self.gui_nachname)
self.gui_vorname = QLineEdit()
# form_layout.addRow("Vorname", self.gui_vorname)
hor_layout = QHBoxLayout()
hor_layout.addWidget(self.gui_nachname, stretch=1)
hor_layout.addWidget(self.gui_vorname, stretch=1)
form_layout.addRow("Nachname / Vorname:", hor_layout)
phone_layout = QHBoxLayout()
phone_layout.setContentsMargins(0, 0, 0, 0)
phone_layout.setSpacing(10)
self.gui_landline_number = QLineEdit()
self.gui_mobile_number = QLineEdit()
# Mit "stretch" definieren wir das Breitenverhältnis
phone_layout.addWidget(self.gui_landline_number, stretch=1)
phone_layout.addWidget(self.gui_mobile_number, stretch=1)
form_layout.addRow("Telefon Festnetz / Mobil:", phone_layout)
self.gui_email = QLineEdit()
form_layout.addRow("E-Mail:", self.gui_email)
self.gui_funktion = QLineEdit()
form_layout.addRow("Funktion", self.gui_funktion)
main_layout.addLayout(form_layout)
self.autofilled_fields = (
self.gui_titel,
self.gui_anrede,
self.gui_nachname,
self.gui_vorname,
self.gui_landline_number,
self.gui_mobile_number,
self.gui_email,
self.gui_funktion,
)
for field in self.autofilled_fields:
field.setReadOnly(True)
field.setStyleSheet("""
QLineEdit {
background-color: #f1f5f9; /* Helles System-Grau */
color: #333D4B; /* Etwas blassere Schrift */
border: 1px dashed #cbd5e1; /* Ein gestrichelter Rand wirkt oft wie ein "Stempel" */
border-radius: 4px;
padding: 5px;
}
/* Wenn das Feld fokussiert wird, keinen blauen Rand anzeigen */
QLineEdit:focus {
border: 1px dashed #cbd5e1;
}
""")
self.update_search_data(None)
def fill_out(self, info: be_init_rec.ContactPersonInfo):
self.gui_titel.setText(info["an_titel"])
self.gui_anrede.setText(info["an_anrede"])
self.gui_nachname.setText(info["an_nachname"])
self.gui_vorname.setText(info["an_vorname"])
self.gui_landline_number.setText(info["an_festnetz"])
self.gui_mobile_number.setText(info["an_mobil"])
self.gui_email.setText(info["an_mail"])
self.gui_funktion.setText(info["an_position"])
def search_result_selected(
self,
index: int,
):
an_id = self.search_input.itemData(index)
comp_info = be_init_rec.contact_person_search_get_info(
an_id=an_id,
)
self.fill_out(comp_info)
def clear_fields(self) -> None:
self.search_input.clear()
for field in self.autofilled_fields:
field.clear()
class DropdownSearch(QWidget):
def __init__(self):
super().__init__()
layout = QVBoxLayout(self)
# 1. Das normale Eingabefeld
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Tippe zum Suchen (z.B. 'Pro')...")
self.search_input.setMinimumWidth(300)
# 2. Deine Datenbank / Liste an Suchbegriffen
search_data = [
"Projekt Alpha",
"Projekt Beta",
"Personalakte Müller",
"Protokoll April 2026",
"Abrechnung",
]
# 3. Den Completer erstellen und mit Daten füttern
self.completer = QCompleter(search_data)
# --- WICHTIGE EINSTELLUNGEN ---
# Ignoriert Groß-/Kleinschreibung (sehr wichtig für eine gute Suche!)
self.completer.setCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
# 'MatchContains' sorgt dafür, dass "pha" auch "Projekt Alpha" findet.
# Standard ist 'MatchStartsWith' (findet nur Worte am Anfang).
self.completer.setFilterMode(Qt.MatchFlag.MatchContains)
# 4. Den Completer an das Eingabefeld binden
self.search_input.setCompleter(self.completer)
layout.addWidget(self.search_input)
# Optional: Aktion auslösen, wenn ein Element im Dropdown angeklickt wird
self.completer.activated.connect(self.on_item_selected)
def on_item_selected(self, text):
print(f"Nutzer hat '{text}' aus dem Dropdown ausgewählt!")
# Hier könntest du z.B. deine Detail-Seite für dieses Projekt öffnen
def update_search_data(
self,
company_id: int | None,
) -> None:
self.clear_fields()
search_choices = be_init_rec.contact_person_search_choices(company_id, True)
for item, db_index in search_choices:
self.search_input.addItem(item, db_index)
class FormFieldType(enum.StrEnum):
@ -511,6 +603,7 @@ FORM_FIELD_GROUPS = [
FormFieldType.DROPDOWN,
required=True,
options=["ja", "nein"],
fill_value="nein",
),
],
),
@ -1153,6 +1246,45 @@ class SearchFormPage(QWidget):
container_layout.addLayout(inf_block_2)
container_layout.addSpacing(10)
# --- SUCHE MIT NAMEN ---
self.company_search = CompanyForm_Search()
container_layout.addWidget(self.company_search)
self.contact_person_search = ContactPersonForm_Search()
container_layout.addWidget(self.contact_person_search)
hor_layout = QHBoxLayout()
label = QLabel("Wie sind Sie auf uns aufmerksam geworden?")
combo = QComboBox()
combo.addItems(
[
"--- Bitte wählen ---",
"Agentur für Arbeit",
"Ausländerbehörde",
"Jobcenter",
"Freunde/Familie",
"Anerkennungsstelle",
"Beratungsstelle",
"Internet",
"Arbeitgeber",
"Bildungsdienstleister",
"Welcome-Mappe",
"Newsletter WFE",
"Newsletter RM",
"Sonstiges",
]
)
combo.setPlaceholderText("Bitte auswählen")
combo.model().item(0).setEnabled(False)
hor_layout.addWidget(label)
hor_layout.addWidget(combo, stretch=1)
container_layout.addLayout(hor_layout)
container_layout.addWidget(
QLabel('Platzhalter "Wie sind Sie auf uns aufmerksam geworden?"')
)
self.company_search.company_selected.connect(self.update_contact_persons)
container_layout.addSpacing(10)
# --- Test Formularlayout ---
container_layout.addSpacing(30)
title = QLabel("--- Automatische Form ---")
@ -1163,15 +1295,6 @@ class SearchFormPage(QWidget):
container_layout.addSpacing(30)
# --- SUCHE MIT NAMEN ---
# addr = Address("Test UG", "Teststraße", 202, "09111", "Chemnitz")
# addr_widget = AddressForm()
# addr_widget.fill_out(addr)
addr_widget = AddressForm_Search()
container_layout.addWidget(addr_widget)
container_layout.addSpacing(30)
# container_layout.addWidget(DropdownSearch())
container_layout.addSpacing(30)
@ -1213,6 +1336,12 @@ class SearchFormPage(QWidget):
]
self.perform_search() # Initiale Ansicht laden
def update_contact_persons(
self,
company_id: int,
) -> None:
self.contact_person_search.update_search_data(company_id)
def perform_search(self):
# 1. Eingaben auslesen
query = self.search_input.text().lower()

View File

@ -34,6 +34,31 @@ class CompanyInfo(TypedDict):
ma_archiviert: str
class ContactPersonInfo(TypedDict):
an_id: str
ma_id: str
wce_id: str
st_id: str
an_sachgebiet: str
an_anrede: str
an_titel: str
an_nachname: str
an_vorname: str
an_position: str
an_mail: str
an_festnetz: str
an_mobil: str
an_faxnummer: str
an_hauptansprechpartner: str
an_anrede_anschrift: str
an_bemerkung: str
an_aktualisierung_datum: str
an_aktualisierung_nutzer: str
an_letztes_kontaktdatum: str
an_ersteintrag_datum: str
an_archiviert: str
def _transform_for_gui_output(
data: pl.DataFrame,
) -> pl.DataFrame:
@ -53,18 +78,19 @@ def _transform_for_gui_output(
return q.collect()
def comp_search_choice_mapping() -> dict[str, int]:
def comp_search_choices() -> tuple[tuple[str, int], ...]:
# TODO no reload functionality
q = db.df_crm_master.lazy()
counter = pl.int_range(0, pl.len()).over(pl.col.ma_unternehmensname)
q = q.with_columns(
ma_unternehmensname_dedupl=pl.when(counter == 0)
dedupl=pl.when(counter == 0)
.then(pl.col.ma_unternehmensname)
.otherwise(pl.format("{} ({})", pl.col.ma_unternehmensname, counter))
)
df = q.collect()
return dict(zip(df["ma_unternehmensname_dedupl"], df["ma_id"]))
# return dict(zip(df["dedupl"], df["ma_id"]))
return tuple(zip(df["dedupl"], df["ma_id"]))
def comp_search_get_info(
@ -76,3 +102,42 @@ def comp_search_get_info(
df = _transform_for_gui_output(df)
return cast(CompanyInfo, df.row(0, named=True))
def contact_person_search_choices(
ma_id: int | None,
use_both_names: bool,
) -> tuple[tuple[str, int], ...]:
# TODO no reload functionality
q = db.df_contact_person.lazy()
if ma_id is not None:
q = q.filter(pl.col.ma_id == ma_id)
dedupl_col = pl.col.an_nachname
if use_both_names:
q = q.with_columns(
name_search=(pl.format("{}, {}", pl.col.an_nachname, pl.col.an_vorname))
)
dedupl_col = pl.col.name_search
counter = pl.int_range(0, pl.len()).over(dedupl_col)
q = q.with_columns(
dedupl=pl.when(counter == 0)
.then(dedupl_col)
.otherwise(pl.format("{} ({})", dedupl_col, counter))
)
df = q.collect()
# return dict(zip(df["dedupl"], df["an_id"]))
return tuple(zip(df["dedupl"], df["an_id"]))
def contact_person_search_get_info(
an_id: int,
) -> ContactPersonInfo:
df = db.df_contact_person.filter(pl.col.an_id == an_id)
if df.height > 1 or df.height == 0:
raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}")
df = _transform_for_gui_output(df)
return cast(ContactPersonInfo, df.row(0, named=True))

View File

@ -32,9 +32,12 @@ class SafeDateTime(TypeDecorator):
return None
md_kontaktliste = sql.MetaData()
md_crm = sql.MetaData()
# ---------- OLD "Kontaktliste" ----------
md_kontaktliste = sql.MetaData()
ext_kl_unternehmen: sql.Table = Table(
"Unternehmen",
md_kontaktliste,
@ -107,6 +110,8 @@ def get_ext_kontaktliste(
df_kontaktliste = get_ext_kontaktliste(None)
# ----------------------------------------------------
ext_crm_master: sql.Table = Table(
"Master",
md_crm,
@ -211,3 +216,82 @@ ext_crm_nutzer_schema: t.PolarsSchema = {
"wce_aktiv": pl.Boolean,
"wce_letzter_login": pl.Datetime,
}
ext_crm_contact_person: sql.Table = Table(
"Ansprechpartner",
md_crm,
Column("an_id", sql.Integer, nullable=False, unique=True),
Column("ma_id", sql.ForeignKey("Master.ma_id")),
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
Column("st_id", sql.Integer, nullable=False),
Column("an_sachgebiet", sql.Text, nullable=True),
Column("an_anrede", sql.Text, nullable=True),
Column("an_titel", sql.Text, nullable=True),
Column("an_nachname", sql.Text, nullable=True),
Column("an_vorname", sql.Text, nullable=True),
Column("an_position", sql.Text, nullable=True),
Column("an_mail", sql.Text, nullable=True),
Column("an_festnetz", sql.Text, nullable=True),
Column("an_mobil", sql.Text, nullable=True),
Column("an_faxnummer", sql.Text, nullable=True),
Column("an_hauptansprechpartner", sql.Text, nullable=True),
Column("an_anrede_anschrift", sql.Text, nullable=True),
Column("an_bemerkung", sql.Text, nullable=True),
Column("an_aktualisierung_datum", SafeDateTime, nullable=True),
Column("an_aktualisierung_nutzer", sql.Text, nullable=True),
Column("an_letztes_kontaktdatum", SafeDateTime, nullable=True),
Column("an_ersteintrag_datum", SafeDateTime, nullable=True),
Column("an_archiviert", sql.Boolean, nullable=True, default=0),
)
ext_crm_contact_person_schema: t.PolarsSchema = {
"an_id": pl.UInt64,
"ma_id": pl.UInt64,
"wce_id": pl.UInt64,
"st_id": pl.UInt64,
"an_sachgebiet": pl.String,
"an_anrede": pl.String,
"an_titel": pl.String,
"an_nachname": pl.String,
"an_vorname": pl.String,
"an_position": pl.String,
"an_mail": pl.String,
"an_festnetz": pl.String,
"an_mobil": pl.String,
"an_faxnummer": pl.String,
"an_hauptansprechpartner": pl.String,
"an_anrede_anschrift": pl.String,
"an_bemerkung": pl.String,
"an_aktualisierung_datum": pl.Datetime,
"an_aktualisierung_nutzer": pl.String,
"an_letztes_kontaktdatum": pl.Datetime,
"an_ersteintrag_datum": pl.Datetime,
"an_archiviert": pl.Boolean,
}
def get_ext_crm_contact_person(
db_path: Path | None,
) -> pl.DataFrame:
if db_path is None:
ENV_PTH = os.environ.get("DOPT_DB_CRM", None)
if ENV_PTH is None:
raise ValueError("No database path provided or found as ENV var.")
db_path = Path(ENV_PTH)
if not db_path.exists():
raise FileNotFoundError(f"Database not found under >{db_path}<")
engine = sql.create_engine(f"sqlite:///{db_path}")
stmt = sql.select(ext_crm_contact_person)
df = pl.read_database(stmt, engine, schema_overrides=ext_crm_contact_person_schema)
# TODO Database seems to contain entries with invalid characters like \t or \n
df = df.with_columns(
pl.col(pl.String).str.replace_all(r"[\r\t\n]", " ").str.strip_chars(" ")
)
return df
df_contact_person = get_ext_crm_contact_person(None)