317 lines
10 KiB
Python
317 lines
10 KiB
Python
from pathlib import Path
|
|
from typing import cast
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from lang_main import io, model_loader
|
|
from lang_main.analysis import timeline as tl
|
|
from lang_main.types import STFRModelTypes, TimelineCandidates
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def data_timeline_filter_activities() -> pd.DataFrame:
|
|
pth_data = Path('./tests/_comparison_results/timeline_01_df_filtered.pkl')
|
|
return pd.read_pickle(pth_data)
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def data_timeline_number_activities() -> pd.Series:
|
|
pth_data = Path('./tests/_comparison_results/timeline_01_act_per_objid.pkl')
|
|
return pd.read_pickle(pth_data)
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def STFR_model():
|
|
model = model_loader.load_sentence_transformer(
|
|
model_name=STFRModelTypes.ALL_MINI_LM_L6_V2,
|
|
)
|
|
return model
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def data_timeline_cands_dict() -> TimelineCandidates:
|
|
pth_data = './tests/_comparison_results/timeline_01_timeline_cands_dict.pkl'
|
|
tl_cands = cast(TimelineCandidates, io.load_pickle(pth_data))
|
|
return tl_cands
|
|
|
|
|
|
def test_cleanup_descriptions(data_pre_cleaned):
|
|
data = data_pre_cleaned.copy()
|
|
data.at[0, 'VorgangsBeschreibung'] = np.nan
|
|
data.at[3, 'VorgangsBeschreibung'] = np.nan
|
|
data.at[5, 'ErledigungsBeschreibung'] = np.nan
|
|
properties = ('VorgangsBeschreibung', 'ErledigungsBeschreibung')
|
|
(data_proc,) = tl.cleanup_descriptions(data, properties=properties)
|
|
assert data_proc.at[0, 'VorgangsBeschreibung'] == 'N.V.'
|
|
assert data_proc.at[3, 'VorgangsBeschreibung'] == 'N.V.'
|
|
assert data_proc.at[5, 'ErledigungsBeschreibung'] == 'N.V.'
|
|
|
|
|
|
@pytest.mark.parametrize('convert_to_days', [True, False])
|
|
def test_calc_delta_to_repair(data_pre_cleaned, convert_to_days):
|
|
feat_start = 'ErstellungsDatum'
|
|
feat_end = 'ErledigungsDatum'
|
|
name_delta_feature = 'Test'
|
|
(data,) = tl.calc_delta_to_repair(
|
|
data_pre_cleaned,
|
|
date_feature_start=feat_start,
|
|
date_feature_end=feat_end,
|
|
name_delta_feature=name_delta_feature,
|
|
convert_to_days=convert_to_days,
|
|
)
|
|
assert name_delta_feature in data.columns
|
|
test_date = data_pre_cleaned.at[0, feat_end] - data_pre_cleaned.at[0, feat_start]
|
|
if convert_to_days:
|
|
assert test_date.days == data.at[0, name_delta_feature]
|
|
else:
|
|
assert test_date == data.at[0, name_delta_feature]
|
|
|
|
|
|
def test_non_relevant_obj_ids(data_pre_cleaned):
|
|
feature_uniqueness = 'HObjektText'
|
|
feature_obj_id = 'ObjektID'
|
|
threshold = 2
|
|
data = data_pre_cleaned.copy()
|
|
data.at[0, feature_obj_id] = 1
|
|
ids_to_ignore = tl._non_relevant_obj_ids(
|
|
data,
|
|
thresh_unique_feat_per_id=threshold,
|
|
feature_uniqueness=feature_uniqueness,
|
|
feature_obj_id=feature_obj_id,
|
|
)
|
|
assert len(ids_to_ignore) == 1
|
|
assert ids_to_ignore == (1,)
|
|
|
|
|
|
def test_remove_non_relevant_obj_ids(data_pre_cleaned):
|
|
feature_uniqueness = 'HObjektText'
|
|
feature_obj_id = 'ObjektID'
|
|
threshold = 2
|
|
data = data_pre_cleaned.copy()
|
|
data.at[0, feature_obj_id] = 1
|
|
|
|
(data_proc,) = tl.remove_non_relevant_obj_ids(
|
|
data,
|
|
thresh_unique_feat_per_id=threshold,
|
|
feature_uniqueness=feature_uniqueness,
|
|
feature_obj_id=feature_obj_id,
|
|
)
|
|
unique_obj_ids = data_proc[feature_obj_id].unique()
|
|
assert 1 not in unique_obj_ids
|
|
assert len(unique_obj_ids) == 2
|
|
|
|
|
|
def test_generate_model_input(data_pre_cleaned):
|
|
target_feature_name = 'nlp_model_input'
|
|
model_input_features = (
|
|
'VorgangsTypName',
|
|
'VorgangsBeschreibung',
|
|
)
|
|
data = data_pre_cleaned.copy()
|
|
(data_proc,) = tl.generate_model_input(
|
|
data,
|
|
target_feature_name=target_feature_name,
|
|
model_input_features=model_input_features,
|
|
)
|
|
feat1 = data.at[0, model_input_features[0]]
|
|
feat2 = data.at[0, model_input_features[1]]
|
|
test_result = f'{feat1} - {feat2}'
|
|
assert data_proc.at[0, target_feature_name] == test_result
|
|
|
|
|
|
def test_filter_activities_per_obj_id(data_pre_cleaned):
|
|
activity_feature = 'VorgangsTypName'
|
|
relevant_activity_types = ('Störungsmeldung',)
|
|
feature_obj_id = 'ObjektID'
|
|
threshold_num_activities = 1 # at least 2 occurrences per ObjID
|
|
|
|
data = data_pre_cleaned.copy()
|
|
data = data.iloc[:5]
|
|
|
|
df_filtered, act_per_obj_id = tl.filter_activities_per_obj_id(
|
|
data,
|
|
activity_feature=activity_feature,
|
|
relevant_activity_types=relevant_activity_types,
|
|
feature_obj_id=feature_obj_id,
|
|
threshold_num_activities=threshold_num_activities,
|
|
)
|
|
assert len(df_filtered) == 2
|
|
assert df_filtered.iat[0, 1] == act_per_obj_id.index[0]
|
|
assert act_per_obj_id.iat[0] == 2
|
|
|
|
|
|
def test_get_timeline_candidates_index(
|
|
data_timeline_filter_activities,
|
|
data_timeline_number_activities,
|
|
STFR_model,
|
|
):
|
|
data = data_timeline_filter_activities.copy()
|
|
target_feature_name = 'nlp_model_input'
|
|
model_input_features = ('VorgangsBeschreibung',)
|
|
(data,) = tl.generate_model_input(
|
|
data,
|
|
target_feature_name=target_feature_name,
|
|
model_input_features=model_input_features,
|
|
)
|
|
data_num_act = data_timeline_number_activities.copy()
|
|
cos_sim_threshold = 0.8
|
|
tl_cands_iter = tl._get_timeline_candidates_index(
|
|
data=data,
|
|
num_activities_per_obj_id=data_num_act,
|
|
model=STFR_model,
|
|
cos_sim_threshold=cos_sim_threshold,
|
|
feature_obj_id='ObjektID',
|
|
model_input_feature=target_feature_name,
|
|
)
|
|
tl_cands_idx = tuple(tl_cands_iter) # format tuple((ObjID, (Idx1, Idx2)))
|
|
assert len(tl_cands_idx) == 0
|
|
|
|
cos_sim_threshold = 0.0
|
|
tl_cands_iter = tl._get_timeline_candidates_index(
|
|
data=data,
|
|
num_activities_per_obj_id=data_num_act,
|
|
model=STFR_model,
|
|
cos_sim_threshold=cos_sim_threshold,
|
|
feature_obj_id='ObjektID',
|
|
model_input_feature=target_feature_name,
|
|
)
|
|
tl_cands_idx = tuple(tl_cands_iter)
|
|
assert len(tl_cands_idx) == 1
|
|
assert tl_cands_idx == ((3, (np.int64(3), np.int64(4))),)
|
|
|
|
|
|
def test_transform_timeline_candidates(
|
|
data_timeline_filter_activities,
|
|
data_timeline_number_activities,
|
|
STFR_model,
|
|
):
|
|
data = data_timeline_filter_activities.copy()
|
|
target_feature_name = 'nlp_model_input'
|
|
model_input_features = ('VorgangsBeschreibung',)
|
|
(data,) = tl.generate_model_input(
|
|
data,
|
|
target_feature_name=target_feature_name,
|
|
model_input_features=model_input_features,
|
|
)
|
|
data_num_act = data_timeline_number_activities.copy()
|
|
cos_sim_threshold = 0.0
|
|
tl_cands_iter = tl._get_timeline_candidates_index(
|
|
data=data,
|
|
num_activities_per_obj_id=data_num_act,
|
|
model=STFR_model,
|
|
cos_sim_threshold=cos_sim_threshold,
|
|
feature_obj_id='ObjektID',
|
|
model_input_feature=target_feature_name,
|
|
)
|
|
tl_cands_dict = tl._transform_timeline_candidates(tl_cands_iter)
|
|
|
|
assert 3 in tl_cands_dict
|
|
assert tl_cands_dict[3] == ((np.int64(3), np.int64(4)),)
|
|
|
|
|
|
def test_map_obj_id_to_texts(data_pre_cleaned):
|
|
data = data_pre_cleaned.iloc[:5].copy()
|
|
feature_obj_id = 'ObjektID'
|
|
feature_obj_text = 'HObjektText'
|
|
map_text = tl._map_obj_id_to_texts(
|
|
data=data,
|
|
feature_obj_id=feature_obj_id,
|
|
feature_obj_text=feature_obj_text,
|
|
)
|
|
assert len(map_text) == 3 # three unique IDs
|
|
assert map_text[1] == 'Fräsmaschine-FS435X'
|
|
assert map_text[2] == 'Schleifmaschine-S4x87'
|
|
assert map_text[3] == 'Bohrbearbeitungszentrum-BBZ35'
|
|
|
|
|
|
def test_get_timeline_candidates(
|
|
data_timeline_filter_activities,
|
|
data_timeline_number_activities,
|
|
STFR_model,
|
|
):
|
|
data = data_timeline_filter_activities.copy()
|
|
target_feature_name = 'nlp_model_input'
|
|
model_input_features = ('VorgangsBeschreibung',)
|
|
(data,) = tl.generate_model_input(
|
|
data,
|
|
target_feature_name=target_feature_name,
|
|
model_input_features=model_input_features,
|
|
)
|
|
data_num_act = data_timeline_number_activities.copy()
|
|
cos_sim_threshold = 0.0
|
|
tl_cands_dict, map_text = tl.get_timeline_candidates(
|
|
data=data,
|
|
num_activities_per_obj_id=data_num_act,
|
|
model=STFR_model,
|
|
cos_sim_threshold=cos_sim_threshold,
|
|
feature_obj_id='ObjektID',
|
|
feature_obj_text='HObjektText',
|
|
model_input_feature=target_feature_name,
|
|
)
|
|
assert 3 in tl_cands_dict
|
|
assert tl_cands_dict[3] == ((np.int64(3), np.int64(4)),)
|
|
assert len(map_text) == 1 # three unique IDs
|
|
assert map_text[3] == 'Bohrbearbeitungszentrum-BBZ35'
|
|
|
|
|
|
def test_filter_timeline_cands(data_pre_cleaned, data_timeline_cands_dict):
|
|
obj_id = 3
|
|
entry_idx = 0
|
|
sort_feature = 'ErstellungsDatum'
|
|
data_proc = tl.filter_timeline_cands(
|
|
data_pre_cleaned,
|
|
data_timeline_cands_dict,
|
|
obj_id=obj_id,
|
|
entry_idx=entry_idx,
|
|
sort_feature=sort_feature,
|
|
)
|
|
assert 3 in data_proc.index
|
|
assert 4 in data_proc.index
|
|
assert data_proc.at[3, 'ObjektID'] == 3
|
|
assert data_proc.at[4, 'ObjektID'] == 3
|
|
assert data_proc.at[3, 'VorgangsTypName'] == 'Störungsmeldung'
|
|
assert data_proc.at[4, 'VorgangsTypName'] == 'Störungsmeldung'
|
|
assert (
|
|
data_proc.at[3, 'ErledigungsBeschreibung']
|
|
== 'Beseitigung der Blockierung und Überprüfung des Antriebs'
|
|
)
|
|
assert (
|
|
data_proc.at[4, 'ErledigungsBeschreibung']
|
|
== 'Reinigung der Leitungen und Austausch des Kühlmittels'
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize('convert_to_days', [True, False])
|
|
def test_calc_delta_to_next_failure(
|
|
data_pre_cleaned,
|
|
data_timeline_cands_dict,
|
|
convert_to_days,
|
|
):
|
|
obj_id = 3
|
|
entry_idx = 0
|
|
sort_feature = 'ErstellungsDatum'
|
|
data_tl_filtered = tl.filter_timeline_cands(
|
|
data_pre_cleaned,
|
|
data_timeline_cands_dict,
|
|
obj_id=obj_id,
|
|
entry_idx=entry_idx,
|
|
sort_feature=sort_feature,
|
|
)
|
|
name_delta_feature = 'test_delta'
|
|
data_proc = tl.calc_delta_to_next_failure(
|
|
data_tl_filtered,
|
|
date_feature=sort_feature,
|
|
name_delta_feature=name_delta_feature,
|
|
convert_to_days=convert_to_days,
|
|
)
|
|
test_date = data_proc.at[4, sort_feature] - data_pre_cleaned.at[3, sort_feature]
|
|
test_date_last = pd.Timedelta(0)
|
|
if convert_to_days:
|
|
assert test_date.days == data_proc.at[3, name_delta_feature]
|
|
assert test_date_last.days == data_proc.at[4, name_delta_feature]
|
|
else:
|
|
assert test_date == data_proc.at[3, name_delta_feature]
|
|
assert test_date_last == data_proc.at[4, name_delta_feature]
|