initial commit

This commit is contained in:
Florian Förster
2024-05-08 14:46:43 +02:00
commit 9edcd5be4e
93 changed files with 982856 additions and 0 deletions

57
src/lang_main/__init__.py Normal file
View File

@@ -0,0 +1,57 @@
from typing import Final, Any
import inspect
from pathlib import Path
from lang_main.shared import (
save_pickle,
load_pickle,
create_saving_folder,
load_toml_config,
)
from lang_main.analysis.preprocessing import Embedding, PandasIndex
from lang_main.analysis.graphs import TokenGraph
__all__ = [
'save_pickle',
'load_pickle',
'create_saving_folder',
'Embedding',
'PandasIndex',
'TokenGraph',
]
USE_INTERNAL_CONFIG: Final[bool] = True
# load config data: internal/external
if USE_INTERNAL_CONFIG:
curr_file_dir = Path(inspect.getfile(inspect.currentframe())) # type: ignore
pkg_dir = curr_file_dir.parent
config_path = Path(pkg_dir, 'config.toml')
loaded_config = load_toml_config(path_to_toml=config_path)
CONFIG: Final[dict[str, Any]] = loaded_config.copy()
else:
raise NotImplementedError("External config data not implemented yet.")
# ** paths
SAVE_PATH_FOLDER: Final[Path] = Path(CONFIG['paths']['results'])
PATH_TO_DATASET: Final[Path] = Path(CONFIG['paths']['dataset'])
# ** control
DO_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing']
DO_TOKEN_ANALYSIS: Final[bool] = CONFIG['control']['token_analysis']
DO_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing']
# ** export
# ** preprocessing
FILENAME_COSSIM_FILTER_CANDIDATES: Final[str] =\
CONFIG['preprocess']['filename_cossim_filter_candidates']
DATE_COLS: Final[list[str]] = CONFIG['preprocess']['date_cols']
THRESHOLD_AMOUNT_CHARACTERS: Final[float] =\
CONFIG['preprocess']['threshold_amount_characters']
THRESHOLD_SIMILARITY: Final[float] = CONFIG['preprocess']['threshold_similarity']
# ** token analysis
# ** graph postprocessing
THRESHOLD_EDGE_WEIGHT: Final[int] = CONFIG['graph_postprocessing']['threshold_edge_weight']
# ** time analysis
THRESHOLD_UNIQUE_TEXTS: Final[int] = CONFIG['time_analysis']['threshold_unique_texts']

View File

View File

@@ -0,0 +1,406 @@
import typing
from typing import Any, Self, Literal, overload, Final
import sys
import logging
from collections.abc import Hashable
from pathlib import Path
import copy
import numpy as np
import numpy.typing as npt
from networkx import Graph, DiGraph
import networkx as nx
from pandas import DataFrame
from lang_main.shared import save_pickle, load_pickle
# TODO change logging behaviour, add logging to file
LOGGING_DEFAULT: Final[bool] = False
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.graphs')
def get_graph_metadata(
graph: Graph | DiGraph,
logging: bool = LOGGING_DEFAULT,
) -> dict[str, int]:
# info about graph
graph_info: dict[str, int] = {}
# nodes and edges
num_nodes = len(graph.nodes)
num_edges = len(graph.edges)
# edge weights
min_edge_weight: int = 1_000_000
max_edge_weight: int = 0
for edge in graph.edges:
weight = typing.cast(int,
graph[edge[0]][edge[1]]['weight'])
if weight < min_edge_weight:
min_edge_weight = weight
if weight > max_edge_weight:
max_edge_weight = weight
# memory
edge_mem = sum([sys.getsizeof(e) for e in graph.edges])
node_mem = sum([sys.getsizeof(n) for n in graph.nodes])
total_mem = edge_mem + node_mem
graph_info.update(
num_nodes=num_nodes,
num_edges=num_edges,
min_edge_weight=min_edge_weight,
max_edge_weight=max_edge_weight,
node_memory=node_mem,
edge_memory=edge_mem,
total_memory=total_mem,
)
if logging:
logger.info((f"Graph properties: {num_nodes} Nodes, "
f"{num_edges} Edges"))
logger.info(f"Node memory: {node_mem / 1024:.2f} KB")
logger.info(f"Edge memory: {edge_mem / 1024:.2f} KB")
logger.info(f"Total memory: {total_mem / 1024:.2f} KB")
return graph_info
def update_graph(
graph: Graph | DiGraph,
parent: Hashable,
child: Hashable,
weight_connection: int = 1,
) -> None:
# !! not necessary to check for existence of nodes
# !! feature already implemented in NetworkX ``add_edge``
"""
# check if nodes already in Graph
if parent not in graph:
graph.add_node(parent)
if child not in graph:
graph.add_node(child)
"""
# check if edge not in Graph
if not graph.has_edge(parent, child):
# create new edge, nodes will be created if not already present
graph.add_edge(parent, child, weight=weight_connection)
else:
# update edge
weight = graph[parent][child]['weight']
weight += weight_connection
graph[parent][child]['weight'] = weight
# build undirected adjacency matrix
def convert_graph_to_undirected(
graph: DiGraph,
logging: bool = LOGGING_DEFAULT,
) -> Graph:
# get adjacency matrix
adj_mat = typing.cast(DataFrame,
nx.to_pandas_adjacency(G=graph, dtype=np.uint32))
arr = typing.cast(npt.NDArray[np.uint32],
adj_mat.to_numpy())
# build undirected array: adding edges of lower triangular matrix to upper one
arr_upper = np.triu(arr)
arr_lower = np.tril(arr)
arr_lower = np.rot90(np.fliplr(arr_lower))
arr_new = arr_upper + arr_lower
# assign new data and create graph
adj_mat.loc[:] = arr_new # type: ignore
graph_undir = typing.cast(Graph,
nx.from_pandas_adjacency(df=adj_mat))
# info about graph
if logging:
logger.info("Successfully converted graph to one with undirected edges.")
_ = get_graph_metadata(graph=graph_undir, logging=logging)
return graph_undir
class TokenGraph(DiGraph):
def __init__(
self,
name: str = 'TokenGraph',
enable_logging: bool = True,
incoming_graph_data: Any| None = None,
**attr,
) -> None:
super().__init__(incoming_graph_data, **attr)
# logging of different actions
self.logging = enable_logging
# properties
self._name = name
# directed and undirected graph data
self._directed = self
self._metadata_directed: dict[str, int] = {}
self._undirected: Graph | None = None
self._metadata_undirected: dict[str, int] = {}
def __repr__(self) -> str:
return self.__str__()
def __str__(self) -> str:
return (f"TokenGraph(name: {self.name}, number of nodes: "
f"{len(self.nodes)}, number of edges: "
f"{len(self.edges)})")
# !! only used to verify that saving was done correctly
"""
def __key(self) -> tuple[Hashable, ...]:
return (self.name, tuple(self.nodes), tuple(self.edges))
def __hash__(self) -> int:
return hash(self.__key())
"""
def copy(self) -> Self:
"""returns a (deep) copy of the graph
Returns
-------
Self
deep copy of the graph
"""
return copy.deepcopy(self)
@property
def name(self) -> str:
return self._name
@property
def directed(self) -> Self:
return self._directed
@property
def undirected(self) -> Graph | None:
return self._undirected
@property
def metadata_directed(self) -> dict[str, int]:
return self._metadata_directed
@property
def metadata_undirected(self) -> dict[str, int]:
return self._metadata_undirected
@overload
def to_undirected(
self,
inplace: Literal[True] = ...,
logging: bool | None = ...,
) -> None:
...
@overload
def to_undirected(
self,
inplace: Literal[False],
logging: bool | None = ...,
) -> Graph:
...
@overload
def to_undirected(
self,
inplace: bool = ...,
logging: bool | None = ...
) -> Graph | None:
...
def to_undirected(
self,
inplace=True,
logging: bool | None = None,
) -> Graph | None:
if logging is None:
logging = self.logging
self._undirected = convert_graph_to_undirected(graph=self,
logging=logging)
self._metadata_undirected = get_graph_metadata(graph=self._undirected,
logging=logging)
if not inplace:
return self._undirected
def update_metadata(
self,
logging: bool | None = None,
) -> None:
if logging is None:
logging = self.logging
self._metadata_directed = get_graph_metadata(graph=self,
logging=logging)
if self._undirected is not None:
self._metadata_undirected = get_graph_metadata(graph=self._undirected,
logging=logging)
def filter_by_edge_weight(
self,
threshold: int,
) -> Self:
"""filters all edges which are below the given threshold
Parameters
----------
threshold : int
edges with weights smaller than this value will be removed
Returns
-------
Self
a copy of the graph with filtered edges
"""
# filter edges by weight
original_graph_edges = copy.deepcopy(self.edges)
filtered_graph = self.copy()
for edge in original_graph_edges:
weight = typing.cast(int,
filtered_graph[edge[0]][edge[1]]['weight'])
if weight < threshold:
filtered_graph.remove_edge(edge[0], edge[1])
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
def filter_by_node_degree(
self,
threshold: int,
) -> Self:
"""filters all nodes which have a degree below the given threshold
Parameters
----------
threshold : int
nodes with a degree smaller than this value will be removed
Returns
-------
Self
a copy of the graph with filtered nodes
"""
# filter nodes by degree
original_graph_nodes = copy.deepcopy(self.nodes)
filtered_graph = self.copy()
for node in original_graph_nodes:
degree = filtered_graph.degree[node] # type: ignore
if degree < threshold:
filtered_graph.remove_node(node)
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
def _save_prepare(
self,
path: Path,
filename: str | None = None,
) -> Path:
if filename is not None:
saving_path = path.joinpath(f"{filename}")
else:
saving_path = path.joinpath(f"{self.name}")
return saving_path
def save_graph(
self,
path: Path,
filename: str | None = None,
directed: bool = False,
) -> None:
"""save one of the stored graphs to disk file,
currently only GraphML format is supported
Parameters
----------
path : Path
target path for saving the file
filename : str | None, optional
filename to be given, by default None
directed : bool, optional
indicator whether directed or undirected graph
should be exported, by default False (undirected)
Raises
------
ValueError
undirected graph should be exported but is not available
"""
saving_path = self._save_prepare(path=path, filename=filename)
if directed:
target_graph = self._directed
elif not directed and self._undirected is not None:
target_graph = self._undirected
else:
raise ValueError("No undirected graph available.")
saving_path = saving_path.with_suffix('.graphml')
nx.write_graphml(G=target_graph, path=saving_path)
logger.info(("Successfully saved graph as GraphML file "
f"under {saving_path}."))
def to_pickle(
self,
path: Path,
filename: str | None = None,
) -> None:
"""save whole TokenGraph object as pickle file
Parameters
----------
path : Path
target path for saving the file
filename : str | None, optional
filename to be given, by default None
"""
saving_path = self._save_prepare(path=path, filename=filename)
saving_path = saving_path.with_suffix('.pickle')
save_pickle(obj=self, path=saving_path)
@classmethod
def from_file(
cls,
path: Path,
) -> Self:
# !! no validity checks for pickle files
# !! GraphML files not correct because not all properties
# !! are parsed correctly
# TODO REWORK
match path.suffix:
case '.graphml':
graph = typing.cast(Self, nx.read_graphml(path, node_type=int))
logger.info(f"Successfully loaded graph from GraphML file {path}.")
case '.pkl' | '.pickle':
graph = typing.cast(Self, load_pickle(path))
logger.info(f"Successfully loaded graph from pickle file {path}.")
case _:
raise ValueError("File format not supported.")
return graph
@classmethod
def from_pickle(
cls,
path: str | Path,
) -> Self:
if isinstance(path, str):
path = Path(path)
if path.suffix not in ('.pkl', '.pickle'):
raise ValueError("File format not supported.")
graph = typing.cast(Self, load_pickle(path))
return graph

