new pipeline management, proto graph display timeline

This commit is contained in:
Florian Förster
2024-06-19 16:58:26 +02:00
parent c2714b8060
commit fb4437a3a2
21 changed files with 2838 additions and 11383 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import copy
import sys
import typing
@@ -169,6 +171,90 @@ def convert_graph_to_cytoscape(
return cyto_data, weight_metadata
def filter_graph_by_edge_weight(
graph: TokenGraph,
bound_lower: int | None,
bound_upper: int | None,
) -> TokenGraph:
"""filters all edges which are within the provided bounds
Parameters
----------
bound_lower : int | None
lower bound for edge weights, edges with weight equal to this value are retained
bound_upper : int | None
upper bound for edge weights, edges with weight equal to this value are retained
Returns
-------
TokenGraph
a copy of the graph with filtered edges
"""
original_graph_edges = copy.deepcopy(graph.edges)
filtered_graph = graph.copy()
if not any([bound_lower, bound_upper]):
logger.warning('No bounds provided, returning original graph.')
return filtered_graph
for edge in original_graph_edges:
weight = typing.cast(int, filtered_graph[edge[0]][edge[1]]['weight'])
if bound_lower is not None and weight < bound_lower:
filtered_graph.remove_edge(edge[0], edge[1])
if bound_upper is not None and weight > bound_upper:
filtered_graph.remove_edge(edge[0], edge[1])
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
def filter_graph_by_node_degree(
graph: TokenGraph,
bound_lower: int | None,
bound_upper: int | None,
) -> TokenGraph:
"""filters all nodes which are within the provided bounds by their degree
Parameters
----------
bound_lower : int | None
lower bound for node degree, nodes with degree equal to this value are retained
bound_upper : int | None
upper bound for node degree, nodes with degree equal to this value are retained
Returns
-------
TokenGraph
a copy of the graph with filtered nodes
"""
# filter nodes by degree
original_graph_nodes = copy.deepcopy(graph.nodes)
filtered_graph = graph.copy()
if not any([bound_lower, bound_upper]):
logger.warning('No bounds provided, returning original graph.')
return filtered_graph
for node in original_graph_nodes:
degree = filtered_graph.degree[node] # type: ignore
if bound_lower is not None and degree < bound_lower:
filtered_graph.remove_node(node)
if bound_upper is not None and degree > bound_upper:
filtered_graph.remove_node(node)
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
# ** ---------------------------------------
class TokenGraph(DiGraph):
def __init__(
self,
@@ -286,87 +372,6 @@ class TokenGraph(DiGraph):
graph=self._undirected, logging=logging
)
def filter_by_edge_weight(
self,
bound_lower: int | None,
bound_upper: int | None,
) -> Self:
"""filters all edges which are within the provided bounds
Parameters
----------
bound_lower : int | None
lower bound for edge weights, edges with weight equal to this value are retained
bound_upper : int | None
upper bound for edge weights, edges with weight equal to this value are retained
Returns
-------
Self
a copy of the graph with filtered edges
"""
original_graph_edges = copy.deepcopy(self.edges)
filtered_graph = self.copy()
if not any([bound_lower, bound_upper]):
logger.warning('No bounds provided, returning original graph.')
return filtered_graph
for edge in original_graph_edges:
weight = typing.cast(int, filtered_graph[edge[0]][edge[1]]['weight'])
if bound_lower is not None and weight < bound_lower:
filtered_graph.remove_edge(edge[0], edge[1])
if bound_upper is not None and weight > bound_upper:
filtered_graph.remove_edge(edge[0], edge[1])
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
def filter_by_node_degree(
self,
bound_lower: int | None,
bound_upper: int | None,
) -> Self:
"""filters all nodes which are within the provided bounds by their degree
Parameters
----------
bound_lower : int | None
lower bound for node degree, nodes with degree equal to this value are retained
bound_upper : int | None
upper bound for node degree, nodes with degree equal to this value are retained
Returns
-------
Self
a copy of the graph with filtered nodes
"""
# filter nodes by degree
original_graph_nodes = copy.deepcopy(self.nodes)
filtered_graph = self.copy()
if not any([bound_lower, bound_upper]):
logger.warning('No bounds provided, returning original graph.')
return filtered_graph
for node in original_graph_nodes:
degree = filtered_graph.degree[node] # type: ignore
if bound_lower is not None and degree < bound_lower:
filtered_graph.remove_node(node)
if bound_upper is not None and degree > bound_upper:
filtered_graph.remove_node(node)
if filtered_graph._undirected is not None:
filtered_graph.to_undirected(inplace=True, logging=False)
filtered_graph.update_metadata(logging=False)
return filtered_graph
def _save_prepare(
self,
path: Path,
@@ -379,14 +384,13 @@ class TokenGraph(DiGraph):
return saving_path
def save_graph(
def to_GraphML(
self,
path: Path,
filename: str | None = None,
directed: bool = False,
) -> None:
"""save one of the stored graphs to disk file,
currently only GraphML format is supported
"""save one of the stored graphs to GraphML format on disk,
Parameters
----------

View File

@@ -22,7 +22,7 @@ from lang_main.analysis.shared import (
similar_index_groups,
)
from lang_main.loggers import logger_preprocess as logger
from lang_main.pipelines.base import BasePipeline
from lang_main.pipelines.base import Pipeline
from lang_main.types import Embedding, PandasIndex
# ** RE patterns
@@ -119,10 +119,9 @@ def remove_duplicates(
).copy()
logger.info('Removed all duplicates from dataset successfully.')
logger.info(
(
f'New Dataset properties: number of entries: {len(wo_duplicates)}, '
f'number of features {len(wo_duplicates.columns)}'
)
'New Dataset properties: number of entries: %d, number of features %d',
len(wo_duplicates),
len(wo_duplicates.columns),
)
return (wo_duplicates,)
@@ -176,6 +175,7 @@ def clean_string_slim(string: str) -> str:
string = pattern_special_chars.sub(' ', string)
string = pattern_repeated_chars.sub(r'\1', string)
# string = pattern_dates.sub('', string)
# dates are used for context, should not be removed at this stage
string = pattern_whitespace.sub(' ', string)
# remove whitespaces at the beginning and the end
string = string.strip()
@@ -241,11 +241,84 @@ def analyse_feature(
return (result_df,)
# ** pre-filter
def numeric_pre_filter_feature(
data: DataFrame,
feature: str,
bound_lower: int | None,
bound_upper: int | None,
) -> tuple[DataFrame]:
if not any([bound_lower, bound_upper]):
raise ValueError('No bounds for filtering provided')
data = data.copy()
if bound_lower is None:
bound_lower = cast(int, data[feature].min())
if bound_upper is None:
bound_upper = cast(int, data[feature].max())
filter_lower = data[feature] >= bound_lower
filter_upper = data[feature] <= bound_upper
filter = filter_lower & filter_upper
data = data.loc[filter]
return (data,)
# ** embedding based similarity
# following functions used to identify similar entries to have
# a more robust identification of duplicates negating negative side effects
# of several disturbances like typos, escape characters, etc.
# build mapping of embeddings for given model
def merge_similarity_dupl(
data: DataFrame,
model: SentenceTransformer,
cos_sim_threshold: float,
) -> tuple[DataFrame]:
logger.info('Start merging of similarity candidates...')
# data
merged_data = data.copy()
model_input = merged_data['entry']
candidates_idx = candidates_by_index(
data_model_input=model_input,
model=model,
cos_sim_threshold=cos_sim_threshold,
)
# graph of similar ids
similar_id_graph, _ = similar_index_connection_graph(candidates_idx)
for similar_id_group in similar_index_groups(similar_id_graph):
similar_id_group = list(similar_id_group)
similar_data = merged_data.loc[similar_id_group, :]
# keep first entry with max number occurrences, then number of
# associated objects, then length of entry
similar_data = similar_data.sort_values(
by=['num_occur', 'num_assoc_obj_ids', 'len'],
ascending=[False, False, False],
)
# merge information to first entry
data_idx = cast(PandasIndex, similar_data.index[0])
similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum()
assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy()
assoc_obj_ids = np.concatenate(assoc_obj_ids)
assoc_obj_ids = np.unique(assoc_obj_ids)
similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids
similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids)
# remaining indices, should be removed
similar_id_group.remove(data_idx)
merged_similar_data = similar_data.drop(index=similar_id_group)
# update entry in main dataset, drop remaining entries
merged_data.update(merged_similar_data)
merged_data = merged_data.drop(index=similar_id_group)
logger.info('Similarity candidates merged successfully.')
return (merged_data,)
#####################################################################
def build_embedding_map(
data: Series,
model: GermanSpacyModel | SentenceTransformer,
@@ -373,7 +446,7 @@ def list_cosSim_dupl_candidates(
save_candidates: bool = False,
saving_path: Path | None = None,
filename: str = 'CosSim-FilterCandidates',
pipeline: BasePipeline | None = None,
pipeline: Pipeline | None = None,
) -> tuple[list[tuple[PandasIndex, PandasIndex]], dict[int, tuple[Embedding, str]]]:
"""providing an overview of candidates with a similarity score greater than
given threshold; more suitable for debugging purposes
@@ -465,53 +538,6 @@ def similar_ids_groups(
"""
def merge_similarity_dupl(
data: DataFrame,
model: SentenceTransformer,
cos_sim_threshold: float,
) -> tuple[DataFrame]:
logger.info('Start merging of similarity candidates...')
# data
merged_data = data.copy()
model_input = merged_data['entry']
candidates_idx = candidates_by_index(
data_model_input=model_input,
model=model,
cos_sim_threshold=cos_sim_threshold,
)
# graph of similar ids
similar_id_graph, _ = similar_index_connection_graph(candidates_idx)
for similar_id_group in similar_index_groups(similar_id_graph):
similar_id_group = list(similar_id_group)
similar_data = merged_data.loc[similar_id_group, :]
# keep first entry with max number occurrences, then number of
# associated objects, then length of entry
similar_data = similar_data.sort_values(
by=['num_occur', 'num_assoc_obj_ids', 'len'],
ascending=[False, False, False],
)
# merge information to first entry
data_idx = cast(PandasIndex, similar_data.index[0])
similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum()
assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy()
assoc_obj_ids = np.concatenate(assoc_obj_ids)
assoc_obj_ids = np.unique(assoc_obj_ids)
similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids
similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids)
# remaining indices, should be removed
similar_id_group.remove(data_idx)
merged_similar_data = similar_data.drop(index=similar_id_group)
# update entry in main dataset, drop remaining entries
merged_data.update(merged_similar_data)
merged_data = merged_data.drop(index=similar_id_group)
logger.info('Similarity candidates merged successfully.')
return (merged_data.copy(),)
# merge duplicates
def merge_similarity_dupl_old(
data: DataFrame,

View File

@@ -24,13 +24,13 @@ PATH_TO_DATASET: Final[Path] = path_dataset_conf.resolve()
# if not PATH_TO_DATASET.exists():
# raise FileNotFoundError(f'Dataset path >>{PATH_TO_DATASET}<< does not exist.')
# ** control
DO_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing']
# DO_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing']
SKIP_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing_skip']
DO_TOKEN_ANALYSIS: Final[bool] = CONFIG['control']['token_analysis']
# DO_TOKEN_ANALYSIS: Final[bool] = CONFIG['control']['token_analysis']
SKIP_TOKEN_ANALYSIS: Final[bool] = CONFIG['control']['token_analysis_skip']
DO_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing']
# DO_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing']
SKIP_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing_skip']
DO_TIME_ANALYSIS: Final[bool] = CONFIG['control']['time_analysis']
# DO_TIME_ANALYSIS: Final[bool] = CONFIG['control']['time_analysis']
SKIP_TIME_ANALYSIS: Final[bool] = CONFIG['control']['time_analysis_skip']
# ** models
@@ -66,11 +66,11 @@ UNIQUE_CRITERION_FEATURE: Final[str] = CONFIG['time_analysis']['uniqueness'][
]
FEATURE_NAME_OBJ_ID: Final[str] = CONFIG['time_analysis']['uniqueness']['feature_name_obj_id']
# ** time_analysis.model_input
MODEL_INPUT_FEATURES: Final[tuple[str]] = tuple(
MODEL_INPUT_FEATURES: Final[tuple[str, ...]] = tuple(
CONFIG['time_analysis']['model_input']['input_features']
)
ACTIVITY_FEATURE: Final[str] = CONFIG['time_analysis']['model_input']['activity_feature']
ACTIVITY_TYPES: Final[tuple[str]] = tuple(
ACTIVITY_TYPES: Final[tuple[str, ...]] = tuple(
CONFIG['time_analysis']['model_input']['activity_types']
)
THRESHOLD_NUM_ACTIVITIES: Final[int] = CONFIG['time_analysis']['model_input'][

View File

@@ -1,4 +1,3 @@
import os
import pickle
import shutil
import tomllib
@@ -22,7 +21,7 @@ def create_saving_folder(
if overwrite_existing:
# overwrite if desired (deletes whole path and re-creates it)
shutil.rmtree(saving_path_folder)
os.makedirs(saving_path_folder)
saving_path_folder.mkdir(parents=True)
else:
logger.info(
(
@@ -62,56 +61,14 @@ def load_pickle(
return obj
# TODO: remove, too specialised for common application
"""
def filter_candidates_idx(
data_model_input: Series,
model: SentenceTransformer,
cos_sim_threshold: float,
) -> Iterator[tuple[PandasIndex, PandasIndex]]:
common function to filter candidate indices based on cosine similarity
using SentenceTransformer model in batch mode,
feed of data as Series to retain information about indices of entries
Parameters
----------
data_model_input : Series
containing indices and text entries to process
model : SentenceTransformer
necessary SentenceTransformer model to encode text entries
cos_sim_threshold : float
threshold for cosine similarity to filter candidates
Yields
------
Iterator[tuple[PandasIndex, PandasIndex]]
index pairs which meet the cosine similarity threshold
# embeddings
batch = typing.cast(list[str],
data_model_input.to_list())
embds = typing.cast(Tensor,
model.encode(
batch,
convert_to_numpy=False,
convert_to_tensor=True,
show_progress_bar=False,
))
# cosine similarity
cos_sim = typing.cast(
npt.NDArray,
sentence_transformers.util.cos_sim(embds, embds).numpy()
)
np.fill_diagonal(cos_sim, 0.)
cos_sim = np.triu(cos_sim)
cos_sim_idx = np.argwhere(cos_sim >= cos_sim_threshold)
for idx_array in cos_sim_idx:
idx_pair = typing.cast(
tuple[np.int64, np.int64],
tuple(data_model_input.index[idx] for idx in idx_array)
def get_entry_point(
saving_path: Path,
filename: str,
) -> Path:
entry_point_path = (saving_path / filename).with_suffix('.pkl')
if not entry_point_path.exists():
raise FileNotFoundError(
f'Could not find provided entry data under path: >>{entry_point_path}<<'
)
yield idx_pair
"""
return entry_point_path

View File

@@ -9,14 +9,12 @@ dataset = './01_2_Rohdaten_neu/Export4.csv'
#results = './results/Export7_trunc/'
#dataset = './01_03_Rohdaten_202403/Export7_trunc.csv'
# only debugging features, production-ready pipelines should always
# be fully executed
[control]
preprocessing = true
preprocessing_skip = false
token_analysis = false
token_analysis_skip = false
graph_postprocessing = false
graph_postprocessing_skip = false
time_analysis = false
time_analysis_skip = false
#[export_filenames]
@@ -42,9 +40,12 @@ criterion_feature = 'HObjektText'
feature_name_obj_id = 'ObjektID'
[time_analysis.model_input]
# input_features = [
# 'VorgangsTypName',
# 'VorgangsArtText',
# 'VorgangsBeschreibung',
# ]
input_features = [
'VorgangsTypName',
'VorgangsArtText',
'VorgangsBeschreibung',
]
activity_feature = 'VorgangsTypName'

View File

@@ -1,9 +1,14 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from pathlib import Path
from typing import Any
from typing import Any, Never, cast
from typing_extensions import override
from lang_main.loggers import logger_pipelines as logger
from lang_main.io import load_pickle, save_pickle
from lang_main.loggers import logger_pipelines as logger
from lang_main.types import ResultHandling
# ** pipelines to perform given actions on dataset in a customisable manner
@@ -12,7 +17,18 @@ class NoPerformableActionError(Exception):
"""Error describing that no action is available in the current pipeline"""
class BasePipeline:
class WrongActionTypeError(Exception):
"""Error raised if added action type is not supported by corresponding pipeline"""
class OutputInPipelineContainerError(Exception):
"""Error raised if an output was detected by one of the performed
actions in a PipelineContainer. Each action in a PipelineContainer is itself a
procedure which does not have any parameters or return values and should therefore not
return any values."""
class BasePipeline(ABC):
def __init__(
self,
name: str,
@@ -25,18 +41,12 @@ class BasePipeline:
self.name = name
# working directory for pipeline == output path
self.working_dir = working_dir
# if not self.working_dir.exists():
# self.working_dir.mkdir(parents=True)
# container for actions to perform during pass
self.actions: list[Callable] = []
self.action_names: list[str] = []
self.actions_kwargs: list[dict[str, Any]] = []
self.is_save_result: list[bool] = []
# progress tracking, start at 1
self.curr_proc_idx: int = 1
# intermediate result
self._intermediate_result: Any | None = None
def __repr__(self) -> str:
return (
@@ -44,15 +54,132 @@ class BasePipeline:
f'working dir: {self.working_dir}, contents: {self.action_names})'
)
@property
def intermediate_result(self) -> Any:
return self._intermediate_result
def panic_wrong_action_type(
self,
action: Any,
compatible_type: str,
) -> Never:
raise WrongActionTypeError(
(
f'Action must be of type {compatible_type}, '
f'but is of type >>{type(action)}<<.'
)
)
def prep_run(self) -> None:
logger.info('Starting pipeline >>%s<<...', self.name)
# progress tracking
self.curr_proc_idx = 1
# check if performable actions available
if len(self.actions) == 0:
raise NoPerformableActionError(
'The pipeline does not contain any performable actions.'
)
def post_run(self) -> None:
logger.info(
'Processing pipeline >>%s<< successfully ended after %d steps.',
self.name,
(self.curr_proc_idx - 1),
)
@abstractmethod
def add(self) -> None: ...
@abstractmethod
def logic(self) -> None: ...
def run(self, *args, **kwargs) -> Any:
self.prep_run()
ret = self.logic(*args, **kwargs)
self.post_run()
return ret
class PipelineContainer(BasePipeline):
def __init__(
self,
name: str,
working_dir: Path,
) -> None:
super().__init__(name=name, working_dir=working_dir)
self.action_skip: list[bool] = []
@override
def add(
self,
action: Callable,
skip: bool = False,
) -> None:
if isinstance(action, Callable):
self.actions.append(action)
self.action_names.append(action.__name__)
self.action_skip.append(skip)
else:
self.panic_wrong_action_type(action=action, compatible_type=Callable.__name__)
@override
def logic(self) -> None:
for idx, (action, action_name) in enumerate(zip(self.actions, self.action_names)):
# loading
if self.action_skip[idx]:
logger.info('[No Calculation] Skipping >>%s<<...', action_name)
self.curr_proc_idx += 1
continue
# calculation
ret = action()
if ret is not None:
raise OutputInPipelineContainerError(
(
f'Output in PipelineContainers not allowed. Action {action_name} '
f'returned values in Container {self.name}.'
)
)
# processing tracking
self.curr_proc_idx += 1
class Pipeline(BasePipeline):
def __init__(
self,
name: str,
working_dir: Path,
) -> None:
# init base class
super().__init__(name=name, working_dir=working_dir)
# name of pipeline
self.name = name
# working directory for pipeline == output path
self.working_dir = working_dir
# if not self.working_dir.exists():
# self.working_dir.mkdir(parents=True)
# container for actions to perform during pass
self.actions_kwargs: list[dict[str, Any]] = []
self.save_results: ResultHandling = []
self.load_results: ResultHandling = []
# intermediate result
self._intermediate_result: tuple[Any, ...] | None = None
def __repr__(self) -> str:
return (
f'{self.__class__.__name__}(name: {self.name}, '
f'working dir: {self.working_dir}, contents: {self.action_names})'
)
# @property
# def intermediate_result(self) -> tuple[Any, ...] | None:
# return self._intermediate_result
@override
def add(
self,
action: Callable,
action_kwargs: dict[str, Any] = {},
save_result: bool = False,
load_result: bool = False,
filename: str | None = None,
) -> None:
# check explicitly for function type
# if isinstance(action, FunctionType):
@@ -60,11 +187,10 @@ class BasePipeline:
self.actions.append(action)
self.action_names.append(action.__name__)
self.actions_kwargs.append(action_kwargs.copy())
self.is_save_result.append(save_result)
self.save_results.append((save_result, filename))
self.load_results.append((load_result, filename))
else:
raise TypeError(
f'Action must be custom function, but is of type >>{type(action)}<<.'
)
self.panic_wrong_action_type(action=action, compatible_type=Callable.__name__)
# TODO: add multiple entries by utilising simple add method
"""
@@ -88,57 +214,84 @@ class BasePipeline:
f"but is of type >>{type(action)}<<."))
"""
def save_curr_result(
def get_result_path(
self,
filename: str,
action_idx: int,
filename: str | None,
) -> tuple[Path, str]:
action_name = self.action_names[action_idx]
if filename is None:
target_filename = f'Pipe-{self.name}_Step-{self.curr_proc_idx}_{action_name}'
else:
target_filename = filename
target_path = self.working_dir.joinpath(target_filename).with_suffix('.pkl')
return target_path, action_name
def load_step(
self,
action_idx: int,
filename: str | None,
) -> tuple[Any, ...]:
target_path, action_name = self.get_result_path(action_idx, filename)
if not target_path.exists():
raise FileNotFoundError(
(
f'No intermediate results for action >>{action_name}<< '
f'under >>{target_path}<< found'
)
)
# results should be tuple, but that is not guaranteed
result_loaded = cast(tuple[Any, ...], load_pickle(target_path))
if not isinstance(result_loaded, tuple):
raise TypeError(f'Loaded results must be tuple, not {type(result_loaded)}')
return result_loaded
def save_step(
self,
action_idx: int,
filename: str | None,
) -> None:
target_filename = f'Pipe-{self.name}_Step-{self.curr_proc_idx}_' + filename
target_path = self.working_dir.joinpath(target_filename)
target_path = target_path.with_suffix('.pkl')
# target_filename = f'Pipe-{self.name}_Step-{self.curr_proc_idx}_' + filename
# target_path = self.working_dir.joinpath(target_filename)
# target_path = target_path.with_suffix('.pkl')
target_path, _ = self.get_result_path(action_idx, filename)
# saving file locally
save_pickle(obj=self._intermediate_result, path=target_path)
def load_intermediate_result(
self,
saving_path: str,
filename: str,
) -> tuple[Any, ...]:
target_path = Path(saving_path + filename).with_suffix('.pkl')
# loading DataFrame or Series from pickle
data = load_pickle(target_path)
return data
def prep_run(self) -> None:
logger.info('Starting processing pipeline >>%s<<...', self.name)
# progress tracking
self.curr_proc_idx = 1
# check if performable actions available
if len(self.actions) == 0:
raise NoPerformableActionError(
'The pipeline does not contain any performable actions.'
)
def run(
@override
def logic(
self,
starting_values: tuple[Any, ...],
) -> tuple[Any, ...]:
# prepare start
self.prep_run()
for idx, (action, action_kwargs) in enumerate(zip(self.actions, self.actions_kwargs)):
# loading
if self.load_results[idx][0]:
filename = self.load_results[idx][1]
ret = self.load_step(action_idx=idx, filename=filename)
logger.info(
'[No Calculation] Loaded result for action >>%s<< successfully',
self.action_names[idx],
)
self.curr_proc_idx += 1
continue
# calculation
if idx == 0:
ret = action(*starting_values, **action_kwargs)
else:
ret = action(*ret, **action_kwargs)
if not isinstance(ret, tuple):
ret = (ret,)
ret = cast(tuple[Any, ...], ret)
# save intermediate result
self._intermediate_result = ret
# check if result should be saved locally
if self.is_save_result[idx]:
self.save_curr_result(filename=self.action_names[idx])
# saving result locally, always save last action
if self.save_results[idx][0] or idx == (len(self.actions) - 1):
filename = self.save_results[idx][1]
self.save_step(action_idx=idx, filename=filename)
# processing tracking
self.curr_proc_idx += 1
logger.info('Processing pipeline >>%s<< successfully ended.', self.name)
return ret

View File

@@ -1,9 +1,11 @@
from lang_main.analysis import graphs
from lang_main.analysis.preprocessing import (
analyse_feature,
clean_string_slim,
entry_wise_cleansing,
load_raw_data,
merge_similarity_dupl,
numeric_pre_filter_feature,
remove_duplicates,
remove_NA,
)
@@ -23,40 +25,50 @@ from lang_main.constants import (
SAVE_PATH_FOLDER,
SPCY_MODEL,
STFR_MODEL,
THRESHOLD_AMOUNT_CHARACTERS,
THRESHOLD_EDGE_WEIGHT,
THRESHOLD_NUM_ACTIVITIES,
THRESHOLD_SIMILARITY,
THRESHOLD_TIMELINE_SIMILARITY,
THRESHOLD_UNIQUE_TEXTS,
UNIQUE_CRITERION_FEATURE,
)
from lang_main.pipelines.base import BasePipeline
from lang_main.pipelines.base import Pipeline
from lang_main.types import EntryPoints
# ** pipeline configuration
# ** target feature preparation
pipe_target_feat = BasePipeline(name='TargetFeature', working_dir=SAVE_PATH_FOLDER)
pipe_target_feat.add(
load_raw_data,
{
'date_cols': DATE_COLS,
},
)
pipe_target_feat.add(remove_duplicates)
pipe_target_feat.add(remove_NA, save_result=True)
pipe_target_feat.add(
entry_wise_cleansing,
{
'target_feature': 'VorgangsBeschreibung',
'cleansing_func': clean_string_slim,
},
save_result=True,
)
pipe_target_feat.add(
analyse_feature,
{
'target_feature': 'VorgangsBeschreibung',
},
save_result=True,
)
def build_base_target_feature_pipe() -> Pipeline:
pipe_target_feat = Pipeline(name='TargetFeature', working_dir=SAVE_PATH_FOLDER)
pipe_target_feat.add(
load_raw_data,
{
'date_cols': DATE_COLS,
},
)
pipe_target_feat.add(remove_duplicates)
pipe_target_feat.add(remove_NA, save_result=True)
pipe_target_feat.add(
entry_wise_cleansing,
{
'target_feature': 'VorgangsBeschreibung',
'cleansing_func': clean_string_slim,
},
save_result=True,
filename=EntryPoints.TIMELINE,
)
pipe_target_feat.add(
analyse_feature,
{
'target_feature': 'VorgangsBeschreibung',
},
save_result=True,
)
return pipe_target_feat
# output: DataFrame containing target feature with
# number of occurrences and associated ObjectIDs
@@ -81,68 +93,114 @@ pipe_target_feat.add(
# save_result=True,
# )
# ** Merge duplicates
pipe_merge = BasePipeline(name='Merge_Duplicates', working_dir=SAVE_PATH_FOLDER)
# pipe_merge.add(merge_similarity_dupl, save_result=True)
pipe_merge.add(
merge_similarity_dupl,
{
'model': STFR_MODEL,
'cos_sim_threshold': THRESHOLD_SIMILARITY,
},
save_result=True,
)
def build_merge_duplicates_pipe() -> Pipeline:
pipe_merge = Pipeline(name='Merge_Duplicates', working_dir=SAVE_PATH_FOLDER)
# pipe_merge.add(merge_similarity_dupl, save_result=True)
pipe_merge.add(
numeric_pre_filter_feature,
{
'feature': 'len',
'bound_lower': THRESHOLD_AMOUNT_CHARACTERS,
'bound_upper': None,
},
)
pipe_merge.add(
merge_similarity_dupl,
{
'model': STFR_MODEL,
'cos_sim_threshold': THRESHOLD_SIMILARITY,
},
save_result=True,
filename=EntryPoints.TOKEN_ANALYSIS,
)
return pipe_merge
# ** token analysis
pipe_token_analysis = BasePipeline(name='Token_Analysis', working_dir=SAVE_PATH_FOLDER)
pipe_token_analysis.add(
build_token_graph,
{
'model': SPCY_MODEL,
'target_feature': 'entry',
'weights_feature': 'num_occur',
'batch_idx_feature': 'batched_idxs',
'build_map': True,
'batch_size_model': 50,
},
save_result=True,
)
def build_tk_graph_pipe() -> Pipeline:
pipe_token_analysis = Pipeline(name='Token_Analysis', working_dir=SAVE_PATH_FOLDER)
pipe_token_analysis.add(
build_token_graph,
{
'model': SPCY_MODEL,
'target_feature': 'entry',
'weights_feature': 'num_occur',
'batch_idx_feature': 'batched_idxs',
'build_map': False,
'batch_size_model': 50,
},
save_result=True,
filename=EntryPoints.TK_GRAPH_POST,
)
return pipe_token_analysis
def build_tk_graph_post_pipe() -> Pipeline:
pipe_graph_postprocessing = Pipeline(
name='Graph_Postprocessing', working_dir=SAVE_PATH_FOLDER
)
pipe_graph_postprocessing.add(
graphs.filter_graph_by_edge_weight,
{
'bound_lower': THRESHOLD_EDGE_WEIGHT,
'bound_upper': None,
},
)
pipe_graph_postprocessing.add(
graphs.filter_graph_by_node_degree,
{
'bound_lower': 1,
'bound_upper': None,
},
save_result=True,
filename=EntryPoints.TK_GRAPH_ANALYSIS,
)
return pipe_graph_postprocessing
# ** timeline analysis
pipe_timeline = BasePipeline(name='Timeline_Analysis', working_dir=SAVE_PATH_FOLDER)
pipe_timeline.add(
remove_non_relevant_obj_ids,
{
'thresh_unique_feat_per_id': THRESHOLD_UNIQUE_TEXTS,
'feature_uniqueness': UNIQUE_CRITERION_FEATURE,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
},
save_result=True,
)
pipe_timeline.add(
generate_model_input,
{
'target_feature_name': 'nlp_model_input',
'model_input_features': MODEL_INPUT_FEATURES,
},
)
pipe_timeline.add(
filter_activities_per_obj_id,
{
'activity_feature': ACTIVITY_FEATURE,
'relevant_activity_types': ACTIVITY_TYPES,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
'threshold_num_activities': THRESHOLD_NUM_ACTIVITIES,
},
)
pipe_timeline.add(
get_timeline_candidates,
{
'model': STFR_MODEL,
'cos_sim_threshold': THRESHOLD_TIMELINE_SIMILARITY,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
'model_input_feature': 'nlp_model_input',
},
save_result=True,
)
def build_timeline_pipe() -> Pipeline:
pipe_timeline = Pipeline(name='Timeline_Analysis', working_dir=SAVE_PATH_FOLDER)
pipe_timeline.add(
remove_non_relevant_obj_ids,
{
'thresh_unique_feat_per_id': THRESHOLD_UNIQUE_TEXTS,
'feature_uniqueness': UNIQUE_CRITERION_FEATURE,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
},
save_result=True,
)
pipe_timeline.add(
generate_model_input,
{
'target_feature_name': 'nlp_model_input',
'model_input_features': MODEL_INPUT_FEATURES,
},
)
pipe_timeline.add(
filter_activities_per_obj_id,
{
'activity_feature': ACTIVITY_FEATURE,
'relevant_activity_types': ACTIVITY_TYPES,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
'threshold_num_activities': THRESHOLD_NUM_ACTIVITIES,
},
)
pipe_timeline.add(
get_timeline_candidates,
{
'model': STFR_MODEL,
'cos_sim_threshold': THRESHOLD_TIMELINE_SIMILARITY,
'feature_obj_id': FEATURE_NAME_OBJ_ID,
'model_input_feature': 'nlp_model_input',
},
save_result=True,
filename=EntryPoints.TIMELINE_POST,
)
return pipe_timeline

View File

@@ -17,8 +17,20 @@ class LoggingLevels(enum.IntEnum):
# ** devices
class STFRDeviceTypes(enum.StrEnum):
CPU = 'cpu'
GPU = 'cuda'
CPU = enum.auto()
GPU = enum.auto()
# ** pipelines
ResultHandling: TypeAlias = list[tuple[bool, str | None]]
class EntryPoints(enum.StrEnum):
TIMELINE = 'TIMELINE'
TIMELINE_POST = 'TIMELINE_POSTPROCESSING'
TK_GRAPH_POST = 'TK-GRAPH_POSTPROCESSING'
TK_GRAPH_ANALYSIS = 'TK-GRAPH_ANALYSIS'
TOKEN_ANALYSIS = 'TOKEN_ANALYSIS'
# ** datasets