using dash-cytoscape
This commit is contained in:
51
src/lang_main/__init__ copy.py
Normal file
51
src/lang_main/__init__ copy.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import inspect
|
||||
import logging
|
||||
import shutil
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from time import gmtime
|
||||
from typing import Any, Final
|
||||
import warnings
|
||||
|
||||
from lang_main.io import load_toml_config
|
||||
|
||||
__all__ = [
|
||||
'CALLER_PATH',
|
||||
]
|
||||
|
||||
logging.Formatter.converter = gmtime
|
||||
LOG_FMT: Final[str] = '%(asctime)s | %(module)s:%(levelname)s | %(message)s'
|
||||
LOG_DATE_FMT: Final[str] = '%Y-%m-%d %H:%M:%S +0000'
|
||||
logging.basicConfig(
|
||||
stream=sys.stdout,
|
||||
format=LOG_FMT,
|
||||
datefmt=LOG_DATE_FMT,
|
||||
)
|
||||
|
||||
CONFIG_FILENAME: Final[str] = 'lang_main_config.toml'
|
||||
USE_INTERNAL_CONFIG: Final[bool] = True
|
||||
pkg_dir = Path(__file__).parent
|
||||
cfg_path_internal = pkg_dir / CONFIG_FILENAME
|
||||
caller_file = Path(inspect.stack()[-1].filename)
|
||||
CALLER_PATH: Final[Path] = caller_file.parent.resolve()
|
||||
|
||||
# load config data: internal/external
|
||||
if USE_INTERNAL_CONFIG:
|
||||
loaded_cfg = load_toml_config(path_to_toml=cfg_path_internal)
|
||||
else:
|
||||
cfg_path_external = CALLER_PATH / CONFIG_FILENAME
|
||||
if not caller_file.exists():
|
||||
warnings.warn('Caller file could not be correctly retrieved.')
|
||||
if not cfg_path_external.exists():
|
||||
shutil.copy(cfg_path_internal, cfg_path_external)
|
||||
sys.exit(
|
||||
(
|
||||
'No config file was found. A new one with default values was created '
|
||||
'in the execution path. Please fill in the necessary values and '
|
||||
'restart the programm.'
|
||||
)
|
||||
)
|
||||
# raise NotImplementedError("External config data not implemented yet.")
|
||||
loaded_cfg = load_toml_config(path_to_toml=cfg_path_external)
|
||||
|
||||
CONFIG: Final[dict[str, Any]] = loaded_cfg.copy()
|
||||
@@ -1,4 +1,3 @@
|
||||
import inspect
|
||||
import logging
|
||||
import shutil
|
||||
import sys
|
||||
@@ -8,10 +7,6 @@ from typing import Any, Final
|
||||
|
||||
from lang_main.io import load_toml_config
|
||||
|
||||
__all__ = [
|
||||
'CALLER_PATH',
|
||||
]
|
||||
|
||||
logging.Formatter.converter = gmtime
|
||||
LOG_FMT: Final[str] = '%(asctime)s | %(module)s:%(levelname)s | %(message)s'
|
||||
LOG_DATE_FMT: Final[str] = '%Y-%m-%d %H:%M:%S +0000'
|
||||
@@ -24,17 +19,15 @@ logging.basicConfig(
|
||||
CONFIG_FILENAME: Final[str] = 'lang_main_config.toml'
|
||||
USE_INTERNAL_CONFIG: Final[bool] = False
|
||||
pkg_dir = Path(__file__).parent
|
||||
cfg_path_internal = pkg_dir / CONFIG_FILENAME
|
||||
caller_file = Path(inspect.stack()[-1].filename)
|
||||
CALLER_PATH: Final[Path] = caller_file.parent.resolve()
|
||||
cfg_path_internal = (pkg_dir / CONFIG_FILENAME).resolve()
|
||||
# caller_file = Path(inspect.stack()[-1].filename)
|
||||
# CALLER_PATH: Final[Path] = caller_file.parent.resolve()
|
||||
|
||||
# load config data: internal/external
|
||||
if USE_INTERNAL_CONFIG:
|
||||
loaded_cfg = load_toml_config(path_to_toml=cfg_path_internal)
|
||||
else:
|
||||
cfg_path_external = CALLER_PATH / CONFIG_FILENAME
|
||||
if not caller_file.exists():
|
||||
raise FileNotFoundError('Caller file could not be correctly retrieved.')
|
||||
cfg_path_external = (Path.cwd() / CONFIG_FILENAME).resolve()
|
||||
if not cfg_path_external.exists():
|
||||
shutil.copy(cfg_path_internal, cfg_path_external)
|
||||
sys.exit(
|
||||
|
||||
@@ -3,7 +3,7 @@ import sys
|
||||
import typing
|
||||
from collections.abc import Hashable, Iterable
|
||||
from pathlib import Path
|
||||
from typing import Any, Final, Literal, Self, overload
|
||||
from typing import Any, Final, Literal, Self, cast, overload
|
||||
|
||||
import networkx as nx
|
||||
import numpy as np
|
||||
@@ -13,6 +13,12 @@ from pandas import DataFrame
|
||||
|
||||
from lang_main.io import load_pickle, save_pickle
|
||||
from lang_main.loggers import logger_graphs as logger
|
||||
from lang_main.types import (
|
||||
CytoscapeData,
|
||||
EdgeWeight,
|
||||
NodeTitle,
|
||||
WeightData,
|
||||
)
|
||||
|
||||
# TODO change logging behaviour, add logging to file
|
||||
LOGGING_DEFAULT: Final[bool] = False
|
||||
@@ -67,7 +73,7 @@ def update_graph(
|
||||
batch: Iterable[tuple[Hashable, Hashable]] | None = None,
|
||||
parent: Hashable | None = None,
|
||||
child: Hashable | None = None,
|
||||
weight_connection: int = 1,
|
||||
weight_connection: int | None = None,
|
||||
) -> None:
|
||||
# !! not necessary to check for existence of nodes
|
||||
# !! feature already implemented in NetworkX ``add_edge``
|
||||
@@ -78,6 +84,8 @@ def update_graph(
|
||||
if child not in graph:
|
||||
graph.add_node(child)
|
||||
"""
|
||||
if weight_connection is None:
|
||||
weight_connection = 1
|
||||
# check if edge not in Graph
|
||||
if batch is not None:
|
||||
graph.add_edges_from(batch, weight=weight_connection)
|
||||
@@ -116,6 +124,51 @@ def convert_graph_to_undirected(
|
||||
return graph_undir
|
||||
|
||||
|
||||
def convert_graph_to_cytoscape(
|
||||
graph: Graph | DiGraph,
|
||||
) -> tuple[list[CytoscapeData], WeightData]:
|
||||
cyto_data: list[CytoscapeData] = []
|
||||
# iterate over nodes
|
||||
nodes = cast(Iterable[NodeTitle], graph.nodes)
|
||||
for i, node in enumerate(nodes):
|
||||
node_data: CytoscapeData = {
|
||||
'data': {
|
||||
'id': node,
|
||||
'label': node,
|
||||
}
|
||||
}
|
||||
cyto_data.append(node_data)
|
||||
# iterate over edges
|
||||
weights: set[int] = set()
|
||||
|
||||
edges = cast(
|
||||
Iterable[
|
||||
tuple[
|
||||
NodeTitle,
|
||||
NodeTitle,
|
||||
EdgeWeight,
|
||||
]
|
||||
],
|
||||
graph.edges.data('weight', default=1), # type: ignore
|
||||
)
|
||||
for i, (source, target, weight) in enumerate(edges):
|
||||
weights.add(weight)
|
||||
edge_data: CytoscapeData = {
|
||||
'data': {
|
||||
'source': source,
|
||||
'target': target,
|
||||
'weight': weight,
|
||||
}
|
||||
}
|
||||
cyto_data.append(edge_data)
|
||||
|
||||
min_weight = min(weights)
|
||||
max_weight = max(weights)
|
||||
weight_metadata: WeightData = {'min': min_weight, 'max': max_weight}
|
||||
|
||||
return cyto_data, weight_metadata
|
||||
|
||||
|
||||
class TokenGraph(DiGraph):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -200,7 +253,9 @@ class TokenGraph(DiGraph):
|
||||
|
||||
@overload
|
||||
def to_undirected(
|
||||
self, inplace: bool = ..., logging: bool | None = ...
|
||||
self,
|
||||
inplace: bool = ...,
|
||||
logging: bool | None = ...,
|
||||
) -> Graph | None: ...
|
||||
|
||||
def to_undirected(
|
||||
|
||||
@@ -214,20 +214,23 @@ def analyse_feature(
|
||||
unique_feature_entries = feature_entries.unique()
|
||||
|
||||
# prepare result DataFrame
|
||||
cols = ['entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids']
|
||||
cols = ['batched_idxs', 'entry', 'len', 'num_occur', 'assoc_obj_ids', 'num_assoc_obj_ids']
|
||||
result_df = pd.DataFrame(columns=cols)
|
||||
|
||||
for entry in tqdm(unique_feature_entries, mininterval=1.0):
|
||||
len_entry = len(entry)
|
||||
filt = data[target_feature] == entry
|
||||
temp = data[filt]
|
||||
batched_idxs = temp.index.to_numpy()
|
||||
assoc_obj_ids = temp['ObjektID'].unique()
|
||||
assoc_obj_ids = np.sort(assoc_obj_ids, kind='stable')
|
||||
num_assoc_obj_ids = len(assoc_obj_ids)
|
||||
num_dupl = filt.sum()
|
||||
|
||||
conc_df = pd.DataFrame(
|
||||
data=[[entry, len_entry, num_dupl, assoc_obj_ids, num_assoc_obj_ids]],
|
||||
data=[
|
||||
[batched_idxs, entry, len_entry, num_dupl, assoc_obj_ids, num_assoc_obj_ids]
|
||||
],
|
||||
columns=cols,
|
||||
)
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ from networkx import Graph
|
||||
from pandas import Series
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from torch import Tensor
|
||||
from tqdm.auto import tqdm
|
||||
|
||||
from lang_main.analysis.graphs import get_graph_metadata, update_graph
|
||||
from lang_main.types import PandasIndex
|
||||
@@ -40,9 +39,8 @@ def candidates_by_index(
|
||||
|
||||
Yields
|
||||
------
|
||||
Iterator[tuple[ObjectID, tuple[PandasIndex, PandasIndex]]]
|
||||
ObjectID and tuple of index pairs which meet the cosine
|
||||
similarity threshold
|
||||
Iterator[tuple[PandasIndex, PandasIndex]]
|
||||
tuple of index pairs which meet the cosine similarity threshold
|
||||
"""
|
||||
# embeddings
|
||||
batch = cast(list[str], data_model_input.to_list())
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import re
|
||||
from collections.abc import Iterator
|
||||
from itertools import combinations
|
||||
from typing import cast
|
||||
from typing import Literal, cast, overload
|
||||
|
||||
from dateutil.parser import parse
|
||||
from pandas import DataFrame
|
||||
from spacy.lang.de import German as GermanSpacyModel
|
||||
from spacy.language import Language as GermanSpacyModel
|
||||
from spacy.tokens.doc import Doc as SpacyDoc
|
||||
from spacy.tokens.token import Token as SpacyToken
|
||||
from tqdm.auto import tqdm
|
||||
@@ -15,6 +15,7 @@ from lang_main.analysis.graphs import (
|
||||
update_graph,
|
||||
)
|
||||
from lang_main.loggers import logger_token_analysis as logger
|
||||
from lang_main.types import PandasIndex
|
||||
|
||||
# ** POS
|
||||
# POS_OF_INTEREST: frozenset[str] = frozenset(['NOUN', 'PROPN', 'ADJ', 'VERB', 'AUX'])
|
||||
@@ -104,7 +105,7 @@ def obtain_relevant_descendants(
|
||||
def add_doc_info_to_graph(
|
||||
graph: TokenGraph,
|
||||
doc: SpacyDoc,
|
||||
weight: int,
|
||||
weight: int | None,
|
||||
) -> None:
|
||||
# iterate over sentences
|
||||
for sent in doc.sents:
|
||||
@@ -142,9 +143,121 @@ def add_doc_info_to_graph(
|
||||
)
|
||||
|
||||
|
||||
@overload
|
||||
def build_token_graph(
|
||||
data: DataFrame,
|
||||
model: GermanSpacyModel,
|
||||
*,
|
||||
target_feature: str = ...,
|
||||
weights_feature: str | None = ...,
|
||||
batch_idx_feature: str = ...,
|
||||
build_map: Literal[False],
|
||||
batch_size_model: int = ...,
|
||||
) -> tuple[TokenGraph, None]: ...
|
||||
|
||||
|
||||
@overload
|
||||
def build_token_graph(
|
||||
data: DataFrame,
|
||||
model: GermanSpacyModel,
|
||||
*,
|
||||
target_feature: str = ...,
|
||||
weights_feature: str | None = ...,
|
||||
batch_idx_feature: str = ...,
|
||||
build_map: Literal[True] = ...,
|
||||
batch_size_model: int = ...,
|
||||
) -> tuple[TokenGraph, dict[PandasIndex, SpacyDoc]]: ...
|
||||
|
||||
|
||||
def build_token_graph(
|
||||
data: DataFrame,
|
||||
model: GermanSpacyModel,
|
||||
*,
|
||||
target_feature: str = 'entry',
|
||||
weights_feature: str | None = None,
|
||||
batch_idx_feature: str = 'batched_idxs',
|
||||
build_map: bool = True,
|
||||
batch_size_model: int = 50,
|
||||
) -> tuple[TokenGraph, dict[PandasIndex, SpacyDoc] | None]:
|
||||
graph = TokenGraph()
|
||||
model_input = cast(tuple[str], tuple(data[target_feature].to_list()))
|
||||
if weights_feature is not None:
|
||||
weights = cast(tuple[int], tuple(data[weights_feature].to_list()))
|
||||
else:
|
||||
weights = None
|
||||
|
||||
docs_mapping: dict[PandasIndex, SpacyDoc] | None
|
||||
if build_map:
|
||||
indices = cast(tuple[list[PandasIndex]], tuple(data[batch_idx_feature].to_list()))
|
||||
docs_mapping = {}
|
||||
else:
|
||||
indices = None
|
||||
docs_mapping = None
|
||||
|
||||
index: int = 0
|
||||
|
||||
for doc in tqdm(
|
||||
model.pipe(model_input, batch_size=batch_size_model), total=len(model_input)
|
||||
):
|
||||
if weights is not None:
|
||||
weight = weights[index]
|
||||
else:
|
||||
weight = None
|
||||
add_doc_info_to_graph(
|
||||
graph=graph,
|
||||
doc=doc,
|
||||
weight=weight,
|
||||
)
|
||||
# build map if option chosen
|
||||
if indices is not None and docs_mapping is not None:
|
||||
corresponding_indices = indices[index]
|
||||
for idx in corresponding_indices:
|
||||
docs_mapping[idx] = doc
|
||||
|
||||
index += 1
|
||||
|
||||
# metadata
|
||||
graph.update_metadata()
|
||||
# convert to undirected
|
||||
graph.to_undirected()
|
||||
|
||||
return graph, docs_mapping
|
||||
|
||||
|
||||
def build_token_graph_simple(
|
||||
data: DataFrame,
|
||||
model: GermanSpacyModel,
|
||||
) -> tuple[TokenGraph, dict[PandasIndex, SpacyDoc]]:
|
||||
graph = TokenGraph()
|
||||
model_input = cast(tuple[str], tuple(data['entry'].to_list()))
|
||||
weights = cast(tuple[int], tuple(data['num_occur'].to_list()))
|
||||
indices = cast(tuple[list[PandasIndex]], tuple(data['batched_idxs'].to_list()))
|
||||
index: int = 0
|
||||
docs_mapping: dict[PandasIndex, SpacyDoc] = {}
|
||||
|
||||
for doc in tqdm(model.pipe(model_input, batch_size=50), total=len(model_input)):
|
||||
add_doc_info_to_graph(
|
||||
graph=graph,
|
||||
doc=doc,
|
||||
weight=weights[index],
|
||||
)
|
||||
corresponding_indices = indices[index]
|
||||
for idx in corresponding_indices:
|
||||
docs_mapping[idx] = doc
|
||||
|
||||
index += 1
|
||||
|
||||
# metadata
|
||||
graph.update_metadata()
|
||||
# convert to undirected
|
||||
graph.to_undirected()
|
||||
|
||||
return graph, docs_mapping
|
||||
|
||||
|
||||
def build_token_graph_old(
|
||||
data: DataFrame,
|
||||
model: GermanSpacyModel,
|
||||
) -> tuple[TokenGraph]:
|
||||
# empty NetworkX directed graph
|
||||
# graph = nx.DiGraph()
|
||||
|
||||
@@ -1,15 +1,28 @@
|
||||
from pathlib import Path
|
||||
from typing import Final
|
||||
|
||||
from lang_main import CALLER_PATH, CONFIG
|
||||
import spacy
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from spacy.language import Language as GermanSpacyModel
|
||||
|
||||
from lang_main import CONFIG
|
||||
from lang_main.types import STFRDeviceTypes
|
||||
|
||||
# ** paths
|
||||
input_path_conf = Path(CONFIG['paths']['inputs'])
|
||||
INPUT_PATH_FOLDER: Final[Path] = (CALLER_PATH / input_path_conf).resolve()
|
||||
save_path_conf = Path(CONFIG['paths']['results'])
|
||||
SAVE_PATH_FOLDER: Final[Path] = (CALLER_PATH / save_path_conf).resolve()
|
||||
path_dataset_conf = Path(CONFIG['paths']['dataset'])
|
||||
PATH_TO_DATASET: Final[Path] = (CALLER_PATH / path_dataset_conf).resolve()
|
||||
input_path_conf = Path.cwd() / Path(CONFIG['paths']['inputs'])
|
||||
INPUT_PATH_FOLDER: Final[Path] = input_path_conf.resolve()
|
||||
# INPUT_PATH_FOLDER: Final[Path] = (CALLER_PATH / input_path_conf).resolve()
|
||||
# TODO reactivate later
|
||||
# if not INPUT_PATH_FOLDER.exists():
|
||||
# raise FileNotFoundError(f'Input path >>{INPUT_PATH_FOLDER}<< does not exist.')
|
||||
save_path_conf = Path.cwd() / Path(CONFIG['paths']['results'])
|
||||
SAVE_PATH_FOLDER: Final[Path] = save_path_conf.resolve()
|
||||
# SAVE_PATH_FOLDER: Final[Path] = (CALLER_PATH / save_path_conf).resolve()
|
||||
path_dataset_conf = Path.cwd() / Path(CONFIG['paths']['dataset'])
|
||||
PATH_TO_DATASET: Final[Path] = path_dataset_conf.resolve()
|
||||
# PATH_TO_DATASET: Final[Path] = (CALLER_PATH / path_dataset_conf).resolve()
|
||||
# if not PATH_TO_DATASET.exists():
|
||||
# raise FileNotFoundError(f'Dataset path >>{PATH_TO_DATASET}<< does not exist.')
|
||||
# ** control
|
||||
DO_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing']
|
||||
SKIP_PREPROCESSING: Final[bool] = CONFIG['control']['preprocessing_skip']
|
||||
@@ -19,8 +32,18 @@ DO_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing']
|
||||
SKIP_GRAPH_POSTPROCESSING: Final[bool] = CONFIG['control']['graph_postprocessing_skip']
|
||||
DO_TIME_ANALYSIS: Final[bool] = CONFIG['control']['time_analysis']
|
||||
SKIP_TIME_ANALYSIS: Final[bool] = CONFIG['control']['time_analysis_skip']
|
||||
# ** export
|
||||
|
||||
# ** models
|
||||
# ** sentence_transformers
|
||||
STFR_DEVICE: Final[STFRDeviceTypes] = STFRDeviceTypes.CPU
|
||||
STFR_MODEL: Final[SentenceTransformer] = SentenceTransformer(
|
||||
'sentence-transformers/all-mpnet-base-v2', device=STFR_DEVICE
|
||||
)
|
||||
|
||||
# ** spacy
|
||||
SPCY_MODEL: Final[GermanSpacyModel] = spacy.load('de_dep_news_trf')
|
||||
|
||||
# ** export
|
||||
# ** preprocessing
|
||||
FILENAME_COSSIM_FILTER_CANDIDATES: Final[str] = CONFIG['preprocess'][
|
||||
'filename_cossim_filter_candidates'
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
import spacy
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
from lang_main.analysis.preprocessing import (
|
||||
analyse_feature,
|
||||
clean_string_slim,
|
||||
@@ -24,6 +21,8 @@ from lang_main.constants import (
|
||||
FEATURE_NAME_OBJ_ID,
|
||||
MODEL_INPUT_FEATURES,
|
||||
SAVE_PATH_FOLDER,
|
||||
SPCY_MODEL,
|
||||
STFR_MODEL,
|
||||
THRESHOLD_NUM_ACTIVITIES,
|
||||
THRESHOLD_SIMILARITY,
|
||||
THRESHOLD_TIMELINE_SIMILARITY,
|
||||
@@ -49,6 +48,7 @@ pipe_target_feat.add(
|
||||
'target_feature': 'VorgangsBeschreibung',
|
||||
'cleansing_func': clean_string_slim,
|
||||
},
|
||||
save_result=True,
|
||||
)
|
||||
pipe_target_feat.add(
|
||||
analyse_feature,
|
||||
@@ -64,8 +64,7 @@ pipe_target_feat.add(
|
||||
# ?? still needed?
|
||||
# using similarity between entries to catch duplicates with typo or similar content
|
||||
# pipe_embds = BasePipeline(name='Embedding1', working_dir=SAVE_PATH_FOLDER)
|
||||
model_spacy = spacy.load('de_dep_news_trf')
|
||||
model_stfr = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
|
||||
|
||||
|
||||
# pipe_embds.add(build_cosSim_matrix, {'model': model_stfr}, save_result=True)
|
||||
# pipe_embds.add(
|
||||
@@ -88,7 +87,7 @@ pipe_merge = BasePipeline(name='Merge_Duplicates', working_dir=SAVE_PATH_FOLDER)
|
||||
pipe_merge.add(
|
||||
merge_similarity_dupl,
|
||||
{
|
||||
'model': model_stfr,
|
||||
'model': STFR_MODEL,
|
||||
'cos_sim_threshold': THRESHOLD_SIMILARITY,
|
||||
},
|
||||
save_result=True,
|
||||
@@ -99,7 +98,12 @@ pipe_token_analysis = BasePipeline(name='Token_Analysis', working_dir=SAVE_PATH_
|
||||
pipe_token_analysis.add(
|
||||
build_token_graph,
|
||||
{
|
||||
'model': model_spacy,
|
||||
'model': SPCY_MODEL,
|
||||
'target_feature': 'entry',
|
||||
'weights_feature': 'num_occur',
|
||||
'batch_idx_feature': 'batched_idxs',
|
||||
'build_map': True,
|
||||
'batch_size_model': 50,
|
||||
},
|
||||
save_result=True,
|
||||
)
|
||||
@@ -135,7 +139,7 @@ pipe_timeline.add(
|
||||
pipe_timeline.add(
|
||||
get_timeline_candidates,
|
||||
{
|
||||
'model': model_stfr,
|
||||
'model': STFR_MODEL,
|
||||
'cos_sim_threshold': THRESHOLD_TIMELINE_SIMILARITY,
|
||||
'feature_obj_id': FEATURE_NAME_OBJ_ID,
|
||||
'model_input_feature': 'nlp_model_input',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import enum
|
||||
from typing import TypeAlias
|
||||
from typing import Required, TypeAlias, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from spacy.tokens.doc import Doc as SpacyDoc
|
||||
from torch import Tensor
|
||||
|
||||
|
||||
# ** logging
|
||||
class LoggingLevels(enum.IntEnum):
|
||||
DEBUG = 10
|
||||
INFO = 20
|
||||
@@ -14,8 +15,50 @@ class LoggingLevels(enum.IntEnum):
|
||||
CRITICAL = 50
|
||||
|
||||
|
||||
# ** devices
|
||||
class STFRDeviceTypes(enum.StrEnum):
|
||||
CPU = 'cpu'
|
||||
GPU = 'cuda'
|
||||
|
||||
|
||||
# ** datatsets
|
||||
PandasIndex: TypeAlias = int | np.int64
|
||||
ObjectID: TypeAlias = int
|
||||
Embedding: TypeAlias = SpacyDoc | Tensor
|
||||
|
||||
# ** graphs
|
||||
NodeTitle: TypeAlias = str
|
||||
EdgeWeight: TypeAlias = int
|
||||
|
||||
|
||||
class NodeData(TypedDict):
|
||||
id: NodeTitle
|
||||
label: NodeTitle
|
||||
|
||||
|
||||
class EdgeData(TypedDict):
|
||||
source: NodeTitle
|
||||
target: NodeTitle
|
||||
weight: EdgeWeight
|
||||
|
||||
|
||||
class WeightData(TypedDict):
|
||||
min: EdgeWeight
|
||||
max: EdgeWeight
|
||||
|
||||
|
||||
class CytoscapePosition(TypedDict):
|
||||
x: int
|
||||
y: int
|
||||
|
||||
|
||||
class CytoscapeData(TypedDict, total=False):
|
||||
data: Required[EdgeData | NodeData]
|
||||
position: CytoscapePosition
|
||||
grabbable: bool
|
||||
locked: bool
|
||||
classes: str
|
||||
|
||||
|
||||
# ** timeline
|
||||
TimelineCandidates: TypeAlias = dict[ObjectID, tuple[tuple[PandasIndex, ...], ...]]
|
||||
|
||||
Reference in New Issue
Block a user