366 lines
14 KiB
Python
366 lines
14 KiB
Python
import networkx as nx
|
|
import pytest
|
|
|
|
from lang_main.analysis import graphs
|
|
from lang_main.errors import (
|
|
EdgePropertyNotContainedError,
|
|
EmptyEdgesError,
|
|
EmptyGraphError,
|
|
NodePropertyNotContainedError,
|
|
)
|
|
|
|
TK_GRAPH_NAME = 'TEST_TOKEN_GRAPH'
|
|
|
|
|
|
def build_init_graph(token_graph: bool):
|
|
edge_weights = [
|
|
{'weight': 1},
|
|
{'weight': 2},
|
|
{'weight': 3},
|
|
{'weight': 4},
|
|
{'weight': 5},
|
|
{'weight': 6},
|
|
]
|
|
edges = [
|
|
(1, 2),
|
|
(1, 3),
|
|
(2, 4),
|
|
(3, 4),
|
|
(1, 4),
|
|
(2, 1),
|
|
]
|
|
edges_to_add = []
|
|
for i, edge in enumerate(edges):
|
|
edge = list(edge)
|
|
edge.append(edge_weights[i]) # type: ignore
|
|
edges_to_add.append(tuple(edge))
|
|
|
|
if token_graph:
|
|
G = graphs.TokenGraph(name=TK_GRAPH_NAME, enable_logging=False)
|
|
else:
|
|
G = nx.DiGraph()
|
|
|
|
G.add_edges_from(edges_to_add)
|
|
|
|
return G
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def graph() -> graphs.DiGraph:
|
|
return build_init_graph(token_graph=False)
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def tk_graph() -> graphs.TokenGraph:
|
|
return build_init_graph(token_graph=True) # type: ignore
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def tk_graph_undirected(tk_graph) -> graphs.Graph:
|
|
return tk_graph.undirected
|
|
|
|
|
|
def test_graph_size(graph):
|
|
assert len(graph.nodes) == 4
|
|
assert len(graph.edges) == 6
|
|
|
|
|
|
def test_save_to_GraphML(graph, tmp_path):
|
|
filename = 'test_graphML'
|
|
graphs.save_to_GraphML(graph, saving_path=tmp_path, filename=filename)
|
|
saved_file = (tmp_path / filename).with_suffix('.graphml')
|
|
assert saved_file.exists()
|
|
|
|
|
|
def test_save_load_pickle_tk_graph(tk_graph, tmp_path):
|
|
filename = 'test_save_tkg'
|
|
tk_graph.to_pickle(tmp_path, filename)
|
|
load_pth = (tmp_path / filename).with_suffix('.pkl')
|
|
assert load_pth.exists()
|
|
loaded_graph = graphs.TokenGraph.from_file(load_pth)
|
|
assert loaded_graph.nodes == tk_graph.nodes
|
|
assert loaded_graph.edges == tk_graph.edges
|
|
filename = None
|
|
tk_graph.to_pickle(tmp_path, filename)
|
|
load_pth = (tmp_path / tk_graph.name).with_suffix('.pkl')
|
|
assert load_pth.exists()
|
|
loaded_graph = graphs.TokenGraph.from_file(load_pth)
|
|
assert loaded_graph.nodes == tk_graph.nodes
|
|
assert loaded_graph.edges == tk_graph.edges
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'import_graph,directed', [('tk_graph', True), ('tk_graph_undirected', False)]
|
|
)
|
|
def test_save_load_GraphML_tk_graph(import_graph, tk_graph, directed, tmp_path, request):
|
|
test_graph = request.getfixturevalue(import_graph)
|
|
filename = 'test_save_tkg'
|
|
tk_graph.to_GraphML(tmp_path, filename, directed=directed)
|
|
load_pth = (tmp_path / filename).with_suffix('.graphml')
|
|
assert load_pth.exists()
|
|
loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int)
|
|
assert loaded_graph.nodes == test_graph.nodes
|
|
assert loaded_graph.edges == test_graph.edges
|
|
filename = None
|
|
tk_graph.to_GraphML(tmp_path, filename, directed=directed)
|
|
load_pth = (tmp_path / tk_graph.name).with_suffix('.graphml')
|
|
assert load_pth.exists()
|
|
loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int)
|
|
assert loaded_graph.nodes == test_graph.nodes
|
|
assert loaded_graph.edges == test_graph.edges
|
|
|
|
|
|
def test_get_graph_metadata(graph):
|
|
metadata = graphs.get_graph_metadata(graph)
|
|
assert metadata['num_nodes'] == 4
|
|
assert metadata['num_edges'] == 6
|
|
assert metadata['min_edge_weight'] == 1
|
|
assert metadata['max_edge_weight'] == 6
|
|
assert metadata['node_memory'] == 112
|
|
assert metadata['edge_memory'] == 336
|
|
assert metadata['total_memory'] == 448
|
|
|
|
|
|
def test_update_graph_batch():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
graphs.update_graph(graph_obj, batch=((4, 5), (5, 6)), weight_connection=8)
|
|
metadata = graphs.get_graph_metadata(graph_obj)
|
|
assert metadata['num_nodes'] == 6
|
|
assert metadata['num_edges'] == 8
|
|
assert metadata['min_edge_weight'] == 1
|
|
assert metadata['max_edge_weight'] == 8
|
|
|
|
|
|
def test_update_graph_single_new():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
graphs.update_graph(graph_obj, parent=4, child=5, weight_connection=7)
|
|
metadata = graphs.get_graph_metadata(graph_obj)
|
|
assert metadata['num_nodes'] == 5
|
|
assert metadata['num_edges'] == 7
|
|
assert metadata['min_edge_weight'] == 1
|
|
assert metadata['max_edge_weight'] == 7
|
|
|
|
|
|
def test_update_graph_single_existing():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
graphs.update_graph(graph_obj, parent=1, child=4, weight_connection=5)
|
|
metadata = graphs.get_graph_metadata(graph_obj)
|
|
assert metadata['num_nodes'] == 4
|
|
assert metadata['num_edges'] == 6
|
|
assert metadata['min_edge_weight'] == 1
|
|
assert metadata['max_edge_weight'] == 10
|
|
|
|
|
|
@pytest.mark.parametrize('cast_int', [True, False])
|
|
def test_convert_graph_to_undirected(graph, cast_int):
|
|
graph_undir = graphs.convert_graph_to_undirected(graph, cast_int=cast_int)
|
|
# edges: (1, 2, w=1) und (2, 1, w=6) --> undirected: (1, 2, w=7)
|
|
assert graph_undir[1][2]['weight'] == pytest.approx(7.0)
|
|
|
|
|
|
def test_convert_graph_to_cytoscape(graph):
|
|
cyto_graph, weight_data = graphs.convert_graph_to_cytoscape(graph)
|
|
node = cyto_graph[0]
|
|
edge = cyto_graph[-1]
|
|
assert node['data']['id'] == 1 # type: ignore
|
|
assert edge['data']['source'] == 3 # type: ignore
|
|
assert edge['data']['target'] == 4 # type: ignore
|
|
assert edge['data']['weight'] == 4 # type: ignore
|
|
assert weight_data['min'] == 1
|
|
assert weight_data['max'] == 6
|
|
|
|
|
|
def test_tk_graph_properties(tk_graph):
|
|
assert tk_graph.name == TK_GRAPH_NAME
|
|
assert isinstance(tk_graph.directed, graphs.TokenGraph)
|
|
assert isinstance(tk_graph.undirected, nx.Graph)
|
|
tk_graph.update_metadata()
|
|
metadata_directed = tk_graph.metadata_directed
|
|
assert metadata_directed['num_nodes'] == 4
|
|
assert metadata_directed['num_edges'] == 6
|
|
assert metadata_directed['min_edge_weight'] == 1
|
|
assert metadata_directed['max_edge_weight'] == 6
|
|
assert metadata_directed['node_memory'] == 112
|
|
assert metadata_directed['edge_memory'] == 336
|
|
assert metadata_directed['total_memory'] == 448
|
|
metadata_undirected = tk_graph.metadata_undirected
|
|
assert metadata_undirected['num_nodes'] == 4
|
|
assert metadata_undirected['num_edges'] == 5
|
|
assert metadata_undirected['min_edge_weight'] == 2
|
|
assert metadata_undirected['max_edge_weight'] == 7
|
|
assert metadata_undirected['node_memory'] == 112
|
|
assert metadata_undirected['edge_memory'] == 280
|
|
assert metadata_undirected['total_memory'] == 392
|
|
|
|
|
|
def test_filter_graph_by_edge_weight(tk_graph):
|
|
filtered_graph = graphs.filter_graph_by_edge_weight(
|
|
tk_graph,
|
|
bound_lower=2,
|
|
bound_upper=5,
|
|
)
|
|
assert not filtered_graph.has_edge(1, 2)
|
|
assert not filtered_graph.has_edge(2, 1)
|
|
|
|
|
|
def test_filter_graph_by_node_degree(tk_graph):
|
|
filtered_graph = graphs.filter_graph_by_node_degree(
|
|
tk_graph,
|
|
bound_lower=3,
|
|
bound_upper=3,
|
|
)
|
|
assert len(filtered_graph.nodes) == 2
|
|
|
|
|
|
def test_filter_graph_by_number_edges(tk_graph):
|
|
number_edges_limit = 1
|
|
filtered_graph = graphs.filter_graph_by_number_edges(
|
|
tk_graph,
|
|
limit=number_edges_limit,
|
|
)
|
|
assert len(filtered_graph.edges) == number_edges_limit
|
|
filtered_graph = graphs.filter_graph_by_node_degree(
|
|
filtered_graph,
|
|
bound_lower=1,
|
|
bound_upper=None,
|
|
)
|
|
assert len(filtered_graph.nodes) == 2, 'one edge should result in only two nodes'
|
|
|
|
|
|
def test_add_weighted_degree():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
property_name = 'degree_weighted'
|
|
graphs.add_weighted_degree(graph_obj, 'weight', property_name)
|
|
assert graph_obj.nodes[1][property_name] == 14
|
|
assert graph_obj.nodes[2][property_name] == 10
|
|
assert graph_obj.nodes[3][property_name] == 6
|
|
|
|
|
|
def test_add_betweenness_centrality():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
property_name = 'betweenness_centrality'
|
|
graphs.add_betweenness_centrality(graph_obj, property_name=property_name)
|
|
assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(0.1667)
|
|
assert graph_obj.nodes[2][property_name] == 0
|
|
assert graph_obj.nodes[3][property_name] == 0
|
|
|
|
|
|
def test_add_importance_metric():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
property_name_WD = 'degree_weighted'
|
|
graphs.add_weighted_degree(graph_obj, 'weight', property_name_WD)
|
|
property_name_BC = 'betweenness_centrality'
|
|
graphs.add_betweenness_centrality(graph_obj, property_name=property_name_BC)
|
|
property_name = 'importance'
|
|
graphs.add_importance_metric(
|
|
graph_obj,
|
|
property_name=property_name,
|
|
property_name_weighted_degree=property_name_WD,
|
|
property_name_betweenness=property_name_BC,
|
|
)
|
|
assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(2.3333)
|
|
assert graph_obj.nodes[2][property_name] == 0
|
|
assert graph_obj.nodes[3][property_name] == 0
|
|
|
|
with pytest.raises(NodePropertyNotContainedError):
|
|
graphs.add_importance_metric(
|
|
graph_obj,
|
|
property_name=property_name,
|
|
property_name_weighted_degree='prop_not_contained',
|
|
property_name_betweenness=property_name_BC,
|
|
)
|
|
|
|
with pytest.raises(NodePropertyNotContainedError):
|
|
graphs.add_importance_metric(
|
|
graph_obj,
|
|
property_name=property_name,
|
|
property_name_weighted_degree=property_name_WD,
|
|
property_name_betweenness='prop_not_contained',
|
|
)
|
|
|
|
|
|
def test_static_graph_analysis():
|
|
graph_obj = build_init_graph(token_graph=True)
|
|
(graph_obj,) = graphs.static_graph_analysis(graph_obj) # type: ignore
|
|
property_name = 'degree_weighted'
|
|
assert graph_obj.nodes[1][property_name] == 14
|
|
assert graph_obj.nodes[2][property_name] == 10
|
|
assert graph_obj.nodes[3][property_name] == 6
|
|
assert graph_obj.undirected.nodes[1][property_name] == 14
|
|
assert graph_obj.undirected.nodes[2][property_name] == 10
|
|
assert graph_obj.undirected.nodes[3][property_name] == 6
|
|
|
|
|
|
def test_pipe_add_graph_metrics():
|
|
graph_obj = build_init_graph(token_graph=False)
|
|
graph_obj_undir = graphs.convert_graph_to_undirected(graph_obj, cast_int=True)
|
|
graph_collection = graphs.pipe_add_graph_metrics(graph_obj, graph_obj_undir)
|
|
property_name = 'degree_weighted'
|
|
assert graph_collection[0].nodes[1][property_name] == 14
|
|
assert graph_collection[0].nodes[2][property_name] == 10
|
|
assert graph_collection[0].nodes[3][property_name] == 6
|
|
assert graph_collection[1].nodes[1][property_name] == 14
|
|
assert graph_collection[1].nodes[2][property_name] == 10
|
|
assert graph_collection[1].nodes[3][property_name] == 6
|
|
property_name = 'betweenness_centrality'
|
|
assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(0.1667)
|
|
assert graph_collection[0].nodes[2][property_name] == 0
|
|
assert graph_collection[0].nodes[3][property_name] == 0
|
|
assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(0.1667)
|
|
assert graph_collection[1].nodes[2][property_name] == 0
|
|
assert graph_collection[1].nodes[3][property_name] == 0
|
|
property_name = 'importance'
|
|
assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(2.3333)
|
|
assert graph_collection[0].nodes[2][property_name] == 0
|
|
assert graph_collection[0].nodes[3][property_name] == 0
|
|
assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(2.3333)
|
|
assert graph_collection[1].nodes[2][property_name] == 0
|
|
assert graph_collection[1].nodes[3][property_name] == 0
|
|
|
|
|
|
def test_pipe_rescale_graph_edge_weights(tk_graph):
|
|
rescaled_tkg, rescaled_undir = graphs.pipe_rescale_graph_edge_weights(tk_graph)
|
|
assert rescaled_tkg[2][1]['weight'] == pytest.approx(1.0)
|
|
assert rescaled_tkg[1][2]['weight'] == pytest.approx(0.095238)
|
|
assert rescaled_undir[2][1]['weight'] == pytest.approx(1.0)
|
|
assert rescaled_undir[1][2]['weight'] == pytest.approx(1.0)
|
|
|
|
|
|
def test_pipe_graph_split(tk_graph):
|
|
graph_copy, graph_undir = graphs.pipe_graph_split(tk_graph)
|
|
assert len(graph_copy.nodes) == len(tk_graph.nodes)
|
|
assert len(graph_copy.edges) == len(tk_graph.edges)
|
|
assert len(graph_copy.nodes) == len(graph_undir.nodes)
|
|
assert len(graph_undir.nodes) == len(tk_graph.nodes)
|
|
assert len(graph_undir.edges) != len(tk_graph.edges)
|
|
|
|
|
|
@pytest.mark.parametrize('import_graph', ['graph', 'tk_graph'])
|
|
def test_rescale_edge_weights(import_graph, request):
|
|
test_graph = request.getfixturevalue(import_graph)
|
|
rescaled_graph = graphs.rescale_edge_weights(test_graph)
|
|
assert rescaled_graph[2][1]['weight'] == pytest.approx(1.0)
|
|
assert rescaled_graph[1][2]['weight'] == pytest.approx(0.095238)
|
|
|
|
|
|
@pytest.mark.parametrize('import_graph', ['graph', 'tk_graph'])
|
|
def test_verify_property(import_graph, request):
|
|
test_graph = request.getfixturevalue(import_graph)
|
|
test_property = 'centrality'
|
|
with pytest.raises(EdgePropertyNotContainedError):
|
|
graphs.verify_property(test_graph, property=test_property)
|
|
test_property = 'weight'
|
|
assert not graphs.verify_property(test_graph, property=test_property)
|
|
|
|
|
|
def test_verify_non_empty_graph():
|
|
graph = nx.Graph()
|
|
with pytest.raises(EmptyGraphError):
|
|
graphs.verify_non_empty_graph(graph)
|
|
graph.add_nodes_from([1, 2, 3, 4])
|
|
with pytest.raises(EmptyEdgesError):
|
|
graphs.verify_non_empty_graph(graph, including_edges=True)
|
|
assert not graphs.verify_non_empty_graph(graph, including_edges=False)
|
|
graph.add_edges_from([(1, 2), (1, 3), (2, 4)])
|
|
assert not graphs.verify_non_empty_graph(graph, including_edges=True)
|