import time import webbrowser from pathlib import Path from threading import Thread from typing import Any, Final, cast # import dash_cytoscape as cyto import plotly.express as px from dash import ( Dash, Input, Output, State, callback, dash_table, dcc, html, ) from pandas import DataFrame from plotly.graph_objects import Figure import lang_main.io from lang_main.analysis import graphs, tokens from lang_main.constants import SAVE_PATH_FOLDER, SPCY_MODEL from lang_main.errors import EmptyEdgesError, EmptyGraphError from lang_main.pipelines.predefined import ( build_tk_graph_render_pipe, build_tk_graph_rescaling_pipe, ) from lang_main.types import EntryPoints, ObjectID, TimelineCandidates # ** data # p_df = Path(r'../results/test_20240619/TIMELINE.pkl').resolve() p_df = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE) (data,) = cast(tuple[DataFrame], lang_main.io.load_pickle(p_df)) # p_tl = Path(r'../results/test_20240619/TIMELINE_POSTPROCESSING.pkl').resolve() p_tl = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE_POST) cands, texts = cast( tuple[TimelineCandidates, dict[ObjectID, str]], lang_main.io.load_pickle(p_tl) ) # ** necessary pipelines rescaling_pipe = build_tk_graph_rescaling_pipe( exit_point=EntryPoints.TIMELINE_TK_GRAPH_RESCALED, save_result=False, ) BASE_NETWORK_NAME: Final[str] = 'test_timeline' # RENDER_FOLDER: Final[Path] = Path.cwd() / 'assets/' graph_render_pipe = build_tk_graph_render_pipe( with_subgraphs=False, base_network_name=BASE_NETWORK_NAME, ) # PTH_RENDERED_GRAPH = f'assets/{BASE_NETWORK_NAME}.svg' PTH_RENDERED_GRAPH = lang_main.io.get_entry_point( SAVE_PATH_FOLDER, BASE_NETWORK_NAME, file_ext='.svg', ) TABLE_FEATS: Final[list[str]] = [ 'ErstellungsDatum', 'ErledigungsDatum', 'VorgangsTypName', 'VorgangsBeschreibung', ] TABLE_FEATS_DATES: Final[list[str]] = [ 'ErstellungsDatum', 'ErledigungsDatum', ] # ** figure config MARKERS_OCCURRENCES: Final[dict[str, Any]] = { 'size': 12, 'color': 'yellow', 'line': { 'width': 2, 'color': 'red', }, } MARKERS_DELTA: Final[dict[str, Any]] = { 'size': 8, 'color': 'red', 'symbol': 'cross', } HOVER_DATA: Final[dict[str, Any]] = { 'ErstellungsDatum': '|%d.%m.%Y', 'ErledigungsDatum': '|%d.%m.%Y', 'VorgangsBeschreibung': True, } HOVER_DATA_DELTA: Final[dict[str, Any]] = { 'ErstellungsDatum': '|%d.%m.%Y', 'ErledigungsDatum': '|%d.%m.%Y', 'VorgangsDatum': '|%d.%m.%Y', 'delta': True, 'VorgangsBeschreibung': True, } # ** graph p_tk_graph = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST) ret = lang_main.io.load_pickle(p_tk_graph) tk_graph = cast(graphs.TokenGraph, ret[0]) tk_graph_filtered = graphs.filter_graph_by_edge_weight(tk_graph, 150, None) tk_graph_filtered = graphs.filter_graph_by_node_degree(tk_graph_filtered, 1, None) graph_layout = html.Div( [ dcc.Store(id='graph-store', storage_type='memory'), # dcc.Store(id='graph-store-cyto-curr_cands', storage_type='memory'), html.Div(id='output'), html.Div( [ html.H2('Token Graph', style={'margin': 0}), ], style={ 'display': 'flex', 'marginBottom': '1em', }, ), html.Div( [ html.H3('Graph'), html.Button( 'Download Bild', id='bt-reset', style={ 'marginLeft': 'auto', 'width': '300px', }, ), dcc.Download(id='static-graph-download'), dcc.Loading( id='loading-graph-render', children=html.Div( [ html.Img( id='static-graph-img', alt='static rendered graph', # style={ # 'width': 'auto', # 'height': 'auto', # }, ), html.P(id='info-graph-errors', children=[]), ], style={ 'border': '3px solid black', 'borderRadius': '25px', 'marginTop': '1em', 'marginBottom': '2em', 'padding': '7px', }, ), ), ], style={'marginTop': '1em'}, ), ], ) # ** app external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = Dash(__name__, external_stylesheets=external_stylesheets) app.layout = html.Div( [ html.H1(children='Demo Zeitreihenanalyse', style={'textAlign': 'center'}), html.Div( children=[ html.H2('Wählen Sie ein Objekt aus (ObjektID):'), dcc.Dropdown( list(cands.keys()), id='selector-obj_id', placeholder='ObjektID auswählen...', ), ] ), html.Div( children=[ html.H3(id='object-text'), dcc.Dropdown(id='selector-candidates'), dcc.Graph(id='figure-occurrences'), dcc.Graph(id='figure-delta'), ] ), html.Div( [dash_table.DataTable(id='table-candidates')], style={'marginBottom': '2em'} ), graph_layout, ], style={'margin': '2em'}, ) # ** selectors of candidates @callback( Output('object-text', 'children'), Input('selector-obj_id', 'value'), prevent_initial_call=True, ) def update_obj_text(obj_id): obj_id = int(obj_id) obj_text = texts[obj_id] headline = f'HObjektText: {obj_text}' return headline @callback( [ Output('selector-candidates', 'options'), Output('selector-candidates', 'value'), ], Input('selector-obj_id', 'value'), prevent_initial_call=True, ) def update_choice_candidates(obj_id): obj_id = int(obj_id) choices = list(range(1, len(cands[obj_id]) + 1)) return choices, choices[0] # ** helpers to filter DataFrame def pre_filter_data( data: DataFrame, idx: int, obj_id: ObjectID, ) -> DataFrame: idx = int(idx) obj_id = int(obj_id) # data = data.copy() cands_for_obj_id = cands[obj_id] cands_choice = cands_for_obj_id[int(idx) - 1] # data data = data.loc[list(cands_choice)].sort_index() # type: ignore data['delta'] = data['ErledigungsDatum'] - data['ErstellungsDatum'] data['delta'] = data['delta'].dt.days return data # ** figure generation # TODO check possible storage of pre-filtered result # TODO change input of ``update_table_candidates`` and ``display_candidates_as_graph`` # TODO to storage component @callback( [ Output('figure-occurrences', 'figure'), Output('figure-delta', 'figure'), ], Input('selector-candidates', 'value'), State('selector-obj_id', 'value'), prevent_initial_call=True, ) def update_timeline(index, obj_id): obj_id = int(obj_id) obj_text = texts[obj_id] title_occurrences = f'HObjektText: {obj_text}' title_delta = f'HObjektText: {obj_text}, Differenz Erstellung und Erledigung' df = pre_filter_data(data, idx=index, obj_id=obj_id) # figure fig_occurrences = fig_timeline_occurrences(df, title_occurrences) fig_delta = fig_timeline_delta(df, title_delta) return fig_occurrences, fig_delta def fig_timeline_occurrences( df: DataFrame, title: str, ) -> Figure: fig = px.line( data_frame=df, x='ErstellungsDatum', y='ObjektID', title=title, hover_data=HOVER_DATA, ) fig.update_traces( mode='markers+lines', marker=MARKERS_OCCURRENCES, marker_symbol='diamond' ) fig.update_xaxes( tickformat='%B\n%Y', rangeslider_visible=True, ) fig.update_yaxes(type='category') fig.update_layout(hovermode='x unified') return fig def fig_timeline_delta( df: DataFrame, title: str, ) -> Figure: fig = px.scatter( data_frame=df, x='ErstellungsDatum', y='delta', title=title, hover_data=HOVER_DATA_DELTA, ) fig.update_traces(marker=MARKERS_DELTA) fig.update_xaxes(tickformat='%B\n%Y') fig.update_yaxes(dtick=1) fig.update_layout(hovermode='x unified') return fig # ** HTML table @callback( [Output('table-candidates', 'data'), Output('table-candidates', 'columns')], Input('selector-candidates', 'value'), State('selector-obj_id', 'value'), prevent_initial_call=True, ) def update_table_candidates(index, obj_id): df = pre_filter_data(data, idx=index, obj_id=obj_id) df = df.filter(items=TABLE_FEATS, axis=1).sort_values( by='ErstellungsDatum', ascending=True ) cols = [{'name': i, 'id': i} for i in df.columns] # convert dates to strings for col in TABLE_FEATS_DATES: df[col] = df[col].dt.strftime(r'%Y-%m-%d') table_data = df.to_dict('records') return table_data, cols # ** graph callbacks @app.callback( [ Output('graph-store', 'data'), Output('static-graph-img', 'src'), Output('info-graph-errors', 'children'), ], # Input('graph-build-btn', 'n_clicks'), Input('selector-candidates', 'value'), State('selector-obj_id', 'value'), prevent_initial_call=True, ) def display_candidates_as_graph(index, obj_id): error_msg = '' t1 = time.perf_counter() df = pre_filter_data(data, idx=index, obj_id=obj_id) t2 = time.perf_counter() print(f'Time for filtering: {t2 - t1} s') t1 = time.perf_counter() tk_graph_cands, _ = tokens.build_token_graph( data=df, model=SPCY_MODEL, target_feature='VorgangsBeschreibung', build_map=False, logging_graph=False, ) t2 = time.perf_counter() print(f'Time for graph building: {t2 - t1} s') # ** now start rendering pipeline in Cytoscape # rescale graph try: t1 = time.perf_counter() _, tk_graph_rescaled_undirected = cast( tuple[graphs.TokenGraph, graphs.Graph], rescaling_pipe.run(starting_values=(tk_graph_cands,)), ) # render graph in Cytoscape and export image _ = graph_render_pipe.run(starting_values=(tk_graph_rescaled_undirected,)) # load image as b64 encoded string b64_img = lang_main.io.encode_file_to_base64_str(PTH_RENDERED_GRAPH) static_img = f'data:image/svg+xml;base64,{b64_img}' graph_to_store = lang_main.io.encode_to_base64_str(tk_graph_cands) # place image in browser t2 = time.perf_counter() print(f'Time for graph rescaling and rendering: {t2 - t1} s') except (EmptyGraphError, EmptyEdgesError): graph_to_store = '' static_img = '' error_msg = 'Graph ist leer und konnte nicht generiert werden!' finally: return graph_to_store, static_img, error_msg @callback( Output('static-graph-download', 'data'), Input('bt-reset', 'n_clicks'), prevent_initial_call=True, ) def func(n_clicks): return dcc.send_file(path=PTH_RENDERED_GRAPH) def _start_webbrowser(): host = '127.0.0.1' port = '8050' adress = f'http://{host}:{port}/' time.sleep(2) webbrowser.open_new(adress) def main(): webbrowser_thread = Thread(target=_start_webbrowser, daemon=True) webbrowser_thread.start() app.run(debug=True) if __name__ == '__main__': main()