View File

@@ -0,0 +1,561 @@
from typing import cast, Callable
from collections.abc import Iterable
import sys
import logging
from itertools import combinations
import re
from math import factorial
from pathlib import Path
import numpy as np
from torch import Tensor
from pandas import DataFrame, Series
import pandas as pd
from spacy.lang.de import German as GermanSpacyModel
from spacy.tokens.doc import Doc as SpacyDoc
from sentence_transformers import SentenceTransformer
import sentence_transformers
import sentence_transformers.util
from tqdm import tqdm
from lang_main.types import Embedding, PandasIndex
from lang_main.pipelines.base import BasePipeline
from lang_main.analysis.shared import (
similar_index_connection_graph,
similar_index_groups,
)
#from lang_main.analysis.graphs import update_graph, get_graph_metadata
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.preprocess')
# ** (1) dataset preparation: loading and simple preprocessing
# following functions used to load a given dataset and perform simple
# duplicate cleansing based on all properties
def load_raw_data(
path: Path,
date_cols: Iterable[str] = (
'VorgangsDatum',
'ErledigungsDatum',
'Arbeitsbeginn',
'ErstellungsDatum',
),
) -> tuple[DataFrame]:
"""load IHM dataset with standard structure
Parameters
----------
path : str
path to dataset file, usually CSV file
date_cols : list[str], optional
columns which contain dates and are parsed as such,
by default (
'VorgangsDatum',
'ErledigungsDatum',
'Arbeitsbeginn',
'ErstellungsDatum',
)
Returns
-------
DataFrame
raw dataset as DataFrame
"""
# load dataset
date_cols = list(date_cols)
data = pd.read_csv(
filepath_or_buffer=path,
sep=';',
encoding='cp1252',
parse_dates=date_cols,
dayfirst=True,
)
logger.info("Loaded dataset successfully.")
logger.info((f"Dataset properties: number of entries: {len(data)}, "
f"number of features {len(data.columns)}"))
return (data,)
def remove_duplicates(
data: DataFrame,
) -> tuple[DataFrame]:
"""removes duplicated entries over all features in the given dataset
Parameters
----------
data : DataFrame
read data with standard structure
Returns
-------
DataFrame
dataset with removed duplicates over all features
"""
# obtain info about duplicates over all features
duplicates_filt = data.duplicated()
logger.info(f"Number of duplicates over all features: {duplicates_filt.sum()}")
# drop duplicates
wo_duplicates = data.drop_duplicates(ignore_index=True)
duplicates_subset: list[str] = [
'VorgangsID',
'ObjektID',
]
duplicates_subset_filt = wo_duplicates.duplicated(subset=duplicates_subset)
logger.info(("Number of duplicates over subset "
f">>{duplicates_subset}<<: {duplicates_subset_filt.sum()}"))
wo_duplicates =\
wo_duplicates.drop_duplicates(subset=duplicates_subset, ignore_index=True).copy()
logger.info("Removed all duplicates from dataset successfully.")
logger.info((f"New Dataset properties: number of entries: {len(wo_duplicates)}, "
f"number of features {len(wo_duplicates.columns)}"))
return (wo_duplicates,)
def remove_NA(
data: DataFrame,
target_features: list[str] = [
'VorgangsBeschreibung',
],
) -> tuple[DataFrame]:
"""function to drop NA entries based on a subset of features to be analysed
Parameters
----------
data : DataFrame
standard IHM dataset, perhaps pre-cleaned
target_features : list[str], optional
subset to analyse to define an NA entry, by default [ 'VorgangsBeschreibung', ]
Returns
-------
DataFrame
dataset with removed NA entries for given subset of features
"""
wo_NA = data.dropna(axis=0, subset=target_features, ignore_index=True).copy() # type: ignore
logger.info(f"Removed NA entries for features >>{target_features}<< from dataset successfully.")
return (wo_NA,)
# ** (2) entry-based cleansing
# following functions clean and prepare specific entries, not whole dataset
def clean_string_slim(
string: str
) -> str:
"""mapping function to clean single string entries in a series (feature-wise)
of the dataset, used to be applied element-wise for string features
Parameters
----------
string : str
dataset entry feature
Returns
-------
str
cleaned entry
"""
# remove special chars
pattern = r'[\t\n\r\f\v]'
string = re.sub(pattern, ' ', string)
# remove whitespaces at the beginning and the end
string = string.strip()
return string
def entry_wise_cleansing(
data: DataFrame,
target_feature: str,
cleansing_func: Callable[[str], str],
) -> tuple[DataFrame]:
# apply given cleansing function to target feature
data[target_feature] = data[target_feature].map(cleansing_func)
logger.info((f"Successfully applied entry-wise cleansing procedure >>{cleansing_func.__name__}<< "
f"for feature >>{target_feature}<<"))
return (data,)
# ** in-depth analysis of one feature
# following functions try to gain insights on a given feature of the IHM dataset such
# as number of occurrences or associated Object IDs
def analyse_feature(
data: DataFrame,
target_feature: str,
) -> tuple[DataFrame]:
# feature columns
feature_entries = data[target_feature]
logger.info(f"Number of entries for feature >>{target_feature}<<: {len(feature_entries)}")
# obtain unique entries
unique_feature_entries = feature_entries.unique()
# prepare result DataFrame
cols = ['entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids']
result_df = pd.DataFrame(columns=cols)
for entry in tqdm(unique_feature_entries, mininterval=1.):
len_entry = len(entry)
filt = data[target_feature] == entry
temp = data[filt]
assoc_obj_ids = temp['ObjektID'].unique()
assoc_obj_ids = np.sort(assoc_obj_ids, kind='stable')
num_assoc_obj_ids = len(assoc_obj_ids)
num_dupl = filt.sum()
conc_df = pd.DataFrame(data=[[
entry,
len_entry,
num_dupl,
assoc_obj_ids,
num_assoc_obj_ids
]], columns=cols)
result_df = pd.concat([result_df, conc_df], ignore_index=True)
result_df = result_df.sort_values(by='num_occur', ascending=False).copy()
return (result_df,)
# ** embedding based similarity
# following functions used to identify similar entries to have
# a more robust identification of duplicates negating negative side effects
# of several disturbances like typos, escape characters, etc.
# build mapping of embeddings for given model
def build_embedding_map(
data: Series,
model: GermanSpacyModel | SentenceTransformer,
) -> tuple[dict[int, tuple[Embedding, str]], tuple[bool, bool]]:
# dictionary with embeddings
embeddings: dict[int, tuple[Embedding, str]] = {}
is_spacy = False
is_STRF = False
if isinstance(model, GermanSpacyModel):
is_spacy = True
elif isinstance(model, SentenceTransformer):
is_STRF = True
if not any((is_spacy, is_STRF)):
raise NotImplementedError("Model type unknown")
for (idx, text) in tqdm(data.items(), total=len(data), mininterval=1.):
# verbose code: Pyright not inferring types correctly
idx = cast(int, idx)
text = cast(str, text)
if is_spacy:
model = cast(GermanSpacyModel, model)
embd = cast(SpacyDoc, model(text))
embeddings[idx] = (embd, text)
# check for empty vectors
if not embd.vector_norm:
logger.debug('--- Unknown Words ---')
logger.debug(f'{embd.text=} has no vector')
elif is_STRF:
model = cast(SentenceTransformer, model)
embd = cast(Tensor,
model.encode(text, show_progress_bar=False))
embeddings[idx] = (embd, text)
return embeddings, (is_spacy, is_STRF)
# build similarity matrix out of embeddings
def build_cosSim_matrix(
data: Series,
model: GermanSpacyModel | SentenceTransformer,
) -> tuple[DataFrame, dict[int, tuple[Embedding, str]]]:
# build empty matrix
df_index = data.index
cosineSim_idx_matrix = pd.DataFrame(data=0., columns=df_index,
index=df_index, dtype=np.float32)
logger.info("Start building embedding map...")
# obtain embeddings based on used model
embds, (is_spacy, is_STRF) = build_embedding_map(
data=data,
model=model,
)
logger.info("Embedding map built successfully.")
# apply index based mapping for efficient handling of large texts
combs = combinations(df_index, 2)
total_combs = factorial(len(df_index)) // factorial(2) // factorial(len(df_index)-2)
logger.info("Start calculation of similarity scores...")
for (idx1, idx2) in tqdm(combs, total=total_combs, mininterval=1.):
#print(f"{idx1=}, {idx2=}")
embd1 = embds[idx1][0]
embd2 = embds[idx2][0]
# calculate similarity based on model type
if is_spacy:
embd1 = cast(SpacyDoc, embds[idx1][0])
embd2 = cast(SpacyDoc, embds[idx2][0])
cosSim = embd1.similarity(embd2)
elif is_STRF:
embd1 = cast(Tensor, embds[idx1][0])
embd2 = cast(Tensor, embds[idx2][0])
cosSim = sentence_transformers.util.cos_sim(embd1, embd2)
cosSim = cast(float, cosSim.item())
cosineSim_idx_matrix.at[idx1, idx2] = cosSim
logger.info("Similarity scores calculated successfully.")
return cosineSim_idx_matrix, embds
# obtain index pairs with cosine similarity
# greater than or equal to given threshold value
def filt_thresh_cosSim_matrix(
cosineSim_idx_matrix: DataFrame,
embds: dict[int, tuple[Embedding, str]],
threshold: float,
) -> tuple[Series, dict[int, tuple[Embedding, str]]]:
"""filter similarity matrix by threshold value and return index pairs with
a similarity score greater than the provided threshold
Parameters
----------
threshold : float
similarity threshold
cosineSim_idx_matrix : DataFrame
similarity matrix
Returns
-------
Series
series with multi index (index pairs) and corresponding similarity score
"""
cosineSim_filt = cast(Series,
cosineSim_idx_matrix.where(cosineSim_idx_matrix >= threshold).stack())
return cosineSim_filt, embds
def list_cosSim_dupl_candidates(
cosineSim_filt: Series,
embds: dict[int, tuple[Embedding, str]],
save_candidates: bool = False,
saving_path: Path | None = None,
filename: str = 'CosSim-FilterCandidates',
pipeline: BasePipeline | None = None,
) -> tuple[list[tuple[PandasIndex, PandasIndex]], dict[int, tuple[Embedding, str]]]:
"""providing an overview of candidates with a similarity score greater than
given threshold; more suitable for debugging purposes
Returns
-------
DataFrame
contains indices, corresponding texts and similarity score to evaluate results
list[tuple[Index, Index]]
list containing relevant index pairs for entries with similarity score greater than
given threshold
"""
logger.info("Start gathering of similarity candidates...")
# compare found duplicates
columns: list[str] = ['idx1', 'text1', 'idx2', 'text2', 'score']
df_candidates = pd.DataFrame(columns=columns)
index_pairs: list[tuple[PandasIndex, PandasIndex]] = []
for ((idx1, idx2), score) in tqdm(cosineSim_filt.items(), total=len(cosineSim_filt)): # type: ignore
# get text content from embedding as second tuple entry
content = [[
idx1,
embds[idx1][1],
idx2,
embds[idx2][1],
score,
]]
# add candidates to collection DataFrame
df_conc = pd.DataFrame(columns=columns, data=content)
if df_candidates.empty:
df_candidates = df_conc.copy()
else:
df_candidates = pd.concat([df_candidates, df_conc])
# save index pairs
index_pairs.append((idx1, idx2))
logger.info("Similarity candidates gathered successfully.")
if save_candidates:
if saving_path is None:
raise ValueError(("Saving path must be provided if duplicate "
"candidates should be saved."))
elif pipeline is not None:
target_filename = (f'Pipe-{pipeline.name}_Step_{pipeline.curr_proc_idx}_'
+ filename + '.xlsx')
elif pipeline is None:
target_filename = f'{filename}.xlsx'
logger.info("Saving similarity candidates...")
target_path = saving_path.joinpath(target_filename)
df_candidates.to_excel(target_path)
logger.info(f"Similarity candidates saved successfully to >>{target_path}<<.")
return index_pairs, embds
# TODO: change implementation fully to SentenceTransformer
# usage of batch processing for embeddings, use candidate idx function
# from time analysis --> moved to ``helpers.py``
"""
def similar_ids_connection_graph(
similar_idx_pairs: list[tuple[PandasIndex, PandasIndex]],
) -> tuple[Graph, dict[str, int]]:
# build index graph to obtain graph of connected (similar) indices
# use this graph to get connected components (indices which belong together)
# retain semantic connection on whole dataset
similar_id_graph = nx.Graph()
for (idx1, idx2) in similar_idx_pairs:
# inplace operation, parent/child do not really exist in undirected graph
update_graph(graph=similar_id_graph, parent=idx1, child=idx2)
graph_info = get_graph_metadata(graph=similar_id_graph, logging=True)
return similar_id_graph, graph_info
def similar_ids_groups(
dupl_id_graph: Graph,
) -> Iterator[list[PandasIndex]]:
# groups of connected indices
ids_groups = cast(Iterator[set[PandasIndex]],
nx.connected_components(G=dupl_id_graph))
for id_group in ids_groups:
yield list(id_group)
"""
def merge_similarity_dupl(
data: DataFrame,
similar_idx_pairs: list[tuple[PandasIndex, PandasIndex]],
) -> tuple[DataFrame]:
logger.info("Start merging of similarity candidates...")
# data
merged_data = data.copy()
# graph of similar ids
similar_id_graph, _ = similar_index_connection_graph(similar_idx_pairs)
for similar_id_group in similar_index_groups(similar_id_graph):
similar_data = merged_data.loc[similar_id_group,:]
# keep first entry with max number occurrences, then number of
# associated objects, then length of entry
similar_data = similar_data.sort_values(
by=['num_occur', 'num_assoc_obj_ids', 'len'],
ascending=[False, False, False],
)
# merge information to first entry
data_idx = cast(PandasIndex, similar_data.index[0])
similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum()
assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy()
assoc_obj_ids = np.concatenate(assoc_obj_ids)
assoc_obj_ids = np.unique(assoc_obj_ids)
similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids
similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids)
# remaining indices, should be removed
similar_id_group.remove(data_idx)
merged_similar_data = similar_data.drop(index=similar_id_group)
# update entry in main dataset, drop remaining entries
merged_data.update(merged_similar_data)
merged_data = merged_data.drop(index=similar_id_group)
logger.info("Similarity candidates merged successfully.")
return (merged_data.copy(),)
# merge duplicates
def merge_similarity_dupl_old(
data: DataFrame,
dupl_idx_pairs: list[tuple[PandasIndex, PandasIndex]],
) -> tuple[DataFrame]:
# copy pre-cleaned data
temp = data.copy()
index = temp.index
#logger.info("Start merging of similarity candidates...")
# iterate over index pairs
for (i1, i2) in tqdm(dupl_idx_pairs):
# if an entry does not exist any more, skip this pair
if i1 not in index or i2 not in index:
continue
# merge num occur
num_occur1 = temp.at[i1, 'num_occur']
num_occur2 = temp.at[i2, 'num_occur']
new_num_occur = num_occur1 + num_occur2
# merge associated object ids
assoc_ids1 = temp.at[i1, 'assoc_obj_ids']
assoc_ids2 = temp.at[i2, 'assoc_obj_ids']
new_assoc_ids = np.append(assoc_ids1, assoc_ids2)
new_assoc_ids = np.unique(new_assoc_ids.flatten())
# recalculate num associated obj ids
new_num_assoc_obj_ids = len(new_assoc_ids)
# write properties to first entry
temp.at[i1, 'num_occur'] = new_num_occur
temp.at[i1, 'assoc_obj_ids'] = new_assoc_ids
temp.at[i1, 'num_assoc_obj_ids'] = new_num_assoc_obj_ids
# drop second entry
temp = temp.drop(index=i2)
index = temp.index
#logger.info("Similarity candidates merged successfully.")
return (temp,)
# ** debugging and evaluation
def choose_cosSim_dupl_candidates(
cosineSim_filt: Series,
embds: dict[int, tuple[Embedding, str]],
) -> tuple[DataFrame, list[tuple[PandasIndex, PandasIndex]]]:
"""providing an overview of candidates with a similarity score greater than
given threshold, but decision is made manually by iterating through the candidates
with user interaction; more suitable for debugging purposes
Returns
-------
DataFrame
contains indices, corresponding texts and similarity score to evaluate results
list[tuple[Index, Index]]
list containing relevant index pairs for entries with similarity score greater than
given threshold
"""
# compare found duplicates
columns = ['idx1', 'text1', 'idx2', 'text2', 'score']
df_candidates = pd.DataFrame(columns=columns)
index_pairs: list[tuple[PandasIndex, PandasIndex]] = []
for ((idx1, idx2), score) in cosineSim_filt.items(): # type: ignore
# get texts for comparison
text1 = embds[idx1][1]
text2 = embds[idx2][1]
# get decision
print('---------- New Decision ----------')
print('text1:\n', text1, '\n', flush=True)
print('text2:\n', text2, '\n', flush=True)
decision = input('Please enter >>y<< if this is a duplicate, else hit enter:')
if not decision == 'y':
continue
# get text content from embedding as second tuple entry
content = [[
idx1,
text1,
idx2,
text2,
score,
]]
df_conc = pd.DataFrame(columns=columns, data=content)
df_candidates = pd.concat([df_candidates, df_conc])
index_pairs.append((idx1, idx2))
return df_candidates, index_pairs

