Module lang_main.analysis.preprocessing

Functions

def analyse_feature(data: DataFrame, target_feature: str) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def analyse_feature(
    data: DataFrame,
    target_feature: str,
) -> tuple[DataFrame]:
    # feature columns
    feature_entries = data[target_feature]
    logger.info(
        'Number of entries for feature >>%s<<: %d', target_feature, len(feature_entries)
    )
    # obtain unique entries
    unique_feature_entries = feature_entries.unique()

    # prepare result DataFrame
    cols = ['batched_idxs', 'entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids']
    result_df = pd.DataFrame(columns=cols)

    for entry in tqdm(unique_feature_entries, mininterval=1.0):
        len_entry = len(entry)
        filt = data[target_feature] == entry
        temp = data[filt]
        batched_idxs = temp.index.to_numpy()
        assoc_obj_ids = temp['ObjektID'].unique()
        assoc_obj_ids = np.sort(assoc_obj_ids, kind='stable')
        num_assoc_obj_ids = len(assoc_obj_ids)
        num_dupl = filt.sum()

        conc_df = pd.DataFrame(
            data=[
                [batched_idxs, entry, len_entry, num_dupl, assoc_obj_ids, num_assoc_obj_ids]
            ],
            columns=cols,
        )

        result_df = pd.concat([result_df, conc_df], ignore_index=True)

    result_df = result_df.sort_values(
        by=['num_occur', 'len'], ascending=[False, False]
    ).copy()

    return (result_df,)
def load_raw_data(path: Path,
date_cols: Collection[str] = ('VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum')) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def load_raw_data(
    path: Path,
    date_cols: Collection[str] = (
        'VorgangsDatum',
        'ErledigungsDatum',
        'Arbeitsbeginn',
        'ErstellungsDatum',
    ),
) -> tuple[DataFrame]:
    """load IHM dataset with standard structure

    Parameters
    ----------
    path : str
        path to dataset file, usually CSV file
    date_cols : Collection[str], optional
        columns which contain dates and are parsed as such,
        by default (
            'VorgangsDatum',
            'ErledigungsDatum',
            'Arbeitsbeginn',
            'ErstellungsDatum',
        )

    Returns
    -------
    DataFrame
        raw dataset as DataFrame
    """
    # load dataset
    date_cols = list(date_cols)
    data = pd.read_csv(
        filepath_or_buffer=path,
        sep=';',
        encoding='cp1252',
        parse_dates=list(date_cols),
        dayfirst=True,
    )
    logger.info('Loaded dataset successfully.')
    logger.info(
        (
            f'Dataset properties: number of entries: {len(data)}, '
            f'number of features {len(data.columns)}'
        )
    )
    return (data,)

load IHM dataset with standard structure

Parameters

path : str
path to dataset file, usually CSV file
date_cols : Collection[str], optional
columns which contain dates and are parsed as such, by default ( 'VorgangsDatum', 'ErledigungsDatum', 'Arbeitsbeginn', 'ErstellungsDatum', )

Returns

DataFrame
raw dataset as DataFrame
def merge_similarity_duplicates(data: DataFrame, model: SentenceTransformer, cos_sim_threshold: float) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def merge_similarity_duplicates(
    data: DataFrame,
    model: SentenceTransformer,
    cos_sim_threshold: float,
) -> tuple[DataFrame]:
    logger.info('Start merging of similarity candidates...')

    # data
    merged_data = data.copy()
    model_input = merged_data['entry']
    candidates_idx = candidates_by_index(
        data_model_input=model_input,
        model=model,
        cos_sim_threshold=cos_sim_threshold,
    )
    # graph of similar ids
    similar_id_graph, _ = similar_index_connection_graph(candidates_idx)

    for similar_id_group in similar_index_groups(similar_id_graph):
        similar_id_group = list(similar_id_group)
        similar_data = merged_data.loc[similar_id_group, :]
        # keep first entry with max number occurrences, then number of
        # associated objects, then length of entry
        similar_data = similar_data.sort_values(
            by=['num_occur', 'num_assoc_obj_ids', 'len'],
            ascending=[False, False, False],
        )
        # merge information to first entry
        data_idx = cast(PandasIndex, similar_data.index[0])
        similar_data.at[data_idx, 'num_occur'] = similar_data['num_occur'].sum()
        assoc_obj_ids = similar_data['assoc_obj_ids'].to_numpy()
        assoc_obj_ids = np.concatenate(assoc_obj_ids)
        assoc_obj_ids = np.unique(assoc_obj_ids)
        similar_data.at[data_idx, 'assoc_obj_ids'] = assoc_obj_ids
        similar_data.at[data_idx, 'num_assoc_obj_ids'] = len(assoc_obj_ids)
        # remaining indices, should be removed
        similar_id_group.remove(data_idx)
        merged_similar_data = similar_data.drop(index=similar_id_group)
        # update entry in main dataset, drop remaining entries
        merged_data.update(merged_similar_data)
        merged_data = merged_data.drop(index=similar_id_group)

    logger.info('Similarity candidates merged successfully.')

    return (merged_data,)
