import networkx as nx import pytest from lang_main.analysis import graphs from lang_main.errors import ( EdgePropertyNotContainedError, EmptyEdgesError, EmptyGraphError, NodePropertyNotContainedError, ) TK_GRAPH_NAME = 'TEST_TOKEN_GRAPH' def build_init_graph(token_graph: bool): edge_weights = [ {'weight': 1}, {'weight': 2}, {'weight': 3}, {'weight': 4}, {'weight': 5}, {'weight': 6}, ] edges = [ (1, 2), (1, 3), (2, 4), (3, 4), (1, 4), (2, 1), ] edges_to_add = [] for i, edge in enumerate(edges): edge = list(edge) edge.append(edge_weights[i]) # type: ignore edges_to_add.append(tuple(edge)) if token_graph: G = graphs.TokenGraph(name=TK_GRAPH_NAME, enable_logging=False) else: G = nx.DiGraph() G.add_edges_from(edges_to_add) return G @pytest.fixture(scope='module') def graph() -> graphs.DiGraph: return build_init_graph(token_graph=False) @pytest.fixture(scope='module') def tk_graph() -> graphs.TokenGraph: return build_init_graph(token_graph=True) # type: ignore @pytest.fixture(scope='module') def tk_graph_undirected(tk_graph) -> graphs.Graph: return tk_graph.undirected def test_graph_size(graph): assert len(graph.nodes) == 4 assert len(graph.edges) == 6 def test_save_to_GraphML(graph, tmp_path): filename = 'test_graphML' graphs.save_to_GraphML(graph, saving_path=tmp_path, filename=filename) saved_file = (tmp_path / filename).with_suffix('.graphml') assert saved_file.exists() def test_save_load_pickle_tk_graph(tk_graph, tmp_path): filename = 'test_save_tkg' tk_graph.to_pickle(tmp_path, filename) load_pth = (tmp_path / filename).with_suffix('.pkl') assert load_pth.exists() loaded_graph = graphs.TokenGraph.from_file(load_pth) assert loaded_graph.nodes == tk_graph.nodes assert loaded_graph.edges == tk_graph.edges filename = None tk_graph.to_pickle(tmp_path, filename) load_pth = (tmp_path / tk_graph.name).with_suffix('.pkl') assert load_pth.exists() loaded_graph = graphs.TokenGraph.from_file(load_pth) assert loaded_graph.nodes == tk_graph.nodes assert loaded_graph.edges == tk_graph.edges @pytest.mark.parametrize( 'import_graph,directed', [('tk_graph', True), ('tk_graph_undirected', False)] ) def test_save_load_GraphML_tk_graph(import_graph, tk_graph, directed, tmp_path, request): test_graph = request.getfixturevalue(import_graph) filename = 'test_save_tkg' tk_graph.to_GraphML(tmp_path, filename, directed=directed) load_pth = (tmp_path / filename).with_suffix('.graphml') assert load_pth.exists() loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int) assert loaded_graph.nodes == test_graph.nodes assert loaded_graph.edges == test_graph.edges filename = None tk_graph.to_GraphML(tmp_path, filename, directed=directed) load_pth = (tmp_path / tk_graph.name).with_suffix('.graphml') assert load_pth.exists() loaded_graph = graphs.TokenGraph.from_file(load_pth, node_type_graphml=int) assert loaded_graph.nodes == test_graph.nodes assert loaded_graph.edges == test_graph.edges def test_get_graph_metadata(graph): metadata = graphs.get_graph_metadata(graph) assert metadata['num_nodes'] == 4 assert metadata['num_edges'] == 6 assert metadata['min_edge_weight'] == 1 assert metadata['max_edge_weight'] == 6 assert metadata['node_memory'] == 112 assert metadata['edge_memory'] == 336 assert metadata['total_memory'] == 448 def test_update_graph_batch(): graph_obj = build_init_graph(token_graph=False) graphs.update_graph(graph_obj, batch=((4, 5), (5, 6)), weight_connection=8) metadata = graphs.get_graph_metadata(graph_obj) assert metadata['num_nodes'] == 6 assert metadata['num_edges'] == 8 assert metadata['min_edge_weight'] == 1 assert metadata['max_edge_weight'] == 8 def test_update_graph_single_new(): graph_obj = build_init_graph(token_graph=False) graphs.update_graph(graph_obj, parent=4, child=5, weight_connection=7) metadata = graphs.get_graph_metadata(graph_obj) assert metadata['num_nodes'] == 5 assert metadata['num_edges'] == 7 assert metadata['min_edge_weight'] == 1 assert metadata['max_edge_weight'] == 7 def test_update_graph_single_existing(): graph_obj = build_init_graph(token_graph=False) graphs.update_graph(graph_obj, parent=1, child=4, weight_connection=5) metadata = graphs.get_graph_metadata(graph_obj) assert metadata['num_nodes'] == 4 assert metadata['num_edges'] == 6 assert metadata['min_edge_weight'] == 1 assert metadata['max_edge_weight'] == 10 @pytest.mark.parametrize('cast_int', [True, False]) def test_convert_graph_to_undirected(graph, cast_int): graph_undir = graphs.convert_graph_to_undirected(graph, cast_int=cast_int) # edges: (1, 2, w=1) und (2, 1, w=6) --> undirected: (1, 2, w=7) assert graph_undir[1][2]['weight'] == pytest.approx(7.0) def test_convert_graph_to_cytoscape(graph): cyto_graph, weight_data = graphs.convert_graph_to_cytoscape(graph) node = cyto_graph[0] edge = cyto_graph[-1] assert node['data']['id'] == 1 # type: ignore assert edge['data']['source'] == 3 # type: ignore assert edge['data']['target'] == 4 # type: ignore assert edge['data']['weight'] == 4 # type: ignore assert weight_data['min'] == 1 assert weight_data['max'] == 6 def test_tk_graph_properties(tk_graph): assert tk_graph.name == TK_GRAPH_NAME assert isinstance(tk_graph.directed, graphs.TokenGraph) assert isinstance(tk_graph.undirected, nx.Graph) tk_graph.update_metadata() metadata_directed = tk_graph.metadata_directed assert metadata_directed['num_nodes'] == 4 assert metadata_directed['num_edges'] == 6 assert metadata_directed['min_edge_weight'] == 1 assert metadata_directed['max_edge_weight'] == 6 assert metadata_directed['node_memory'] == 112 assert metadata_directed['edge_memory'] == 336 assert metadata_directed['total_memory'] == 448 metadata_undirected = tk_graph.metadata_undirected assert metadata_undirected['num_nodes'] == 4 assert metadata_undirected['num_edges'] == 5 assert metadata_undirected['min_edge_weight'] == 2 assert metadata_undirected['max_edge_weight'] == 7 assert metadata_undirected['node_memory'] == 112 assert metadata_undirected['edge_memory'] == 280 assert metadata_undirected['total_memory'] == 392 def test_filter_graph_by_edge_weight(tk_graph): filtered_graph = graphs.filter_graph_by_edge_weight( tk_graph, bound_lower=2, bound_upper=5, ) assert not filtered_graph.has_edge(1, 2) assert not filtered_graph.has_edge(2, 1) def test_filter_graph_by_node_degree(tk_graph): filtered_graph = graphs.filter_graph_by_node_degree( tk_graph, bound_lower=3, bound_upper=3, ) assert len(filtered_graph.nodes) == 2 def test_filter_graph_by_number_edges(tk_graph): number_edges_limit = 1 filtered_graph = graphs.filter_graph_by_number_edges( tk_graph, limit=number_edges_limit, ) assert len(filtered_graph.edges) == number_edges_limit filtered_graph = graphs.filter_graph_by_node_degree( filtered_graph, bound_lower=1, bound_upper=None, ) assert len(filtered_graph.nodes) == 2, 'one edge should result in only two nodes' def test_add_weighted_degree(): graph_obj = build_init_graph(token_graph=False) property_name = 'degree_weighted' graphs.add_weighted_degree(graph_obj, 'weight', property_name) assert graph_obj.nodes[1][property_name] == 14 assert graph_obj.nodes[2][property_name] == 10 assert graph_obj.nodes[3][property_name] == 6 def test_add_betweenness_centrality(): graph_obj = build_init_graph(token_graph=False) property_name = 'betweenness_centrality' graphs.add_betweenness_centrality(graph_obj, property_name=property_name) assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(0.1667) assert graph_obj.nodes[2][property_name] == 0 assert graph_obj.nodes[3][property_name] == 0 def test_add_importance_metric(): graph_obj = build_init_graph(token_graph=False) property_name_WD = 'degree_weighted' graphs.add_weighted_degree(graph_obj, 'weight', property_name_WD) property_name_BC = 'betweenness_centrality' graphs.add_betweenness_centrality(graph_obj, property_name=property_name_BC) property_name = 'importance' graphs.add_importance_metric( graph_obj, property_name=property_name, property_name_weighted_degree=property_name_WD, property_name_betweenness=property_name_BC, ) assert round(graph_obj.nodes[1][property_name], 4) == pytest.approx(2.3333) assert graph_obj.nodes[2][property_name] == 0 assert graph_obj.nodes[3][property_name] == 0 with pytest.raises(NodePropertyNotContainedError): graphs.add_importance_metric( graph_obj, property_name=property_name, property_name_weighted_degree='prop_not_contained', property_name_betweenness=property_name_BC, ) with pytest.raises(NodePropertyNotContainedError): graphs.add_importance_metric( graph_obj, property_name=property_name, property_name_weighted_degree=property_name_WD, property_name_betweenness='prop_not_contained', ) def test_static_graph_analysis(): graph_obj = build_init_graph(token_graph=True) (graph_obj,) = graphs.static_graph_analysis(graph_obj) # type: ignore property_name = 'degree_weighted' assert graph_obj.nodes[1][property_name] == 14 assert graph_obj.nodes[2][property_name] == 10 assert graph_obj.nodes[3][property_name] == 6 assert graph_obj.undirected.nodes[1][property_name] == 14 assert graph_obj.undirected.nodes[2][property_name] == 10 assert graph_obj.undirected.nodes[3][property_name] == 6 def test_pipe_add_graph_metrics(): graph_obj = build_init_graph(token_graph=False) graph_obj_undir = graphs.convert_graph_to_undirected(graph_obj, cast_int=True) graph_collection = graphs.pipe_add_graph_metrics(graph_obj, graph_obj_undir) property_name = 'degree_weighted' assert graph_collection[0].nodes[1][property_name] == 14 assert graph_collection[0].nodes[2][property_name] == 10 assert graph_collection[0].nodes[3][property_name] == 6 assert graph_collection[1].nodes[1][property_name] == 14 assert graph_collection[1].nodes[2][property_name] == 10 assert graph_collection[1].nodes[3][property_name] == 6 property_name = 'betweenness_centrality' assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(0.1667) assert graph_collection[0].nodes[2][property_name] == 0 assert graph_collection[0].nodes[3][property_name] == 0 assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(0.1667) assert graph_collection[1].nodes[2][property_name] == 0 assert graph_collection[1].nodes[3][property_name] == 0 property_name = 'importance' assert round(graph_collection[0].nodes[1][property_name], 4) == pytest.approx(2.3333) assert graph_collection[0].nodes[2][property_name] == 0 assert graph_collection[0].nodes[3][property_name] == 0 assert round(graph_collection[1].nodes[1][property_name], 4) == pytest.approx(2.3333) assert graph_collection[1].nodes[2][property_name] == 0 assert graph_collection[1].nodes[3][property_name] == 0 def test_pipe_rescale_graph_edge_weights(tk_graph): rescaled_tkg, rescaled_undir = graphs.pipe_rescale_graph_edge_weights(tk_graph) assert rescaled_tkg[2][1]['weight'] == pytest.approx(1.0) assert rescaled_tkg[1][2]['weight'] == pytest.approx(0.095238) assert rescaled_undir[2][1]['weight'] == pytest.approx(1.0) assert rescaled_undir[1][2]['weight'] == pytest.approx(1.0) def test_pipe_graph_split(tk_graph): graph_copy, graph_undir = graphs.pipe_graph_split(tk_graph) assert len(graph_copy.nodes) == len(tk_graph.nodes) assert len(graph_copy.edges) == len(tk_graph.edges) assert len(graph_copy.nodes) == len(graph_undir.nodes) assert len(graph_undir.nodes) == len(tk_graph.nodes) assert len(graph_undir.edges) != len(tk_graph.edges) @pytest.mark.parametrize('import_graph', ['graph', 'tk_graph']) def test_rescale_edge_weights(import_graph, request): test_graph = request.getfixturevalue(import_graph) rescaled_graph = graphs.rescale_edge_weights(test_graph) assert rescaled_graph[2][1]['weight'] == pytest.approx(1.0) assert rescaled_graph[1][2]['weight'] == pytest.approx(0.095238) @pytest.mark.parametrize('import_graph', ['graph', 'tk_graph']) def test_verify_property(import_graph, request): test_graph = request.getfixturevalue(import_graph) test_property = 'centrality' with pytest.raises(EdgePropertyNotContainedError): graphs.verify_property(test_graph, property=test_property) test_property = 'weight' assert not graphs.verify_property(test_graph, property=test_property) def test_verify_non_empty_graph(): graph = nx.Graph() with pytest.raises(EmptyGraphError): graphs.verify_non_empty_graph(graph) graph.add_nodes_from([1, 2, 3, 4]) with pytest.raises(EmptyEdgesError): graphs.verify_non_empty_graph(graph, including_edges=True) assert not graphs.verify_non_empty_graph(graph, including_edges=False) graph.add_edges_from([(1, 2), (1, 3), (2, 4)]) assert not graphs.verify_non_empty_graph(graph, including_edges=True)