View File

@@ -0,0 +1,34 @@
from typing import cast
from collections.abc import Iterable, Iterator
import networkx as nx
from networkx import Graph
from lang_main.types import PandasIndex
from lang_main.analysis.graphs import update_graph, get_graph_metadata
def similar_index_connection_graph(
similar_idx_pairs: Iterable[tuple[PandasIndex, PandasIndex]],
) -> tuple[Graph, dict[str, int]]:
# build index graph to obtain graph of connected (similar) indices
# use this graph to get connected components (indices which belong together)
# retain semantic connection on whole dataset
similar_id_graph = nx.Graph()
for (idx1, idx2) in similar_idx_pairs:
# inplace operation, parent/child do not really exist in undirected graph
update_graph(graph=similar_id_graph, parent=idx1, child=idx2)
graph_info = get_graph_metadata(graph=similar_id_graph, logging=True)
return similar_id_graph, graph_info
def similar_index_groups(
similar_id_graph: Graph,
) -> Iterator[list[PandasIndex]]:
# groups of connected indices
ids_groups = cast(Iterator[set[PandasIndex]],
nx.connected_components(G=similar_id_graph))
for id_group in ids_groups:
yield list(id_group)

View File

@@ -0,0 +1,226 @@
from typing import cast
import sys
import logging
from collections.abc import Iterable, Iterator
import numpy as np
import numpy.typing as npt
from pandas import DataFrame, Series
from torch import Tensor
from sentence_transformers import SentenceTransformer
import sentence_transformers
import sentence_transformers.util
from tqdm.auto import tqdm # TODO: check deletion
from lang_main.types import PandasIndex, ObjectID
from lang_main.analysis.shared import (
similar_index_connection_graph,
similar_index_groups,
)
# ** Logging
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.time_analysis')
def non_relevant_obj_ids(
data: DataFrame,
thresh_unique_feat_per_id: int,
*,
feature_uniqueness: str = 'HObjektText',
feature_obj_id: str = 'ObjektID',
) -> tuple[ObjectID, ...]:
data = data.copy()
ids_to_ignore: set[ObjectID] = set()
obj_ids = cast(Iterable[ObjectID], # actually NumPy array
data[feature_obj_id].unique())
for obj_id in obj_ids:
feats_per_obj_id = cast(
Series,
data.loc[(data[feature_obj_id]==obj_id), feature_uniqueness]
)
# check for uniqueness of given feature for current ObjectID
unique_feats_per_obj_id = len(feats_per_obj_id.unique())
if unique_feats_per_obj_id > thresh_unique_feat_per_id:
ids_to_ignore.add(obj_id)
return tuple(ids_to_ignore)
def remove_non_relevant_obj_ids(
data: DataFrame,
thresh_unique_feat_per_id: int,
*,
feature_uniqueness: str = 'HObjektText',
feature_obj_id: str = 'ObjektID',
) -> DataFrame:
data = data.copy()
ids_to_ignore = non_relevant_obj_ids(
data=data,
thresh_unique_feat_per_id=thresh_unique_feat_per_id,
feature_uniqueness=feature_uniqueness,
feature_obj_id=feature_obj_id,
)
# only retain entries with ObjectIDs not in IDs to ignore
data = data.loc[~data[feature_obj_id].isin(ids_to_ignore)]
return data
def filter_activities_per_obj_id(
data: DataFrame,
activity_feature: str = 'VorgangsTypName',
relevant_activity_types: Iterable[str] = (
'Reparaturauftrag (Portal)',
),
feature_obj_id: str = 'ObjektID',
threshold_num_activities: int = 1,
) -> tuple[DataFrame, Series]:
data = data.copy()
# filter only relevant activities count occurrences for each ObjectID
#relevant_activity_types = list(relevant_activity_types) # TODO: check deletion
filt_rel_activities = data[activity_feature].isin(relevant_activity_types)
data_filter_activities = data.loc[filt_rel_activities].copy()
num_activities_per_obj_id = cast(
Series,
data_filter_activities[feature_obj_id].value_counts(sort=True)
)
# filter for ObjectIDs with more than given number of activities
filt_below_thresh = (num_activities_per_obj_id <= threshold_num_activities)
# index of series contains ObjectIDs
obj_ids_below_thresh = num_activities_per_obj_id[filt_below_thresh].index
filt_entries_below_thresh = (data_filter_activities[feature_obj_id]
.isin(obj_ids_below_thresh))
num_activities_per_obj_id = num_activities_per_obj_id.loc[~filt_below_thresh]
data_filter_activities = data_filter_activities.loc[~filt_entries_below_thresh]
return data_filter_activities, num_activities_per_obj_id
def generate_model_input(
data: DataFrame,
target_feature_name: str = 'nlp_model_input',
model_input_features: Iterable[str] = (
'VorgangsTypName',
'VorgangsArtText',
'VorgangsBeschreibung',
),
) -> DataFrame:
data = data.copy()
model_input_features = list(model_input_features)
input_features = data[model_input_features].fillna('').astype(str)
data[target_feature_name] = input_features.apply(
lambda x: ' - '.join(x),
axis=1,
)
return data
# for each obj_id in relevant_obj_ids
## filter data for obj_id
## obtain series of (idx, nlp_model_input)
## make batch of nlp_model_input
## obtain embeddings
## calculate cosine similarity
## filter cosine similarity by threshold
## obtain idx pairs, yield
## use idx pairs to get idx values of series
def get_timeline_candidates_index(
data: DataFrame,
num_activities_per_obj_id: Series,
model: SentenceTransformer,
cos_sim_threshold: float,
feature_obj_id: str = 'ObjektID',
model_input_feature: str = 'nlp_model_input',
) -> Iterator[tuple[ObjectID, list[PandasIndex]]]:
# already sorted ObjIDs (descending regarding number of activities)
obj_ids = cast(Iterable[ObjectID],
num_activities_per_obj_id.index)
for obj_id in obj_ids:
data_per_obj_id = cast(
DataFrame,
data.loc[data[feature_obj_id]==obj_id]
)
data_model_input = data_per_obj_id[model_input_feature]
candidates_idx = candidates_by_index(
data_model_input=data_model_input,
model=model,
cos_sim_threshold=cos_sim_threshold,
)
# directly process candidates
candidates_idx = tuple(candidates_idx)
similar_id_graph, _ = similar_index_connection_graph(
similar_idx_pairs=candidates_idx,
)
for index_group in similar_index_groups(similar_id_graph):
yield obj_id, index_group
# TODO: check application for duplicate removal
def candidates_by_index(
data_model_input: Series,
model: SentenceTransformer,
cos_sim_threshold: float = 0.5,
) -> Iterator[tuple[PandasIndex, PandasIndex]]:
"""function to filter candidate indices based on cosine similarity
using SentenceTransformer model in batch mode,
feed data as Series to retain information about indices of entries and
access them later in the original dataset
Parameters
----------
obj_id : ObjectID
_description_
data_model_input : Series
containing indices and text entries to process
model : SentenceTransformer
necessary SentenceTransformer model to encode text entries
cos_sim_threshold : float, optional
threshold for cosine similarity to filter candidates, by default 0.5
Yields
------
Iterator[tuple[ObjectID, tuple[PandasIndex, PandasIndex]]]
ObjectID and tuple of index pairs which meet the cosine
similarity threshold
"""
# embeddings
batch = cast(list[str],
data_model_input.to_list())
embds = cast(
Tensor,
model.encode(
batch,
convert_to_numpy=False,
convert_to_tensor=True,
show_progress_bar=False,
)
)
# cosine similarity
cos_sim = cast(
npt.NDArray,
sentence_transformers.util.cos_sim(embds, embds).numpy()
)
np.fill_diagonal(cos_sim, 0.)
cos_sim = np.triu(cos_sim)
cos_sim_idx = np.argwhere(cos_sim >= cos_sim_threshold)
for idx_array in cos_sim_idx:
idx_pair = cast(
tuple[np.int64, np.int64],
tuple(data_model_input.index[idx] for idx in idx_array)
)
yield idx_pair
"""
next part:
"""

