improved test coverage, automation scripts

This commit is contained in:
Florian Förster
2024-11-26 16:11:25 +01:00
parent 9291b53f93
commit 38aa0739ad
33 changed files with 979 additions and 297 deletions

View File

@@ -1,177 +1,32 @@
import logging
import os
from pathlib import Path
from typing import Any, Final
from lang_main.config import load_toml_config
_has_py4cyto: bool = True
try:
import py4cytoscape as p4c
except ImportError:
_has_py4cyto = False
# ** external packages config
# ** Huggingface Hub caching
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = 'set'
# ** py4cytoscape config
if _has_py4cyto:
p4c.set_summary_logger(False)
p4c.py4cytoscape_logger.detail_logger.setLevel('ERROR')
p4c.py4cytoscape_logger.detail_logger.removeHandler(
p4c.py4cytoscape_logger.detail_handler
)
p4c.py4cytoscape_logger.detail_logger.addHandler(logging.NullHandler())
from lang_main.config import (
BASE_FOLDERNAME,
CONFIG_FILENAME,
CYTO_STYLESHEET_FILENAME,
PKG_DIR,
PREFER_INTERNAL_CONFIG,
STOP_FOLDER,
get_config_paths,
load_cfg,
)
from lang_main.search import search_base_path
# ** lang-main config
BASE_FOLDERNAME: Final[str] = 'lang-main'
CONFIG_FILENAME: Final[str] = 'lang_main_config.toml'
CYTO_STYLESHEET_FILENAME: Final[str] = r'cytoscape_config/lang_main.xml'
PREFER_INTERNAL_CONFIG: Final[bool] = False
pkg_dir = Path(__file__).parent
cfg_path_internal = (pkg_dir / CONFIG_FILENAME).resolve()
cyto_stylesheet_path = (pkg_dir / CYTO_STYLESHEET_FILENAME).resolve()
cfg_path_internal, cyto_stylesheet_path = get_config_paths(
PKG_DIR, CONFIG_FILENAME, CYTO_STYLESHEET_FILENAME
)
# ** load config data: internal/external
# look for external config first, if not found use internal one
def search_cwd(
glob_pattern: str = CONFIG_FILENAME,
) -> Path | None:
"""Searches the current working directory and looks for files
matching the glob pattern.
Returns the first match encountered.
Parameters
----------
glob_pattern : str, optional
pattern to look for, first match will be returned,
by default CONFIG_FILENAME
Returns
-------
Path | None
Path if corresponding object was found, None otherwise
"""
cfg_path: Path | None = None
res = tuple(Path.cwd().glob(glob_pattern))
if res:
cfg_path = res[0]
return cfg_path
def search_iterative(
starting_path: Path,
glob_pattern: str = CONFIG_FILENAME,
stop_folder_name: str | None = None,
) -> Path | None:
"""Iteratively searches the parent directories of the starting path
and look for files matching the glob pattern. The starting path is not
searched, only its parents. Therefore the starting path can also point
to a file. The folder in which it is placed in will be searched.
Returns the first match encountered.
Parameters
----------
starting_path : Path
non-inclusive starting path
glob_pattern : str, optional
pattern to look for, first match will be returned,
by default CONFIG_FILENAME
stop_folder_name : str, optional
name of the last folder in the directory tree to search, by default None
Returns
-------
Path | None
Path if corresponding object was found, None otherwise
"""
file_path: Path | None = None
stop_folder_reached: bool = False
for it in range(len(starting_path.parents)):
search_path = starting_path.parents[it] # do not look in library folder
res = tuple(search_path.glob(glob_pattern))
if res:
file_path = res[0]
break
elif stop_folder_reached:
break
if stop_folder_name is not None and search_path.name == stop_folder_name:
# library is placed inside a whole python installation for deployment
# if this folder is reached, only look up one parent above
stop_folder_reached = True
return file_path
def search_base_path(
starting_path: Path,
stop_folder_name: str | None = None,
) -> Path | None:
"""Iteratively searches the parent directories of the starting path
and look for folders matching the given name. If a match is encountered,
the parent path will be returned.
Example:
starting_path = path/to/start/folder
stop_folder_name = 'to'
returned path = 'path/'
Parameters
----------
starting_path : Path
non-inclusive starting path
stop_folder_name : str, optional
name of the last folder in the directory tree to search, by default None
Returns
-------
Path | None
Path if corresponding base path was found, None otherwise
"""
stop_folder_path: Path | None = None
base_path: Path | None = None
for it in range(len(starting_path.parents)):
search_path = starting_path.parents[it] # do not look in library folder
if stop_folder_name is not None and search_path.name == stop_folder_name:
# library is placed inside a whole python installation for deployment
# only look up to this folder
stop_folder_path = search_path
break
if stop_folder_path is not None:
base_path = stop_folder_path.parent
return base_path
def load_cfg() -> dict[str, Any]:
cfg_path: Path | None
if PREFER_INTERNAL_CONFIG:
cfg_path = cfg_path_internal
else:
cfg_path = search_cwd(glob_pattern=CONFIG_FILENAME)
if cfg_path is None:
cfg_path = search_iterative(
starting_path=pkg_dir,
glob_pattern=CONFIG_FILENAME,
stop_folder_name='python',
)
# backup: use internal config
if cfg_path is None:
cfg_path = cfg_path_internal
config = load_toml_config(path_to_toml=cfg_path)
return config.copy()
CONFIG: Final[dict[str, Any]] = load_cfg()
base_parent_path = search_base_path(pkg_dir, stop_folder_name=BASE_FOLDERNAME)
CONFIG: Final[dict[str, Any]] = load_cfg(
starting_path=PKG_DIR,
glob_pattern=CONFIG_FILENAME,
stop_folder_name=STOP_FOLDER,
cfg_path_internal=cfg_path_internal,
prefer_internal_config=PREFER_INTERNAL_CONFIG,
)
base_parent_path = search_base_path(PKG_DIR, stop_folder_name=BASE_FOLDERNAME)
if base_parent_path is None:
raise FileNotFoundError('Could not resolve base path of library')
BASE_PATH: Final[Path] = base_parent_path
@@ -185,11 +40,3 @@ if not cyto_stylesheet_path.exists():
)
CYTO_PATH_STYLESHEET: Final[Path] = cyto_stylesheet_path
# TODO check removal
# append Graphviz binary folder to system path if not already contained
# if sys.platform == 'win32':
# path = Path(r'C:\Program Files\Graphviz\bin')
# if path.is_dir() and str(path).lower() not in os.environ['PATH'].lower():
# os.environ['PATH'] += f';{path}'

