import typing from typing import cast from pandas import DataFrame from lang_main.analysis.graphs import ( Graph, TokenGraph, save_to_GraphML, ) from lang_main.constants import ( CYTO_BASE_NETWORK_NAME, PATH_TO_DATASET, SAVE_PATH_FOLDER, SKIP_GRAPH_POSTPROCESSING, SKIP_GRAPH_RESCALING, SKIP_GRAPH_STATIC_RENDERING, SKIP_PREPROCESSING, SKIP_TIME_ANALYSIS, SKIP_TOKEN_ANALYSIS, ) from lang_main.io import create_saving_folder, get_entry_point, load_pickle from lang_main.pipelines.base import PipelineContainer from lang_main.pipelines.predefined import ( build_base_target_feature_pipe, build_merge_duplicates_pipe, build_timeline_pipe, build_tk_graph_pipe, build_tk_graph_post_pipe, build_tk_graph_render_pipe, build_tk_graph_rescaling_pipe, ) from lang_main.types import ( EntryPoints, ObjectID, PandasIndex, SpacyDoc, TimelineCandidates, ) # ** build pipelines pipe_target_feat = build_base_target_feature_pipe() pipe_merge = build_merge_duplicates_pipe() pipe_token_analysis = build_tk_graph_pipe() pipe_graph_postprocessing = build_tk_graph_post_pipe() pipe_graph_rescaling = build_tk_graph_rescaling_pipe( save_result=True, exit_point=EntryPoints.TK_GRAPH_ANALYSIS_RESCALED, ) pipe_static_graph_rendering = build_tk_graph_render_pipe( with_subgraphs=True, base_network_name=CYTO_BASE_NETWORK_NAME, ) pipe_timeline = build_timeline_pipe() # ** preprocessing pipeline def run_preprocessing() -> None: create_saving_folder( saving_path_folder=SAVE_PATH_FOLDER, overwrite_existing=False, ) # run pipelines ret = typing.cast( tuple[DataFrame], pipe_target_feat.run(starting_values=(PATH_TO_DATASET,)) ) target_feat_data = ret[0] _ = typing.cast(tuple[DataFrame], pipe_merge.run(starting_values=(target_feat_data,))) # ** token analysis def run_token_analysis() -> None: # load entry point entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TOKEN_ANALYSIS) loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path)) preprocessed_data = loaded_results[0] # build token graph (tk_graph, _) = typing.cast( tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None], pipe_token_analysis.run(starting_values=(preprocessed_data,)), ) tk_graph.to_GraphML(SAVE_PATH_FOLDER, filename='TokenGraph', directed=False) def run_graph_postprocessing() -> None: # load entry point entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST) loaded_results = cast( tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None], load_pickle(entry_point_path), ) tk_graph = loaded_results[0] # filter graph by edge weight and remove single nodes (no connection) ret = cast(tuple[TokenGraph], pipe_graph_postprocessing.run(starting_values=(tk_graph,))) tk_graph_filtered = ret[0] tk_graph_filtered.to_GraphML( SAVE_PATH_FOLDER, filename='TokenGraph-filtered', directed=False ) def run_graph_edge_rescaling() -> None: entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_ANALYSIS) loaded_results = cast( tuple[TokenGraph], load_pickle(entry_point_path), ) tk_graph = loaded_results[0] tk_graph_rescaled, tk_graph_rescaled_undirected = cast( tuple[TokenGraph, Graph], pipe_graph_rescaling.run(starting_values=(tk_graph,)) ) # tk_graph_rescaled = ret[0] # tk_graph_rescaled_undirected = ret[1] tk_graph_rescaled.to_GraphML( SAVE_PATH_FOLDER, filename='TokenGraph-directed-rescaled', directed=False ) save_to_GraphML( tk_graph_rescaled_undirected, saving_path=SAVE_PATH_FOLDER, filename='TokenGraph-undirected-rescaled', ) def run_static_graph_rendering() -> None: entry_point_path = get_entry_point( SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_ANALYSIS_RESCALED, ) loaded_results = cast( tuple[TokenGraph, Graph], load_pickle(entry_point_path), ) tk_graph_rescaled = loaded_results[0] tk_graph_rescaled_undirected = loaded_results[1] _ = pipe_static_graph_rendering.run(starting_values=(tk_graph_rescaled_undirected,)) # ** time analysis def run_time_analysis() -> None: # load entry point entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE) loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path)) preprocessed_data = loaded_results[0] _ = cast( tuple[TimelineCandidates, dict[ObjectID, str]], pipe_timeline.run(starting_values=(preprocessed_data,)), ) def build_pipeline_container() -> PipelineContainer: container = PipelineContainer( name='Pipeline-Container-Base', working_dir=SAVE_PATH_FOLDER ) container.add(run_preprocessing, skip=SKIP_PREPROCESSING) container.add(run_token_analysis, skip=SKIP_TOKEN_ANALYSIS) container.add(run_graph_postprocessing, skip=SKIP_GRAPH_POSTPROCESSING) container.add(run_graph_edge_rescaling, skip=SKIP_GRAPH_RESCALING) container.add(run_static_graph_rendering, skip=SKIP_GRAPH_STATIC_RENDERING) container.add(run_time_analysis, skip=SKIP_TIME_ANALYSIS) return container def main() -> None: procedure = build_pipeline_container() procedure.run() if __name__ == '__main__': main()