View File

@@ -0,0 +1,171 @@
from typing import cast
import sys
import logging
import re
from itertools import combinations
from collections.abc import Iterator
from dateutil.parser import parse
from spacy.tokens.token import Token as SpacyToken
from spacy.tokens.doc import Doc as SpacyDoc
from spacy.lang.de import German as GermanSpacyModel
from pandas import DataFrame
from tqdm.auto import tqdm
from lang_main.analysis.graphs import (
update_graph,
TokenGraph,
)
# ** Logging
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.token_analysis')
# ** POS
#POS_OF_INTEREST: frozenset[str] = frozenset(['NOUN', 'PROPN', 'ADJ', 'VERB', 'AUX'])
#POS_OF_INTEREST: frozenset[str] = frozenset(['NOUN', 'ADJ', 'VERB', 'AUX'])
#POS_OF_INTEREST: frozenset[str] = frozenset(['NOUN', 'PROPN'])
POS_OF_INTEREST: frozenset[str] = frozenset(['NOUN', 'PROPN', 'VERB', 'AUX'])
#POS_INDIRECT: frozenset[str] = frozenset(['AUX', 'VERB'])
POS_INDIRECT: frozenset[str] = frozenset(['AUX'])
# ** TAG
#TAG_OF_INTEREST: frozenset[str] = frozenset(['ADJD'])
TAG_OF_INTEREST: frozenset[str] = frozenset()
# ** obtaining connection in texts
def pre_clean_word(string: str) -> str:
pattern = r'[^A-Za-zäöüÄÖÜ]+'
string = re.sub(pattern, '', string)
return string
# https://stackoverflow.com/questions/25341945/check-if-string-has-date-any-format
def is_str_date(
string: str,
fuzzy: bool = False,
) -> bool:
#print(string)
try:
# check if string is a number
# if length is greater than 8, it is not a date
int(string)
if len(string) > 8:
return False
except ValueError:
# not a number
pass
try:
parse(string, fuzzy=fuzzy)
return True
except ValueError:
return False
def obtain_relevant_descendants(
token: SpacyToken,
) -> Iterator[SpacyToken]:
for descendant in token.subtree:
# subtrees contain the token itself
# if current element is token skip this element
if descendant == token:
continue
# if descendant is a date skip it)
if is_str_date(string=descendant.text):
continue
logger.debug((f"Token >>{token}<<, POS >>{token.pos_}<< | descendant "
f">>{descendant}<<, POS >>{descendant.pos_}<<"))
# eliminate cases of cross-references with verbs
if ((token.pos_ == 'AUX' or token.pos_ == 'VERB') and
(descendant.pos_ == 'AUX' or descendant.pos_ == 'VERB')):
continue
# skip cases in which descendant is indirect POS with others than verbs
elif descendant.pos_ in POS_INDIRECT:
continue
# skip cases in which child has no relevant POS or TAG
elif not (descendant.pos_ in POS_OF_INTEREST or descendant.tag_ in TAG_OF_INTEREST):
continue
yield descendant
# TODO look at results and fine-tune function accordingly
def add_doc_info_to_graph(
graph: TokenGraph,
doc: SpacyDoc,
weight: int,
) -> None:
# iterate over sentences
for sent in doc.sents:
# iterate over tokens in sentence
for token in sent:
# skip tokens which are not relevant
if not (token.pos_ in POS_OF_INTEREST or token.tag_ in TAG_OF_INTEREST):
continue
# skip token which are dates or times
if is_str_date(string=token.text):
continue
relevant_descendants = obtain_relevant_descendants(token=token)
# for non-AUX: add parent <--> descendant pair to graph
if token.pos_ not in POS_INDIRECT:
for descendant in relevant_descendants:
# add descendant and parent to graph
update_graph(
graph=graph,
parent=token.lemma_,
child=descendant.lemma_,
weight_connection=weight
)
else:
# if indirect POS, make connection between all associated words
combs = combinations(relevant_descendants, r=2)
for comb in combs:
# !! parents and children do not really exist in this case,
# !! but only one connection is made
update_graph(
graph=graph,
parent=comb[0].lemma_,
child=comb[1].lemma_,
weight_connection=weight,
)
def build_token_graph(
data: DataFrame,
model: GermanSpacyModel,
) -> tuple[TokenGraph]:
# empty NetworkX directed graph
#graph = nx.DiGraph()
graph = TokenGraph()
for row in tqdm(data.itertuples(), total=len(data)):
# obtain properties from tuple
# attribute names must match with preprocessed data
entry_text = cast(str, row.entry)
weight = cast(int, row.num_occur)
# get spacy model output
doc = model(entry_text)
add_doc_info_to_graph(
graph=graph,
doc=doc,
weight=weight,
)
# metadata
graph.update_metadata()
# convert to undirected
graph.to_undirected()
return (graph,)

