prepare DB load and save

This commit is contained in:
2026-05-22 12:54:43 +02:00
parent eba9d523b1
commit f66b535ac2
3 changed files with 707 additions and 229 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -6,77 +6,190 @@ import datetime
import enum import enum
import json import json
import re import re
from collections import defaultdict
from collections.abc import Sequence from collections.abc import Sequence
from pprint import pprint from pprint import pprint
from typing import Any from typing import Annotated, Any
import babel import babel
from pydantic import BaseModel, ConfigDict, model_validator from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator
from PySide6.QtCore import QDate, Qt from PySide6.QtCore import QDate, Qt
# %% # %%
# class FlatBaseModel(BaseModel):
# @classmethod
# def _unflatten_dict(cls, flat_dict: dict) -> dict:
# """Hilfsmethode: Macht aus {'a__b': 1} wieder {'a': {'b': 1}}"""
# result = {}
# for key, value in flat_dict.items():
# if "__" in key:
# parts = key.split("__")
# aktuell = result
# for part in parts[:-1]:
# if part not in aktuell or not isinstance(aktuell[part], dict):
# aktuell[part] = {}
# aktuell = aktuell[part]
# aktuell[parts[-1]] = value
# else:
# result[key] = value
# return result
# @model_validator(mode="before")
# @classmethod
# def __unflatten_input(cls, data: Any) -> Any:
# """Eingangskontrolle: Verarbeitet flache DB/GUI-Daten zu Pydantic-Strukturen."""
# if not isinstance(data, dict):
# return data
# # 1. Haupt-Struktur wiederherstellen
# unflattened = cls._unflatten_dict(data)
# print("\n----------------------")
# pprint(unflattened)
# # 2. Schleife über die Felder, um JSON-Listen von Sub-Modellen zu entpacken
# for key, value in unflattened.items():
# print(f"{value=}")
# if isinstance(value, str) and value.startswith("[") and value.endswith("]"):
# try:
# parsed = json.loads(value)
# if isinstance(parsed, list):
# print(f"Key {key}: identified list")
# # Falls die Liste Dictionaries enthält, ent-flachen wir diese einzeln
# unflattened[key] = [
# cls._unflatten_dict(item) if isinstance(item, dict) else item
# for item in parsed
# ]
# else:
# unflattened[key] = parsed
# except json.JSONDecodeError:
# print(f"Key {key}: JSON serialize error")
# pass
# if isinstance(value, list):
# unflattened[key] = [
# cls._unflatten_dict(item) if isinstance(item, dict) else item
# for item in value
# ]
# print("\n\n##################Result:")
# pprint(unflattened)
# return unflattened
# def to_db(self) -> dict[str, Any]:
# """Ausgang für die DB: Flach, Listen sind JSON-Strings."""
# nested = super().model_dump()
# return self.__flatten_dict(nested, serialize_lists=True)
# def to_gui(self) -> dict[str, Any]:
# """Ausgang für die GUI: Flach, aber Listen bleiben Python-Listen für Widgets."""
# nested = super().model_dump()
# return self.__flatten_dict(nested, serialize_lists=False)
# @classmethod
# def __flatten_dict(
# cls, nested_dict: dict, parent_key: str = "", serialize_lists: bool = True
# ) -> dict:
# """Rekursiver Alleskönner zum Abflachen von Strukturen."""
# items = []
# for k, v in nested_dict.items():
# new_key = f"{parent_key}__{k}" if parent_key else k
# if isinstance(v, dict):
# items.extend(cls.__flatten_dict(v, new_key, serialize_lists).items())
# elif isinstance(v, list):
# processed_list = []
# for item in v:
# if isinstance(item, dict):
# # WICHTIG: Das Dict in der Liste wird isoliert abgeflacht (ohne parent_key),
# # da es ein eigenständiges Zeilen-Objekt im DynamicListWidget bleibt!
# processed_list.append(
# cls.__flatten_dict(item, serialize_lists=serialize_lists)
# )
# else:
# processed_list.append(item)
# if serialize_lists:
# items.append((new_key, json.dumps(processed_list)))
# else:
# items.append((new_key, processed_list))
# else:
# items.append((new_key, v))
# return dict(items)
class FlatBaseModel(BaseModel): class FlatBaseModel(BaseModel):
"""
Optimierte Pydantic-Basisklasse, die JSON-Strings und Doppel-Unterstriche
vollständig rekursiv (tiefenunabhängig) auflöst.
"""
@classmethod @classmethod
def _unflatten_dict(cls, flat_dict: dict) -> dict: def _recursive_parse_json(cls, data: Any) -> Any:
"""Hilfsmethode: Macht aus {'a__b': 1} wieder {'a': {'b': 1}}""" """Sucht im gesamten Objekt nach JSON-Listen-Strings und entpackt sie."""
result = {} if isinstance(data, str) and data.startswith("[") and data.endswith("]"):
for key, value in flat_dict.items(): try:
if "__" in key: parsed = json.loads(data)
parts = key.split("__") # Falls die Liste selbst wieder konvertiert werden muss (z.B. Sub-Dicts)
aktuell = result return cls._recursive_parse_json(parsed)
for part in parts[:-1]: except json.JSONDecodeError:
if part not in aktuell or not isinstance(aktuell[part], dict): return data
aktuell[part] = {} elif isinstance(data, dict):
aktuell = aktuell[part] return {k: cls._recursive_parse_json(v) for k, v in data.items()}
aktuell[parts[-1]] = value elif isinstance(data, list):
else: return [cls._recursive_parse_json(item) for item in data]
result[key] = value return data
return result
@classmethod
def _recursive_unflatten(cls, data: Any) -> Any:
"""Baut im gesamten Objekt flache '__'-Schlüssel in geschachtelte Dicts um."""
if isinstance(data, dict):
# 1. Die aktuelle Ebene dieses Dictionaries ent-flachen
unflattened_level = {}
for key, value in data.items():
if "__" in key:
parts = key.split("__")
aktuell = unflattened_level
for part in parts[:-1]:
if part not in aktuell or not isinstance(aktuell[part], dict):
aktuell[part] = {}
aktuell = aktuell[part]
aktuell[parts[-1]] = value
else:
unflattened_level[key] = value
# 2. Jetzt tiefer in die Werte gehen (rekursiv für Unter-Dicts)
return {k: cls._recursive_unflatten(v) for k, v in unflattened_level.items()}
elif isinstance(data, list):
# Auch durch Listen (z.B. deine Schulbildung) wandern und Sub-Dicts ent-flachen
return [cls._recursive_unflatten(item) for item in data]
return data
@model_validator(mode="before") @model_validator(mode="before")
@classmethod @classmethod
def __unflatten_input(cls, data: Any) -> Any: def __unflatten_input(cls, data: Any) -> Any:
"""Eingangskontrolle: Verarbeitet flache DB/GUI-Daten zu Pydantic-Strukturen.""" """Eingangskontrolle: Bereitet flache DB/GUI-Daten sauber für Pydantic vor."""
if not isinstance(data, dict): if not isinstance(data, dict):
return data return data
# 1. Haupt-Struktur wiederherstellen # Schritt 1: Alle JSON-Strings (egal wie tief) in echte Listen umwandeln
unflattened = cls._unflatten_dict(data) # Das verwandelt '[1, 2, 3]' zuverlässig in [1, 2, 3]
pprint(unflattened) json_parsed_data = cls._recursive_parse_json(data)
# 2. Schleife über die Felder, um JSON-Listen von Sub-Modellen zu entpacken # Schritt 2: Alle '__' Schlüssel (egal wie tief) in Unter-Strukturen falten
for key, value in unflattened.items(): final_nested_data = cls._recursive_unflatten(json_parsed_data)
if isinstance(value, str) and value.startswith("[") and value.endswith("]"):
try:
parsed = json.loads(value)
if isinstance(parsed, list):
# Falls die Liste Dictionaries enthält, ent-flachen wir diese einzeln
unflattened[key] = [
cls._unflatten_dict(item) if isinstance(item, dict) else item
for item in parsed
]
else:
unflattened[key] = parsed
except json.JSONDecodeError:
pass
if isinstance(value, (list)):
unflattened[key] = [
cls._unflatten_dict(item) if isinstance(item, dict) else item
for item in value
]
pprint(unflattened) return final_nested_data
return unflattened def to_db(self) -> Dict[str, Any]:
def to_db(self) -> dict[str, Any]:
"""Ausgang für die DB: Flach, Listen sind JSON-Strings.""" """Ausgang für die DB: Flach, Listen sind JSON-Strings."""
nested = super().model_dump() nested = super().model_dump()
return self.__flatten_dict(nested, serialize_lists=True) return self.__flatten_dict(nested, serialize_lists=True)
def to_gui(self) -> dict[str, Any]: def to_gui(self) -> Dict[str, Any]:
"""Ausgang für die GUI: Flach, aber Listen bleiben Python-Listen für Widgets.""" """Ausgang für die GUI: Flach, aber Listen bleiben Python-Listen."""
nested = super().model_dump() nested = super().model_dump()
return self.__flatten_dict(nested, serialize_lists=False) return self.__flatten_dict(nested, serialize_lists=False)
@@ -84,7 +197,7 @@ class FlatBaseModel(BaseModel):
def __flatten_dict( def __flatten_dict(
cls, nested_dict: dict, parent_key: str = "", serialize_lists: bool = True cls, nested_dict: dict, parent_key: str = "", serialize_lists: bool = True
) -> dict: ) -> dict:
"""Rekursiver Alleskönner zum Abflachen von Strukturen.""" """Rekursiver Helfer zum Abflachen von Strukturen."""
items = [] items = []
for k, v in nested_dict.items(): for k, v in nested_dict.items():
new_key = f"{parent_key}__{k}" if parent_key else k new_key = f"{parent_key}__{k}" if parent_key else k
@@ -95,8 +208,6 @@ class FlatBaseModel(BaseModel):
processed_list = [] processed_list = []
for item in v: for item in v:
if isinstance(item, dict): if isinstance(item, dict):
# WICHTIG: Das Dict in der Liste wird isoliert abgeflacht (ohne parent_key),
# da es ein eigenständiges Zeilen-Objekt im DynamicListWidget bleibt!
processed_list.append( processed_list.append(
cls.__flatten_dict(item, serialize_lists=serialize_lists) cls.__flatten_dict(item, serialize_lists=serialize_lists)
) )
@@ -139,6 +250,7 @@ class ProjektModell(FlatBaseModel):
class Grunderfassung_Unternehmen(FlatBaseModel): class Grunderfassung_Unternehmen(FlatBaseModel):
Schulbildung: list[Grunderfassung_Schulbildung] Schulbildung: list[Grunderfassung_Schulbildung]
Stammdaten: Grunderfassung_Stammdaten
class Grunderfassung_Schulbildung(BaseModel): class Grunderfassung_Schulbildung(BaseModel):
@@ -153,30 +265,90 @@ class Grunderfassung_Schulbildung(BaseModel):
SB_bemerkungsfeld: str | None SB_bemerkungsfeld: str | None
# %% ValidAge = Annotated[int, Field(ge=0, le=99)]
list_schulbildung = [
{
"SB_abschluss": None,
"SB_abschlussgrad": None,
"SB_abschlussjahr": None,
"SB_bemerkungsfeld": None,
"SB_land": None,
"SB_ort": None,
"SB_schule": None,
},
{
"SB_abschluss": None,
"SB_abschlussgrad": None,
"SB_abschlussjahr": None,
"SB_bemerkungsfeld": None,
"SB_land": None,
"SB_ort": None,
"SB_schule": None,
},
]
data = {"Schulbildung": list_schulbildung}
Grunderfassung_Unternehmen(**data)
class Grunderfassung_Stammdaten(BaseModel):
# Stammdaten_titel: str | None
# Stammdaten_anrede_anschrift: str
# Stammdaten_name: str
# Stammdaten_vorname: str | None
# Stammdaten_geburtsdatum: datetime.date | None
# Stammdaten_herkunftsland: str
# Stammdaten_staatsangehoerigkeit: str | None
# Stammdaten_rueckkehrer: bool | None
# Stammdaten_aufenthaltsort: str | None
# Stammdaten_strasse: str | None
# Stammdaten_hausnummer: str | None
# Stammdaten_PLZ: str | None
# Stammdaten_ort: str | None
# Stammdaten_bundesland: str | None
# Stammdaten_land: str | None
# Stammdaten_festnetznummer: str | None
# Stammdaten_mobilfunknummer: str | None
# Stammdaten_email: EmailStr | None
# Stammdaten_familienstand: str | None
anzahl_kinder: Grunderfassung_Stammdaten_AnzahlKinder
# @field_validator("Stammdaten_rueckkehrer", mode="before")
# @classmethod
# def str_to_bool(cls, value: Any) -> Any:
# if isinstance(value, str):
# value = value.strip().lower()
# if value == "ja":
# return True
# elif value == "nein":
# return False
# raise ValueError("Wert muss 'ja', 'nein', True oder False sein.")
# return value
class Grunderfassung_Stammdaten_AnzahlKinder(BaseModel):
anzahl: int | None
alter: list[ValidAge | None] | None = None
# %%
data = {
"Schulbildung": [
{
"SB_abschluss": None,
"SB_abschlussgrad": None,
"SB_abschlussjahr": None,
"SB_bemerkungsfeld": None,
"SB_land": None,
"SB_ort": None,
"SB_schule": None,
},
{
"SB_abschluss": None,
"SB_abschlussgrad": None,
"SB_abschlussjahr": None,
"SB_bemerkungsfeld": None,
"SB_land": None,
"SB_ort": None,
"SB_schule": None,
},
],
"Stammdaten__anzahl_kinder__anzahl": 3,
"Stammdaten__anzahl_kinder__alter": [1, 2, 3],
}
parsed = Grunderfassung_Unternehmen(**data)
# %%
db_entry = parsed.to_db()
pprint(db_entry)
# %%
loaded_from_db = Grunderfassung_Unternehmen(**db_entry)
# %%
loaded_from_db
# %%
loaded_from_db.to_gui()
# %% # %%
projekt = ProjektModell(**gui_rohdaten) projekt = ProjektModell(**gui_rohdaten)
@@ -285,6 +457,48 @@ pprint(unflatted)
flatted flatted
# %%
# key merger
def merge_dicts_to_lists(dict_iter):
merged = defaultdict(list)
for d in dict_iter:
for key, value in d.items():
merged[key].append(value)
return dict(merged)
def unmerge_dict_to_list(merged_dict):
if not merged_dict:
return []
# 1. Wir trennen die Keys und die dazugehörigen Wert-Listen
keys = merged_dict.keys()
value_lists = merged_dict.values()
# 2. zip(*value_lists) nimmt das erste Element aus jeder Liste, dann das zweite, etc.
# 3. zip(keys, row) verbindet die Keys wieder mit den jeweiligen Werten einer Zeile
return [dict(zip(keys, row)) for row in zip(*value_lists)]
data = [
{"Stammdaten__anzahl_kinder__alter": None},
{"Stammdaten__anzahl_kinder__alter": 2},
{"Stammdaten__anzahl_kinder__alter": 3},
{"Stammdaten__anzahl_kinder__alter": None},
]
merged = merge_dicts_to_lists(data)
pprint(merged)
merged = {"Stammdaten__anzahl_kinder__alter": [1, 2]}
unmerged = unmerge_dict_to_list(merged)
pprint(unmerged)
# %%
unmerge_dict_to_list({})
# %% # %%
string = """ string = """
Metallerzeugung & -bearbeitung; Elektro, Energie, Chemie; IT & Software; Kunststoff, Papier, Textil; Logistik, Verkehr, Transport; Handwerk, Bau, Grüne Berufe; Gesundheit & Pflege; Tourismus & Gastronomie; Handel; Bildung & Soziales; Entwicklung, Planung, Qualität; Administration, Finanzen, Verwaltung; Marketing, Design, Vertrieb; Einkauf, Lager, Wartung; Sonstige; Keine Schwerpunkte, branchenübergreifende Rekrutierung Metallerzeugung & -bearbeitung; Elektro, Energie, Chemie; IT & Software; Kunststoff, Papier, Textil; Logistik, Verkehr, Transport; Handwerk, Bau, Grüne Berufe; Gesundheit & Pflege; Tourismus & Gastronomie; Handel; Bildung & Soziales; Entwicklung, Planung, Qualität; Administration, Finanzen, Verwaltung; Marketing, Design, Vertrieb; Einkauf, Lager, Wartung; Sonstige; Keine Schwerpunkte, branchenübergreifende Rekrutierung

