from __future__ import annotations import dataclasses as dc import datetime from typing import Any, TypedDict, cast import polars as pl import sqlalchemy as sql from wce_crm import db from wce_crm.logging import logger_back as logger class CompanyInfo(TypedDict): ma_id: str wce_id: str ma_unternehmensname: str ma_branche: str ma_strasse: str ma_hausnummer: str ma_plz: str ma_ort: str ma_plz_postfach: str ma_postfach: str ma_website: str ma_mail: str ma_telefonnummer: str ma_faxnummer: str ma_ersteintrag_datum: str ma_aktualisierung_datum: str ma_aktualisierung_nutzer: str ma_sollprozess: str ma_auslaendische_mitarbeiter: str ma_quelle_information: str ma_bemerkung: str ma_kontakt: str ma_schlagworte: str ma_archiviert: str class ContactPersonInfo(TypedDict): an_id: str ma_id: str wce_id: str st_id: str an_sachgebiet: str an_anrede: str an_titel: str an_nachname: str an_vorname: str an_position: str an_mail: str an_festnetz: str an_mobil: str an_faxnummer: str an_hauptansprechpartner: str an_anrede_anschrift: str an_bemerkung: str an_aktualisierung_datum: str an_aktualisierung_nutzer: str an_letztes_kontaktdatum: str an_ersteintrag_datum: str an_archiviert: str def _transform_for_gui_output( data: pl.DataFrame, ) -> pl.DataFrame: q = ( data.lazy() .with_columns( pl.col(pl.Datetime).dt.to_string("%d.%m.%Y"), pl.col(pl.Date).dt.to_string("%d.%m.%Y"), pl.when(pl.col(pl.Boolean)) .then(pl.lit("Ja")) .otherwise(pl.lit("Nein")) .name.keep(), ) .with_columns(pl.all().cast(pl.String)) ) return q.collect() def comp_search_choices() -> tuple[tuple[str, int], ...]: # TODO no reload functionality logger.debug("[Call backend] comp_search_choices") q = db.DF_CRM_MASTER.lazy() counter = pl.int_range(0, pl.len()).over(pl.col.ma_unternehmensname) q = q.with_columns( dedupl=pl.when(counter == 0) .then(pl.col.ma_unternehmensname) .otherwise(pl.format("{} ({})", pl.col.ma_unternehmensname, counter)) ) df = q.collect() return tuple(zip(df["dedupl"], df["ma_id"])) def comp_search_get_info( ma_id: int, ) -> CompanyInfo: logger.debug("[Call backend] comp_search_get_info") df = db.DF_CRM_MASTER.filter(pl.col.ma_id == ma_id) if df.height > 1 or df.height == 0: raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}") df = _transform_for_gui_output(df) return cast(CompanyInfo, df.row(0, named=True)) def contact_person_search_choices( ma_id: int | None, use_both_names: bool, ) -> tuple[tuple[str, int], ...]: # TODO no reload functionality logger.debug("[Call backend] contact_person_search_choices") q = db.DF_CONTACT_PERSON.lazy() if ma_id is not None: q = q.filter(pl.col.ma_id == ma_id) dedupl_col = pl.col.an_nachname if use_both_names: q = q.with_columns( name_search=(pl.format("{}, {}", pl.col.an_nachname, pl.col.an_vorname)) ) dedupl_col = pl.col.name_search counter = pl.int_range(0, pl.len()).over(dedupl_col) q = q.with_columns( dedupl=pl.when(counter == 0) .then(dedupl_col) .otherwise(pl.format("{} ({})", dedupl_col, counter)) ) df = q.collect() return tuple(zip(df["dedupl"], df["an_id"])) def contact_person_search_get_info( an_id: int, ) -> ContactPersonInfo: logger.debug("[Call backend] contact_person_search_get_info") df = db.DF_CONTACT_PERSON.filter(pl.col.an_id == an_id) if df.height > 1 or df.height == 0: raise ValueError(f"Größe des zurückgelieferten Datenpakets ungültig: {df.height}") df = _transform_for_gui_output(df) return cast(ContactPersonInfo, df.row(0, named=True)) def insert_initial_recording( data: dict[str, Any], ) -> None: logger.debug("[Call backend] insert_initial_recording") stmt = db.grunderfassung_unternehmen.insert().values(data) with db.ENGINE.begin() as conn: conn.execute(stmt) def update_initial_recording( id_: int, data: dict[str, Any], ) -> None: logger.debug("[Call backend] update_initial_recording") stmt = ( db.grunderfassung_unternehmen.update() .where(db.grunderfassung_unternehmen.c.erfassung_id == id_) .values(data) ) with db.ENGINE.begin() as conn: conn.execute(stmt) def get_initial_recording( id_: int, ) -> dict[str, Any]: logger.debug("[Call backend] get_initial_recording") stmt = db.grunderfassung_unternehmen.select().where( db.grunderfassung_unternehmen.c.erfassung_id == id_ ) with db.ENGINE.begin() as conn: ret = conn.execute(stmt) row = ret.fetchone() if row is None: raise KeyError(f"Database ID {id_} not found") return row._asdict() @dc.dataclass(slots=True) class FrontpageCompany: erfassung_id: int ma_id: int name: str Metadaten_aktualisierung: datetime.datetime def get_company_list() -> list[FrontpageCompany]: logger.debug("[Call backend] get_company_list") stmt = sql.select( db.grunderfassung_unternehmen.c.erfassung_id, db.grunderfassung_unternehmen.c.Partnersuche__un_suche, db.grunderfassung_unternehmen.c.Metadaten_aktualisierung, ).order_by(db.grunderfassung_unternehmen.c.Metadaten_aktualisierung.desc()) with db.ENGINE.connect() as conn: res = conn.execute(stmt) front_page_companies: list[FrontpageCompany] = [] for entry in res: erfassung_id = entry[0] ma_id = entry[1] datetime_akt = cast(datetime.datetime, entry[2]) datetime_akt = datetime_akt.astimezone() comp_info = comp_search_get_info(ma_id) name = comp_info["ma_unternehmensname"] front_page_companies.append(FrontpageCompany(erfassung_id, ma_id, name, datetime_akt)) return front_page_companies