34
src/lang_main/config.toml Normal file
View File

@@ -0,0 +1,34 @@
# lang_main: Config file
[paths]
results = './results/test_new2/'
dataset = './01_2_Rohdaten_neu/Export4.csv'
#results = './results/Export7/'
#dataset = './01_03_Rohdaten_202403/Export7_59499_Zeilen.csv'
#results = './results/Export7_trunc/'
#dataset = './01_03_Rohdaten_202403/Export7_trunc.csv'
[control]
preprocessing = false
token_analysis = true
graph_postprocessing = false
#[export_filenames]
#filename_cossim_filter_candidates = 'CosSim-FilterCandidates'
[preprocess]
filename_cossim_filter_candidates = 'CosSim-FilterCandidates'
date_cols = [
"VorgangsDatum",
"ErledigungsDatum",
"Arbeitsbeginn",
"ErstellungsDatum",
]
threshold_amount_characters = 5
threshold_similarity = 0.8
[graph_postprocessing]
threshold_edge_weight = 150
[time_analysis]
threshold_unique_texts = 5

View File

@@ -0,0 +1,20 @@
[ {
"name" : "IHM",
"transformers" : [ {
"id" : "org.cytoscape.CompositeFilter",
"parameters" : {
"type" : "ALL"
},
"transformers" : [ {
"id" : "org.cytoscape.ColumnFilter",
"parameters" : {
"predicate" : "IS",
"criterion" : true,
"caseSensitive" : false,
"type" : "nodes",
"anyMatch" : true,
"columnName" : "IsSingleNode"
}
} ]
} ]
} ]

View File