View File

@@ -33,6 +33,7 @@ class SafeDateTime(TypeDecorator):
md_crm = sql.MetaData() md_crm = sql.MetaData()
md_main = sql.MetaData()
# ---------- OLD "Kontaktliste" ---------- # ---------- OLD "Kontaktliste" ----------
@@ -295,3 +296,58 @@ def get_ext_crm_contact_person(
df_contact_person = get_ext_crm_contact_person(None) df_contact_person = get_ext_crm_contact_person(None)
grunderfassung_unternehmen: sql.Table = Table(
"grunderfassung_unternehmen",
md_main,
Column("erfassung_id", sql.Integer, nullable=False, unique=True, autoincrement=True),
Column("Arbeitserfahrung", sql.Text, nullable=True),
Column("Grunderfassung_fallnummer", sql.Text, nullable=True),
Column("Grunderfassung_notiz", sql.Text, nullable=True),
Column("HoehereBildung", sql.Text, nullable=True),
Column("Kontaktperson__KP_adresse", sql.Text, nullable=True),
Column("Kontaktperson__KP_anrede_anschrift", sql.Text, nullable=True),
Column("Kontaktperson__KP_email", sql.Text, nullable=True),
Column("Kontaktperson__KP_festnetznummer", sql.Text, nullable=True),
Column("Kontaktperson__KP_funktion_beziehung", sql.Text, nullable=True),
Column("Kontaktperson__KP_mobilfunknummer", sql.Text, nullable=True),
Column("Kontaktperson__KP_name", sql.Text, nullable=True),
Column("Kontaktperson__KP_name_partner", sql.Text, nullable=True),
Column("Kontaktperson__KP_titel", sql.Text, nullable=True),
Column("Kontaktperson__KP_vorname", sql.Text, nullable=True),
Column("Metadaten_aktualisierung", sql.Text, nullable=True),
Column("Metadaten_erstellung", sql.Text, nullable=True),
Column("Metadaten_nutzer", sql.String(20), nullable=True),
Column("Partnersuche__kanal_aufmerksamkeit", sql.Text, nullable=True),
Column("Partnersuche__person_suche", sql.Text, nullable=True),
Column("Partnersuche__un_suche", sql.Text, nullable=True),
Column("Projektrelevanz__relevanz", sql.Boolean, nullable=True),
Column("Schulbildung", sql.Text, nullable=True),
Column("Sprachkenntnisse", sql.Text, nullable=True),
Column("Stammdaten__PLZ", sql.Text, nullable=True),
Column("Stammdaten__anrede_anschrift", sql.Text, nullable=True),
Column("Stammdaten__anzahl_kinder__alter", sql.Text, nullable=True),
Column("Stammdaten__anzahl_kinder__anzahl", sql.Text, nullable=True),
Column("Stammdaten__aufenthaltsort", sql.Text, nullable=True),
Column("Stammdaten__bundesland", sql.Text, nullable=True),
Column("Stammdaten__email", sql.Text, nullable=True),
Column("Stammdaten__familienstand", sql.Text, nullable=True),
Column("Stammdaten__festnetznummer", sql.Text, nullable=True),
Column("Stammdaten__geburtsdatum", sql.Text, nullable=True),
Column("Stammdaten__hausnummer", sql.Text, nullable=True),
Column("Stammdaten__herkunftsland", sql.Text, nullable=True),
Column("Stammdaten__mobilfunknummer", sql.Text, nullable=True),
Column("Stammdaten__name", sql.Text, nullable=True),
Column("Stammdaten__ort", sql.Text, nullable=True),
Column("Stammdaten__rueckkehrer", sql.Boolean, nullable=True),
Column("Stammdaten__staatsangehoerigkeit", sql.Text, nullable=True),
Column("Stammdaten__strasse", sql.Text, nullable=True),
Column("Stammdaten__titel", sql.Text, nullable=True),
Column("Stammdaten__vorname", sql.Text, nullable=True),
Column("WeitereInfos__WI_arbeitsstatus", sql.Text, nullable=True),
Column("WeitereInfos__WI_aufenthaltstitel", sql.Text, nullable=True),
Column("WeitereInfos__WI_deutsch_sprache", sql.Text, nullable=True),
Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Text, nullable=True),
Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True),
)