def numeric_pre_filter_feature(data: DataFrame, feature: str, bound_lower: int | None, bound_upper: int | None) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def numeric_pre_filter_feature(
    data: DataFrame,
    feature: str,
    bound_lower: int | None,
    bound_upper: int | None,
) -> tuple[DataFrame]:
    """filter DataFrame for a given numerical feature regarding their bounds
    bounds are inclusive: entries (bound_lower <= entry <= bound_upper) are retained

    Parameters
    ----------
    data : DataFrame
        DataFrame to filter
    feature : str
        feature name to filter
    bound_lower : int | None
        lower bound of values to retain
    bound_upper : int | None
        upper bound of values to retain

    Returns
    -------
    tuple[DataFrame]
        filtered DataFrame

    Raises
    ------
    ValueError
        if no bounds are provided, at least one bound must be set
    """
    if not any([bound_lower, bound_upper]):
        raise ValueError('No bounds for filtering provided')

    data = data.copy()
    if bound_lower is None:
        bound_lower = cast(int, data[feature].min())
    if bound_upper is None:
        bound_upper = cast(int, data[feature].max())

    filter_lower = data[feature] >= bound_lower
    filter_upper = data[feature] <= bound_upper
    filter = filter_lower & filter_upper

    data = data.loc[filter]

    return (data,)

filter DataFrame for a given numerical feature regarding their bounds bounds are inclusive: entries (bound_lower <= entry <= bound_upper) are retained

Parameters

data : DataFrame
DataFrame to filter
feature : str
feature name to filter
bound_lower : int | None
lower bound of values to retain
bound_upper : int | None
upper bound of values to retain

Returns

tuple[DataFrame]
filtered DataFrame

Raises

ValueError
if no bounds are provided, at least one bound must be set
def remove_NA(data: DataFrame, target_features: Collection[str] = ('VorgangsBeschreibung',)) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def remove_NA(
    data: DataFrame,
    target_features: Collection[str] = ('VorgangsBeschreibung',),
) -> tuple[DataFrame]:
    """function to drop NA entries based on a subset of features to be analysed

    Parameters
    ----------
    data : DataFrame
        standard IHM dataset, perhaps pre-cleaned
    target_features : Collection[str], optional
        subset to analyse to define an NA entry, by default ('VorgangsBeschreibung',)

    Returns
    -------
    DataFrame
        dataset with removed NA entries for given subset of features
    """
    target_features = list(target_features)
    wo_NA = data.dropna(axis=0, subset=target_features, ignore_index=True).copy()  # type: ignore
    logger.info(
        f'Removed NA entries for features >>{target_features}<< from dataset successfully.'
    )

    return (wo_NA,)

function to drop NA entries based on a subset of features to be analysed

Parameters

data : DataFrame
standard IHM dataset, perhaps pre-cleaned
target_features : Collection[str], optional
subset to analyse to define an NA entry, by default ('VorgangsBeschreibung',)

Returns

DataFrame
dataset with removed NA entries for given subset of features
def remove_duplicates(data: DataFrame) ‑> tuple[pandas.core.frame.DataFrame]
Expand source code
def remove_duplicates(
    data: DataFrame,
) -> tuple[DataFrame]:
    """removes duplicated entries over all features in the given dataset

    Parameters
    ----------
    data : DataFrame
        read data with standard structure

    Returns
    -------
    DataFrame
        dataset with removed duplicates over all features
    """
    # obtain info about duplicates over all features
    duplicates_filt = data.duplicated()
    logger.info(f'Number of duplicates over all features: {duplicates_filt.sum()}')
    # drop duplicates
    wo_duplicates = data.drop_duplicates(ignore_index=True)
    duplicates_subset: list[str] = [
        'VorgangsID',
        'ObjektID',
    ]
    duplicates_subset_filt = wo_duplicates.duplicated(subset=duplicates_subset)
    logger.info(
        (
            'Number of duplicates over subset '
            f'>>{duplicates_subset}<<: {duplicates_subset_filt.sum()}'
        )
    )
    wo_duplicates = wo_duplicates.drop_duplicates(
        subset=duplicates_subset, ignore_index=True
    ).copy()
    logger.info('Removed all duplicates from dataset successfully.')
    logger.info(
        'New Dataset properties: number of entries: %d, number of features %d',
        len(wo_duplicates),
        len(wo_duplicates.columns),
    )

    return (wo_duplicates,)

removes duplicated entries over all features in the given dataset

Parameters

data : DataFrame
read data with standard structure

Returns

DataFrame
dataset with removed duplicates over all features