@@ -0,0 +1,305 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<vizmap id="VizMap-2024_04_17-09_15" documentVersion="3.1">
<visualStyle name="IHM">
<network>
<visualProperty default="false" name="NETWORK_FORCE_HIGH_DETAIL"/>
<visualProperty default="400.0" name="NETWORK_HEIGHT"/>
<visualProperty default="" name="NETWORK_TITLE"/>
<visualProperty default="0.0" name="NETWORK_CENTER_X_LOCATION"/>
<visualProperty default="true" name="NETWORK_NODE_SELECTION"/>
<visualProperty default="false" name="NETWORK_ANNOTATION_SELECTION"/>
<visualProperty default="true" name="NETWORK_EDGE_SELECTION"/>
<visualProperty default="0.0" name="NETWORK_CENTER_Y_LOCATION"/>
<visualProperty default="550.0" name="NETWORK_WIDTH"/>
<visualProperty default="0.0" name="NETWORK_DEPTH"/>
<visualProperty default="1.0" name="NETWORK_SCALE_FACTOR"/>
<visualProperty default="0.0" name="NETWORK_CENTER_Z_LOCATION"/>
<visualProperty default="#F7FCF5" name="NETWORK_BACKGROUND_PAINT"/>
<visualProperty default="false" name="NETWORK_NODE_LABEL_SELECTION"/>
</network>
<node>
<dependency value="true" name="nodeCustomGraphicsSizeSync"/>
<dependency value="false" name="nodeSizeLocked"/>
<visualProperty default="255" name="NODE_TRANSPARENCY"/>
<visualProperty default="ROUND_RECTANGLE" name="COMPOUND_NODE_SHAPE"/>
<visualProperty default="#787878" name="NODE_PAINT"/>
<visualProperty default="ELLIPSE" name="NODE_SHAPE"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_5"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_1"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_5, name=Node Custom Paint 5)" name="NODE_CUSTOMPAINT_5"/>
<visualProperty default="#B6B6B6" name="NODE_LABEL_BACKGROUND_COLOR"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_1"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_3"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_2"/>
<visualProperty default="50.0" name="NODE_HEIGHT">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="30.0" greaterValue="30.0" lesserValue="1.0"/>
<continuousMappingPoint attrValue="23.0" equalValue="70.0" greaterValue="1.0" lesserValue="70.0"/>
</continuousMapping>
</visualProperty>
<visualProperty default="50.0" name="NODE_WIDTH">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="30.0" greaterValue="30.0" lesserValue="1.0"/>
<continuousMappingPoint attrValue="23.0" equalValue="70.0" greaterValue="1.0" lesserValue="70.0"/>
</continuousMapping>
</visualProperty>
<visualProperty default="10.0" name="COMPOUND_NODE_PADDING"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_4"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_8, name=Node Custom Paint 8)" name="NODE_CUSTOMPAINT_8"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_6"/>
<visualProperty default="true" name="NODE_NESTED_NETWORK_IMAGE_VISIBLE"/>
<visualProperty default="#FFFF00" name="NODE_SELECTED_PAINT"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_9, name=Node Custom Paint 9)" name="NODE_CUSTOMPAINT_9"/>
<visualProperty default="false" name="NODE_SELECTED"/>
<visualProperty default="" name="NODE_LABEL">
<passthroughMapping attributeName="name" attributeType="string"/>
</visualProperty>
<visualProperty default="255" name="NODE_BORDER_TRANSPARENCY"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_7, name=Node Custom Paint 7)" name="NODE_CUSTOMPAINT_7"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_9"/>
<visualProperty default="0.0" name="NODE_Z_LOCATION"/>
<visualProperty default="0.0" name="NODE_LABEL_ROTATION"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_1"/>
<visualProperty default="255" name="NODE_LABEL_BACKGROUND_TRANSPARENCY"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_6"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_6"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_4"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_4"/>
<visualProperty default="" name="NODE_TOOLTIP"/>
<visualProperty default="#FEB24C" name="NODE_FILL_COLOR"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_4, name=Node Custom Paint 4)" name="NODE_CUSTOMPAINT_4"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_5"/>
<visualProperty default="Verdana Fett,plain,1" name="NODE_LABEL_FONT_FACE"/>
<visualProperty default="NONE" name="NODE_LABEL_BACKGROUND_SHAPE"/>
<visualProperty default="SOLID" name="NODE_BORDER_STROKE"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_9"/>
<visualProperty default="18.0" name="NODE_SIZE"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_7"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_7"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_LABEL_POSITION"/>
<visualProperty default="true" name="NODE_VISIBLE"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_3, name=Node Custom Paint 3)" name="NODE_CUSTOMPAINT_3"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_3"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_7"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_2"/>
<visualProperty default="255" name="NODE_LABEL_TRANSPARENCY"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_6, name=Node Custom Paint 6)" name="NODE_CUSTOMPAINT_6"/>
<visualProperty default="#FFFFFF" name="NODE_BORDER_PAINT"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_1, name=Node Custom Paint 1)" name="NODE_CUSTOMPAINT_1"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_9"/>
<visualProperty default="0.0" name="NODE_DEPTH"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_5"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_2, name=Node Custom Paint 2)" name="NODE_CUSTOMPAINT_2"/>
<visualProperty default="7.0" name="NODE_BORDER_WIDTH"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_3"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_2"/>
<visualProperty default="0.0" name="NODE_Y_LOCATION"/>
<visualProperty default="#252525" name="NODE_LABEL_COLOR"/>
<visualProperty default="0.0" name="NODE_X_LOCATION"/>
<visualProperty default="200.0" name="NODE_LABEL_WIDTH"/>
<visualProperty default="16" name="NODE_LABEL_FONT_SIZE">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="20" greaterValue="20" lesserValue="1"/>
<continuousMappingPoint attrValue="23.0" equalValue="48" greaterValue="1" lesserValue="48"/>
</continuousMapping>
</visualProperty>
</node>
<edge>
<dependency value="true" name="arrowColorMatchesEdge"/>
<visualProperty default="#000000" name="EDGE_TARGET_ARROW_UNSELECTED_PAINT"/>
<visualProperty default="C,C,c,0.00,0.00" name="EDGE_LABEL_POSITION"/>
<visualProperty default="true" name="EDGE_CURVED"/>
<visualProperty default="SOLID" name="EDGE_LINE_TYPE"/>
<visualProperty default="6.0" name="EDGE_TARGET_ARROW_SIZE"/>
<visualProperty default="false" name="EDGE_SELECTED"/>
<visualProperty default="3.0" name="EDGE_WIDTH">
<continuousMapping attributeName="weight" attributeType="float">
<continuousMappingPoint attrValue="120.0" equalValue="3.0" greaterValue="3.0" lesserValue="1.0"/>
<continuousMappingPoint attrValue="3799.9999153465033" equalValue="15.0" greaterValue="15.0" lesserValue="15.0"/>
<continuousMappingPoint attrValue="92788.0" equalValue="20.0" greaterValue="1.0" lesserValue="20.0"/>
</continuousMapping>
</visualProperty>
<visualProperty default="" name="EDGE_TOOLTIP"/>
<visualProperty default="#000000" name="EDGE_SOURCE_ARROW_UNSELECTED_PAINT"/>
<visualProperty default="0.0" name="EDGE_LABEL_ROTATION"/>
<visualProperty default="#2171B5" name="EDGE_UNSELECTED_PAINT"/>
<visualProperty default="255" name="EDGE_TRANSPARENCY">
<continuousMapping attributeName="weight" attributeType="float">
<continuousMappingPoint attrValue="120.0" equalValue="120" greaterValue="120" lesserValue="1"/>
<continuousMappingPoint attrValue="3700.0000471025705" equalValue="210" greaterValue="210" lesserValue="210"/>
<continuousMappingPoint attrValue="92788.0" equalValue="255" greaterValue="1" lesserValue="255"/>
</continuousMapping>
</visualProperty>
<visualProperty default="6.0" name="EDGE_SOURCE_ARROW_SIZE"/>
<visualProperty default="#B6B6B6" name="EDGE_LABEL_BACKGROUND_COLOR"/>
<visualProperty default="0.0" name="EDGE_Z_ORDER"/>
<visualProperty default="#000000" name="EDGE_LABEL_COLOR"/>
<visualProperty default="#FF0000" name="EDGE_STROKE_SELECTED_PAINT"/>
<visualProperty default="255" name="EDGE_LABEL_TRANSPARENCY"/>
<visualProperty default="true" name="EDGE_VISIBLE"/>
<visualProperty default="false" name="EDGE_LABEL_AUTOROTATE"/>
<visualProperty default="SansSerif.plain,plain,10" name="EDGE_LABEL_FONT_FACE"/>
<visualProperty default="AUTO_BEND" name="EDGE_STACKING"/>
<visualProperty default="10" name="EDGE_LABEL_FONT_SIZE"/>
<visualProperty default="NONE" name="EDGE_SOURCE_ARROW_SHAPE"/>
<visualProperty default="#FFFF00" name="EDGE_TARGET_ARROW_SELECTED_PAINT"/>
<visualProperty default="" name="EDGE_LABEL"/>
<visualProperty default="0.728545744495502,-0.684997151948455,0.6456513365424503" name="EDGE_BEND"/>
<visualProperty default="255" name="EDGE_LABEL_BACKGROUND_TRANSPARENCY"/>
<visualProperty default="0.5" name="EDGE_STACKING_DENSITY"/>
<visualProperty default="200.0" name="EDGE_LABEL_WIDTH"/>
<visualProperty default="NONE" name="EDGE_TARGET_ARROW_SHAPE"/>
<visualProperty default="#FFFFFF" name="EDGE_STROKE_UNSELECTED_PAINT"/>
<visualProperty default="NONE" name="EDGE_LABEL_BACKGROUND_SHAPE"/>
<visualProperty default="#FFFF00" name="EDGE_SOURCE_ARROW_SELECTED_PAINT"/>
</edge>
</visualStyle>
<visualStyle name="IHM_base">
<network>
<visualProperty default="false" name="NETWORK_FORCE_HIGH_DETAIL"/>
<visualProperty default="400.0" name="NETWORK_HEIGHT"/>
<visualProperty default="" name="NETWORK_TITLE"/>
<visualProperty default="0.0" name="NETWORK_CENTER_X_LOCATION"/>
<visualProperty default="true" name="NETWORK_NODE_SELECTION"/>
<visualProperty default="false" name="NETWORK_ANNOTATION_SELECTION"/>
<visualProperty default="true" name="NETWORK_EDGE_SELECTION"/>
<visualProperty default="0.0" name="NETWORK_CENTER_Y_LOCATION"/>
<visualProperty default="550.0" name="NETWORK_WIDTH"/>
<visualProperty default="0.0" name="NETWORK_DEPTH"/>
<visualProperty default="1.0" name="NETWORK_SCALE_FACTOR"/>
<visualProperty default="0.0" name="NETWORK_CENTER_Z_LOCATION"/>
<visualProperty default="#F7FCF5" name="NETWORK_BACKGROUND_PAINT"/>
<visualProperty default="false" name="NETWORK_NODE_LABEL_SELECTION"/>
</network>
<node>
<dependency value="true" name="nodeCustomGraphicsSizeSync"/>
<dependency value="false" name="nodeSizeLocked"/>
<visualProperty default="255" name="NODE_TRANSPARENCY"/>
<visualProperty default="ROUND_RECTANGLE" name="COMPOUND_NODE_SHAPE"/>
<visualProperty default="#787878" name="NODE_PAINT"/>
<visualProperty default="ELLIPSE" name="NODE_SHAPE"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_5"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_1"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_5, name=Node Custom Paint 5)" name="NODE_CUSTOMPAINT_5"/>
<visualProperty default="#B6B6B6" name="NODE_LABEL_BACKGROUND_COLOR"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_1"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_3"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_2"/>
<visualProperty default="50.0" name="NODE_HEIGHT">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="30.0" greaterValue="30.0" lesserValue="1.0"/>
<continuousMappingPoint attrValue="23.0" equalValue="70.0" greaterValue="1.0" lesserValue="70.0"/>
</continuousMapping>
</visualProperty>
<visualProperty default="50.0" name="NODE_WIDTH">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="30.0" greaterValue="30.0" lesserValue="1.0"/>
<continuousMappingPoint attrValue="23.0" equalValue="70.0" greaterValue="1.0" lesserValue="70.0"/>
</continuousMapping>
</visualProperty>
<visualProperty default="10.0" name="COMPOUND_NODE_PADDING"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_4"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_8, name=Node Custom Paint 8)" name="NODE_CUSTOMPAINT_8"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_6"/>
<visualProperty default="true" name="NODE_NESTED_NETWORK_IMAGE_VISIBLE"/>
<visualProperty default="#FFFF00" name="NODE_SELECTED_PAINT"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_9, name=Node Custom Paint 9)" name="NODE_CUSTOMPAINT_9"/>
<visualProperty default="false" name="NODE_SELECTED"/>
<visualProperty default="" name="NODE_LABEL">
<passthroughMapping attributeName="name" attributeType="string"/>
</visualProperty>
<visualProperty default="255" name="NODE_BORDER_TRANSPARENCY"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_7, name=Node Custom Paint 7)" name="NODE_CUSTOMPAINT_7"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_9"/>
<visualProperty default="0.0" name="NODE_Z_LOCATION"/>
<visualProperty default="0.0" name="NODE_LABEL_ROTATION"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_1"/>
<visualProperty default="255" name="NODE_LABEL_BACKGROUND_TRANSPARENCY"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_6"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_6"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_4"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_4"/>
<visualProperty default="" name="NODE_TOOLTIP"/>
<visualProperty default="#FEB24C" name="NODE_FILL_COLOR"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_4, name=Node Custom Paint 4)" name="NODE_CUSTOMPAINT_4"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_5"/>
<visualProperty default="Verdana Fett,plain,1" name="NODE_LABEL_FONT_FACE"/>
<visualProperty default="NONE" name="NODE_LABEL_BACKGROUND_SHAPE"/>
<visualProperty default="SOLID" name="NODE_BORDER_STROKE"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_9"/>
<visualProperty default="18.0" name="NODE_SIZE"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_7"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_7"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_LABEL_POSITION"/>
<visualProperty default="true" name="NODE_VISIBLE"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_3, name=Node Custom Paint 3)" name="NODE_CUSTOMPAINT_3"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_3"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_7"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_2"/>
<visualProperty default="255" name="NODE_LABEL_TRANSPARENCY"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_6, name=Node Custom Paint 6)" name="NODE_CUSTOMPAINT_6"/>
<visualProperty default="#FFFFFF" name="NODE_BORDER_PAINT"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_1, name=Node Custom Paint 1)" name="NODE_CUSTOMPAINT_1"/>
<visualProperty default="C,C,c,0.00,0.00" name="NODE_CUSTOMGRAPHICS_POSITION_9"/>
<visualProperty default="0.0" name="NODE_DEPTH"/>
<visualProperty default="org.cytoscape.cg.model.NullCustomGraphics,0,[ Remove Graphics ]," name="NODE_CUSTOMGRAPHICS_5"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_8"/>
<visualProperty default="DefaultVisualizableVisualProperty(id=NODE_CUSTOMPAINT_2, name=Node Custom Paint 2)" name="NODE_CUSTOMPAINT_2"/>
<visualProperty default="7.0" name="NODE_BORDER_WIDTH"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_3"/>
<visualProperty default="0.0" name="NODE_CUSTOMGRAPHICS_SIZE_2"/>
<visualProperty default="0.0" name="NODE_Y_LOCATION"/>
<visualProperty default="#252525" name="NODE_LABEL_COLOR"/>
<visualProperty default="0.0" name="NODE_X_LOCATION"/>
<visualProperty default="200.0" name="NODE_LABEL_WIDTH"/>
<visualProperty default="16" name="NODE_LABEL_FONT_SIZE">
<continuousMapping attributeName="Degree" attributeType="float">
<continuousMappingPoint attrValue="0.0" equalValue="20" greaterValue="20" lesserValue="1"/>
<continuousMappingPoint attrValue="23.0" equalValue="48" greaterValue="1" lesserValue="48"/>
</continuousMapping>
</visualProperty>
</node>
<edge>
<dependency value="true" name="arrowColorMatchesEdge"/>
<visualProperty default="#000000" name="EDGE_TARGET_ARROW_UNSELECTED_PAINT"/>
<visualProperty default="C,C,c,0.00,0.00" name="EDGE_LABEL_POSITION"/>
<visualProperty default="true" name="EDGE_CURVED"/>
<visualProperty default="SOLID" name="EDGE_LINE_TYPE"/>
<visualProperty default="6.0" name="EDGE_TARGET_ARROW_SIZE"/>
<visualProperty default="false" name="EDGE_SELECTED"/>
<visualProperty default="5.0" name="EDGE_WIDTH"/>
<visualProperty default="" name="EDGE_TOOLTIP"/>
<visualProperty default="#000000" name="EDGE_SOURCE_ARROW_UNSELECTED_PAINT"/>
<visualProperty default="0.0" name="EDGE_LABEL_ROTATION"/>
<visualProperty default="#2171B5" name="EDGE_UNSELECTED_PAINT"/>
<visualProperty default="255" name="EDGE_TRANSPARENCY"/>
<visualProperty default="6.0" name="EDGE_SOURCE_ARROW_SIZE"/>
<visualProperty default="#B6B6B6" name="EDGE_LABEL_BACKGROUND_COLOR"/>
<visualProperty default="0.0" name="EDGE_Z_ORDER"/>
<visualProperty default="#000000" name="EDGE_LABEL_COLOR"/>
<visualProperty default="#FF0000" name="EDGE_STROKE_SELECTED_PAINT"/>
<visualProperty default="255" name="EDGE_LABEL_TRANSPARENCY"/>
<visualProperty default="true" name="EDGE_VISIBLE"/>
<visualProperty default="false" name="EDGE_LABEL_AUTOROTATE"/>
<visualProperty default="SansSerif.plain,plain,10" name="EDGE_LABEL_FONT_FACE"/>
<visualProperty default="AUTO_BEND" name="EDGE_STACKING"/>
<visualProperty default="10" name="EDGE_LABEL_FONT_SIZE"/>
<visualProperty default="NONE" name="EDGE_SOURCE_ARROW_SHAPE"/>
<visualProperty default="#FFFF00" name="EDGE_TARGET_ARROW_SELECTED_PAINT"/>
<visualProperty default="" name="EDGE_LABEL"/>
<visualProperty default="0.728545744495502,-0.684997151948455,0.6456513365424503" name="EDGE_BEND"/>
<visualProperty default="255" name="EDGE_LABEL_BACKGROUND_TRANSPARENCY"/>
<visualProperty default="0.5" name="EDGE_STACKING_DENSITY"/>
<visualProperty default="200.0" name="EDGE_LABEL_WIDTH"/>
<visualProperty default="NONE" name="EDGE_TARGET_ARROW_SHAPE"/>
<visualProperty default="#FFFFFF" name="EDGE_STROKE_UNSELECTED_PAINT"/>
<visualProperty default="NONE" name="EDGE_LABEL_BACKGROUND_SHAPE"/>
<visualProperty default="#FFFF00" name="EDGE_SOURCE_ARROW_SELECTED_PAINT"/>
</edge>
</visualStyle>
</vizmap>

