import time from collections.abc import Iterable from pathlib import Path from typing import Literal, cast import py4cytoscape as p4c from networkx import DiGraph, Graph from py4cytoscape.exceptions import CyError from requests.exceptions import RequestException from lang_main.constants import ( CYTO_BASE_NETWORK_NAME, CYTO_COLLECTION_NAME, CYTO_ITER_NEIGHBOUR_DEPTH, CYTO_LAYOUT_NAME, CYTO_LAYOUT_PROPERTIES, CYTO_NETWORK_ZOOM_FACTOR, CYTO_NUMBER_SUBGRAPHS, CYTO_PATH_STYLESHEET, CYTO_SANDBOX_NAME, CYTO_SELECTION_PROPERTY, CYTO_STYLESHEET_NAME, PROPERTY_NAME_DEGREE_WEIGHTED, SAVE_PATH_FOLDER, ) from lang_main.loggers import logger_rendering as logger from lang_main.types import ( CytoExportFileTypes, CytoExportPageSizes, CytoLayoutProperties, CytoLayouts, CytoNodeID, ) # ** Cytoscape API related, using py4cytoscape def verify_connection(): """Cytoscape: checks if CyREST and Cytoscape versions are compatible nad if Cytoscape API endpoint is reachable Raises ------ CyError incompatible CyREST or Cytoscape versions RequestException API endpoint not reachable """ try: p4c.cytoscape_ping() except CyError as error: logger.error('[CyError] CyREST or Cytoscape version not supported.') raise error except RequestException as error: logger.error('[CytoAPIConnection] Connection to CyREST API failed.') raise error def import_to_cytoscape( graph: DiGraph | Graph, network_name: str = CYTO_BASE_NETWORK_NAME, sandbox_name: str = CYTO_SANDBOX_NAME, reinitialise_sandbox: bool = True, ) -> None: """Cytoscape: import NetworkX graph as new network collection Parameters ---------- graph : DiGraph | Graph NetworkX graph object """ logger.debug('Checking Cytoscape connection...') verify_connection() logger.debug('Setting Cytoscape sandbox...') p4c.sandbox_set( sandbox_name=sandbox_name, reinitialize=reinitialise_sandbox, copy_samples=False, ) logger.debug('Importing to and analysing network in Cytoscape...') p4c.delete_all_networks() p4c.create_network_from_networkx( graph, title=network_name, collection=CYTO_COLLECTION_NAME, ) analyse_network(network_name=network_name) logger.debug('Import and analysis of network to Cytoscape successful.') def verify_table_property( property: str, table_type: Literal['node', 'edge', 'network'] = 'node', network_name: str = CYTO_BASE_NETWORK_NAME, ) -> bool: table = p4c.get_table_columns(table=table_type, network=network_name) return property in table.columns def analyse_network( property_degree_weighted: str = PROPERTY_NAME_DEGREE_WEIGHTED, network_name: str = CYTO_BASE_NETWORK_NAME, ) -> None: node_table = p4c.get_table_columns(table='node', network=network_name) net_analyse_possible: bool = True if len(node_table) < 4: net_analyse_possible = False if net_analyse_possible: p4c.analyze_network(directed=False) node_table = p4c.get_table_columns(table='node', network=network_name) node_table['stress_norm'] = node_table['Stress'] / node_table['Stress'].max() node_table[CYTO_SELECTION_PROPERTY] = ( node_table[property_degree_weighted] * node_table['BetweennessCentrality'] * node_table['stress_norm'] ) else: node_table[CYTO_SELECTION_PROPERTY] = 1 p4c.load_table_data(node_table, data_key_column='name', network=network_name) def reset_current_network_to_base() -> None: """resets to currently selected network in Cytoscape back to the base one""" p4c.set_current_network(CYTO_BASE_NETWORK_NAME) def fit_content( zoom_factor: float = CYTO_NETWORK_ZOOM_FACTOR, network_name: str = CYTO_BASE_NETWORK_NAME, ) -> None: p4c.hide_all_panels() p4c.fit_content(selected_only=False, network=network_name) zoom_current = p4c.get_network_zoom(network=network_name) zoom_new = zoom_current * zoom_factor p4c.set_network_zoom_bypass(zoom_new, bypass=False, network=network_name) def export_network_to_image( filename: str, target_folder: Path = SAVE_PATH_FOLDER, filetype: CytoExportFileTypes = 'SVG', network_name: str = CYTO_BASE_NETWORK_NAME, pdf_export_page_size: CytoExportPageSizes = 'A4', sandbox_name: str = CYTO_SANDBOX_NAME, ) -> None: """Cytoscape: export current selected view as image Parameters ---------- filename : str export filename filetype : CytoExportFileTypes, optional export filetype supported by Cytoscape, by default 'SVG' network_name : str, optional network to export, by default CYTO_BASE_NETWORK_NAME pdf_export_page_size : CytoExportPageSizes, optional page size which should be used for PDF exports supported by Cytoscape, by default 'A4' """ logger.debug('Exporting image to file...') if not target_folder.exists(): target_folder.mkdir(parents=True) dst_file_pth = (target_folder / filename).with_suffix(f'.{filetype.lower()}') text_as_font = True if filetype == 'SVG': text_as_font = False # close non-necessary windows and fit graph in frame before image display fit_content(network_name=network_name) # image is generated in sandbox directory and transferred to target destination # (preparation for remote instances of Cytoscape) p4c.export_image( filename=filename, type=filetype, network=network_name, overwrite_file=True, all_graphics_details=True, export_text_as_font=text_as_font, page_size=pdf_export_page_size, ) # TODO remove if Cytoscape >= 3.10.* is running in container # p4c.export_image( # filename=filename, # type=filetype, # network=network_name, # overwrite_file=True, # ) logger.debug('Exported image to sandbox.') logger.debug('Transferring image from sandbox to target destination...') sandbox_filename = f'{filename}.{filetype.lower()}' p4c.sandbox_get_from( source_file=sandbox_filename, dest_file=str(dst_file_pth), overwrite=True, sandbox_name=sandbox_name, ) logger.debug('Transfer of image from sandbox to target destination successful.') def layout_network( layout_name: CytoLayouts = CYTO_LAYOUT_NAME, layout_properties: CytoLayoutProperties = CYTO_LAYOUT_PROPERTIES, network_name: str = CYTO_BASE_NETWORK_NAME, ) -> None: """Cytoscape: apply a supported layout algorithm to currently selected network Parameters ---------- layout_name : CytoLayouts, optional layout algorithm supported by Cytoscape (name of the CyREST API, does not necessarily match the name in the Cytoscape UI), by default CYTO_LAYOUT_NAME layout_properties : CytoLayoutProperties, optional configuration of parameters for the given layout algorithm, by default CYTO_LAYOUT_PROPERTIES network_name : str, optional network to apply the layout algorithm on, by default CYTO_BASE_NETWORK_NAME """ logger.debug('Applying layout to network...') p4c.set_layout_properties(layout_name, layout_properties) p4c.layout_network(layout_name=layout_name, network=network_name) fit_content(network_name=network_name) logger.debug('Layout application to network successful.') def apply_style_to_network( style_name: str = CYTO_STYLESHEET_NAME, pth_to_stylesheet: Path = CYTO_PATH_STYLESHEET, network_name: str = CYTO_BASE_NETWORK_NAME, node_size_property: str = 'node_selection', min_node_size: int = 15, max_node_size: int = 40, sandbox_name: str = CYTO_SANDBOX_NAME, ) -> None: """Cytoscape: apply a chosen Cytoscape style to the defined network Parameters ---------- style_name : str, optional Cytoscape name of the style which should be applied, by default CYTO_STYLESHEET_NAME pth_to_stylesheet : Path, optional path where the stylesheet definition in Cytoscape's XML format can be found, by default CYTO_PATH_STYLESHEET network_name : str, optional network to apply the style on, by default CYTO_BASE_NETWORK_NAME Raises ------ FileNotFoundError if provided stylesheet can not be found under the provided path """ logger.debug('Applying style to network...') styles_avail = cast(list[str], p4c.get_visual_style_names()) if style_name not in styles_avail: if not pth_to_stylesheet.exists(): # existence for standard path verified at import, but not for other # provided paths raise FileNotFoundError( f'Visual stylesheet for Cytoscape not found under: >>{pth_to_stylesheet}<<' ) # send to sandbox sandbox_filename = pth_to_stylesheet.name p4c.sandbox_send_to( source_file=pth_to_stylesheet, dest_file=sandbox_filename, overwrite=True, sandbox_name=sandbox_name, ) # load stylesheet p4c.import_visual_styles(sandbox_filename) p4c.set_visual_style(style_name, network=network_name) # node size mapping, only if needed property is available # TODO check removal # size_prop_available = verify_table_property( # property=node_size_property, # network_name=network_name, # ) # if size_prop_available: scheme = p4c.scheme_c_number_continuous( start_value=min_node_size, end_value=max_node_size ) node_size_map = p4c.gen_node_size_map( node_size_property, number_scheme=scheme, mapping_type='c', style_name=style_name, default_number=min_node_size, ) p4c.set_node_size_mapping(**node_size_map) # TODO removal # else: # node_table = p4c.get_table_columns(table='node', network=network_name) # nodes_SUID = node_table['SUID'].to_list() # p4c.set_node_size_bypass(nodes_SUID, new_sizes=min_node_size, network=network_name) # p4c.set_visual_style(style_name, network=network_name) # time.sleep(1) # if not waited image export could be without applied style fit_content(network_name=network_name) logger.debug('Style application to network successful.') def get_subgraph_node_selection( network_name: str = CYTO_BASE_NETWORK_NAME, num_subgraphs: int = CYTO_NUMBER_SUBGRAPHS, ) -> list[CytoNodeID]: """Cytoscape: obtain the relevant nodes for iterative subgraph generation Parameters ---------- network_name : str, optional network to retrieve the nodes from, by default CYTO_BASE_NETWORK_NAME property_degree_weighted : str, optional property name which contains the weighted degree, by default PROPERTY_NAME_DEGREE_WEIGHTED num_subgraphs : int, optional number of relevant nodes which form the basis to generate subgraphs from, by default CYTO_NUMBER_SUBGRAPHS Returns ------- list[CytoNodeID] list containing all relevant Cytoscape nodes """ logger.debug('Selecting nodes for subgraph generation...') node_table = p4c.get_table_columns(table='node', network=network_name) node_table = node_table.sort_values(by=CYTO_SELECTION_PROPERTY, ascending=False) p4c.load_table_data(node_table, data_key_column='name', network=network_name) node_table_choice = node_table.iloc[:num_subgraphs, :] logger.debug('Selection of nodes for subgraph generation successful.') return node_table_choice['SUID'].to_list() def select_neighbours_of_node( node: CytoNodeID, neighbour_iter_depth: int = CYTO_ITER_NEIGHBOUR_DEPTH, network_name: str = CYTO_BASE_NETWORK_NAME, ) -> None: """Cytoscape: iterative selection of a node's neighbouring nodes and their connecting edges Parameters ---------- node : CytoNodeID node which neighbours should be selected neighbour_iter_depth : int, optional indicates how many levels of neighbours should be choosen, e.g. 1 --> only first-level neighbours are considered which are directly connected to the node, 2 --> all nodes with iteration depth of 1 are chosen and additionally their direct neighbours, by default CYTO_ITER_NEIGHBOUR_DEPTH network_name : str, optional network to perform action on, by default CYTO_BASE_NETWORK_NAME """ logger.debug('Selecting node neighbours for %s...', node) p4c.clear_selection(network=network_name) p4c.select_nodes(node, network=network_name) for _ in range(neighbour_iter_depth): _ = p4c.select_first_neighbors(network=network_name) _ = p4c.select_edges_connecting_selected_nodes() logger.debug('Selection of node neighbours for %s successful.', node) def make_subnetwork( index: int, network_name: str = CYTO_BASE_NETWORK_NAME, export_image: bool = True, target_folder: Path = SAVE_PATH_FOLDER, ) -> None: """Cytoscape: generate a new subnetwork based on the currently selected nodes and edges Parameters ---------- index : int id-like property to identify the subnetwork relative to its parent network_name : str, optional network to generate subnetwork from, by default CYTO_BASE_NETWORK_NAME export_image : bool, optional trigger image export of newly generated subnetwork, by default True """ logger.debug('Generating subnetwork with index %d...', index) subnetwork_name = network_name + f'_sub_{index+1}' p4c.create_subnetwork( nodes='selected', edges='selected', subnetwork_name=subnetwork_name, network=network_name, ) p4c.set_current_network(subnetwork_name) if export_image: time.sleep(1) export_network_to_image( filename=subnetwork_name, target_folder=target_folder, network_name=subnetwork_name, ) logger.debug('Generation of subnetwork with index %d successful.', index) def build_subnetworks( nodes_to_analyse: Iterable[CytoNodeID], network_name: str = CYTO_BASE_NETWORK_NAME, export_image: bool = True, target_folder: Path = SAVE_PATH_FOLDER, ) -> None: """Cytoscape: iteratively build subnetworks from a collection of nodes and their respective neighbouring nodes Parameters ---------- nodes_to_analyse : Iterable[CytoNodeID] collection of nodes to make subnetworks from, for each node a dedicated subnetwork will be generated network_name : str, optional network which contains the provided nodes, by default CYTO_BASE_NETWORK_NAME export_image : bool, optional trigger image export of newly generated subnetworks, by default True """ logger.debug('Generating all subnetworks for node selection...') for idx, node in enumerate(nodes_to_analyse): select_neighbours_of_node(node=node, network_name=network_name) make_subnetwork( index=idx, network_name=network_name, export_image=export_image, target_folder=target_folder, ) logger.debug('Generation of all subnetworks for node selection successful.')