import re from collections.abc import Iterable from itertools import combinations from math import factorial from pathlib import Path from typing import Callable, cast import numpy as np import pandas as pd import sentence_transformers import sentence_transformers.util from pandas import DataFrame, Series from sentence_transformers import SentenceTransformer from spacy.lang.de import German as GermanSpacyModel from spacy.tokens.doc import Doc as SpacyDoc from torch import Tensor from tqdm import tqdm from lang_main.analysis.shared import ( candidates_by_index, similar_index_connection_graph, similar_index_groups, ) from lang_main.loggers import logger_preprocess as logger from lang_main.pipelines.base import BasePipeline from lang_main.types import Embedding, PandasIndex # ** RE patterns pattern_special_chars = re.compile(r'[\t\n\r\f\v]+') pattern_repeated_chars = re.compile(r'([,;.:!?-_\+]){2,}') pattern_dates = re.compile(r'(\d{1,2}\.)?(\d{1,2}\.)([\d]{2,4})?') pattern_whitespace = re.compile(r'[ ]{2,}') # ** (1) dataset preparation: loading and simple preprocessing # following functions used to load a given dataset and perform simple # duplicate cleansing based on all properties def load_raw_data( path: Path, date_cols: Iterable[str] = ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', ), ) -> tuple[DataFrame]: """load IHM dataset with standard structure Parameters ---------- path : str path to dataset file, usually CSV file date_cols : list[str], optional columns which contain dates and are parsed as such, by default ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', ) Returns ------- DataFrame raw dataset as DataFrame """ # load dataset date_cols = list(date_cols) data = pd.read_csv( filepath_or_buffer=path, sep=';', encoding='cp1252', parse_dates=date_cols, dayfirst=True, ) logger.info('Loaded dataset successfully.') logger.info( ( f'Dataset properties: number of entries: {len(data)}, ' f'number of features {len(data.columns)}' ) ) return (data,) def remove_duplicates( data: DataFrame, ) -> tuple[DataFrame]: """removes duplicated entries over all features in the given dataset Parameters ---------- data : DataFrame read data with standard structure Returns ------- DataFrame dataset with removed duplicates over all features """ # obtain info about duplicates over all features duplicates_filt = data.duplicated() logger.info(f'Number of duplicates over all features: {duplicates_filt.sum()}') # drop duplicates wo_duplicates = data.drop_duplicates(ignore_index=True) duplicates_subset: list[str] = [ 'VorgangsID', 'ObjektID', ] duplicates_subset_filt = wo_duplicates.duplicated(subset=duplicates_subset) logger.info( ( 'Number of duplicates over subset ' f'>>{duplicates_subset}<<: {duplicates_subset_filt.sum()}' ) ) wo_duplicates = wo_duplicates.drop_duplicates( subset=duplicates_subset, ignore_index=True ).copy() logger.info('Removed all duplicates from dataset successfully.') logger.info( ( f'New Dataset properties: number of entries: {len(wo_duplicates)}, ' f'number of features {len(wo_duplicates.columns)}' ) ) return (wo_duplicates,) def remove_NA( data: DataFrame, target_features: list[str] = [ 'VorgangsBeschreibung', ], ) -> tuple[DataFrame]: """function to drop NA entries based on a subset of features to be analysed Parameters ---------- data : DataFrame standard IHM dataset, perhaps pre-cleaned target_features : list[str], optional subset to analyse to define an NA entry, by default [ 'VorgangsBeschreibung', ] Returns ------- DataFrame dataset with removed NA entries for given subset of features """ wo_NA = data.dropna(axis=0, subset=target_features, ignore_index=True).copy() # type: ignore logger.info( f'Removed NA entries for features >>{target_features}<< from dataset successfully.' ) return (wo_NA,) # ** (2) entry-based cleansing # following functions clean and prepare specific entries, not whole dataset def clean_string_slim(string: str) -> str: """mapping function to clean single string entries in a series (feature-wise) of the dataset, used to be applied element-wise for string features Parameters ---------- string : str dataset entry feature Returns ------- str cleaned entry """ # remove special chars string = pattern_special_chars.sub(' ', string) string = pattern_repeated_chars.sub(r'\1', string) # string = pattern_dates.sub('', string) string = pattern_whitespace.sub(' ', string) # remove whitespaces at the beginning and the end string = string.strip() return string def entry_wise_cleansing( data: DataFrame, target_feature: str, cleansing_func: Callable[[str], str], ) -> tuple[DataFrame]: # apply given cleansing function to target feature data[target_feature] = data[target_feature].map(cleansing_func) logger.info( ('Successfully applied entry-wise cleansing procedure >>%s<< for feature >>%s<<'), cleansing_func.__name__, target_feature, ) return (data,) # ** in-depth analysis of one feature # following functions try to gain insights on a given feature of the IHM dataset such # as number of occurrences or associated Object IDs def analyse_feature( data: DataFrame, target_feature: str, ) -> tuple[DataFrame]: # feature columns feature_entries = data[target_feature] logger.info( 'Number of entries for feature >>%s<<: %d', target_feature, len(feature_entries) ) # obtain unique entries unique_feature_entries = feature_entries.unique() # prepare result DataFrame cols = ['entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids'] result_df = pd.DataFrame(columns=cols) for entry in tqdm(unique_feature_entries, mininterval=1.0): len_entry = len(entry) filt = data[target_feature] == entry temp = data[filt] assoc_obj_ids = temp['ObjektID'].unique() assoc_obj_ids = np.sort(assoc_obj_ids, kind='stable') num_assoc_obj_ids = len(assoc_obj_ids) num_dupl = filt.sum() conc_df = pd.DataFrame( data=[[entry, len_entry, num_dupl, assoc_obj_ids, num_assoc_obj_ids]], columns=cols, ) result_df = pd.concat([result_df, conc_df], ignore_index=True) result_df = result_df.sort_values(by='num_occur', ascending=False).copy() return (result_df,) # ** embedding based similarity # following functions used to identify similar entries to have # a more robust identification of duplicates negating negative side effects # of several disturbances like typos, escape characters, etc. # build mapping of embeddings for given model def build_embedding_map( data: Series, model: GermanSpacyModel | SentenceTransformer, ) -> tuple[dict[int, tuple[Embedding, str]], tuple[bool, bool]]: # dictionary with embeddings embeddings: dict[int, tuple[Embedding, str]] = {} is_spacy = False is_STRF = False if isinstance(model, GermanSpacyModel): is_spacy = True elif isinstance(model, SentenceTransformer): is_STRF = True if not any((is_spacy, is_STRF)): raise NotImplementedError('Model type unknown') for idx, text in tqdm(data.items(), total=len(data), mininterval=1.0): # verbose code: Pyright not inferring types correctly idx = cast(int, idx) text = cast(str, text) if is_spacy: model = cast(GermanSpacyModel, model) embd = cast(SpacyDoc, model(text)) embeddings[idx] = (embd, text) # check for empty vectors if not embd.vector_norm: logger.debug('--- Unknown Words ---') logger.debug('embd.text: %s has no vector', embd.text) elif is_STRF: model = cast(SentenceTransformer, model) embd = cast(Tensor, model.encode(text, show_progress_bar=False)) embeddings[idx] = (embd, text) return embeddings, (is_spacy, is_STRF) # adapt interface # use candidates by index function # merges: build_embedding_map, build_cosSim_matrix, filt_thresh_cosSim_matrix # build similarity matrix out of embeddings def build_cosSim_matrix( data: Series, model: GermanSpacyModel | SentenceTransformer, ) -> tuple[DataFrame, dict[int, tuple[Embedding, str]]]: # build empty matrix df_index = data.index cosineSim_idx_matrix = pd.DataFrame( data=0.0, columns=df_index, index=df_index, dtype=np.float32 ) logger.info('Start building embedding map...') # obtain embeddings based on used model embds, (is_spacy, is_STRF) = build_embedding_map( data=data, model=model, ) logger.info('Embedding map built successfully.') # apply index based mapping for efficient handling of large texts combs = combinations(df_index, 2) total_combs = factorial(len(df_index)) // factorial(2) // factorial(len(df_index) - 2) logger.info('Start calculation of similarity scores...') for idx1, idx2 in tqdm(combs, total=total_combs, mininterval=1.0): # print(f"{idx1=}, {idx2=}") embd1 = embds[idx1][0] embd2 = embds[idx2][0] # calculate similarity based on model type if is_spacy: embd1 = cast(SpacyDoc, embds[idx1][0]) embd2 = cast(SpacyDoc, embds[idx2][0]) cosSim = embd1.similarity(embd2) elif is_STRF: embd1 = cast(Tensor, embds[idx1][0]) embd2 = cast(Tensor, embds[idx2][0]) cosSim = sentence_transformers.util.cos_sim(embd1, embd2) cosSim = cast(float, cosSim.item()) cosineSim_idx_matrix.at[idx1, idx2] = cosSim logger.info('Similarity scores calculated successfully.') return cosineSim_idx_matrix, embds # obtain index pairs with cosine similarity # greater than or equal to given threshold value def filt_thresh_cosSim_matrix( cosineSim_idx_matrix: DataFrame, embds: dict[int, tuple[Embedding, str]], threshold: float, ) -> tuple[Series, dict[int, tuple[Embedding, str]]]: """filter similarity matrix by threshold value and return index pairs with a similarity score greater than the provided threshold Parameters ---------- threshold : float similarity threshold cosineSim_idx_matrix : DataFrame similarity matrix Returns ------- Series series with multi index (index pairs) and corresponding similarity score """ cosineSim_filt = cast( Series, cosineSim_idx_matrix.where(cosineSim_idx_matrix >= threshold).stack() ) return cosineSim_filt, embds def list_cosSim_dupl_candidates( cosineSim_filt: Series, embds: dict[int, tuple[Embedding, str]], save_candidates: bool = False, saving_path: Path | None = None, filename: str = 'CosSim-FilterCandidates', pipeline: BasePipeline | None = None, ) -> tuple[list[tuple[PandasIndex, PandasIndex]], dict[int, tuple[Embedding, str]]]: """providing an overview of candidates with a similarity score greater than given threshold; more suitable for debugging purposes Returns ------- DataFrame contains indices, corresponding texts and similarity score to evaluate results list[tuple[Index, Index]] list containing relevant index pairs for entries with similarity score greater than given threshold """ logger.info('Start gathering of similarity candidates...') # compare found duplicates columns: list[str] = ['idx1', 'text1', 'idx2', 'text2', 'score'] df_candidates = pd.DataFrame(columns=columns) index_pairs: list[tuple[PandasIndex, PandasIndex]] = [] for (idx1, idx2), score in tqdm(cosineSim_filt.items(), total=len(cosineSim_filt)): # type: ignore # get text content from embedding as second tuple entry content = [ [ idx1, embds[idx1][1], idx2, embds[idx2][1], score, ] ] # add candidates to collection DataFrame df_conc = pd.DataFrame(columns=columns, data=content) if df_candidates.empty: df_candidates = df_conc.copy() else: df_candidates = pd.concat([df_candidates, df_conc]) # save index pairs index_pairs.append((idx1, idx2)) logger.info('Similarity candidates gathered successfully.') if save_candidates: if saving_path is None: raise ValueError( ('Saving path must be provided if duplicate ' 'candidates should be saved.') ) elif pipeline is not None: target_filename = ( f'Pipe-{pipeline.name}_Step_{pipeline.curr_proc_idx}_' + filename + '.xlsx' ) elif pipeline is None: target_filename = f'{filename}.xlsx' logger.info('Saving similarity candidates...') target_path = saving_path.joinpath(target_filename) df_candidates.to_excel(target_path) logger.info('Similarity candidates saved successfully to >>%s<<.', target_path) return index_pairs, embds # TODO: change implementation fully to SentenceTransformer # usage of batch processing for embeddings, use candidate idx function # from time analysis --> moved to ``helpers.py`` """ def similar_ids_connection_graph( similar_idx_pairs: list[tuple[PandasIndex, PandasIndex]], ) -> tuple[Graph, dict[str, int]]: # build index graph to obtain graph of connected (similar) indices # use this graph to get connected components (indices which belong together) # retain semantic connection on whole dataset similar_id_graph = nx.Graph() for (idx1, idx2) in similar_idx_pairs: # inplace operation, parent/child do not really exist in undirected graph update_graph(graph=similar_id_graph, parent=idx1, child=idx2) graph_info = get_graph_metadata(graph=similar_id_graph, logging=True) return similar_id_graph, graph_info def similar_ids_groups( dupl_id_graph: Graph, ) -> Iterator[list[PandasIndex]]: # groups of connected indices ids_groups = cast(Iterator[set[PandasIndex]], nx.connected_components(G=dupl_id_graph)) for id_group in ids_groups: yield list(id_group) """ def merge_similarity_dupl( data: DataFrame, model: SentenceTransformer, cos_sim_threshold: float, ) -> tuple[DataFrame]: logger.info('Start merging of similarity candidates...') # data merged_data = data.copy() model_input = merged_data['entry'] candidates_idx = candidates_by_index( data_model_input=model_input, model=model, cos_sim_threshold=cos_sim_threshold, ) # graph of similar ids similar_id_graph, _ = similar_index_connection_graph(candidates_idx) for similar_id_group in similar_index_groups(similar_id_graph): similar_id_group = list(similar_id_group) similar_data = merged_data.loc[similar_id_group, :] # keep first entry with max number occurrences, then number of # associated objects, then length of entry similar_data = similar_data.sort_values( by=['num_occur', 'num_assoc_obj_ids', 'len'], ascending=[False, False, False], ) # merge information to first entry data_idx = cast(PandasIndex, similar_data.index[0]) similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum() assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy() assoc_obj_ids = np.concatenate(assoc_obj_ids) assoc_obj_ids = np.unique(assoc_obj_ids) similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids) # remaining indices, should be removed similar_id_group.remove(data_idx) merged_similar_data = similar_data.drop(index=similar_id_group) # update entry in main dataset, drop remaining entries merged_data.update(merged_similar_data) merged_data = merged_data.drop(index=similar_id_group) logger.info('Similarity candidates merged successfully.') return (merged_data.copy(),) # merge duplicates def merge_similarity_dupl_old( data: DataFrame, dupl_idx_pairs: list[tuple[PandasIndex, PandasIndex]], ) -> tuple[DataFrame]: # copy pre-cleaned data temp = data.copy() index = temp.index # logger.info("Start merging of similarity candidates...") # iterate over index pairs for i1, i2 in tqdm(dupl_idx_pairs): # if an entry does not exist any more, skip this pair if i1 not in index or i2 not in index: continue # merge num occur num_occur1 = temp.at[i1, 'num_occur'] num_occur2 = temp.at[i2, 'num_occur'] new_num_occur = num_occur1 + num_occur2 # merge associated object ids assoc_ids1 = temp.at[i1, 'assoc_obj_ids'] assoc_ids2 = temp.at[i2, 'assoc_obj_ids'] new_assoc_ids = np.append(assoc_ids1, assoc_ids2) new_assoc_ids = np.unique(new_assoc_ids.flatten()) # recalculate num associated obj ids new_num_assoc_obj_ids = len(new_assoc_ids) # write properties to first entry temp.at[i1, 'num_occur'] = new_num_occur temp.at[i1, 'assoc_obj_ids'] = new_assoc_ids temp.at[i1, 'num_assoc_obj_ids'] = new_num_assoc_obj_ids # drop second entry temp = temp.drop(index=i2) index = temp.index # logger.info("Similarity candidates merged successfully.") return (temp,) # ** debugging and evaluation def choose_cosSim_dupl_candidates( cosineSim_filt: Series, embds: dict[int, tuple[Embedding, str]], ) -> tuple[DataFrame, list[tuple[PandasIndex, PandasIndex]]]: """providing an overview of candidates with a similarity score greater than given threshold, but decision is made manually by iterating through the candidates with user interaction; more suitable for debugging purposes Returns ------- DataFrame contains indices, corresponding texts and similarity score to evaluate results list[tuple[Index, Index]] list containing relevant index pairs for entries with similarity score greater than given threshold """ # compare found duplicates columns = ['idx1', 'text1', 'idx2', 'text2', 'score'] df_candidates = pd.DataFrame(columns=columns) index_pairs: list[tuple[PandasIndex, PandasIndex]] = [] for (idx1, idx2), score in cosineSim_filt.items(): # type: ignore # get texts for comparison text1 = embds[idx1][1] text2 = embds[idx2][1] # get decision print('---------- New Decision ----------') print('text1:\n', text1, '\n', flush=True) print('text2:\n', text2, '\n', flush=True) decision = input('Please enter >>y<< if this is a duplicate, else hit enter:') if not decision == 'y': continue # get text content from embedding as second tuple entry content = [ [ idx1, text1, idx2, text2, score, ] ] df_conc = pd.DataFrame(columns=columns, data=content) df_candidates = pd.concat([df_candidates, df_conc]) index_pairs.append((idx1, idx2)) return df_candidates, index_pairs