Module lang_main.analysis.preprocessing
Functions
def analyse_feature(data: DataFrame, target_feature: str) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def analyse_feature( data: DataFrame, target_feature: str, ) -> tuple[DataFrame]: # feature columns feature_entries = data[target_feature] logger.info( 'Number of entries for feature >>%s<<: %d', target_feature, len(feature_entries) ) # obtain unique entries unique_feature_entries = feature_entries.unique() # prepare result DataFrame cols = ['batched_idxs', 'entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids'] result_df = pd.DataFrame(columns=cols) for entry in tqdm(unique_feature_entries, mininterval=1.0): len_entry = len(entry) filt = data[target_feature] == entry temp = data[filt] batched_idxs = temp.index.to_numpy() assoc_obj_ids = temp['ObjektID'].unique() assoc_obj_ids = np.sort(assoc_obj_ids, kind='stable') num_assoc_obj_ids = len(assoc_obj_ids) num_dupl = filt.sum() conc_df = pd.DataFrame( data=[ [batched_idxs, entry, len_entry, num_dupl, assoc_obj_ids, num_assoc_obj_ids] ], columns=cols, ) result_df = pd.concat([result_df, conc_df], ignore_index=True) result_df = result_df.sort_values( by=['num_occur', 'len'], ascending=[False, False] ).copy() return (result_df,) def load_raw_data(path: Path,
date_cols: Collection[str] = ('VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum')) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def load_raw_data( path: Path, date_cols: Collection[str] = ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', ), ) -> tuple[DataFrame]: """load IHM dataset with standard structure Parameters ---------- path : str path to dataset file, usually CSV file date_cols : Collection[str], optional columns which contain dates and are parsed as such, by default ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', ) Returns ------- DataFrame raw dataset as DataFrame """ # load dataset date_cols = list(date_cols) data = pd.read_csv( filepath_or_buffer=path, sep=';', encoding='cp1252', parse_dates=list(date_cols), dayfirst=True, ) logger.info('Loaded dataset successfully.') logger.info( ( f'Dataset properties: number of entries: {len(data)}, ' f'number of features {len(data.columns)}' ) ) return (data,)load IHM dataset with standard structure
Parameters
path:str- path to dataset file, usually CSV file
date_cols:Collection[str], optional- columns which contain dates and are parsed as such, by default ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', )
Returns
DataFrame- raw dataset as DataFrame
def merge_similarity_duplicates(data: DataFrame, model: SentenceTransformer, cos_sim_threshold: float) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def merge_similarity_duplicates( data: DataFrame, model: SentenceTransformer, cos_sim_threshold: float, ) -> tuple[DataFrame]: logger.info('Start merging of similarity candidates...') # data merged_data = data.copy() model_input = merged_data['entry'] candidates_idx = candidates_by_index( data_model_input=model_input, model=model, cos_sim_threshold=cos_sim_threshold, ) # graph of similar ids similar_id_graph, _ = similar_index_connection_graph(candidates_idx) for similar_id_group in similar_index_groups(similar_id_graph): similar_id_group = list(similar_id_group) similar_data = merged_data.loc[similar_id_group, :] # keep first entry with max number occurrences, then number of # associated objects, then length of entry similar_data = similar_data.sort_values( by=['num_occur', 'num_assoc_obj_ids', 'len'], ascending=[False, False, False], ) # merge information to first entry data_idx = cast(PandasIndex, similar_data.index[0]) similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum() assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy() assoc_obj_ids = np.concatenate(assoc_obj_ids) assoc_obj_ids = np.unique(assoc_obj_ids) similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids) # remaining indices, should be removed similar_id_group.remove(data_idx) merged_similar_data = similar_data.drop(index=similar_id_group) # update entry in main dataset, drop remaining entries merged_data.update(merged_similar_data) merged_data = merged_data.drop(index=similar_id_group) logger.info('Similarity candidates merged successfully.') return (merged_data,) def numeric_pre_filter_feature(data: DataFrame, feature: str, bound_lower: int | None, bound_upper: int | None) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def numeric_pre_filter_feature( data: DataFrame, feature: str, bound_lower: int | None, bound_upper: int | None, ) -> tuple[DataFrame]: """filter DataFrame for a given numerical feature regarding their bounds bounds are inclusive: entries (bound_lower <= entry <= bound_upper) are retained Parameters ---------- data : DataFrame DataFrame to filter feature : str feature name to filter bound_lower : int | None lower bound of values to retain bound_upper : int | None upper bound of values to retain Returns ------- tuple[DataFrame] filtered DataFrame Raises ------ ValueError if no bounds are provided, at least one bound must be set """ if not any([bound_lower, bound_upper]): raise ValueError('No bounds for filtering provided') data = data.copy() if bound_lower is None: bound_lower = cast(int, data[feature].min()) if bound_upper is None: bound_upper = cast(int, data[feature].max()) filter_lower = data[feature] >= bound_lower filter_upper = data[feature] <= bound_upper filter = filter_lower & filter_upper data = data.loc[filter] return (data,)filter DataFrame for a given numerical feature regarding their bounds bounds are inclusive: entries (bound_lower <= entry <= bound_upper) are retained
Parameters
data:DataFrame- DataFrame to filter
feature:str- feature name to filter
bound_lower:int | None- lower bound of values to retain
bound_upper:int | None- upper bound of values to retain
Returns
tuple[DataFrame]- filtered DataFrame
Raises
ValueError- if no bounds are provided, at least one bound must be set
def remove_NA(data: DataFrame, target_features: Collection[str] = ('VorgangsBeschreibung',)) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def remove_NA( data: DataFrame, target_features: Collection[str] = ('VorgangsBeschreibung',), ) -> tuple[DataFrame]: """function to drop NA entries based on a subset of features to be analysed Parameters ---------- data : DataFrame standard IHM dataset, perhaps pre-cleaned target_features : Collection[str], optional subset to analyse to define an NA entry, by default ('VorgangsBeschreibung',) Returns ------- DataFrame dataset with removed NA entries for given subset of features """ target_features = list(target_features) wo_NA = data.dropna(axis=0, subset=target_features, ignore_index=True).copy() # type: ignore logger.info( f'Removed NA entries for features >>{target_features}<< from dataset successfully.' ) return (wo_NA,)function to drop NA entries based on a subset of features to be analysed
Parameters
data:DataFrame- standard IHM dataset, perhaps pre-cleaned
target_features:Collection[str], optional- subset to analyse to define an NA entry, by default ('VorgangsBeschreibung',)
Returns
DataFrame- dataset with removed NA entries for given subset of features
def remove_duplicates(data: DataFrame) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def remove_duplicates( data: DataFrame, ) -> tuple[DataFrame]: """removes duplicated entries over all features in the given dataset Parameters ---------- data : DataFrame read data with standard structure Returns ------- DataFrame dataset with removed duplicates over all features """ # obtain info about duplicates over all features duplicates_filt = data.duplicated() logger.info(f'Number of duplicates over all features: {duplicates_filt.sum()}') # drop duplicates wo_duplicates = data.drop_duplicates(ignore_index=True) duplicates_subset: list[str] = [ 'VorgangsID', 'ObjektID', ] duplicates_subset_filt = wo_duplicates.duplicated(subset=duplicates_subset) logger.info( ( 'Number of duplicates over subset ' f'>>{duplicates_subset}<<: {duplicates_subset_filt.sum()}' ) ) wo_duplicates = wo_duplicates.drop_duplicates( subset=duplicates_subset, ignore_index=True ).copy() logger.info('Removed all duplicates from dataset successfully.') logger.info( 'New Dataset properties: number of entries: %d, number of features %d', len(wo_duplicates), len(wo_duplicates.columns), ) return (wo_duplicates,)removes duplicated entries over all features in the given dataset
Parameters
data:DataFrame- read data with standard structure
Returns
DataFrame- dataset with removed duplicates over all features