lang-main/tests/analysis/test_graphs.py
2024-12-19 16:26:01 +01:00

357 lines
13 KiB
Python

import networkx as nx
import pytest
from lang_main.analysis import graphs
from lang_main.errors import (
EdgePropertyNotContainedError,
EmptyEdgesError,
EmptyGraphError,
NodePropertyNotContainedError,
)
TK_GRAPH_NAME = 'TEST_TOKEN_GRAPH'
def build_init_graph(token_graph: bool):
edge_weights = [
{'weight': 1},
{'weight': 2},
{'weight': 3},
{'weight': 4},
{'weight': 5},
{'weight': 6},
]
edges = [
(1, 2),
(1, 3),
(2, 4),
(3, 4),
(1, 4),
(2, 1),
]
edges_to_add = []
for i, edge in enumerate(edges):
edge = list(edge)
edge.append(edge_weights[i]) # type: ignore
edges_to_add.append(tuple(edge))
if token_graph:
G = graphs.TokenGraph(name=TK_GRAPH_NAME, enable_logging=False)
else:
G = nx.DiGraph()
G.add_edges_from(edges_to_add)
return G
@pytest.fixture(scope='module')
def graph() -> graphs.DiGraph:
return build_init_graph(token_graph=False)
@pytest.fixture(scope='module')
def tk_graph() -> graphs.TokenGraph:
return build_init_graph(token_graph=True) # type: ignore
@pytest.fixture(scope='module')
def tk_graph_undirected(tk_graph) -> graphs.Graph:
return tk_graph.undirected
def test_graph_size(graph):
assert len(graph.nodes) == 4
assert len(graph.edges) == 6
def test_save_to_GraphML(graph, tmp_path):
filename = 'test_graphML'
graphs.save_to_GraphML(graph, saving_path=tmp_path, filename=filename)
saved_file = (tmp_path / filename).with_suffix('.graphml')
assert saved_file.exists()
def test_save_load_pickle_tk_graph(tk_graph, tmp_path):
filename = 'test_save_tkg'
tk_graph.to_pickle(tmp_path, filename)
load_pth = (tmp_path / filename).with_suffix('.pkl')
assert load_pth.exists()
loaded_graph = graphs.TokenGraph.from_file(load_pth)
assert loaded_graph.nodes == tk_graph.nodes
assert loaded_graph.edges == tk_graph.edges
filename = None
tk_graph.to_pickle(tmp_path, filename)
load_pth = (tmp_path / tk_graph.name).with_suffix('.pkl')
assert load_pth.exists()
loaded_graph = graphs.TokenGraph.from_file(load_pth)
assert loaded_graph.nodes == tk_graph.nodes
assert loaded_graph.edges == tk_graph.edges
@pytest.mark.parametrize(
'import_graph,directed', [('tk_graph', True), ('tk_graph_undirected', False)]
)
def test_save_load_GraphML_tk_graph(import_graph, tk_graph, directed, tmp_path, request):
test_graph = request.getfixturevalue(import_graph)
filename = 'test_save_tkg'
tk_graph.to_GraphML(tmp_path, filename, directed=directed)
load_pth = (tmp_path / filename).with_suffix('.graphml')
assert load_pth.exists()
loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int)
assert loaded_graph.nodes == test_graph.nodes
assert loaded_graph.edges == test_graph.edges
filename = None
tk_graph.to_GraphML(tmp_path, filename, directed=directed)
load_pth = (tmp_path / tk_graph.name).with_suffix('.graphml')
assert load_pth.exists()
loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int)
assert loaded_graph.nodes == test_graph.nodes
assert loaded_graph.edges == test_graph.edges
def test_get_graph_metadata(graph):
metadata = graphs.get_graph_metadata(graph)
assert metadata['num_nodes'] == 4
assert metadata['num_edges'] == 6
assert metadata['min_edge_weight'] == 1
assert metadata['max_edge_weight'] == 6
assert metadata['node_memory'] == 112
assert metadata['edge_memory'] == 336
assert metadata['total_memory'] == 448
def test_update_graph_batch():
graph_obj = build_init_graph(token_graph=False)
graphs.update_graph(graph_obj, batch=((4, 5), (5, 6)), weight_connection=8)
metadata = graphs.get_graph_metadata(graph_obj)
assert metadata['num_nodes'] == 6
assert metadata['num_edges'] == 8
assert metadata['min_edge_weight'] == 1
assert metadata['max_edge_weight'] == 8
def test_update_graph_single_new():
graph_obj = build_init_graph(token_graph=False)
graphs.update_graph(graph_obj, parent=4, child=5, weight_connection=7)
metadata = graphs.get_graph_metadata(graph_obj)
assert metadata['num_nodes'] == 5
assert metadata['num_edges'] == 7
assert metadata['min_edge_weight'] == 1
assert metadata['max_edge_weight'] == 7
def test_update_graph_single_existing():
graph_obj = build_init_graph(token_graph=False)
graphs.update_graph(graph_obj, parent=1, child=4, weight_connection=5)
metadata = graphs.get_graph_metadata(graph_obj)
assert metadata['num_nodes'] == 4
assert metadata['num_edges'] == 6
assert metadata['min_edge_weight'] == 1
assert metadata['max_edge_weight'] == 10
@pytest.mark.parametrize('cast_int', [True, False])
def test_convert_graph_to_undirected(graph, cast_int):
graph_undir = graphs.convert_graph_to_undirected(graph, cast_int=cast_int)
# edges: (1, 2, w=1) und (2, 1, w=6) --> undirected: (1, 2, w=7)
assert graph_undir[1][2]['weight'] == pytest.approx(7.0)
def test_convert_graph_to_cytoscape(graph):
cyto_graph, weight_data = graphs.convert_graph_to_cytoscape(graph)
node = cyto_graph[0]
edge = cyto_graph[-1]
assert node['data']['id'] == 1 # type: ignore
assert edge['data']['source'] == 3 # type: ignore
assert edge['data']['target'] == 4 # type: ignore
assert edge['data']['weight'] == 4 # type: ignore
assert weight_data['min'] == 1
assert weight_data['max'] == 6
def test_tk_graph_properties(tk_graph):
assert tk_graph.name == TK_GRAPH_NAME
assert isinstance(tk_graph.directed, graphs.TokenGraph)
assert isinstance(tk_graph.undirected, nx.Graph)
tk_graph.update_metadata()
metadata_directed = tk_graph.metadata_directed
assert metadata_directed['num_nodes'] == 4
assert metadata_directed['num_edges'] == 6
assert metadata_directed['min_edge_weight'] == 1
assert metadata_directed['max_edge_weight'] == 6
assert metadata_directed['node_memory'] == 112
assert metadata_directed['edge_memory'] == 336
assert metadata_directed['total_memory'] == 448
metadata_undirected = tk_graph.metadata_undirected
assert metadata_undirected['num_nodes'] == 4
assert metadata_undirected['num_edges'] == 5
assert metadata_undirected['min_edge_weight'] == 2
assert metadata_undirected['max_edge_weight'] == 7
assert metadata_undirected['node_memory'] == 112
assert metadata_undirected['edge_memory'] == 280
assert metadata_undirected['total_memory'] == 392
def test_filter_graph_by_edge_weight(tk_graph):
filtered_graph = graphs.filter_graph_by_edge_weight(
tk_graph,
bound_lower=2,
bound_upper=5,
)
assert not filtered_graph.has_edge(1, 2)
assert not filtered_graph.has_edge(2, 1)
def test_filter_graph_by_node_degree(tk_graph):
filtered_graph = graphs.filter_graph_by_node_degree(
tk_graph,
bound_lower=3,
bound_upper=3,
)
assert len(filtered_graph.nodes) == 2
def test_filter_graph_by_number_edges(tk_graph):
number_edges_limit = 1
filtered_graph = graphs.filter_graph_by_number_edges(
tk_graph,
limit=number_edges_limit,
)
assert len(filtered_graph.edges) == number_edges_limit
filtered_graph = graphs.filter_graph_by_node_degree(
filtered_graph,
bound_lower=1,
bound_upper=None,
)
assert len(filtered_graph.nodes) == 2, 'one edge should result in only two nodes'
def test_add_weighted_degree():
graph_obj = build_init_graph(token_graph=False)
property_name = 'degree_weighted'
graphs.add_weighted_degree(graph_obj, 'weight', property_name)
assert graph_obj.nodes[1][property_name] == 14
assert graph_obj.nodes[2][property_name] == 10
assert graph_obj.nodes[3][property_name] == 6
def test_add_betweenness_centrality():
graph_obj = build_init_graph(token_graph=False)
property_name = 'betweenness_centrality'
graphs.add_betweenness_centrality(graph_obj, property_name=property_name)
assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(0.1667)
assert graph_obj.nodes[2][property_name] == 0
assert graph_obj.nodes[3][property_name] == 0
def test_add_importance_metric():
graph_obj = build_init_graph(token_graph=False)
property_name_WD = 'degree_weighted'
graphs.add_weighted_degree(graph_obj, 'weight', property_name_WD)
property_name_BC = 'betweenness_centrality'
graphs.add_betweenness_centrality(graph_obj, property_name=property_name_BC)
property_name = 'importance'
graphs.add_importance_metric(
graph_obj,
property_name=property_name,
property_name_weighted_degree=property_name_WD,
property_name_betweenness=property_name_BC,
)
assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(2.3333)
assert graph_obj.nodes[2][property_name] == 0
assert graph_obj.nodes[3][property_name] == 0
with pytest.raises(NodePropertyNotContainedError):
graphs.add_importance_metric(
graph_obj,
property_name=property_name,
property_name_weighted_degree='prop_not_contained',
property_name_betweenness=property_name_BC,
)
with pytest.raises(NodePropertyNotContainedError):
graphs.add_importance_metric(
graph_obj,
property_name=property_name,
property_name_weighted_degree=property_name_WD,
property_name_betweenness='prop_not_contained',
)
def test_static_graph_analysis():
graph_obj = build_init_graph(token_graph=True)
(graph_obj,) = graphs.static_graph_analysis(graph_obj) # type: ignore
property_name = 'degree_weighted'
assert graph_obj.nodes[1][property_name] == 14
assert graph_obj.nodes[2][property_name] == 10
assert graph_obj.nodes[3][property_name] == 6
assert graph_obj.undirected.nodes[1][property_name] == 14
assert graph_obj.undirected.nodes[2][property_name] == 10
assert graph_obj.undirected.nodes[3][property_name] == 6
def test_pipe_add_graph_metrics():
graph_obj = build_init_graph(token_graph=False)
graph_obj_undir = graphs.convert_graph_to_undirected(graph_obj, cast_int=True)
graph_collection = graphs.pipe_add_graph_metrics(graph_obj, graph_obj_undir)
property_name = 'degree_weighted'
assert graph_collection[0].nodes[1][property_name] == 14
assert graph_collection[0].nodes[2][property_name] == 10
assert graph_collection[0].nodes[3][property_name] == 6
assert graph_collection[1].nodes[1][property_name] == 14
assert graph_collection[1].nodes[2][property_name] == 10
assert graph_collection[1].nodes[3][property_name] == 6
property_name = 'betweenness_centrality'
assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(0.1667)
assert graph_collection[0].nodes[2][property_name] == 0
assert graph_collection[0].nodes[3][property_name] == 0
assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(0.1667)
assert graph_collection[1].nodes[2][property_name] == 0
assert graph_collection[1].nodes[3][property_name] == 0
property_name = 'importance'
assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(2.3333)
assert graph_collection[0].nodes[2][property_name] == 0
assert graph_collection[0].nodes[3][property_name] == 0
assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(2.3333)
assert graph_collection[1].nodes[2][property_name] == 0
assert graph_collection[1].nodes[3][property_name] == 0
def test_pipe_rescale_graph_edge_weights(tk_graph):
rescaled_tkg, rescaled_undir = graphs.pipe_rescale_graph_edge_weights(tk_graph)
assert rescaled_tkg[2][1]['weight'] == pytest.approx(1.0)
assert rescaled_tkg[1][2]['weight'] == pytest.approx(0.0952)
assert rescaled_undir[2][1]['weight'] == pytest.approx(1.0)
assert rescaled_undir[1][2]['weight'] == pytest.approx(1.0)
@pytest.mark.parametrize('import_graph', ['graph', 'tk_graph'])
def test_rescale_edge_weights(import_graph, request):
test_graph = request.getfixturevalue(import_graph)
rescaled_graph = graphs.rescale_edge_weights(test_graph)
assert rescaled_graph[2][1]['weight'] == pytest.approx(1.0)
assert rescaled_graph[1][2]['weight'] == pytest.approx(0.0952)
@pytest.mark.parametrize('import_graph', ['graph', 'tk_graph'])
def test_verify_property(import_graph, request):
test_graph = request.getfixturevalue(import_graph)
test_property = 'centrality'
with pytest.raises(EdgePropertyNotContainedError):
graphs.verify_property(test_graph, property=test_property)
test_property = 'weight'
assert not graphs.verify_property(test_graph, property=test_property)
def test_verify_non_empty_graph():
graph = nx.Graph()
with pytest.raises(EmptyGraphError):
graphs.verify_non_empty_graph(graph)
graph.add_nodes_from([1, 2, 3, 4])
with pytest.raises(EmptyEdgesError):
graphs.verify_non_empty_graph(graph, including_edges=True)
assert not graphs.verify_non_empty_graph(graph, including_edges=False)
graph.add_edges_from([(1, 2), (1, 3), (2, 4)])
assert not graphs.verify_non_empty_graph(graph, including_edges=True)