165 lines
5.1 KiB
Python
165 lines
5.1 KiB
Python
import typing
|
|
from typing import cast
|
|
|
|
from pandas import DataFrame
|
|
|
|
from lang_main.analysis.graphs import (
|
|
Graph,
|
|
TokenGraph,
|
|
save_to_GraphML,
|
|
)
|
|
from lang_main.constants import (
|
|
PATH_TO_DATASET,
|
|
SAVE_PATH_FOLDER,
|
|
SKIP_GRAPH_POSTPROCESSING,
|
|
SKIP_GRAPH_RESCALING,
|
|
SKIP_GRAPH_STATIC_RENDERING,
|
|
SKIP_PREPROCESSING,
|
|
SKIP_TIME_ANALYSIS,
|
|
SKIP_TOKEN_ANALYSIS,
|
|
)
|
|
from lang_main.io import create_saving_folder, get_entry_point, load_pickle
|
|
from lang_main.pipelines.base import PipelineContainer
|
|
from lang_main.pipelines.predefined import (
|
|
build_base_target_feature_pipe,
|
|
build_merge_duplicates_pipe,
|
|
build_timeline_pipe,
|
|
build_tk_graph_pipe,
|
|
build_tk_graph_post_pipe,
|
|
build_tk_graph_rendering_pipe,
|
|
build_tk_graph_rescaling_pipe,
|
|
)
|
|
from lang_main.types import (
|
|
EntryPoints,
|
|
ObjectID,
|
|
PandasIndex,
|
|
SpacyDoc,
|
|
TimelineCandidates,
|
|
)
|
|
|
|
# ** build pipelines
|
|
pipe_target_feat = build_base_target_feature_pipe()
|
|
pipe_merge = build_merge_duplicates_pipe()
|
|
pipe_token_analysis = build_tk_graph_pipe()
|
|
pipe_graph_postprocessing = build_tk_graph_post_pipe()
|
|
pipe_graph_rescaling = build_tk_graph_rescaling_pipe()
|
|
pipe_static_graph_rendering = build_tk_graph_rendering_pipe()
|
|
pipe_timeline = build_timeline_pipe()
|
|
|
|
|
|
# ** preprocessing pipeline
|
|
def run_preprocessing() -> None:
|
|
create_saving_folder(
|
|
saving_path_folder=SAVE_PATH_FOLDER,
|
|
overwrite_existing=False,
|
|
)
|
|
# run pipelines
|
|
ret = typing.cast(
|
|
tuple[DataFrame], pipe_target_feat.run(starting_values=(PATH_TO_DATASET,))
|
|
)
|
|
target_feat_data = ret[0]
|
|
_ = typing.cast(tuple[DataFrame], pipe_merge.run(starting_values=(target_feat_data,)))
|
|
|
|
|
|
# ** token analysis
|
|
def run_token_analysis() -> None:
|
|
# load entry point
|
|
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TOKEN_ANALYSIS)
|
|
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
|
|
preprocessed_data = loaded_results[0]
|
|
# build token graph
|
|
(tk_graph, _) = typing.cast(
|
|
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
|
|
pipe_token_analysis.run(starting_values=(preprocessed_data,)),
|
|
)
|
|
tk_graph.to_GraphML(SAVE_PATH_FOLDER, filename='TokenGraph', directed=False)
|
|
|
|
|
|
def run_graph_postprocessing() -> None:
|
|
# load entry point
|
|
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST)
|
|
loaded_results = cast(
|
|
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
|
|
load_pickle(entry_point_path),
|
|
)
|
|
tk_graph = loaded_results[0]
|
|
# filter graph by edge weight and remove single nodes (no connection)
|
|
ret = cast(tuple[TokenGraph], pipe_graph_postprocessing.run(starting_values=(tk_graph,)))
|
|
tk_graph_filtered = ret[0]
|
|
tk_graph_filtered.to_GraphML(
|
|
SAVE_PATH_FOLDER, filename='TokenGraph-filtered', directed=False
|
|
)
|
|
|
|
|
|
def run_graph_edge_rescaling() -> None:
|
|
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_ANALYSIS)
|
|
loaded_results = cast(
|
|
tuple[TokenGraph],
|
|
load_pickle(entry_point_path),
|
|
)
|
|
tk_graph = loaded_results[0]
|
|
ret = cast(
|
|
tuple[TokenGraph, Graph], pipe_graph_rescaling.run(starting_values=(tk_graph,))
|
|
)
|
|
tk_graph_rescaled = ret[0]
|
|
tk_graph_rescaled_undirected = ret[1]
|
|
tk_graph_rescaled.to_GraphML(
|
|
SAVE_PATH_FOLDER, filename='TokenGraph-directed-rescaled', directed=False
|
|
)
|
|
save_to_GraphML(
|
|
tk_graph_rescaled_undirected,
|
|
saving_path=SAVE_PATH_FOLDER,
|
|
filename='TokenGraph-undirected-rescaled',
|
|
)
|
|
|
|
|
|
def run_static_graph_rendering() -> None:
|
|
entry_point_path = get_entry_point(
|
|
SAVE_PATH_FOLDER,
|
|
EntryPoints.TK_GRAPH_ANALYSIS_RESCALED,
|
|
)
|
|
loaded_results = cast(
|
|
tuple[TokenGraph, Graph],
|
|
load_pickle(entry_point_path),
|
|
)
|
|
tk_graph_rescaled = loaded_results[0]
|
|
tk_graph_rescaled_undirected = loaded_results[1]
|
|
|
|
_ = pipe_static_graph_rendering.run(starting_values=(tk_graph_rescaled_undirected,))
|
|
|
|
|
|
# ** time analysis
|
|
def run_time_analysis() -> None:
|
|
# load entry point
|
|
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE)
|
|
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
|
|
preprocessed_data = loaded_results[0]
|
|
|
|
_ = cast(
|
|
tuple[TimelineCandidates, dict[ObjectID, str]],
|
|
pipe_timeline.run(starting_values=(preprocessed_data,)),
|
|
)
|
|
|
|
|
|
def build_pipeline_container() -> PipelineContainer:
|
|
container = PipelineContainer(
|
|
name='Pipeline-Container-Base', working_dir=SAVE_PATH_FOLDER
|
|
)
|
|
container.add(run_preprocessing, skip=SKIP_PREPROCESSING)
|
|
container.add(run_token_analysis, skip=SKIP_TOKEN_ANALYSIS)
|
|
container.add(run_graph_postprocessing, skip=SKIP_GRAPH_POSTPROCESSING)
|
|
container.add(run_graph_edge_rescaling, skip=SKIP_GRAPH_RESCALING)
|
|
container.add(run_static_graph_rendering, skip=SKIP_GRAPH_STATIC_RENDERING)
|
|
container.add(run_time_analysis, skip=SKIP_TIME_ANALYSIS)
|
|
|
|
return container
|
|
|
|
|
|
def main() -> None:
|
|
procedure = build_pipeline_container()
|
|
procedure.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|