# %% from __future__ import annotations import dataclasses as dc import datetime import enum import json import re from collections import defaultdict from collections.abc import Sequence from pprint import pprint from typing import Annotated, Any import babel import polars as pl import sqlalchemy as sql from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator from PySide6.QtCore import QDate, Qt from wce_crm import constants, db from wce_crm.backend import backend # %% int("test") # %% db_path = constants.Config.DB_PATH_MAIN crm_path = constants.Config.DB_PATH_CRM engine = sql.create_engine(f"sqlite:///{str(db_path)}") engine_crm = sql.create_engine(f"sqlite:///{str(crm_path)}") # %% stmt = sql.select( db.grunderfassung.c.erfassung_id, db.grunderfassung.c.Partnersuche__un_suche, ) with engine.connect() as conn: res = conn.execute(stmt) for r in res: print(r) # %% # %% backend.front_get_company_list() # %% db.DF_CRM_MASTER.filter(pl.col.ma_id == 381) # %% # class FlatBaseModel(BaseModel): # @classmethod # def _unflatten_dict(cls, flat_dict: dict) -> dict: # """Hilfsmethode: Macht aus {'a__b': 1} wieder {'a': {'b': 1}}""" # result = {} # for key, value in flat_dict.items(): # if "__" in key: # parts = key.split("__") # aktuell = result # for part in parts[:-1]: # if part not in aktuell or not isinstance(aktuell[part], dict): # aktuell[part] = {} # aktuell = aktuell[part] # aktuell[parts[-1]] = value # else: # result[key] = value # return result # @model_validator(mode="before") # @classmethod # def __unflatten_input(cls, data: Any) -> Any: # """Eingangskontrolle: Verarbeitet flache DB/GUI-Daten zu Pydantic-Strukturen.""" # if not isinstance(data, dict): # return data # # 1. Haupt-Struktur wiederherstellen # unflattened = cls._unflatten_dict(data) # print("\n----------------------") # pprint(unflattened) # # 2. Schleife über die Felder, um JSON-Listen von Sub-Modellen zu entpacken # for key, value in unflattened.items(): # print(f"{value=}") # if isinstance(value, str) and value.startswith("[") and value.endswith("]"): # try: # parsed = json.loads(value) # if isinstance(parsed, list): # print(f"Key {key}: identified list") # # Falls die Liste Dictionaries enthält, ent-flachen wir diese einzeln # unflattened[key] = [ # cls._unflatten_dict(item) if isinstance(item, dict) else item # for item in parsed # ] # else: # unflattened[key] = parsed # except json.JSONDecodeError: # print(f"Key {key}: JSON serialize error") # pass # if isinstance(value, list): # unflattened[key] = [ # cls._unflatten_dict(item) if isinstance(item, dict) else item # for item in value # ] # print("\n\n##################Result:") # pprint(unflattened) # return unflattened # def to_db(self) -> dict[str, Any]: # """Ausgang für die DB: Flach, Listen sind JSON-Strings.""" # nested = super().model_dump() # return self.__flatten_dict(nested, serialize_lists=True) # def to_gui(self) -> dict[str, Any]: # """Ausgang für die GUI: Flach, aber Listen bleiben Python-Listen für Widgets.""" # nested = super().model_dump() # return self.__flatten_dict(nested, serialize_lists=False) # @classmethod # def __flatten_dict( # cls, nested_dict: dict, parent_key: str = "", serialize_lists: bool = True # ) -> dict: # """Rekursiver Alleskönner zum Abflachen von Strukturen.""" # items = [] # for k, v in nested_dict.items(): # new_key = f"{parent_key}__{k}" if parent_key else k # if isinstance(v, dict): # items.extend(cls.__flatten_dict(v, new_key, serialize_lists).items()) # elif isinstance(v, list): # processed_list = [] # for item in v: # if isinstance(item, dict): # # WICHTIG: Das Dict in der Liste wird isoliert abgeflacht (ohne parent_key), # # da es ein eigenständiges Zeilen-Objekt im DynamicListWidget bleibt! # processed_list.append( # cls.__flatten_dict(item, serialize_lists=serialize_lists) # ) # else: # processed_list.append(item) # if serialize_lists: # items.append((new_key, json.dumps(processed_list))) # else: # items.append((new_key, processed_list)) # else: # items.append((new_key, v)) # return dict(items) class FlatBaseModel(BaseModel): """ Optimierte Pydantic-Basisklasse, die JSON-Strings und Doppel-Unterstriche vollständig rekursiv (tiefenunabhängig) auflöst. """ @classmethod def _recursive_parse_json(cls, data: Any) -> Any: """Sucht im gesamten Objekt nach JSON-Listen-Strings und entpackt sie.""" if isinstance(data, str) and data.startswith("[") and data.endswith("]"): try: parsed = json.loads(data) # Falls die Liste selbst wieder konvertiert werden muss (z.B. Sub-Dicts) return cls._recursive_parse_json(parsed) except json.JSONDecodeError: return data elif isinstance(data, dict): return {k: cls._recursive_parse_json(v) for k, v in data.items()} elif isinstance(data, list): return [cls._recursive_parse_json(item) for item in data] return data @classmethod def _recursive_unflatten(cls, data: Any) -> Any: """Baut im gesamten Objekt flache '__'-Schlüssel in geschachtelte Dicts um.""" if isinstance(data, dict): # 1. Die aktuelle Ebene dieses Dictionaries ent-flachen unflattened_level = {} for key, value in data.items(): if "__" in key: parts = key.split("__") aktuell = unflattened_level for part in parts[:-1]: if part not in aktuell or not isinstance(aktuell[part], dict): aktuell[part] = {} aktuell = aktuell[part] aktuell[parts[-1]] = value else: unflattened_level[key] = value # 2. Jetzt tiefer in die Werte gehen (rekursiv für Unter-Dicts) return {k: cls._recursive_unflatten(v) for k, v in unflattened_level.items()} elif isinstance(data, list): # Auch durch Listen (z.B. deine Schulbildung) wandern und Sub-Dicts ent-flachen return [cls._recursive_unflatten(item) for item in data] return data @model_validator(mode="before") @classmethod def __unflatten_input(cls, data: Any) -> Any: """Eingangskontrolle: Bereitet flache DB/GUI-Daten sauber für Pydantic vor.""" if not isinstance(data, dict): return data # Schritt 1: Alle JSON-Strings (egal wie tief) in echte Listen umwandeln # Das verwandelt '[1, 2, 3]' zuverlässig in [1, 2, 3] json_parsed_data = cls._recursive_parse_json(data) # Schritt 2: Alle '__' Schlüssel (egal wie tief) in Unter-Strukturen falten final_nested_data = cls._recursive_unflatten(json_parsed_data) return final_nested_data def to_db(self) -> Dict[str, Any]: """Ausgang für die DB: Flach, Listen sind JSON-Strings.""" nested = super().model_dump() return self.__flatten_dict(nested, serialize_lists=True) def to_gui(self) -> Dict[str, Any]: """Ausgang für die GUI: Flach, aber Listen bleiben Python-Listen.""" nested = super().model_dump() return self.__flatten_dict(nested, serialize_lists=False) @classmethod def __flatten_dict( cls, nested_dict: dict, parent_key: str = "", serialize_lists: bool = True ) -> dict: """Rekursiver Helfer zum Abflachen von Strukturen.""" items = [] for k, v in nested_dict.items(): new_key = f"{parent_key}__{k}" if parent_key else k if isinstance(v, dict): items.extend(cls.__flatten_dict(v, new_key, serialize_lists).items()) elif isinstance(v, list): processed_list = [] for item in v: if isinstance(item, dict): processed_list.append( cls.__flatten_dict(item, serialize_lists=serialize_lists) ) else: processed_list.append(item) if serialize_lists: items.append((new_key, json.dumps(processed_list))) else: items.append((new_key, processed_list)) else: items.append((new_key, v)) return dict(items) # %% gui_rohdaten = { "projekt_name": "Mars Rover", "meilensteine": [ {"titel": "Triebwerk Test", "finanzen__betrag": 5000.0, "finanzen__waehrung": "EUR"}, {"titel": "Software Beta", "finanzen__betrag": 1200.0, "finanzen__waehrung": "EUR"}, ], } class BudgetDetails(BaseModel): betrag: float waehrung: str = "EUR" class Meilenstein(BaseModel): titel: str finanzen: BudgetDetails # <--- Verschachtelung innerhalb der Liste! class ProjektModell(FlatBaseModel): projekt_name: str meilensteine: list[Meilenstein] # <--- Die Liste von Modellen class Grunderfassung_Unternehmen(FlatBaseModel): Schulbildung: list[Grunderfassung_Schulbildung] Stammdaten: Grunderfassung_Stammdaten class Grunderfassung_Schulbildung(BaseModel): model_config = ConfigDict(str_strip_whitespace=True) SB_abschluss: str | None SB_abschlussgrad: str | None SB_schule: str | None SB_ort: str | None SB_land: str | None SB_abschlussjahr: str | None SB_bemerkungsfeld: str | None ValidAge = Annotated[int, Field(ge=0, le=99)] class Grunderfassung_Stammdaten(BaseModel): # Stammdaten_titel: str | None # Stammdaten_anrede_anschrift: str # Stammdaten_name: str # Stammdaten_vorname: str | None # Stammdaten_geburtsdatum: datetime.date | None # Stammdaten_herkunftsland: str # Stammdaten_staatsangehoerigkeit: str | None # Stammdaten_rueckkehrer: bool | None # Stammdaten_aufenthaltsort: str | None # Stammdaten_strasse: str | None # Stammdaten_hausnummer: str | None # Stammdaten_PLZ: str | None # Stammdaten_ort: str | None # Stammdaten_bundesland: str | None # Stammdaten_land: str | None # Stammdaten_festnetznummer: str | None # Stammdaten_mobilfunknummer: str | None # Stammdaten_email: EmailStr | None # Stammdaten_familienstand: str | None anzahl_kinder: Grunderfassung_Stammdaten_AnzahlKinder # @field_validator("Stammdaten_rueckkehrer", mode="before") # @classmethod # def str_to_bool(cls, value: Any) -> Any: # if isinstance(value, str): # value = value.strip().lower() # if value == "ja": # return True # elif value == "nein": # return False # raise ValueError("Wert muss 'ja', 'nein', True oder False sein.") # return value class Grunderfassung_Stammdaten_AnzahlKinder(BaseModel): anzahl: int | None alter: list[ValidAge | None] | None = None # %% data = { "Schulbildung": [ { "SB_abschluss": None, "SB_abschlussgrad": None, "SB_abschlussjahr": None, "SB_bemerkungsfeld": None, "SB_land": None, "SB_ort": None, "SB_schule": None, }, { "SB_abschluss": None, "SB_abschlussgrad": None, "SB_abschlussjahr": None, "SB_bemerkungsfeld": None, "SB_land": None, "SB_ort": None, "SB_schule": None, }, ], "Stammdaten__anzahl_kinder__anzahl": 3, "Stammdaten__anzahl_kinder__alter": [1, 2, 3], } parsed = Grunderfassung_Unternehmen(**data) # %% db_entry = parsed.to_db() pprint(db_entry) # %% loaded_from_db = Grunderfassung_Unternehmen(**db_entry) # %% loaded_from_db # %% loaded_from_db.to_gui() # %% projekt = ProjektModell(**gui_rohdaten) # %% projekt.to_gui() # %% target_dict = { "t1": "test", "t2": "test2", "t3": ["t3-1", "t3-2", "t3-3"], "t4": { "t4-1": "t4-1", "t4-2": "t4-2", }, "t5": [ {"sub1": "sub1-1"}, {"sub2": "sub2-1"}, {"sub3": "sub3-1"}, ], } target_dict = { "Stammdaten": { "Stammdaten_PLZ": "4523", "Stammdaten_anrede_anschrift": "Sehr geehrter Herr", "Stammdaten_anzahl_kinder": {"alter": [3, 7, 10, 14], "anzahl": 4}, "Stammdaten_aufenthaltsort": "Ausland EU/EWR", "Stammdaten_bundesland": None, "Stammdaten_email": "max.mustermann@test.at", "Stammdaten_familienstand": "ledig", "Stammdaten_festnetznummer": "123456789", "Stammdaten_geburtsdatum": datetime.date(1990, 4, 11), "Stammdaten_hausnummer": "12", "Stammdaten_herkunftsland": "AT", "Stammdaten_land": None, "Stammdaten_mobilfunknummer": "123456789", "Stammdaten_name": "Mustermann", "Stammdaten_ort": "Ort in Österreich", "Stammdaten_rueckkehrer": False, "Stammdaten_staatsangehoerigkeit": "AT", "Stammdaten_strasse": "Teststraße", "Stammdaten_titel": "Test", "Stammdaten_vorname": "Max", }, "Schulbildung": [ {"Schulbildung-[1]__SB_abschluss": None, "Schulbildung-[1]__SB_abschlussgrad": None}, {"Schulbildung-[2]__SB_abschluss": None, "Schulbildung-[2]__SB_abschlussgrad": None}, ], } def flatten_for_db( nested_dict: dict, parent_key: str = "", sep: str = ".", ) -> dict[str, Any]: items = [] for k, v in nested_dict.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): items.extend(flatten_for_db(v, new_key, sep=sep).items()) elif isinstance(v, list): items.append((new_key, json.dumps(v))) else: items.append((new_key, v)) return dict(items) def unflatten_from_db( flat_dict: dict, sep: str = ".", ) -> dict[str, Any]: result = {} for key, value in flat_dict.items(): if isinstance(value, str) and (value.startswith("[") and value.endswith("]")): try: value = json.loads(value) except json.JSONDecodeError: pass parts = key.split(sep) aktuell = result for part in parts[:-1]: if part not in aktuell: aktuell[part] = {} aktuell = aktuell[part] aktuell[parts[-1]] = value return result flatted = flatten_for_db(target_dict, sep="__") pprint(flatted) unflatted = unflatten_from_db(flatted, sep="__") print("\n\n") pprint(unflatted) # %% flatted # %% # key merger def merge_dicts_to_lists(dict_iter): merged = defaultdict(list) for d in dict_iter: for key, value in d.items(): merged[key].append(value) return dict(merged) def unmerge_dict_to_list(merged_dict): if not merged_dict: return [] # 1. Wir trennen die Keys und die dazugehörigen Wert-Listen keys = merged_dict.keys() value_lists = merged_dict.values() # 2. zip(*value_lists) nimmt das erste Element aus jeder Liste, dann das zweite, etc. # 3. zip(keys, row) verbindet die Keys wieder mit den jeweiligen Werten einer Zeile return [dict(zip(keys, row)) for row in zip(*value_lists)] data = [ {"Stammdaten__anzahl_kinder__alter": None}, {"Stammdaten__anzahl_kinder__alter": 2}, {"Stammdaten__anzahl_kinder__alter": 3}, {"Stammdaten__anzahl_kinder__alter": None}, ] merged = merge_dicts_to_lists(data) pprint(merged) merged = {"Stammdaten__anzahl_kinder__alter": [1, 2]} unmerged = unmerge_dict_to_list(merged) pprint(unmerged) # %% unmerge_dict_to_list({}) # %% string = """ Metallerzeugung & -bearbeitung; Elektro, Energie, Chemie; IT & Software; Kunststoff, Papier, Textil; Logistik, Verkehr, Transport; Handwerk, Bau, Grüne Berufe; Gesundheit & Pflege; Tourismus & Gastronomie; Handel; Bildung & Soziales; Entwicklung, Planung, Qualität; Administration, Finanzen, Verwaltung; Marketing, Design, Vertrieb; Einkauf, Lager, Wartung; Sonstige; Keine Schwerpunkte, branchenübergreifende Rekrutierung """ parts = string.strip().split(";") [part.strip() for part in parts] # %% DYNAMIC_LIST_KEY_PATTERN = re.compile(r"-\[(\d+)\]") dynamic_content = { "Stammdaten_anzahl_kinder-[0]": {"Stammdaten_anzahl_kinder": "5"}, "Stammdaten_anzahl_kinder-[1]": {"Stammdaten_alter_kinder": "23213"}, "Stammdaten_anzahl_kinder-[2]": {"Stammdaten_alter_kinder": "123123"}, "Stammdaten_anzahl_kinder-[3]": {"Stammdaten_alter_kinder": "123213"}, "Stammdaten_anzahl_kinder-[4]": {"Stammdaten_alter_kinder": "123123"}, "Stammdaten_anzahl_kinder-[5]": {"Stammdaten_alter_kinder": "123123"}, } def find_dynamic_content(content: dict[str, Any]) -> dict[str, Any] | None: found = None for key in dynamic_content.keys(): if DYNAMIC_LIST_KEY_PATTERN.search(key): # found an match: this is dynamic content dictionary print("found") found = dynamic_content break return found # %% new_content = { "Stammdaten": { "Stammdaten_PLZ": "", "Stammdaten_anrede_anschrift": "asdasdas", "Stammdaten_anzahl_kinder": [ { "Stammdaten_anzahl_kinder-[0]": {"Stammdaten_anzahl_kinder": "5"}, "Stammdaten_anzahl_kinder-[1]": {"Stammdaten_alter_kinder": "23213"}, "Stammdaten_anzahl_kinder-[2]": {"Stammdaten_alter_kinder": "123123"}, "Stammdaten_anzahl_kinder-[3]": {"Stammdaten_alter_kinder": "123213"}, "Stammdaten_anzahl_kinder-[4]": {"Stammdaten_alter_kinder": "123123"}, "Stammdaten_anzahl_kinder-[5]": {"Stammdaten_alter_kinder": "123123"}, } ], } } new_content = { "Stammdaten": { "Stammdaten_anzahl_kinder-[0]": {"Stammdaten_anzahl_kinder": "5"}, "Stammdaten_anzahl_kinder-[1]": {"Stammdaten_alter_kinder": None}, "Stammdaten_anzahl_kinder-[2]": {"Stammdaten_alter_kinder": None}, "Stammdaten_anzahl_kinder-[3]": {"Stammdaten_alter_kinder": None}, "Stammdaten_anzahl_kinder-[4]": {"Stammdaten_alter_kinder": None}, "Stammdaten_anzahl_kinder-[5]": {"Stammdaten_alter_kinder": None}, } } # object Stammdaten_Anzahl_Kinder: Stammdaten_anzahl_kinder: int, Stammdaten_alter_kinder: list[int] def get_leafs(data): if isinstance(data, dict): for value in data.values(): yield from get_leafs(value) elif isinstance(data, (list, tuple, set)): for item in data: yield from get_leafs(item) else: yield data def get_leaf_dicts(data): if isinstance(data, dict): has_inner_dicts = False for value in data.values(): for inner_dict in get_leaf_dicts(value): has_inner_dicts = True yield inner_dict if not has_inner_dicts: yield data elif isinstance(data, (list, tuple, set)): for item in data: yield from get_leaf_dicts(item) # %% for x in get_leafs(new_content): print(x) # %% export_dict = {} children_values: list[str] | None = None for idx, data_dict in enumerate(get_leaf_dicts(new_content)): if idx == 0: export_dict.update(data_dict) else: for key in data_dict: if key not in export_dict: children_values = export_dict.setdefault(key, []) assert children_values is not None children_values.append(data_dict[key]) export_dict # %% class Stammdaten_AnzahlKinder(BaseModel): Stammdaten_anzahl_kinder: int | None Stammdaten_alter_kinder: list[int | None] # %% Stammdaten_AnzahlKinder(**export_dict) # %% find_dynamic_content(dynamic_content) # %% @dc.dataclass(slots=True) class CountryList: iso_to_country: dict[str, str] for_dropdown: Sequence[tuple[str, str]] def get_country_list_german() -> CountryList: locale = babel.Locale("de", "DE") countries: list[tuple[str, str]] = [] iso_to_country: dict[str, str] = {} for iso_code, country_name in locale.territories.items(): if len(iso_code) == 2 and not iso_code.isdigit(): countries.append((country_name, iso_code)) iso_to_country[iso_code] = country_name countries.sort(key=lambda x: x[0]) return CountryList( iso_to_country=iso_to_country, for_dropdown=tuple(countries), ) # %% DYNAMIC_LIST_KEY_PATTERN = r"-\[(\d+)\]" key = "Schulbildung-[12].7b8da0f7-7a0e-4f71-878a-85616099e849" matches = re.search(DYNAMIC_LIST_KEY_PATTERN, key) # %% matches # %% matches.group(1) # %% class COUNTRY(enum.IntEnum): DE = 1 FR = 2 CM = 3 class COUNTRY2(enum.Enum): DE = 1 FR = 2 CM = 3 def give_value(t): print(f"Wert ist: {t}") give_value(COUNTRY.DE) give_value(COUNTRY2.DE) # %% COUNTRY(10) # %% COUNTRY.DE # %% t_str = "asd.yxcxc.dfgjj.aasdsdsdsd.sdsdsdsd" splitted = t_str.split(".") part, rest = splitted[0], splitted[1:] part # %% ".".join([part] + rest) # %% class FormFieldType(enum.StrEnum): TEXT = enum.auto() LONGTEXT = enum.auto() DATE = enum.auto() DATETIME = enum.auto() @dc.dataclass(slots=True) class FormField: key: str label: str type: FormFieldType required: bool def __post_init__(self) -> None: self.label = self.label.strip() if not self.label.endswith(":"): self.label += ":" if self.required: self.label += "*" # %% FormField("name", "Projektbeschreibung", FormFieldType.LONGTEXT, required=True) # %% FormField("name", "Projektbeschreibung:", FormFieldType.LONGTEXT, required=True) # %% FormField("name", "Projektbeschreibung", FormFieldType.LONGTEXT, required=False) # %% FormField("name", "Projektbeschreibung:", FormFieldType.LONGTEXT, required=False) # %% addr.export() # %% set_date = QDate.fromString("26.07.2026", "dd.MM.yyyy") # %% Qt.Tet