lang-main/scripts/analyse_dataset.py
2024-07-10 16:52:16 +02:00

138 lines
4.3 KiB
Python

import typing
from typing import cast
from pandas import DataFrame
from lang_main.analysis.graphs import Graph, TokenGraph, save_to_GraphML
from lang_main.constants import (
PATH_TO_DATASET,
SAVE_PATH_FOLDER,
SKIP_GRAPH_POSTPROCESSING,
SKIP_GRAPH_RESCALING,
SKIP_PREPROCESSING,
SKIP_TIME_ANALYSIS,
SKIP_TOKEN_ANALYSIS,
)
from lang_main.io import create_saving_folder, get_entry_point, load_pickle
from lang_main.pipelines.base import PipelineContainer
from lang_main.pipelines.predefined import (
build_base_target_feature_pipe,
build_merge_duplicates_pipe,
build_timeline_pipe,
build_tk_graph_pipe,
build_tk_graph_post_pipe,
build_tk_graph_rescaling,
)
from lang_main.types import (
EntryPoints,
ObjectID,
PandasIndex,
SpacyDoc,
TimelineCandidates,
)
# ** build pipelines
pipe_target_feat = build_base_target_feature_pipe()
pipe_merge = build_merge_duplicates_pipe()
pipe_token_analysis = build_tk_graph_pipe()
pipe_graph_postprocessing = build_tk_graph_post_pipe()
pipe_graph_rescaling = build_tk_graph_rescaling()
pipe_timeline = build_timeline_pipe()
# ** preprocessing pipeline
def run_preprocessing() -> None:
create_saving_folder(
saving_path_folder=SAVE_PATH_FOLDER,
overwrite_existing=False,
)
# run pipelines
ret = typing.cast(
tuple[DataFrame], pipe_target_feat.run(starting_values=(PATH_TO_DATASET,))
)
target_feat_data = ret[0]
_ = typing.cast(tuple[DataFrame], pipe_merge.run(starting_values=(target_feat_data,)))
# ** token analysis
def run_token_analysis() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TOKEN_ANALYSIS)
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
preprocessed_data = loaded_results[0]
# build token graph
(tk_graph, docs_mapping) = typing.cast(
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
pipe_token_analysis.run(starting_values=(preprocessed_data,)),
)
tk_graph.to_GraphML(SAVE_PATH_FOLDER, filename='TokenGraph', directed=False)
def run_graph_postprocessing() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST)
loaded_results = cast(
tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None],
load_pickle(entry_point_path),
)
tk_graph = loaded_results[0]
# filter graph by edge weight and remove single nodes (no connection)
ret = cast(tuple[TokenGraph], pipe_graph_postprocessing.run(starting_values=(tk_graph,)))
tk_graph_filtered = ret[0]
tk_graph_filtered.to_GraphML(
SAVE_PATH_FOLDER, filename='TokenGraph-filtered', directed=False
)
def run_graph_edge_rescaling() -> None:
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_ANALYSIS)
loaded_results = cast(
tuple[TokenGraph],
load_pickle(entry_point_path),
)
tk_graph = loaded_results[0]
ret = cast(
tuple[TokenGraph, Graph], pipe_graph_rescaling.run(starting_values=(tk_graph,))
)
undirected_rescaled_graph = ret[1]
save_to_GraphML(
undirected_rescaled_graph,
saving_path=SAVE_PATH_FOLDER,
filename='TokenGraph-undirected-rescaled',
)
# ** time analysis
def run_time_analysis() -> None:
# load entry point
entry_point_path = get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE)
loaded_results = cast(tuple[DataFrame], load_pickle(entry_point_path))
preprocessed_data = loaded_results[0]
_ = cast(
tuple[TimelineCandidates, dict[ObjectID, str]],
pipe_timeline.run(starting_values=(preprocessed_data,)),
)
def build_pipeline_container() -> PipelineContainer:
container = PipelineContainer(
name='Pipeline-Container-Base', working_dir=SAVE_PATH_FOLDER
)
container.add(run_preprocessing, skip=SKIP_PREPROCESSING)
container.add(run_token_analysis, skip=SKIP_TOKEN_ANALYSIS)
container.add(run_graph_postprocessing, skip=SKIP_GRAPH_POSTPROCESSING)
container.add(run_graph_edge_rescaling, skip=SKIP_GRAPH_RESCALING)
container.add(run_time_analysis, skip=SKIP_TIME_ANALYSIS)
return container
def main() -> None:
procedure = build_pipeline_container()
procedure.run()
if __name__ == '__main__':
main()