View File

View File

@@ -0,0 +1,144 @@
from typing import Any
#from types import FunctionType
import sys
import logging
from collections.abc import Callable
from pathlib import Path
from lang_main.shared import save_pickle, load_pickle
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.pipelines')
# ** pipelines to perform given actions on dataset in a customisable manner
class NoPerformableActionError(Exception):
"""Error describing that no action is available in the current pipeline"""
class BasePipeline():
def __init__(
self,
name: str,
working_dir: Path,
) -> None:
# init base class
super().__init__()
# name of pipeline
self.name = name
# working directory for pipeline == output path
self.working_dir = working_dir
# container for actions to perform during pass
self.actions: list[Callable] = []
self.action_names: list[str] = []
self.actions_kwargs: list[dict[str, Any]] = []
self.is_save_result: list[bool] = []
# progress tracking, start at 1
self.curr_proc_idx: int = 1
# intermediate result
self._intermediate_result: Any | None = None
def __repr__(self) -> str:
return (f"{self.__class__.__name__}(name: {self.name}, "
f"working dir: {self.working_dir}, contents: {self.action_names})")
@property
def intermediate_result(self) -> Any:
return self._intermediate_result
def add(
self,
action: Callable,
action_kwargs: dict[str, Any] = {},
save_result: bool = False,
) -> None:
# check explicitly for function type
#if isinstance(action, FunctionType):
if isinstance(action, Callable):
self.actions.append(action)
self.action_names.append(action.__name__)
self.actions_kwargs.append(action_kwargs.copy())
self.is_save_result.append(save_result)
else:
raise TypeError(("Action must be custom function, "
f"but is of type >>{type(action)}<<."))
# TODO: add multiple entries by utilising simple add method
"""
def add_multi(
self,
action: FunctionType | Sequence[FunctionType],
action_kwargs: dict[str, Any] | Sequence[dict[str, Any]],
) -> None:
if isinstance(action, Sequence):
if len(action_kwargs) != len(action):
raise ValueError(("Sequences for actions and corresponding keyword "
"arguments must have the same length."))
self.actions.extend(action)
self.actions_kwargs.extend(action_kwargs)
elif isinstance(action, FunctionType):
self.actions.append(action)
self.actions_kwargs.append(action_kwargs)
else:
raise TypeError(("Action must be function or sequence of functions, "
f"but is of type >>{type(action)}<<."))
"""
def save_curr_result(
self,
filename: str,
) -> None:
target_filename = f'Pipe-{self.name}_Step-{self.curr_proc_idx}_' + filename + '.pickle'
target_path = self.working_dir.joinpath(target_filename)
# saving file locally
save_pickle(obj=self._intermediate_result, path=target_path)
def load_intermediate_result(
self,
saving_path: str,
filename: str,
) -> tuple[Any, ...]:
target_path = saving_path + filename + '.pickle'
# loading DataFrame or Series from pickle
data = load_pickle(target_path)
return data
def prep_run(self) -> None:
logger.info(f"Starting processing pipeline >>{self.name}<<...")
# progress tracking
self.curr_proc_idx = 1
# check if performable actions available
if len(self.actions) == 0:
raise NoPerformableActionError(("The pipeline does not contain any "
"performable actions."))
def run(
self,
starting_values: tuple[Any, ...],
) -> tuple[Any, ...]:
# prepare start
self.prep_run()
for idx, (action, action_kwargs) in enumerate(zip(self.actions, self.actions_kwargs)):
if idx == 0:
ret = action(*starting_values, **action_kwargs)
else:
ret = action(*ret, **action_kwargs)
# save intermediate result
self._intermediate_result = ret
# check if result should be saved locally
if self.is_save_result[idx]:
self.save_curr_result(filename=self.action_names[idx])
# processing tracking
self.curr_proc_idx += 1
logger.info(f"Processing pipeline >>{self.name}<< successfully ended.")
return ret

