Module lang_main.analysis.timeline
Functions
def calc_delta_to_next_failure(data: pandas.core.frame.DataFrame,
date_feature: str = 'ErstellungsDatum',
name_delta_feature: str = 'Zeitspanne bis zum nächsten Ereignis [Tage]',
convert_to_days: bool = True) ‑> pandas.core.frame.DataFrame-
Expand source code
def calc_delta_to_next_failure( data: DataFrameTLFiltered, date_feature: str = 'ErstellungsDatum', name_delta_feature: str = NAME_DELTA_FEAT_TO_NEXT_FAILURE, convert_to_days: bool = True, ) -> DataFrameTLFiltered: data = data.copy() last_val = data[date_feature].iat[-1] shifted = data[date_feature].shift(-1, fill_value=last_val) data[name_delta_feature] = shifted - data[date_feature] data = data.sort_values(by=name_delta_feature, ascending=False) if convert_to_days: data[name_delta_feature] = data[name_delta_feature].dt.days return data def calc_delta_to_repair(data: pandas.core.frame.DataFrame,
date_feature_start: str = 'ErstellungsDatum',
date_feature_end: str = 'ErledigungsDatum',
name_delta_feature: str = 'Zeitspanne bis zur Behebung [Tage]',
convert_to_days: bool = True) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def calc_delta_to_repair( data: DataFrame, date_feature_start: str = 'ErstellungsDatum', date_feature_end: str = 'ErledigungsDatum', name_delta_feature: str = NAME_DELTA_FEAT_TO_REPAIR, convert_to_days: bool = True, ) -> tuple[DataFrame]: logger.info('Calculating time differences between start and end of operations...') data = data.copy() data[name_delta_feature] = data[date_feature_end] - data[date_feature_start] if convert_to_days: data[name_delta_feature] = data[name_delta_feature].dt.days logger.info('Calculation successful.') return (data,) def cleanup_descriptions(data: pandas.core.frame.DataFrame,
properties: Collection[str] = ('VorgangsBeschreibung', 'ErledigungsBeschreibung')) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def cleanup_descriptions( data: DataFrame, properties: Collection[str] = ( 'VorgangsBeschreibung', 'ErledigungsBeschreibung', ), ) -> tuple[DataFrame]: logger.info('Cleaning necessary descriptions...') data = data.copy() features = list(properties) data[features] = data[features].fillna('N.V.') (data,) = entry_wise_cleansing(data, target_features=features) logger.info('Cleansing successful.') return (data.copy(),) def filter_activities_per_obj_id(data: pandas.core.frame.DataFrame,
activity_feature: str = 'VorgangsTypName',
relevant_activity_types: Iterable[str] = ('Reparaturauftrag (Portal)',),
feature_obj_id: str = 'ObjektID',
threshold_num_activities: int = 1) ‑> tuple[pandas.core.frame.DataFrame, pandas.core.series.Series]-
Expand source code
def filter_activities_per_obj_id( data: DataFrame, activity_feature: str = 'VorgangsTypName', relevant_activity_types: Iterable[str] = ('Reparaturauftrag (Portal)',), feature_obj_id: str = 'ObjektID', threshold_num_activities: int = 1, ) -> tuple[DataFrame, Series]: data = data.copy() # filter only relevant activities, count occurrences for each ObjectID logger.info('Filtering activities per ObjectID...') filt_rel_activities = data[activity_feature].isin(relevant_activity_types) data_filter_activities = data.loc[filt_rel_activities].copy() num_activities_per_obj_id = cast( Series, data_filter_activities[feature_obj_id].value_counts(sort=True) ) # filter for ObjectIDs with more than given number of activities filt_below_thresh = num_activities_per_obj_id <= threshold_num_activities # index of series contains ObjectIDs obj_ids_below_thresh = num_activities_per_obj_id[filt_below_thresh].index filt_entries_below_thresh = data_filter_activities[feature_obj_id].isin( obj_ids_below_thresh ) num_activities_per_obj_id = num_activities_per_obj_id.loc[~filt_below_thresh] data_filter_activities = data_filter_activities.loc[~filt_entries_below_thresh] logger.info('Activities per ObjectID filtered successfully.') return data_filter_activities, num_activities_per_obj_id def filter_timeline_cands(data: pandas.core.frame.DataFrame,
cands: dict[int, tuple[tuple[int | numpy.int64, ...], ...]],
obj_id: int,
entry_idx: int,
sort_feature: str = 'ErstellungsDatum') ‑> pandas.core.frame.DataFrame-
Expand source code
def filter_timeline_cands( data: DataFrame, cands: TimelineCandidates, obj_id: ObjectID, entry_idx: int, sort_feature: str = 'ErstellungsDatum', ) -> DataFrameTLFiltered: data = data.copy() cands_for_obj_id = cands[obj_id] cands_choice = cands_for_obj_id[entry_idx] data = data.loc[list(cands_choice)].sort_values( by=sort_feature, ascending=True, ) return data def generate_model_input(data: pandas.core.frame.DataFrame,
target_feature_name: str = 'nlp_model_input',
model_input_features: Iterable[str] = ('VorgangsTypName', 'VorgangsArtText', 'VorgangsBeschreibung')) ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def generate_model_input( data: DataFrame, target_feature_name: str = 'nlp_model_input', model_input_features: Iterable[str] = ( 'VorgangsTypName', 'VorgangsArtText', 'VorgangsBeschreibung', ), ) -> tuple[DataFrame]: logger.info('Generating concatenation of model input features...') data = data.copy() model_input_features = list(model_input_features) input_features = data[model_input_features].fillna('').astype(str) data[target_feature_name] = input_features.apply( lambda x: ' - '.join(x), axis=1, ) logger.info('Model input generated successfully.') return (data,) def get_timeline_candidates(data: pandas.core.frame.DataFrame,
num_activities_per_obj_id: pandas.core.series.Series,
*,
model: sentence_transformers.SentenceTransformer.SentenceTransformer,
cos_sim_threshold: float,
feature_obj_id: str = 'ObjektID',
feature_obj_text: str = 'HObjektText',
model_input_feature: str = 'nlp_model_input') ‑> tuple[dict[int, tuple[tuple[int | numpy.int64, ...], ...]], dict[int, str]]-
Expand source code
def get_timeline_candidates( data: DataFrame, num_activities_per_obj_id: Series, *, model: SentenceTransformer, cos_sim_threshold: float, feature_obj_id: str = 'ObjektID', feature_obj_text: str = 'HObjektText', model_input_feature: str = 'nlp_model_input', ) -> tuple[TimelineCandidates, dict[ObjectID, str]]: logger.info('Obtaining timeline candidates...') candidates = _get_timeline_candidates_index( data=data, num_activities_per_obj_id=num_activities_per_obj_id, model=model, cos_sim_threshold=cos_sim_threshold, feature_obj_id=feature_obj_id, model_input_feature=model_input_feature, ) tl_candidates = _transform_timeline_candidates(candidates) logger.info('Timeline candidates obtained successfully.') # text mapping to obtain object descriptors logger.info('Mapping ObjectIDs to their respective text descriptor...') map_obj_text = _map_obj_id_to_texts( data=data, feature_obj_id=feature_obj_id, feature_obj_text=feature_obj_text, ) logger.info('ObjectIDs successfully mapped to text descriptors.') return tl_candidates, map_obj_text def remove_non_relevant_obj_ids(data: pandas.core.frame.DataFrame,
thresh_unique_feat_per_id: int,
*,
feature_uniqueness: str = 'HObjektText',
feature_obj_id: str = 'ObjektID') ‑> tuple[pandas.core.frame.DataFrame]-
Expand source code
def remove_non_relevant_obj_ids( data: DataFrame, thresh_unique_feat_per_id: int, *, feature_uniqueness: str = 'HObjektText', feature_obj_id: str = 'ObjektID', ) -> tuple[DataFrame]: logger.info('Removing non-relevant ObjectIDs from dataset...') data = data.copy() ids_to_ignore = _non_relevant_obj_ids( data=data, thresh_unique_feat_per_id=thresh_unique_feat_per_id, feature_uniqueness=feature_uniqueness, feature_obj_id=feature_obj_id, ) # only retain entries with ObjectIDs not in IDs to ignore data = data.loc[~(data[feature_obj_id].isin(ids_to_ignore))] logger.debug('Ignored ObjectIDs: %s', ids_to_ignore) logger.info('Non-relevant ObjectIDs removed successfully.') return (data,)