14 Commits

Author SHA1 Message Date
b6087a5b21 fix alembic script 2026-06-19 06:28:58 +02:00
27934497d5 remove metadata creation 2026-06-17 17:40:07 +02:00
abb4c67b67 doc version history 2026-06-17 17:34:27 +02:00
fdf44454a0 bump version 2026-06-17 17:31:38 +02:00
5ce911f8cd fix for alembic pipeline 2026-06-17 17:31:32 +02:00
4364d8c0f9 remove unused imports 2026-06-17 17:31:09 +02:00
696357138a adapt logging behaviour 2026-06-17 17:30:52 +02:00
389fe2e159 add database migration tooling with alembic 2026-06-17 16:55:24 +02:00
decfdbffd1 small refactoring with more precise declarations 2026-06-17 16:54:54 +02:00
f24c4f0ff7 robust .env loading 2026-06-17 16:19:44 +02:00
1f75801fc5 add initial recording of individual persons with changed database interaction 2026-06-17 09:46:04 +02:00
4a1d5d4647 change database structure for initial recording of individual persons 2026-06-17 09:45:26 +02:00
44402af0b7 major refactoring 2026-06-17 09:44:06 +02:00
15af78900f fix list 2026-05-29 12:03:10 +02:00
24 changed files with 2199 additions and 1377 deletions

148
alembic.ini Normal file
View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# Or organize into date-based subdirectories (requires recursive_version_locations = true)
# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = sqlite:///data/db/wce_grunderfassung.db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

78
alembic/env.py Normal file
View File

@@ -0,0 +1,78 @@
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from wce_crm import constants, db
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = db.MD_MAIN
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
config.set_main_option("sqlalchemy.url", f"sqlite:///{constants.Config.DB_PATH_MAIN}")
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

