lang-main/scripts/analyse_dataset.py
2024-09-04 18:02:12 +02:00

194 lines
6.1 KiB
Python

import cProfile
import pstats
import typing
from pathlib import Path
from typing import Final, cast
from pandas import DataFrame
from lang_main.analysis.graphs import (
Graph,
TokenGraph,
save_to_GraphML,
)
from lang_main.constants import (
CYTO_BASE_NETWORK_NAME,
PATH_TO_DATASET,
SAVE_PATH_FOLDER,
SKIP_GRAPH_POSTPROCESSING,
SKIP_GRAPH_RESCALING,
SKIP_GRAPH_STATIC_RENDERING,
SKIP_PREPROCESSING,
SKIP_TIME_ANALYSIS,
SKIP_TOKEN_ANALYSIS,
)
from lang_main.io import create_saving_folder, get_entry_point, load_pickle
from lang_main.pipelines.base import PipelineContainer
from lang_main.pipelines.predefined import (
build_base_target_feature_pipe,
build_merge_duplicates_pipe,
build_timeline_pipe,
build_tk_graph_pipe,
build_tk_graph_post_pipe,
build_tk_graph_render_pipe,
build_tk_graph_rescaling_pipe,
)
from lang_main.types import (
EntryPoints,
ObjectID,
PandasIndex,
SpacyDoc,
TimelineCandidates,
)
# ** profiling
ONLY_PROFILING_REPORT: Final[bool] = True
USE_PROFILING: Final[bool] = True
PROFILE_REPORT_NAME: Final[str] = 'prof_report.profdata'
# ** build pipelines
pipe_target_feat = build_base_target_feature_pipe()
pipe_merge = build_merge_duplicates_pipe()
pipe_token_analysis = build_tk_graph_pipe()
pipe_graph_postprocessing = build_tk_graph_post_pipe()
pipe_graph_rescaling = build_tk_graph_rescaling_pipe(
save_result=True,
exit_point=EntryPoints.TK_GRAPH_ANALYSIS_RESCALED,
)
pipe_static_graph_rendering = build_tk_graph_render_pipe(
with_subgraphs=True,
base_network_name=CYTO_BASE_NETWORK_NAME,
)
pipe_timeline = build_timeline_pipe()
# ** preprocessing pipeline
def run_preprocessing() -> None:
create_saving_folder(
saving_path_folder=SAVE_PATH_FOLDER,
overwrite_existing=False,
)
# run pipelines
ret = typing.cast(
tuple[DataFrame], pipe_target_feat.run(starting_values=(PATH_TO_DATASET,))
)
target_feat_data = ret[0]
_ = typing.cast(tuple[DataFrame], pipe_merge.run(starting_values=(target_feat_data,)))
# ** token analysis
def run_token_analysis() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TOKEN_ANALYSIS)
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
preprocessed_data = loaded_results[0]
# build token graph
(tk_graph, _) = typing.cast(
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
pipe_token_analysis.run(starting_values=(preprocessed_data,)),
)
tk_graph.to_GraphML(SAVE_PATH_FOLDER, filename='TokenGraph', directed=False)
def run_graph_postprocessing() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST)
loaded_results = cast(
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
load_pickle(entry_point_path),
)
tk_graph = loaded_results[0]
# filter graph by edge weight and remove single nodes (no connection)
ret = cast(tuple[TokenGraph], pipe_graph_postprocessing.run(starting_values=(tk_graph,)))
tk_graph_filtered = ret[0]
tk_graph_filtered.to_GraphML(
SAVE_PATH_FOLDER, filename='TokenGraph-filtered', directed=False
)
def run_graph_edge_rescaling() -> None:
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_ANALYSIS)
loaded_results = cast(
tuple[TokenGraph],
load_pickle(entry_point_path),
)
tk_graph = loaded_results[0]
tk_graph_rescaled, tk_graph_rescaled_undirected = cast(
tuple[TokenGraph, Graph], pipe_graph_rescaling.run(starting_values=(tk_graph,))
)
# tk_graph_rescaled = ret[0]
# tk_graph_rescaled_undirected = ret[1]
tk_graph_rescaled.to_GraphML(
SAVE_PATH_FOLDER, filename='TokenGraph-directed-rescaled', directed=False
)
save_to_GraphML(
tk_graph_rescaled_undirected,
saving_path=SAVE_PATH_FOLDER,
filename='TokenGraph-undirected-rescaled',
)
def run_static_graph_rendering() -> None:
entry_point_path = get_entry_point(
SAVE_PATH_FOLDER,
EntryPoints.TK_GRAPH_ANALYSIS_RESCALED,
)
loaded_results = cast(
tuple[TokenGraph, Graph],
load_pickle(entry_point_path),
)
tk_graph_rescaled = loaded_results[0]
tk_graph_rescaled_undirected = loaded_results[1]
_ = pipe_static_graph_rendering.run(starting_values=(tk_graph_rescaled_undirected,))
# ** time analysis
def run_time_analysis() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE)
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
preprocessed_data = loaded_results[0]
_ = cast(
tuple[TimelineCandidates, dict[ObjectID, str]],
pipe_timeline.run(starting_values=(preprocessed_data,)),
)
def build_pipeline_container() -> PipelineContainer:
container = PipelineContainer(
name='Pipeline-Container-Base', working_dir=SAVE_PATH_FOLDER
)
container.add(run_preprocessing, skip=SKIP_PREPROCESSING)
container.add(run_token_analysis, skip=SKIP_TOKEN_ANALYSIS)
container.add(run_graph_postprocessing, skip=SKIP_GRAPH_POSTPROCESSING)
container.add(run_graph_edge_rescaling, skip=SKIP_GRAPH_RESCALING)
container.add(run_static_graph_rendering, skip=SKIP_GRAPH_STATIC_RENDERING)
container.add(run_time_analysis, skip=SKIP_TIME_ANALYSIS)
return container
def main() -> None:
procedure = build_pipeline_container()
procedure.run()
if __name__ == '__main__':
report_path = Path.cwd() / 'profiling'
if not report_path.exists():
report_path.mkdir(parents=True, exist_ok=True)
report_file = report_path / PROFILE_REPORT_NAME
if ONLY_PROFILING_REPORT:
p_stats = pstats.Stats(str(report_file))
p_stats.sort_stats(pstats.SortKey.CUMULATIVE).print_stats(60)
p_stats.sort_stats('tottime').print_stats(60)
elif USE_PROFILING:
cProfile.run('main()', str(report_file))
p_stats = pstats.Stats(str(report_file))
p_stats.sort_stats(pstats.SortKey.CUMULATIVE).print_stats(30)
p_stats.sort_stats('tottime').print_stats(30)
else:
main()