lang-main/tests/analysis/test_timeline.py
2025-01-22 16:54:15 +01:00

317 lines
10 KiB
Python

from pathlib import Path
from typing import cast
import numpy as np
import pandas as pd
import pytest
from lang_main import io, model_loader
from lang_main.analysis import timeline as tl
from lang_main.types import STFRModelTypes, TimelineCandidates
@pytest.fixture(scope='module')
def data_timeline_filter_activities() -> pd.DataFrame:
pth_data = Path('./tests/_comparison_results/timeline_01_df_filtered.pkl')
return pd.read_pickle(pth_data)
@pytest.fixture(scope='module')
def data_timeline_number_activities() -> pd.Series:
pth_data = Path('./tests/_comparison_results/timeline_01_act_per_objid.pkl')
return pd.read_pickle(pth_data)
@pytest.fixture(scope='module')
def STFR_model():
model = model_loader.load_sentence_transformer(
model_name=STFRModelTypes.ALL_MINI_LM_L6_V2,
)
return model
@pytest.fixture(scope='module')
def data_timeline_cands_dict() -> TimelineCandidates:
pth_data = './tests/_comparison_results/timeline_01_timeline_cands_dict.pkl'
tl_cands = cast(TimelineCandidates, io.load_pickle(pth_data))
return tl_cands
def test_cleanup_descriptions(data_pre_cleaned):
data = data_pre_cleaned.copy()
data.at[0, 'VorgangsBeschreibung'] = np.nan
data.at[3, 'VorgangsBeschreibung'] = np.nan
data.at[5, 'ErledigungsBeschreibung'] = np.nan
properties = ('VorgangsBeschreibung', 'ErledigungsBeschreibung')
(data_proc,) = tl.cleanup_descriptions(data, properties=properties)
assert data_proc.at[0, 'VorgangsBeschreibung'] == 'N.V.'
assert data_proc.at[3, 'VorgangsBeschreibung'] == 'N.V.'
assert data_proc.at[5, 'ErledigungsBeschreibung'] == 'N.V.'
@pytest.mark.parametrize('convert_to_days', [True, False])
def test_calc_delta_to_repair(data_pre_cleaned, convert_to_days):
feat_start = 'ErstellungsDatum'
feat_end = 'ErledigungsDatum'
name_delta_feature = 'Test'
(data,) = tl.calc_delta_to_repair(
data_pre_cleaned,
date_feature_start=feat_start,
date_feature_end=feat_end,
name_delta_feature=name_delta_feature,
convert_to_days=convert_to_days,
)
assert name_delta_feature in data.columns
test_date = data_pre_cleaned.at[0, feat_end] - data_pre_cleaned.at[0, feat_start]
if convert_to_days:
assert test_date.days == data.at[0, name_delta_feature]
else:
assert test_date == data.at[0, name_delta_feature]
def test_non_relevant_obj_ids(data_pre_cleaned):
feature_uniqueness = 'HObjektText'
feature_obj_id = 'ObjektID'
threshold = 2
data = data_pre_cleaned.copy()
data.at[0, feature_obj_id] = 1
ids_to_ignore = tl._non_relevant_obj_ids(
data,
thresh_unique_feat_per_id=threshold,
feature_uniqueness=feature_uniqueness,
feature_obj_id=feature_obj_id,
)
assert len(ids_to_ignore) == 1
assert ids_to_ignore == (1,)
def test_remove_non_relevant_obj_ids(data_pre_cleaned):
feature_uniqueness = 'HObjektText'
feature_obj_id = 'ObjektID'
threshold = 2
data = data_pre_cleaned.copy()
data.at[0, feature_obj_id] = 1
(data_proc,) = tl.remove_non_relevant_obj_ids(
data,
thresh_unique_feat_per_id=threshold,
feature_uniqueness=feature_uniqueness,
feature_obj_id=feature_obj_id,
)
unique_obj_ids = data_proc[feature_obj_id].unique()
assert 1 not in unique_obj_ids
assert len(unique_obj_ids) == 2
def test_generate_model_input(data_pre_cleaned):
target_feature_name = 'nlp_model_input'
model_input_features = (
'VorgangsTypName',
'VorgangsBeschreibung',
)
data = data_pre_cleaned.copy()
(data_proc,) = tl.generate_model_input(
data,
target_feature_name=target_feature_name,
model_input_features=model_input_features,
)
feat1 = data.at[0, model_input_features[0]]
feat2 = data.at[0, model_input_features[1]]
test_result = f'{feat1} - {feat2}'
assert data_proc.at[0, target_feature_name] == test_result
def test_filter_activities_per_obj_id(data_pre_cleaned):
activity_feature = 'VorgangsTypName'
relevant_activity_types = ('Störungsmeldung',)
feature_obj_id = 'ObjektID'
threshold_num_activities = 1 # at least 2 occurrences per ObjID
data = data_pre_cleaned.copy()
data = data.iloc[:5]
df_filtered, act_per_obj_id = tl.filter_activities_per_obj_id(
data,
activity_feature=activity_feature,
relevant_activity_types=relevant_activity_types,
feature_obj_id=feature_obj_id,
threshold_num_activities=threshold_num_activities,
)
assert len(df_filtered) == 2
assert df_filtered.iat[0, 1] == act_per_obj_id.index[0]
assert act_per_obj_id.iat[0] == 2
def test_get_timeline_candidates_index(
data_timeline_filter_activities,
data_timeline_number_activities,
STFR_model,
):
data = data_timeline_filter_activities.copy()
target_feature_name = 'nlp_model_input'
model_input_features = ('VorgangsBeschreibung',)
(data,) = tl.generate_model_input(
data,
target_feature_name=target_feature_name,
model_input_features=model_input_features,
)
data_num_act = data_timeline_number_activities.copy()
cos_sim_threshold = 0.8
tl_cands_iter = tl._get_timeline_candidates_index(
data=data,
num_activities_per_obj_id=data_num_act,
model=STFR_model,
cos_sim_threshold=cos_sim_threshold,
feature_obj_id='ObjektID',
model_input_feature=target_feature_name,
)
tl_cands_idx = tuple(tl_cands_iter) # format tuple((ObjID, (Idx1, Idx2)))
assert len(tl_cands_idx) == 0
cos_sim_threshold = 0.0
tl_cands_iter = tl._get_timeline_candidates_index(
data=data,
num_activities_per_obj_id=data_num_act,
model=STFR_model,
cos_sim_threshold=cos_sim_threshold,
feature_obj_id='ObjektID',
model_input_feature=target_feature_name,
)
tl_cands_idx = tuple(tl_cands_iter)
assert len(tl_cands_idx) == 1
assert tl_cands_idx == ((3, (np.int64(3), np.int64(4))),)
def test_transform_timeline_candidates(
data_timeline_filter_activities,
data_timeline_number_activities,
STFR_model,
):
data = data_timeline_filter_activities.copy()
target_feature_name = 'nlp_model_input'
model_input_features = ('VorgangsBeschreibung',)
(data,) = tl.generate_model_input(
data,
target_feature_name=target_feature_name,
model_input_features=model_input_features,
)
data_num_act = data_timeline_number_activities.copy()
cos_sim_threshold = 0.0
tl_cands_iter = tl._get_timeline_candidates_index(
data=data,
num_activities_per_obj_id=data_num_act,
model=STFR_model,
cos_sim_threshold=cos_sim_threshold,
feature_obj_id='ObjektID',
model_input_feature=target_feature_name,
)
tl_cands_dict = tl._transform_timeline_candidates(tl_cands_iter)
assert 3 in tl_cands_dict
assert tl_cands_dict[3] == ((np.int64(3), np.int64(4)),)
def test_map_obj_id_to_texts(data_pre_cleaned):
data = data_pre_cleaned.iloc[:5].copy()
feature_obj_id = 'ObjektID'
feature_obj_text = 'HObjektText'
map_text = tl._map_obj_id_to_texts(
data=data,
feature_obj_id=feature_obj_id,
feature_obj_text=feature_obj_text,
)
assert len(map_text) == 3 # three unique IDs
assert map_text[1] == 'Fräsmaschine-FS435X'
assert map_text[2] == 'Schleifmaschine-S4x87'
assert map_text[3] == 'Bohrbearbeitungszentrum-BBZ35'
def test_get_timeline_candidates(
data_timeline_filter_activities,
data_timeline_number_activities,
STFR_model,
):
data = data_timeline_filter_activities.copy()
target_feature_name = 'nlp_model_input'
model_input_features = ('VorgangsBeschreibung',)
(data,) = tl.generate_model_input(
data,
target_feature_name=target_feature_name,
model_input_features=model_input_features,
)
data_num_act = data_timeline_number_activities.copy()
cos_sim_threshold = 0.0
tl_cands_dict, map_text = tl.get_timeline_candidates(
data=data,
num_activities_per_obj_id=data_num_act,
model=STFR_model,
cos_sim_threshold=cos_sim_threshold,
feature_obj_id='ObjektID',
feature_obj_text='HObjektText',
model_input_feature=target_feature_name,
)
assert 3 in tl_cands_dict
assert tl_cands_dict[3] == ((np.int64(3), np.int64(4)),)
assert len(map_text) == 1 # three unique IDs
assert map_text[3] == 'Bohrbearbeitungszentrum-BBZ35'
def test_filter_timeline_cands(data_pre_cleaned, data_timeline_cands_dict):
obj_id = 3
entry_idx = 0
sort_feature = 'ErstellungsDatum'
data_proc = tl.filter_timeline_cands(
data_pre_cleaned,
data_timeline_cands_dict,
obj_id=obj_id,
entry_idx=entry_idx,
sort_feature=sort_feature,
)
assert 3 in data_proc.index
assert 4 in data_proc.index
assert data_proc.at[3, 'ObjektID'] == 3
assert data_proc.at[4, 'ObjektID'] == 3
assert data_proc.at[3, 'VorgangsTypName'] == 'Störungsmeldung'
assert data_proc.at[4, 'VorgangsTypName'] == 'Störungsmeldung'
assert (
data_proc.at[3, 'ErledigungsBeschreibung']
== 'Beseitigung der Blockierung und Überprüfung des Antriebs'
)
assert (
data_proc.at[4, 'ErledigungsBeschreibung']
== 'Reinigung der Leitungen und Austausch des Kühlmittels'
)
@pytest.mark.parametrize('convert_to_days', [True, False])
def test_calc_delta_to_next_failure(
data_pre_cleaned,
data_timeline_cands_dict,
convert_to_days,
):
obj_id = 3
entry_idx = 0
sort_feature = 'ErstellungsDatum'
data_tl_filtered = tl.filter_timeline_cands(
data_pre_cleaned,
data_timeline_cands_dict,
obj_id=obj_id,
entry_idx=entry_idx,
sort_feature=sort_feature,
)
name_delta_feature = 'test_delta'
data_proc = tl.calc_delta_to_next_failure(
data_tl_filtered,
date_feature=sort_feature,
name_delta_feature=name_delta_feature,
convert_to_days=convert_to_days,
)
test_date = data_proc.at[4, sort_feature] - data_pre_cleaned.at[3, sort_feature]
test_date_last = pd.Timedelta(0)
if convert_to_days:
assert test_date.days == data_proc.at[3, name_delta_feature]
assert test_date_last.days == data_proc.at[4, name_delta_feature]
else:
assert test_date == data_proc.at[3, name_delta_feature]
assert test_date_last == data_proc.at[4, name_delta_feature]