View File

@@ -3,7 +3,7 @@ from typing import cast
from pandas import DataFrame, Series
from sentence_transformers import SentenceTransformer
from tqdm.auto import tqdm # TODO: check deletion
from tqdm.auto import tqdm
from lang_main.analysis.shared import (
candidates_by_index,

View File

@@ -241,35 +241,36 @@ def build_token_graph(
return graph, docs_mapping
def build_token_graph_simple(
data: DataFrame,
model: SpacyModel,
) -> tuple[TokenGraph, dict[PandasIndex, SpacyDoc]]:
graph = TokenGraph()
model_input = cast(tuple[str], tuple(data['entry'].to_list()))
weights = cast(tuple[int], tuple(data['num_occur'].to_list()))
indices = cast(tuple[list[PandasIndex]], tuple(data['batched_idxs'].to_list()))
index: int = 0
docs_mapping: dict[PandasIndex, SpacyDoc] = {}
# TODO check removal
# def build_token_graph_simple(
# data: DataFrame,
# model: SpacyModel,
# ) -> tuple[TokenGraph, dict[PandasIndex, SpacyDoc]]:
# graph = TokenGraph()
# model_input = cast(tuple[str], tuple(data['entry'].to_list()))
# weights = cast(tuple[int], tuple(data['num_occur'].to_list()))
# indices = cast(tuple[list[PandasIndex]], tuple(data['batched_idxs'].to_list()))
# index: int = 0
# docs_mapping: dict[PandasIndex, SpacyDoc] = {}
for doc in tqdm(model.pipe(model_input, batch_size=50), total=len(model_input)):
add_doc_info_to_graph(
graph=graph,
doc=doc,
weight=weights[index],
)
corresponding_indices = indices[index]
for idx in corresponding_indices:
docs_mapping[idx] = doc
# for doc in tqdm(model.pipe(model_input, batch_size=50), total=len(model_input)):
# add_doc_info_to_graph(
# graph=graph,
# doc=doc,
# weight=weights[index],
# )
# corresponding_indices = indices[index]
# for idx in corresponding_indices:
# docs_mapping[idx] = doc
index += 1
# index += 1
# metadata
graph.update_metadata()
# convert to undirected
graph.to_undirected(logging=False)
# # metadata
# graph.update_metadata()
# # convert to undirected
# graph.to_undirected(logging=False)
return graph, docs_mapping
# return graph, docs_mapping
# TODO check removal

View File

@@ -1,11 +1,40 @@
from __future__ import annotations
import logging
import os
import sys
import tomllib
from typing import TYPE_CHECKING, Any
from pathlib import Path
from typing import Any, Final
if TYPE_CHECKING:
from pathlib import Path
from lang_main.search import search_cwd, search_iterative
_has_py4cyto: bool = True
try:
import py4cytoscape as p4c
except ImportError:
_has_py4cyto = False
# ** external packages config
# ** Huggingface Hub caching
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = 'set'
# ** py4cytoscape config
if _has_py4cyto:
p4c.set_summary_logger(False)
p4c.py4cytoscape_logger.detail_logger.setLevel('ERROR')
p4c.py4cytoscape_logger.detail_logger.removeHandler(
p4c.py4cytoscape_logger.detail_handler
)
p4c.py4cytoscape_logger.detail_logger.addHandler(logging.NullHandler())
# ** lang-main config
BASE_FOLDERNAME: Final[str] = 'lang-main'
CONFIG_FILENAME: Final[str] = 'lang_main_config.toml'
CYTO_STYLESHEET_FILENAME: Final[str] = r'cytoscape_config/lang_main.xml'
PREFER_INTERNAL_CONFIG: Final[bool] = False
PKG_DIR: Final[Path] = Path(__file__).parent
STOP_FOLDER: Final[str] = 'python'
def load_toml_config(
@@ -14,4 +43,46 @@ def load_toml_config(
with open(path_to_toml, 'rb') as f:
data = tomllib.load(f)
print('Loaded TOML config file successfully.', file=sys.stderr, flush=True)
return data
# ** load config data: internal/external
def get_config_paths(
root_folder: Path,
cfg_name: str,
cyto_stylesheet_name: str,
) -> tuple[Path, Path]:
cfg_path_internal = (root_folder / cfg_name).resolve()
cyto_stylesheet_path = (root_folder / cyto_stylesheet_name).resolve()
return cfg_path_internal, cyto_stylesheet_path
def load_cfg(
starting_path: Path,
glob_pattern: str,
stop_folder_name: str | None,
cfg_path_internal: Path,
prefer_internal_config: bool = False,
) -> dict[str, Any]:
cfg_path: Path | None
# look for external config first, if not found use internal one
if prefer_internal_config:
cfg_path = cfg_path_internal
else:
cfg_path = search_cwd(glob_pattern)
if cfg_path is None:
cfg_path = search_iterative(
starting_path=starting_path,
glob_pattern=glob_pattern,
stop_folder_name=stop_folder_name,
)
# backup: use internal config
if cfg_path is None:
cfg_path = cfg_path_internal
config = load_toml_config(path_to_toml=cfg_path)
return config.copy()

View File

@@ -1,6 +1,6 @@
# lang_main: Config file
[info]
pkg = 'lang_main'
pkg = 'lang_main_internal'
[paths]
inputs = './inputs/'

View File

@@ -60,7 +60,7 @@ def load_spacy(
model_name: str,
) -> SpacyModel:
try:
spacy_model_obj = importlib.import_module(SPACY_MODEL_NAME)
spacy_model_obj = importlib.import_module(model_name)
except ModuleNotFoundError:
raise LanguageModelNotFoundError(
(

View File

@@ -148,14 +148,10 @@ class Pipeline(BasePipeline):
) -> None:
# init base class
super().__init__(name=name, working_dir=working_dir)
# name of pipeline
self.name = name
# working directory for pipeline == output path
self.working_dir = working_dir
# if not self.working_dir.exists():
# self.working_dir.mkdir(parents=True)
# container for actions to perform during pass
self.actions_kwargs: list[dict[str, Any]] = []
self.save_results: ResultHandling = []
@@ -192,28 +188,6 @@ class Pipeline(BasePipeline):
else:
self.panic_wrong_action_type(action=action, compatible_type=Callable.__name__)
# TODO: add multiple entries by utilising simple add method
"""
def add_multi(
self,
action: FunctionType | Sequence[FunctionType],
action_kwargs: dict[str, Any] | Sequence[dict[str, Any]],
) -> None:
if isinstance(action, Sequence):
if len(action_kwargs) != len(action):
raise ValueError(("Sequences for actions and corresponding keyword "
"arguments must have the same length."))
self.actions.extend(action)
self.actions_kwargs.extend(action_kwargs)
elif isinstance(action, FunctionType):
self.actions.append(action)
self.actions_kwargs.append(action_kwargs)
else:
raise TypeError(("Action must be function or sequence of functions, "
f"but is of type >>{type(action)}<<."))
"""
def get_result_path(
self,
action_idx: int,
@@ -253,11 +227,7 @@ class Pipeline(BasePipeline):
action_idx: int,
filename: str | None,
) -> None:
# target_filename = f'Pipe-{self.name}_Step-{self.curr_proc_idx}_' + filename
# target_path = self.working_dir.joinpath(target_filename)
# target_path = target_path.with_suffix('.pkl')
target_path, _ = self.get_result_path(action_idx, filename)
# saving file locally
save_pickle(obj=self._intermediate_result, path=target_path)
@override
@@ -270,6 +240,7 @@ class Pipeline(BasePipeline):
if self.load_results[idx][0]:
filename = self.load_results[idx][1]
ret = self.load_step(action_idx=idx, filename=filename)
self._intermediate_result = ret
logger.info(
'[No Calculation] Loaded result for action >>%s<< successfully',
self.action_names[idx],
@@ -279,18 +250,12 @@ class Pipeline(BasePipeline):
# calculation
if idx == 0:
args = starting_values
# ret = action(*starting_values, **action_kwargs)
else:
args = ret
if args is not None:
ret = action(*args, **action_kwargs)
# elif args is not None:
# ret = action(*args)
# elif args is None and action_kwargs:
# ret = action(**action_kwargs)
else:
# ret = action()
ret = action(**action_kwargs)
if ret is not None and not isinstance(ret, tuple):

View File

@@ -60,7 +60,7 @@ SPACY_MODEL = m_load.instantiate_model(
# ** pipeline configuration
# ** target feature preparation
def build_base_target_feature_pipe() -> Pipeline:
pipe_target_feat = Pipeline(name='TargetFeature', working_dir=SAVE_PATH_FOLDER)
pipe_target_feat = Pipeline(name='Target_Feature', working_dir=SAVE_PATH_FOLDER)
pipe_target_feat.add(
load_raw_data,
{
@@ -185,7 +185,15 @@ def build_tk_graph_render_pipe(
) -> Pipeline:
# optional dependency: late import
# raises exception if necessary modules are not found
from lang_main.render import cytoscape as cyto
try:
from lang_main.render import cytoscape as cyto
except ImportError:
raise ImportError(
(
'Dependencies for Cytoscape interaction not found.'
'Install package with optional dependencies.'
)
)
pipe_graph_rendering = Pipeline(
name='Graph_Static-Rendering',

View File

@@ -60,7 +60,7 @@ def verify_connection() -> None:
"""
try:
p4c.cytoscape_ping()
except CyError as error:
except CyError as error: # pragma: no cover
logger.error('[CyError] CyREST or Cytoscape version not supported.')
raise error
except RequestException as error:
@@ -164,6 +164,7 @@ def verify_table_property(
network_name: str = CYTO_BASE_NETWORK_NAME,
) -> bool:
table = p4c.get_table_columns(table=table_type, network=network_name)
logger.debug('Table >>%s<< wiht columns: %s', table, table.columns)
return property in table.columns
@@ -174,7 +175,7 @@ def analyse_network(
) -> None:
node_table = p4c.get_table_columns(table='node', network=network_name)
net_analyse_possible: bool = True
if len(node_table) < 4:
if len(node_table) < 4: # pragma: no cover
net_analyse_possible = False
if net_analyse_possible:
@@ -186,7 +187,7 @@ def analyse_network(
* node_table['BetweennessCentrality']
* node_table['stress_norm']
)
else:
else: # pragma: no cover
node_table[CYTO_SELECTION_PROPERTY] = 1
p4c.load_table_data(node_table, data_key_column='name', network=network_name)
@@ -231,7 +232,7 @@ def export_network_to_image(
by default 'A4'
"""
logger.debug('Exporting image to file...')
if not target_folder.exists():
if not target_folder.exists(): # pragma: no cover
target_folder.mkdir(parents=True)
dst_file_pth = (target_folder / filename).with_suffix(f'.{filetype.lower()}')
@@ -252,13 +253,6 @@ def export_network_to_image(
export_text_as_font=text_as_font,
page_size=pdf_export_page_size,
)
# TODO remove if Cytoscape >= 3.10.* is running in container
# p4c.export_image(
# filename=filename,
# type=filetype,
# network=network_name,
# overwrite_file=True,
# )
logger.debug('Exported image to sandbox.')
logger.debug('Transferring image from sandbox to target destination...')
sandbox_filename = f'{filename}.{filetype.lower()}'
@@ -328,6 +322,7 @@ def apply_style_to_network(
"""
logger.debug('Applying style to network...')
styles_avail = cast(list[str], p4c.get_visual_style_names())
logger.debug('Available styles: %s', styles_avail)
if style_name not in styles_avail:
if not pth_to_stylesheet.exists():
# existence for standard path verified at import, but not for other
@@ -348,12 +343,6 @@ def apply_style_to_network(
p4c.set_visual_style(style_name, network=network_name)
# node size mapping, only if needed property is available
# TODO check removal
# size_prop_available = verify_table_property(
# property=node_size_property,
# network_name=network_name,
# )
# if size_prop_available:
scheme = p4c.scheme_c_number_continuous(
start_value=min_node_size, end_value=max_node_size
)
@@ -365,13 +354,6 @@ def apply_style_to_network(
default_number=min_node_size,
)
p4c.set_node_size_mapping(**node_size_map)
# TODO removal
# else:
# node_table = p4c.get_table_columns(table='node', network=network_name)
# nodes_SUID = node_table['SUID'].to_list()
# p4c.set_node_size_bypass(nodes_SUID, new_sizes=min_node_size, network=network_name)
# p4c.set_visual_style(style_name, network=network_name)
# time.sleep(1) # if not waited image export could be without applied style
fit_content(network_name=network_name)
logger.debug('Style application to network successful.')
@@ -402,7 +384,7 @@ def get_subgraph_node_selection(
node_table = p4c.get_table_columns(table='node', network=network_name)
node_table = node_table.sort_values(by=CYTO_SELECTION_PROPERTY, ascending=False)
p4c.load_table_data(node_table, data_key_column='name', network=network_name)
node_table_choice = node_table.iloc[:num_subgraphs, :]
node_table_choice = node_table.iloc[:num_subgraphs]
logger.debug('Selection of nodes for subgraph generation successful.')
return node_table_choice['SUID'].to_list()

View File

@@ -9,8 +9,8 @@ re_parenthesis_1 = re.compile(r'[(]+')
re_parenthesis_2 = re.compile(r'[)]+')
@cy_log
def select_edges_connecting_selected_nodes(network=None, base_url=DEFAULT_BASE_URL): # noqa: F405
@cy_log # pragma: no cover
def select_edges_connecting_selected_nodes(network=None, base_url=DEFAULT_BASE_URL): # noqa: F405 # pragma: no cover
"""Select edges in a Cytoscape Network connecting the selected nodes, including self loops connecting single nodes.
Any edges selected beforehand are deselected before any new edges are selected

110
src/lang_main/search.py Normal file
View File

@@ -0,0 +1,110 @@
from pathlib import Path
def search_cwd(
glob_pattern: str,
) -> Path | None:
"""Searches the current working directory and looks for files
matching the glob pattern.
Returns the first match encountered.
Parameters
----------
glob_pattern : str, optional
pattern to look for, first match will be returned
Returns
-------
Path | None
Path if corresponding object was found, None otherwise
"""
path_found: Path | None = None
res = tuple(Path.cwd().glob(glob_pattern))
if res:
path_found = res[0]
return path_found
def search_iterative(
starting_path: Path,
glob_pattern: str,
stop_folder_name: str | None = None,
) -> Path | None:
"""Iteratively searches the parent directories of the starting path
and look for files matching the glob pattern. The starting path is not
searched, only its parents. Therefore the starting path can also point
to a file. The folder in which it is placed in will be searched.
Returns the first match encountered.
The parent of the stop folder will be searched if it exists.
Parameters
----------
starting_path : Path
non-inclusive starting path
glob_pattern : str, optional
pattern to look for, first match will be returned
stop_folder_name : str, optional
name of the last folder in the directory tree to search, by default None
Returns
-------
Path | None
Path if corresponding object was found, None otherwise
"""
file_path: Path | None = None
stop_folder_reached: bool = False
for search_path in starting_path.parents:
res = tuple(search_path.glob(glob_pattern))
if res:
file_path = res[0]
break
elif stop_folder_reached:
break
if stop_folder_name is not None and search_path.name == stop_folder_name:
# library is placed inside a whole python installation for deployment
# if this folder is reached, only look up one parent above
stop_folder_reached = True
return file_path
def search_base_path(
starting_path: Path,
stop_folder_name: str | None = None,
) -> Path | None:
"""Iteratively searches the parent directories of the starting path
and look for folders matching the given name. If a match is encountered,
the parent path will be returned.
Example:
starting_path = path/to/start/folder
stop_folder_name = 'to'
returned path = 'path/'
Parameters
----------
starting_path : Path
non-inclusive starting path
stop_folder_name : str, optional
name of the last folder in the directory tree to search, by default None
Returns
-------
Path | None
Path if corresponding base path was found, None otherwise
"""
stop_folder_path: Path | None = None
base_path: Path | None = None
for search_path in starting_path.parents:
if stop_folder_name is not None and search_path.name == stop_folder_name:
# library is placed inside a whole python installation for deployment
# only look up to this folder
stop_folder_path = search_path
break
if stop_folder_path is not None:
base_path = stop_folder_path.parent
return base_path