28
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,37 @@
"""added new column and renamed table for initial recording
Revision ID: 5f2af5179c47
Revises:
Create Date: 2026-06-17 16:06:46.421562
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "5f2af5179c47"
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("grunderfassung_unternehmen", "grunderfassung")
op.add_column(
"grunderfassung",
sa.Column("Metadaten_wiedereintrittsdatum", sa.Date, nullable=True, default=None),
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("grunderfassung", "Metadaten_wiedereintrittsdatum")
op.rename_table("grunderfassung", "grunderfassung_unternehmen")
# ### end Alembic commands ###

View File

@@ -1,4 +1,5 @@
DOPT_STOP_FOLDER_NAME=python
DOPT_DB_CRM=data/wce_crm.db
DOPT_DB_MAIN=data/wce_grunderfassung.db
DOPT_PATH_LOGGING=data/logs
DOPT_PATH_LOGGING=data/logs
DOPT_ALEMBIC_BASE=python/alembic

View File

@@ -1,5 +1,10 @@
# Versionshistorie (Changelog)
## 17.06.2026 (Version: v0.1.1dev) (Tag: ExtTest-20260617)
- Grunderfassung für Individualpersonen hinzugefügt
- Löschen von Einträgen möglich (Option/Schaltfläche neben dem Speichern und Zurücksetzen: Tastenkombination ``Strg + L``)
## 29.05.2026 (Version: v0.1.1dev) (Tag: ExtTest-20260529)
- initiale Version für erste Feedback-Schleife

23
pdm.lock generated
View File

@@ -5,7 +5,7 @@
groups = ["default", "dev", "lint", "nb", "tests"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:25812ee6ba42033e1341c1799716828d8582092826c1d0889ea775a0f5c548a2"
content_hash = "sha256:bde2f66706034b9c34c6794bf192b44db6e116a287938d88c90c751a2666bdf7"
[[metadata.targets]]
requires_python = ">=3.11,<3.14"
@@ -152,6 +152,23 @@ files = [
{file = "aiosignal-1.4.0.tar.gz", hash = "sha256:f47eecd9468083c2029cc99945502cb7708b082c232f9aca65da147157b251c7"},
]
[[package]]
name = "alembic"
version = "1.18.4"
requires_python = ">=3.10"
summary = "A database migration tool for SQLAlchemy."
groups = ["default"]
dependencies = [
"Mako",
"SQLAlchemy>=1.4.23",
"tomli; python_version < \"3.11\"",
"typing-extensions>=4.12",
]
files = [
{file = "alembic-1.18.4-py3-none-any.whl", hash = "sha256:a5ed4adcf6d8a4cb575f3d759f071b03cd6e5c7618eb796cb52497be25bfe19a"},
{file = "alembic-1.18.4.tar.gz", hash = "sha256:cb6e1fd84b6174ab8dbb2329f86d631ba9559dd78df550b57804d607672cedbc"},
]
[[package]]
name = "annotated-doc"
version = "0.0.4"
@@ -1845,7 +1862,7 @@ name = "mako"
version = "1.3.11"
requires_python = ">=3.8"
summary = "A super-fast templating language that borrows the best ideas from the existing templating languages."
groups = ["dev"]
groups = ["default", "dev"]
dependencies = [
"MarkupSafe>=0.9.2",
]
@@ -1895,7 +1912,7 @@ name = "markupsafe"
version = "3.0.3"
requires_python = ">=3.9"
summary = "Safely add untrusted strings to HTML/XML markup."
groups = ["dev", "nb"]
groups = ["default", "dev", "nb"]
files = [
{file = "markupsafe-3.0.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:1cc7ea17a6824959616c525620e387f6dd30fec8cb44f649e31712db02123dad"},
{file = "markupsafe-3.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4bd4cd07944443f5a265608cc6aab442e4f74dff8088b0dfc8238647b8f6ae9a"},

19
prototypes/db_alter.py Normal file
View File

@@ -0,0 +1,19 @@
# %%
import sqlalchemy as sql
from wce_crm import db
# %%
stmt = sql.text(
"ALTER TABLE grunderfassung_unternehmen ADD COLUMN Metadaten_wiedereintrittsdatum DATE"
)
with db.ENGINE.begin() as conn:
conn.execute(stmt)
# %%
stmt = sql.text("ALTER TABLE grunderfassung_unternehmen RENAME TO grunderfassung")
with db.ENGINE.begin() as conn:
conn.execute(stmt)
# %%

View File

@@ -30,8 +30,8 @@ engine = sql.create_engine(f"sqlite:///{str(db_path)}")
engine_crm = sql.create_engine(f"sqlite:///{str(crm_path)}")
# %%
stmt = sql.select(
db.grunderfassung_unternehmen.c.erfassung_id,
db.grunderfassung_unternehmen.c.Partnersuche__un_suche,
db.grunderfassung.c.erfassung_id,
db.grunderfassung.c.Partnersuche__un_suche,
)
with engine.connect() as conn:
@@ -44,7 +44,7 @@ for r in res:
# %%
backend.get_company_list()
backend.front_get_company_list()
# %%

View File

@@ -1,11 +1,11 @@
[project]
name = "wce-crm"
version = "0.1.1dev7"
version = "0.1.1dev10"
description = "GUI for CRM of NAFKA project with WCE"
authors = [
{name = "d-opt GmbH, resp. Florian Förster", email = "f.foerster@d-opt.com"},
]
dependencies = ["pyside6>=6.11.0", "sqlalchemy[asyncio]>=2.0.50", "polars>=1.40.1", "dopt-basics>=0.2.6", "pydantic[email]>=2.13.4", "babel>=2.18.0", "python-dotenv>=1.2.2"]
dependencies = ["pyside6>=6.11.0", "sqlalchemy[asyncio]>=2.0.50", "polars>=1.40.1", "dopt-basics>=0.2.6", "pydantic[email]>=2.13.4", "babel>=2.18.0", "python-dotenv>=1.2.2", "alembic>=1.18.4"]
requires-python = "<3.14,>=3.11"
readme = "README.md"
license = {text = "LicenseRef-Proprietary"}
@@ -71,7 +71,7 @@ directory = "reports/coverage"
[tool.bumpversion]
current_version = "0.1.1dev7"
current_version = "0.1.1dev10"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -0,0 +1,5 @@
param(
[string]$Message
)
pdm run alembic revision --autogenerate -m $Message

View File

@@ -8,6 +8,28 @@ $ENV_PATH = 'B:\deployments\WCE-NAFKA\dopt_nafka_wce-crm'
$PY_PATH = Join-Path -Path $ENV_PATH -ChildPath 'python'
$SRC_PATH = (Get-Location).Path
function create_folder {
param (
[string]$base_path,
[string]$folder_name,
[switch]$recreate
)
$target_path = Join-Path -Path $base_path -ChildPath $folder_name
$target_path_exists = Test-Path -Path $target_path
if (-not $target_path_exists){
Write-Output "[PWSH] Folder >$folder_name< not existing. Create..."
New-Item -Path $target_path -ItemType Directory
}
elseif ($target_path_exists -and $recreate){
Write-Output "[PWSH] Folder >$folder_name< exists, but should be recreated..."
Remove-Item -Path $target_path -Recurse -Force
New-Item -Path $target_path -ItemType Directory
}
else {
Write-Output "Folder >$folder_name< already exists."
}
}
Write-Output "Build Pipeline for d-opt WCE/NAFKA project"
@@ -88,6 +110,23 @@ if ($? -eq $false){
}
Write-Output "Copied database files successfully"
# copy alembic files
Write-Output "Copying alembic files..."
$copy_file = Join-Path -Path $SRC_PATH -ChildPath 'alembic'
$dest_file = Join-Path -Path $PY_PATH -ChildPath 'alembic'
create_folder -base_path $PY_PATH -folder_name 'alembic'
Copy-Item -Path $copy_file -Destination $dest_file -Force -Recurse
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while copying the alembic folder"
Exit
}
$copy_file = Join-Path -Path $SRC_PATH -ChildPath 'alembic.ini'
Copy-Item -Path $copy_file -Destination $dest_file -Force
if ($? -eq $false){
Write-Output "[PWSH] Exiting script because there were errors while copying the alembic INI file"
Exit
}
# copy .env file
Write-Output "Copying ENV file..."
$env_file = Join-Path -Path $SRC_PATH -ChildPath 'deployment/.env'

View File

@@ -1,7 +1,8 @@
# List of environment variables
DOPT_DEVELOPMENT: flag which signals that the current environment is in development mode
DOPT_STOP_FOLDER_NAME: stop folder to find base path
DOPT_DB_CRM: path to CRM database, relative to base path
DOPT_DB_MAIN: path to main database, relative to base path
DOPT_PATH_LOGGING: path to logging folder, relative to base path
- DOPT_DEVELOPMENT: flag which signals that the current environment is in development mode
- DOPT_STOP_FOLDER_NAME: stop folder to find base path
- DOPT_DB_CRM: path to CRM database, relative to base path
- DOPT_DB_MAIN: path to main database, relative to base path
- DOPT_PATH_LOGGING: path to logging folder, relative to base path
- DOPT_ALEMBIC_BASE: path to all relevant alembic file

View File

@@ -1,5 +1,6 @@
import os
import sys
from pathlib import Path
import dotenv
@@ -7,4 +8,8 @@ if sys.stdout is None or sys.stderr is None:
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")
dotenv.load_dotenv()
deploy_env_pth = Path(sys.executable).parent / ".env"
if deploy_env_pth.exists():
dotenv.load_dotenv(dotenv_path=deploy_env_pth)
else:
dotenv.load_dotenv()

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import dataclasses as dc
import datetime
from typing import Any, TypedDict, cast
from typing import Any, TypeAlias, TypedDict, cast
import polars as pl
import sqlalchemy as sql
@@ -10,6 +10,8 @@ import sqlalchemy as sql
from wce_crm import db
from wce_crm.logging import logger_back as logger
InitRecId: TypeAlias = int
class CompanyInfo(TypedDict):
ma_id: str
@@ -82,7 +84,7 @@ def _transform_for_gui_output(
return q.collect()
def comp_search_choices() -> tuple[tuple[str, int], ...]:
def initrec_comp_search_choices() -> tuple[tuple[str, int], ...]:
# TODO no reload functionality
logger.debug("[Call backend] comp_search_choices")
q = db.DF_CRM_MASTER.lazy()
@@ -97,7 +99,7 @@ def comp_search_choices() -> tuple[tuple[str, int], ...]:
return tuple(zip(df["dedupl"], df["ma_id"]))
def comp_search_get_info(
def initrec_comp_search_get_info(
ma_id: int,
) -> CompanyInfo:
logger.debug("[Call backend] comp_search_get_info")
@@ -109,7 +111,7 @@ def comp_search_get_info(
return cast(CompanyInfo, df.row(0, named=True))
def contact_person_search_choices(
def initrec_comp_contact_person_search_choices(
ma_id: int | None,
use_both_names: bool,
) -> tuple[tuple[str, int], ...]:
@@ -137,7 +139,7 @@ def contact_person_search_choices(
return tuple(zip(df["dedupl"], df["an_id"]))
def contact_person_search_get_info(
def initrec_comp_contact_person_search_get_info(
an_id: int,
) -> ContactPersonInfo:
logger.debug("[Call backend] contact_person_search_get_info")
@@ -149,61 +151,78 @@ def contact_person_search_get_info(
return cast(ContactPersonInfo, df.row(0, named=True))
def insert_initial_recording(
def initrec_insert_initial_recording(
data: dict[str, Any],
) -> None:
) -> InitRecId:
logger.debug("[Call backend] insert_initial_recording")
stmt = db.grunderfassung_unternehmen.insert().values(data)
stmt = db.grunderfassung.insert()
with db.ENGINE.begin() as conn:
conn.execute(stmt)
ret = conn.execute(stmt, data)
if ret.rowcount == 0:
raise IOError("Entry was not inserted correctly")
prim_keys = ret.inserted_primary_key
assert prim_keys
return prim_keys[0]
def update_initial_recording(
id_: int,
def initrec_update_initial_recording(
id_: InitRecId,
data: dict[str, Any],
) -> None:
logger.debug("[Call backend] update_initial_recording")
stmt = (
db.grunderfassung_unternehmen.update()
.where(db.grunderfassung_unternehmen.c.erfassung_id == id_)
.values(data)
)
stmt = db.grunderfassung.update().where(db.grunderfassung.c.erfassung_id == id_)
with db.ENGINE.begin() as conn:
conn.execute(stmt)
conn.execute(stmt, data)
def get_initial_recording(
id_: int,
def initrec_get_initial_recording(
id_: InitRecId,
) -> dict[str, Any]:
logger.debug("[Call backend] get_initial_recording")
stmt = db.grunderfassung_unternehmen.select().where(
db.grunderfassung_unternehmen.c.erfassung_id == id_
)
stmt = db.grunderfassung.select().where(db.grunderfassung.c.erfassung_id == id_)
with db.ENGINE.begin() as conn:
ret = conn.execute(stmt)
row = ret.fetchone()
if row is None:
if ret.rowcount == 0:
raise KeyError(f"Database ID {id_} not found")
row = ret.fetchone()
assert row, "row was not obtained"
return row._asdict()
def initrec_delete_initial_recording(
id_: InitRecId,
) -> None:
logger.debug("[Call backend] delete_initial_recording")
stmt = db.grunderfassung.delete().where(db.grunderfassung.c.erfassung_id == id_)
with db.ENGINE.begin() as conn:
ret = conn.execute(stmt)
if ret.rowcount == 0:
raise KeyError(f"Database ID {id_} not found for deletion")
@dc.dataclass(slots=True)
class FrontpageCompany:
erfassung_id: int
ma_id: int
erfassung_id: InitRecId
# ma_id: int
name: str
Metadaten_aktualisierung: datetime.datetime
is_company: bool
def get_company_list() -> list[FrontpageCompany]:
def front_get_company_list() -> list[FrontpageCompany]:
logger.debug("[Call backend] get_company_list")
stmt = sql.select(
db.grunderfassung_unternehmen.c.erfassung_id,
db.grunderfassung_unternehmen.c.Partnersuche__un_suche,
db.grunderfassung_unternehmen.c.Metadaten_aktualisierung,
).order_by(db.grunderfassung_unternehmen.c.Metadaten_aktualisierung.desc())
db.grunderfassung.c.erfassung_id,
db.grunderfassung.c.Partnersuche__un_suche,
db.grunderfassung.c.Metadaten_aktualisierung,
).order_by(db.grunderfassung.c.Metadaten_aktualisierung.desc())
with db.ENGINE.connect() as conn:
res = conn.execute(stmt)
@@ -216,9 +235,21 @@ def get_company_list() -> list[FrontpageCompany]:
datetime_akt = cast(datetime.datetime, entry[2])
datetime_akt = datetime_akt.astimezone()
comp_info = comp_search_get_info(ma_id)
name = comp_info["ma_unternehmensname"]
name: str = "PLATZHALTER INDIVIDUAL"
is_company: bool = False
if ma_id is not None:
comp_info = initrec_comp_search_get_info(ma_id)
name = comp_info["ma_unternehmensname"]
is_company = True
front_page_companies.append(FrontpageCompany(erfassung_id, ma_id, name, datetime_akt))
# front_page_companies.append(FrontpageCompany(erfassung_id, ma_id, name, datetime_akt))
front_page_companies.append(
FrontpageCompany(
erfassung_id=erfassung_id,
name=name,
Metadaten_aktualisierung=datetime_akt,
is_company=is_company,
)
)
return front_page_companies

View File

@@ -24,3 +24,4 @@ class Config:
)
PATH_LOGGING: Path = BASE_PATH / os.getenv("DOPT_PATH_LOGGING", "data/d-opt.log")
LOG_FILENAME: str = "dopt.log"
ALEMBIC_PATH: Path = BASE_PATH / os.getenv("DOPT_ALEMBIC_BASE", "python/alembic")

View File

@@ -0,0 +1,9 @@
from __future__ import annotations
from typing import Final
CUSTOM_WIDGET_NAMES: Final[frozenset] = frozenset(
[
"grunderfassung_suche",
]
)

297
src/wce_crm/data_models.py Normal file
View File

@@ -0,0 +1,297 @@
from __future__ import annotations
import datetime
import json
from typing import Annotated, Any, Final
from pydantic import (
AwareDatetime,
BaseModel,
ConfigDict,
EmailStr,
Field,
field_validator,
model_validator,
)
ValidAge = Annotated[int, Field(ge=0, le=99)]
def _parse_json(value: Any) -> str:
if isinstance(value, datetime.date):
return value.isoformat()
elif isinstance(value, datetime.datetime):
return value.isoformat()
else:
raise TypeError
COLUMN_SEP: Final[str] = "__"
class FlatBaseModel(BaseModel):
"""
Optimised Pydantic base class, which parses JSON strings and column
separators recursively and correctly
"""
@classmethod
def _recursive_parse_json(
cls,
data: Any,
) -> Any:
"""look for JSON list strings and parse them"""
if isinstance(data, str) and data.startswith("[") and data.endswith("]"):
try:
parsed = json.loads(data)
# Falls die Liste selbst wieder konvertiert werden muss (z.B. Sub-Dicts)
return cls._recursive_parse_json(parsed)
except json.JSONDecodeError:
return data
elif isinstance(data, dict):
return {k: cls._recursive_parse_json(v) for k, v in data.items()}
elif isinstance(data, list):
return [cls._recursive_parse_json(item) for item in data]
return data
@classmethod
def _recursive_unflatten(
cls,
data: Any,
) -> Any:
"""building nested structure using column spearator sequence"""
if isinstance(data, dict):
unflattened_level = {}
for key, value in data.items():
if COLUMN_SEP in key:
parts = key.split(COLUMN_SEP)
aktuell = unflattened_level
for part in parts[:-1]:
if part not in aktuell or not isinstance(aktuell[part], dict):
aktuell[part] = {}
aktuell = aktuell[part]
aktuell[parts[-1]] = value
else:
unflattened_level[key] = value
return {k: cls._recursive_unflatten(v) for k, v in unflattened_level.items()}
elif isinstance(data, list):
return [cls._recursive_unflatten(item) for item in data]
return data
@model_validator(mode="before")
@classmethod
def __unflatten_input(
cls,
data: Any,
) -> Any: # type: ignore
"""entry control: prepare flat DB/GUI data for Pydantic"""
if not isinstance(data, dict):
return data
# setp 1: convert all JSON-Strings to lists
json_parsed_data = cls._recursive_parse_json(data)
# step 2: build nested structure based on defined separator sequence
final_nested_data = cls._recursive_unflatten(json_parsed_data)
return final_nested_data
def to_db(self, *args, **kwargs) -> dict[str, Any]:
"""output for DB: flat, lists as JSON-Strings"""
nested = super().model_dump(*args, **kwargs)
return self.__flatten_dict(nested, serialize_lists=True)
def to_gui(self, *args, **kwargs) -> dict[str, Any]:
"""output for GUI: flat, but lists remain Python lists"""
nested = super().model_dump(*args, **kwargs)
return self.__flatten_dict(nested, serialize_lists=False)
@classmethod
def __flatten_dict(
cls,
nested_dict: dict,
parent_key: str = "",
serialize_lists: bool = True,
) -> dict[str, Any]:
"""recursive function to flatten the structure (for outputs)"""
items = []
for k, v in nested_dict.items():
new_key = f"{parent_key}{COLUMN_SEP}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(cls.__flatten_dict(v, new_key, serialize_lists).items())
elif isinstance(v, list):
processed_list = []
for item in v:
if isinstance(item, dict):
processed_list.append(
cls.__flatten_dict(item, serialize_lists=serialize_lists)
)
else:
processed_list.append(item)
if serialize_lists:
items.append((new_key, json.dumps(processed_list, default=_parse_json)))
else:
items.append((new_key, processed_list))
else:
items.append((new_key, v))
return dict(items)
class Grunderfassung(FlatBaseModel):
# default in SQLAlchemy with lambda and timezone-aware datetime
Metadaten_erstellung: AwareDatetime | None = None
Metadaten_aktualisierung: AwareDatetime | None = None # see above
Metadaten_nutzer: str | None
Metadaten_wiedereintrittsdatum: datetime.date | None = None
Grunderfassung_fallnummer: str
Grunderfassung_notiz: str | None
Partnersuche: Grunderfassung_PartnerSuche | None = None
Projektrelevanz: Grunderfassung_Projektrelevanz
Kontaktperson: Grunderfassung_Kontaktperson
Stammdaten: Grunderfassung_Stammdaten
WeitereInfos: Grunderfassung_WeitereInfos
Schulbildung: list[Grunderfassung_Schulbildung]
HoehereBildung: list[Grunderfassung_HoehereBildung]
Arbeitserfahrung: list[Grunderfassung_Arbeitserfahrung]
Sprachkenntnisse: list[Grunderfassung_Sprachen]
class Grunderfassung_PartnerSuche(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
un_suche: int | None
person_suche: int | None
kanal_aufmerksamkeit: str | None
class Grunderfassung_Projektrelevanz(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
relevanz: str
foerderperiode: str | None = None
class Grunderfassung_Kontaktperson(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
KP_name_partner: str | None
KP_titel: str | None
KP_anrede_anschrift: str | None
KP_name: str | None
KP_vorname: str | None
KP_festnetznummer: str | None
KP_mobilfunknummer: str | None
KP_email: EmailStr | None
KP_funktion_beziehung: str | None
KP_adresse: str | None
class Grunderfassung_Stammdaten(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
titel: str | None
anrede_anschrift: str
name: str
vorname: str | None
geburtsdatum: datetime.date | None
herkunftsland: str
staatsangehoerigkeit: str | None
rueckkehrer: bool | None
aufenthaltsort: str | None
strasse: str | None
hausnummer: str | None
PLZ: str | None
ort: str | None
bundesland: str | None
land: str | None
festnetznummer: str | None
mobilfunknummer: str | None
email: EmailStr | None
familienstand: str | None
anzahl_kinder: Grunderfassung_Stammdaten_AnzahlKinder
@field_validator("rueckkehrer", mode="before")
@classmethod
def str_to_bool(cls, value: Any) -> Any:
if isinstance(value, str):
value = value.strip().lower()
if value == "ja":
return True
elif value == "nein":
return False
raise ValueError("Wert muss 'ja', 'nein', True oder False sein.")
return value
class Grunderfassung_Stammdaten_AnzahlKinder(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
anzahl: int | None
alter: list[ValidAge | None] | None = None
class Grunderfassung_WeitereInfos(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
WI_deutsch_sprache: str | None
WI_aufenthaltstitel: str | None
WI_gueltigkeit_aufenthaltstitel: datetime.date | None
WI_arbeitsstatus: str | None
WI_meldung_institution: str | None
class Grunderfassung_Schulbildung(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
SB_abschluss: str | None
SB_abschlussgrad: str | None
SB_schule: str | None
SB_ort: str | None
SB_land: str | None
SB_abschlussjahr: str | None
SB_bemerkungsfeld: str | None
class Grunderfassung_HoehereBildung(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
HB_anerkennung: str | None
HB_abschlussgrad: str | None
HB_abschlussgrad_dokument: str | None
HB_organisation: str | None
HB_beruf: str | None
HB_land: str | None
HB_ort: str | None
HB_abschlussjahr: str | None
HB_bemerkungsfeld: str | None
class Grunderfassung_Arbeitserfahrung(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
AE_branche: str | None
AE_bezeichnung: str | None
AE_funktion: str | None
AE_unternehmen: str | None
AE_land: str | None
AE_zeitspanne: str | None
AE_beschaeftigungsart: str | None
AE_bemerkungsfeld: str | None
class Grunderfassung_Sprachen(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
SP_sprache: str | None
SP_niveau: str | None
SP_nachweis: str | None
SP_art_nachweis: str | None = None
SP_datum_nachweis: datetime.date | None = None

View File

@@ -13,6 +13,7 @@ from wce_crm import constants
from wce_crm import types as t
# // declarations
class SafeDateTime(TypeDecorator):
"""Cleans non-standard ISO strings before parsing."""
@@ -57,8 +58,8 @@ class UTCDateTime(TypeDecorator):
return value
md_crm = sql.MetaData()
md_main = sql.MetaData()
MD_CRM = sql.MetaData()
MD_MAIN = sql.MetaData()
ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}")
# ---------- OLD "Kontaktliste" ----------
@@ -141,7 +142,7 @@ ENGINE = sql.create_engine(f"sqlite:///{constants.Config.DB_PATH_MAIN}")
ext_crm_master: sql.Table = Table(
"Master",
md_crm,
MD_CRM,
Column("ma_id", sql.Integer, nullable=False, unique=True),
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
Column("ma_unternehmensname", sql.Text, nullable=True),
@@ -218,7 +219,7 @@ DF_CRM_MASTER = get_ext_crm_master(constants.Config.DB_PATH_CRM)
ext_crm_nutzer: sql.Table = Table(
"Nutzer",
md_crm,
MD_CRM,
Column("wce_id", sql.Integer, nullable=False, unique=True),
Column("wce_name", sql.Text, nullable=True),
Column("wce_vorname", sql.Text, nullable=True),
@@ -246,7 +247,7 @@ ext_crm_nutzer_schema: t.PolarsSchema = {
ext_crm_contact_person: sql.Table = Table(
"Ansprechpartner",
md_crm,
MD_CRM,
Column("an_id", sql.Integer, nullable=False, unique=True),
Column("ma_id", sql.ForeignKey("Master.ma_id")),
Column("wce_id", sql.ForeignKey("Nutzer.wce_id")),
@@ -323,9 +324,9 @@ def get_ext_crm_contact_person(
# df_contact_person = get_ext_crm_contact_person(None)
DF_CONTACT_PERSON = get_ext_crm_contact_person(constants.Config.DB_PATH_CRM)
grunderfassung_unternehmen: sql.Table = Table(
"grunderfassung_unternehmen",
md_main,
grunderfassung: sql.Table = Table(
"grunderfassung",
MD_MAIN,
Column(
"erfassung_id",
sql.Integer,
@@ -346,6 +347,7 @@ grunderfassung_unternehmen: sql.Table = Table(
onupdate=lambda: datetime.datetime.now(datetime.UTC),
),
Column("Metadaten_nutzer", sql.String(20), nullable=True),
Column("Metadaten_wiedereintrittsdatum", sql.Date, nullable=True, default=None),
Column("Arbeitserfahrung", sql.Text, nullable=True),
Column("Grunderfassung_fallnummer", sql.Text, nullable=True),
Column("Grunderfassung_notiz", sql.Text, nullable=True),
@@ -362,9 +364,9 @@ grunderfassung_unternehmen: sql.Table = Table(
), # TODO: check if needed when set by trigger
Column("Kontaktperson__KP_titel", sql.Text, nullable=True),
Column("Kontaktperson__KP_vorname", sql.Text, nullable=True),
Column("Partnersuche__kanal_aufmerksamkeit", sql.Text, nullable=True),
Column("Partnersuche__person_suche", sql.Integer, nullable=True),
Column("Partnersuche__un_suche", sql.Integer, nullable=True),
Column("Partnersuche__kanal_aufmerksamkeit", sql.Text, nullable=True, default=None),
Column("Partnersuche__person_suche", sql.Integer, nullable=True, default=None),
Column("Partnersuche__un_suche", sql.Integer, nullable=True, default=None),
Column("Projektrelevanz__relevanz", sql.Text, nullable=True),
Column("Projektrelevanz__foerderperiode", sql.Text, nullable=True),
Column("Schulbildung", sql.Text, nullable=True),
@@ -396,5 +398,3 @@ grunderfassung_unternehmen: sql.Table = Table(
Column("WeitereInfos__WI_gueltigkeit_aufenthaltstitel", sql.Date, nullable=True),
Column("WeitereInfos__WI_meldung_institution", sql.Text, nullable=True),
)
md_main.create_all(ENGINE)

View File

@@ -0,0 +1,36 @@
from alembic import command
from alembic.config import Config
from wce_crm import constants
from wce_crm.logging import logger_base as logger
def check_and_run_migrations() -> None:
logger.info("[DB migration - Alembic] Start...")
alembic_base_folder = constants.Config.ALEMBIC_PATH
alembic_ini_file = alembic_base_folder / "alembic.ini"
alembic_working_dir = alembic_base_folder / "alembic"
assert alembic_ini_file.exists(), "Alembic INI file not found"
assert alembic_working_dir.exists(), "Alembic working dir not found"
assert alembic_working_dir.is_dir(), "Alembic working dir is not a directory"
logger.info("[DB migration - Alembic] Initialise config...")
alembic_cfg = Config(alembic_ini_file)
alembic_cfg.set_main_option("script_location", str(alembic_working_dir))
logger.info("[DB migration - Alembic] Run...")
try:
command.upgrade(alembic_cfg, "head")
logger.info("[DB migration - Alembic] Database up-to-date.")
except Exception as err:
logger.info(
"[DB migration - Alembic] An error occurred during the migration:\n%s",
err,
stack_info=True,
)
if __name__ == "__main__":
# start migration process before app start
check_and_run_migrations()

1109
src/wce_crm/form_defs.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -25,6 +25,7 @@ LOGGING_CFG: LoggingConfig = LoggingConfig(
setup_logging(LOGGING_CFG)
logger_base = BASE_LOGGER.getChild("wce_crm")
logger_base.setLevel(logging.DEBUG)
# ** GUI
logger_gui = logger_base.getChild("gui")