View File

@@ -0,0 +1,66 @@
from sentence_transformers import SentenceTransformer
import spacy
from lang_main import (
SAVE_PATH_FOLDER,
DATE_COLS,
FILENAME_COSSIM_FILTER_CANDIDATES,
THRESHOLD_SIMILARITY,
)
from lang_main.pipelines.base import BasePipeline
from lang_main.analysis.preprocessing import (
load_raw_data,
remove_duplicates,
remove_NA,
clean_string_slim,
entry_wise_cleansing,
analyse_feature,
build_cosSim_matrix,
filt_thresh_cosSim_matrix,
list_cosSim_dupl_candidates,
merge_similarity_dupl,
)
from lang_main.analysis.tokens import build_token_graph
"""
# ** config parameters
SAVE_PATH_FOLDER: Final[Path] = Path(CONFIG['paths']['results'])
DATE_COLS: Final[list[str]] = CONFIG['preprocess']['date_cols']
FILENAME_COSSIM_FILTER_CANDIDATES: Final[str] =\
CONFIG['export_filenames']['filename_cossim_filter_candidates']
THRESHOLD_SIMILARITY: Final[float] = CONFIG['preprocess']['threshold_similarity']
"""
# ** pipeline configuration
# ** target feature preparation
pipe_target_feat = BasePipeline(name='TargetFeature', working_dir=SAVE_PATH_FOLDER)
pipe_target_feat.add(load_raw_data, {'date_cols': DATE_COLS})
pipe_target_feat.add(remove_duplicates)
pipe_target_feat.add(remove_NA, save_result=True)
pipe_target_feat.add(entry_wise_cleansing, {'target_feature': 'VorgangsBeschreibung', 'cleansing_func': clean_string_slim})
pipe_target_feat.add(analyse_feature, {'target_feature': 'VorgangsBeschreibung'}, save_result=True)
# output: DataFrame containing target feature with
# number of occurrences and associated ObjectIDs
# ** embedding pipe
# using similarity between entries to catch duplicates with typo or similar content
pipe_embds = BasePipeline(name='Embedding1', working_dir=SAVE_PATH_FOLDER)
model_spacy = spacy.load('de_dep_news_trf')
model_stfr = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
pipe_embds.add(build_cosSim_matrix, {'model': model_stfr}, save_result=True)
pipe_embds.add(filt_thresh_cosSim_matrix, {'threshold': THRESHOLD_SIMILARITY}, save_result=True)
pipe_embds.add(
list_cosSim_dupl_candidates,
{'save_candidates': True,
'saving_path': SAVE_PATH_FOLDER,
'filename': FILENAME_COSSIM_FILTER_CANDIDATES,
'pipeline': pipe_embds}, save_result=True)
# ** Merge duplicates
pipe_merge = BasePipeline(name='Merge_Duplicates', working_dir=SAVE_PATH_FOLDER)
pipe_merge.add(merge_similarity_dupl, save_result=True)
# ** token analysis
pipe_token_analysis = BasePipeline(name='Token_Analysis', working_dir=SAVE_PATH_FOLDER)
pipe_token_analysis.add(build_token_graph, {'model': model_spacy}, save_result=True)

111
src/lang_main/shared.py Normal file
View File

@@ -0,0 +1,111 @@
from typing import Any
import sys
import os
import shutil
import logging
import pickle
import tomllib
from pathlib import Path
# ** Logging
LOGGING_LEVEL = 'INFO'
logging.basicConfig(level=LOGGING_LEVEL, stream=sys.stdout)
logger = logging.getLogger('ihm_analyse.helpers')
# ** Lib
def create_saving_folder(
saving_path_folder: str | Path,
overwrite_existing: bool = False,
) -> None:
# check for existence of given path
if not os.path.exists(saving_path_folder):
os.makedirs(saving_path_folder)
else:
if overwrite_existing:
# overwrite if desired (deletes whole path and re-creates it)
shutil.rmtree(saving_path_folder)
os.makedirs(saving_path_folder)
else:
logger.info((f"Path >>{saving_path_folder}<< already exists and remained "
"unchanged. If you want to overwrite this path, use parameter "
">>overwrite_existing<<."))
def load_toml_config(
path_to_toml: str | Path,
) -> dict[str, Any]:
with open(path_to_toml, "rb") as f:
data = tomllib.load(f)
logger.info("Loaded TOML config file successfully.")
return data
# saving and loading using pickle
# careful: pickling from unknown sources can be dangerous
def save_pickle(
obj: Any,
path: str | Path,
) -> None:
with open(path, 'wb') as file:
pickle.dump(obj, file, protocol=pickle.HIGHEST_PROTOCOL)
logger.info(f"Saved file successfully under {path}")
def load_pickle(
path: str | Path,
) -> Any:
with open(path, 'rb') as file:
obj = pickle.load(file)
logger.info("Loaded file successfully.")
return obj
# TODO: remove, too specialised for common application
"""
def filter_candidates_idx(
data_model_input: Series,
model: SentenceTransformer,
cos_sim_threshold: float,
) -> Iterator[tuple[PandasIndex, PandasIndex]]:
common function to filter candidate indices based on cosine similarity
using SentenceTransformer model in batch mode,
feed of data as Series to retain information about indices of entries
Parameters
----------
data_model_input : Series
containing indices and text entries to process
model : SentenceTransformer
necessary SentenceTransformer model to encode text entries
cos_sim_threshold : float
threshold for cosine similarity to filter candidates
Yields
------
Iterator[tuple[PandasIndex, PandasIndex]]
index pairs which meet the cosine similarity threshold
# embeddings
batch = typing.cast(list[str],
data_model_input.to_list())
embds = typing.cast(Tensor,
model.encode(
batch,
convert_to_numpy=False,
convert_to_tensor=True,
show_progress_bar=False,
))
# cosine similarity
cos_sim = typing.cast(
npt.NDArray,
sentence_transformers.util.cos_sim(embds, embds).numpy()
)
np.fill_diagonal(cos_sim, 0.)
cos_sim = np.triu(cos_sim)
cos_sim_idx = np.argwhere(cos_sim >= cos_sim_threshold)
for idx_array in cos_sim_idx:
idx_pair = typing.cast(
tuple[np.int64, np.int64],
tuple(data_model_input.index[idx] for idx in idx_array)
)
yield idx_pair
"""

9
src/lang_main/types.py Normal file
View File

@@ -0,0 +1,9 @@
from typing import TypeAlias
import numpy as np
from spacy.tokens.doc import Doc as SpacyDoc
from torch import Tensor
PandasIndex: TypeAlias = int | np.int64
ObjectID: TypeAlias = int
Embedding: TypeAlias = SpacyDoc | Tensor