lang-main/scripts/dash_timeline_static.py
2024-11-07 17:30:33 +01:00

645 lines
19 KiB
Python

import time
import webbrowser
from collections.abc import Collection, Iterable
from pathlib import Path
from threading import Thread
from typing import Any, Final, cast
# import dash_cytoscape as cyto
import plotly.express as px
import plotly.io
from dash import (
Dash,
Input,
Output,
State,
callback,
dash_table,
dcc,
html,
)
from pandas import DataFrame
from plotly.graph_objects import Figure
import lang_main.io
from lang_main import model_loader as m_load
from lang_main.analysis import graphs, tokens
from lang_main.analysis.timeline import (
calc_delta_to_next_failure,
filter_timeline_cands,
)
from lang_main.constants import (
MODEL_LOADER_MAP,
NAME_DELTA_FEAT_TO_NEXT_FAILURE,
NAME_DELTA_FEAT_TO_REPAIR,
SAVE_PATH_FOLDER,
)
from lang_main.errors import EmptyEdgesError, EmptyGraphError
from lang_main.pipelines.predefined import (
build_tk_graph_render_pipe,
build_tk_graph_rescaling_pipe,
)
from lang_main.types import (
DataFrameTLFiltered,
EntryPoints,
HTMLColumns,
HTMLTable,
LanguageModels,
ObjectID,
TimelineCandidates,
)
# ** model
SPACY_MODEL = m_load.instantiate_model(
model_load_map=MODEL_LOADER_MAP,
model=LanguageModels.SPACY,
)
# ** data
# p_df = Path(r'../results/test_20240619/TIMELINE.pkl').resolve()
p_df = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE_POST)
(data,) = cast(tuple[DataFrame], lang_main.io.load_pickle(p_df))
# data = cleanup_descriptions(data, properties=['ErledigungsBeschreibung'])
# p_tl = Path(r'../results/test_20240619/TIMELINE_POSTPROCESSING.pkl').resolve()
p_tl = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TIMELINE_CANDS)
cands, texts = cast(
tuple[TimelineCandidates, dict[ObjectID, str]], lang_main.io.load_pickle(p_tl)
)
# ** necessary pipelines
rescaling_pipe = build_tk_graph_rescaling_pipe(
exit_point=EntryPoints.TIMELINE_TK_GRAPH_RESCALED,
save_result=False,
)
BASE_NETWORK_NAME: Final[str] = 'timeline_candidates'
# RENDER_FOLDER: Final[Path] = Path.cwd() / 'assets/'
graph_render_pipe = build_tk_graph_render_pipe(
with_subgraphs=False,
base_network_name=BASE_NETWORK_NAME,
)
# PTH_RENDERED_GRAPH = f'assets/{BASE_NETWORK_NAME}.svg'
PTH_RENDERED_TIMELINE = lang_main.io.get_entry_point(
SAVE_PATH_FOLDER,
'chart_timeline',
file_ext='.svg',
check_existence=False,
)
PTH_TABLE_TIMELINE = lang_main.io.get_entry_point(
SAVE_PATH_FOLDER,
'table_timeline',
file_ext='.xlsx',
check_existence=False,
)
PTH_RENDERED_DELTA_REPAIR = lang_main.io.get_entry_point(
SAVE_PATH_FOLDER,
'chart_delta_repair',
file_ext='.svg',
check_existence=False,
)
PTH_TABLE_DELTA_REPAIR = lang_main.io.get_entry_point(
SAVE_PATH_FOLDER,
'table_delta_repair',
file_ext='.xlsx',
check_existence=False,
)
PTH_RENDERED_GRAPH = lang_main.io.get_entry_point(
SAVE_PATH_FOLDER,
BASE_NETWORK_NAME,
file_ext='.svg',
check_existence=False,
)
# NAME_DELTA_FEAT_TO_NEXT_FAILURE: Final[str] = 'Zeitspanne bis zum nächsten Ereignis [Tage]'
TABLE_FEATS_OVERVIEW: Final[list[str]] = [
'ErstellungsDatum',
'ErledigungsDatum',
NAME_DELTA_FEAT_TO_REPAIR,
'VorgangsTypName',
'VorgangsBeschreibung',
'ErledigungsBeschreibung',
]
TABLE_FEATS_DATES: Final[list[str]] = [
'ErstellungsDatum',
'ErledigungsDatum',
]
TABLE_FEATS_BEST_ACTIONS: Final[list[str]] = [
'ErstellungsDatum',
'ErledigungsDatum',
'VorgangsTypName',
'VorgangsBeschreibung',
'ErledigungsBeschreibung',
NAME_DELTA_FEAT_TO_NEXT_FAILURE,
]
# ** figure config
MARKERS_OCCURRENCES: Final[dict[str, Any]] = {
'size': 12,
'color': 'yellow',
'line': {
'width': 2,
'color': 'red',
},
}
MARKERS_DELTA: Final[dict[str, Any]] = {
'size': 8,
'color': 'red',
'symbol': 'cross',
}
HOVER_DATA: Final[dict[str, Any]] = {
'ErstellungsDatum': '|%d.%m.%Y',
'ErledigungsDatum': '|%d.%m.%Y',
'VorgangsBeschreibung': True,
'ErledigungsBeschreibung': True,
}
HOVER_DATA_DELTA: Final[dict[str, Any]] = {
'ErstellungsDatum': '|%d.%m.%Y',
'ErledigungsDatum': '|%d.%m.%Y',
'VorgangsDatum': '|%d.%m.%Y',
NAME_DELTA_FEAT_TO_REPAIR: True,
'VorgangsBeschreibung': True,
'ErledigungsBeschreibung': True,
}
# ** graph
p_tk_graph = lang_main.io.get_entry_point(SAVE_PATH_FOLDER, EntryPoints.TK_GRAPH_POST)
ret = lang_main.io.load_pickle(p_tk_graph)
tk_graph = cast(graphs.TokenGraph, ret[0])
tk_graph_filtered = graphs.filter_graph_by_edge_weight(tk_graph, 150, None)
tk_graph_filtered = graphs.filter_graph_by_node_degree(tk_graph_filtered, 1, None)
graph_layout = html.Div(
[
dcc.Store(id='graph-store', storage_type='memory'),
# dcc.Store(id='graph-store-cyto-curr_cands', storage_type='memory'),
html.Div(id='output'),
html.Div(
[
html.H2('Token Graph', style={'margin': 0}),
],
style={
'display': 'flex',
'marginBottom': '1em',
},
),
html.Div(
[
html.H3('Graph'),
html.Button(
'Download Bild',
id='bt-reset',
style={
'marginLeft': 'auto',
'width': '300px',
},
),
dcc.Download(id='static-graph-download'),
dcc.Loading(
id='loading-graph-render',
children=html.Div(
[
html.Img(
id='static-graph-img',
alt='static rendered graph',
style={
'width': '900px',
'height': 'auto',
},
),
html.P(id='info-graph-errors', children=[]),
],
style={
'border': '3px solid black',
'borderRadius': '25px',
'marginTop': '1em',
'marginBottom': '2em',
'padding': '7px',
},
),
),
],
style={'marginTop': '1em'},
),
],
)
# ** app
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
[
html.H1(children='Demo Zeitreihenanalyse', style={'textAlign': 'center'}),
html.Div(
children=[
html.H2('Wählen Sie ein Objekt aus (ObjektID):'),
dcc.Dropdown(
list(cands.keys()),
id='selector-obj_id',
placeholder='ObjektID auswählen...',
),
]
),
html.Div(
children=[
html.H3(id='object-text'),
dcc.Dropdown(id='selector-candidates'),
html.Button(
'Download Diagramm',
id='bt-dl-timeline',
style={
'marginLeft': 'auto',
'width': '300px',
'marginTop': '1em',
},
),
dcc.Download(id='dl-timeline'),
dcc.Graph(id='figure-occurrences'),
html.Button(
'Download Diagramm',
id='bt-dl-deltarepair',
style={
'marginLeft': 'auto',
'width': '300px',
'marginTop': '1em',
},
),
dcc.Download(id='dl-deltarepair'),
dcc.Graph(id='figure-delta'),
]
),
html.Div(
children=[
html.Div(
[
html.H5('Überblick ähnlicher Vorgänge'),
dcc.Download(id='dl-table-timeline'),
html.Button(
'Download Table',
id='bt-table-timeline',
style={
'marginLeft': 'auto',
'width': '300px',
'marginTop': '1em',
},
),
dash_table.DataTable(id='table-candidates'),
],
style={'paddingBottom': '1em'},
),
html.Div(
[
html.H5(
(
'Maßnahmen sortiert nach längstem Zeitraum '
'bis zum nächsten Ereignis'
)
),
dcc.Download(id='dl-table-deltarepair'),
html.Button(
'Download Table',
id='bt-table-deltarepair',
style={
'marginLeft': 'auto',
'width': '300px',
'marginTop': '1em',
},
),
dash_table.DataTable(id='table-best-actions'),
]
),
],
style={'marginBottom': '2em', 'padding': '2em'},
),
graph_layout,
],
style={'margin': '2em'},
)
# ** selectors of candidates
@callback(
Output('object-text', 'children'),
Input('selector-obj_id', 'value'),
prevent_initial_call=True,
)
def update_obj_text(obj_id):
obj_id = int(obj_id)
obj_text = texts[obj_id]
headline = f'HObjektText: {obj_text}'
return headline
@callback(
[
Output('selector-candidates', 'options'),
Output('selector-candidates', 'value'),
],
Input('selector-obj_id', 'value'),
prevent_initial_call=True,
)
def update_choice_candidates(obj_id):
obj_id = int(obj_id)
choices = list(range(1, len(cands[obj_id]) + 1))
return choices, choices[0]
# ** helpers to filter DataFrame
def filter_candidates(
data: DataFrame,
idx: int,
obj_id: ObjectID,
) -> DataFrameTLFiltered:
# assert correct data type because of Dash
idx = int(idx)
obj_id = int(obj_id)
data = filter_timeline_cands(
data=data,
cands=cands,
obj_id=obj_id,
entry_idx=(idx - 1), # idx in Dashboard starts with 1
)
return data
# ** figure generation
# TODO check possible storage of pre-filtered result
# TODO change input of ``update_table_candidates`` and ``display_candidates_as_graph``
# TODO to storage component
@callback(
[
Output('figure-occurrences', 'figure'),
Output('figure-delta', 'figure'),
],
Input('selector-candidates', 'value'),
State('selector-obj_id', 'value'),
prevent_initial_call=True,
)
def update_timeline(index, obj_id):
obj_id = int(obj_id)
obj_text = texts[obj_id]
title_occurrences = f'HObjektText: {obj_text}'
title_delta = f'HObjektText: {obj_text}, Differenz Erstellung und Erledigung'
df = filter_candidates(data, idx=index, obj_id=obj_id)
# figure
fig_occurrences = fig_timeline_occurrences(df, title_occurrences)
fig_delta = fig_timeline_delta(df, title_delta, delta_feature=NAME_DELTA_FEAT_TO_REPAIR)
return fig_occurrences, fig_delta
def fig_timeline_occurrences(
df: DataFrame,
title: str,
) -> Figure:
fig = px.line(
data_frame=df,
x='ErstellungsDatum',
y='ObjektID',
title=title,
hover_data=HOVER_DATA,
)
fig.update_traces(
mode='markers+lines', marker=MARKERS_OCCURRENCES, marker_symbol='diamond'
)
fig.update_xaxes(
tickformat='%B\n%Y',
rangeslider_visible=True,
)
fig.update_yaxes(type='category')
fig.update_layout(hovermode='x unified')
return fig
def fig_timeline_delta(
df: DataFrame,
title: str,
delta_feature: str,
) -> Figure:
fig = px.scatter(
data_frame=df,
x='ErstellungsDatum',
y=delta_feature,
title=title,
hover_data=HOVER_DATA_DELTA,
)
fig.update_traces(marker=MARKERS_DELTA)
fig.update_xaxes(tickformat='%B\n%Y')
fig.update_yaxes(dtick=1)
fig.update_layout(hovermode='x unified')
return fig
def transform_to_HTML_table(
data: DataFrame,
target_features: Collection[str],
date_cols: Iterable[str] | None = None,
sorting_feature: str | None = None,
sorting_ascending: bool = True,
save_path: Path | None = None,
) -> tuple[HTMLColumns, HTMLTable]:
target_features = list(target_features)
data = data.copy()
data = data.filter(items=target_features, axis=1)
if sorting_feature is not None:
data = data.sort_values(by='ErstellungsDatum', ascending=sorting_ascending)
if date_cols is not None:
for col in date_cols:
data[col] = data[col].dt.strftime(r'%Y-%m-%d')
columns = [{'name': col, 'id': col} for col in data.columns]
table_data = data.to_dict('records')
if save_path is not None:
data.to_excel(save_path)
return columns, table_data
# 'table-best-actions'
# ** HTML table
@callback(
[
Output('table-candidates', 'columns'),
Output('table-candidates', 'data'),
Output('table-best-actions', 'columns'),
Output('table-best-actions', 'data'),
],
Input('selector-candidates', 'value'),
State('selector-obj_id', 'value'),
prevent_initial_call=True,
)
def update_tables_candidates(
index,
obj_id,
) -> tuple[HTMLColumns, HTMLTable, HTMLColumns, HTMLTable]:
cands = filter_candidates(data, idx=index, obj_id=obj_id)
overview_cols, overview_table = transform_to_HTML_table(
data=cands,
target_features=TABLE_FEATS_OVERVIEW,
date_cols=TABLE_FEATS_DATES,
sorting_feature='ErstellungsDatum',
sorting_ascending=True,
save_path=PTH_TABLE_TIMELINE,
)
# df = df.filter(items=TABLE_FEATS_OVERVIEW, axis=1).sort_values(
# by='ErstellungsDatum', ascending=True
# )
# cols = [{'name': i, 'id': i} for i in df.columns]
# # convert dates to strings
# for col in TABLE_FEATS_DATES:
# df[col] = df[col].dt.strftime(r'%Y-%m-%d')
# table_data = df.to_dict('records')
cands_best_actions = calc_delta_to_next_failure(
data=cands,
date_feature='ErstellungsDatum',
name_delta_feature=NAME_DELTA_FEAT_TO_NEXT_FAILURE,
)
best_actions_cols, best_actions_table = transform_to_HTML_table(
data=cands_best_actions,
target_features=TABLE_FEATS_BEST_ACTIONS,
date_cols=TABLE_FEATS_DATES,
save_path=PTH_TABLE_DELTA_REPAIR,
)
return overview_cols, overview_table, best_actions_cols, best_actions_table
# ** graph callbacks
@app.callback(
[
Output('graph-store', 'data'),
Output('static-graph-img', 'src'),
Output('info-graph-errors', 'children'),
],
# Input('graph-build-btn', 'n_clicks'),
Input('selector-candidates', 'value'),
State('selector-obj_id', 'value'),
prevent_initial_call=True,
)
def display_candidates_as_graph(index, obj_id):
error_msg = ''
t1 = time.perf_counter()
df = filter_candidates(data, idx=index, obj_id=obj_id)
t2 = time.perf_counter()
print(f'Time for filtering: {t2 - t1} s')
t1 = time.perf_counter()
tk_graph_cands, _ = tokens.build_token_graph(
data=df,
model=SPACY_MODEL,
target_feature='VorgangsBeschreibung',
build_map=False,
logging_graph=False,
)
t2 = time.perf_counter()
print(f'Time for graph building: {t2 - t1} s')
# ** now start rendering pipeline in Cytoscape
# rescale graph
try:
t1 = time.perf_counter()
_, tk_graph_rescaled_undirected = cast(
tuple[graphs.TokenGraph, graphs.Graph],
rescaling_pipe.run(starting_values=(tk_graph_cands,)),
)
# render graph in Cytoscape and export image
_ = graph_render_pipe.run(starting_values=(tk_graph_rescaled_undirected,))
# load image as b64 encoded string
b64_img = lang_main.io.encode_file_to_base64_str(PTH_RENDERED_GRAPH)
static_img = f'data:image/svg+xml;base64,{b64_img}'
graph_to_store = lang_main.io.encode_to_base64_str(tk_graph_cands)
# place image in browser
t2 = time.perf_counter()
print(f'Time for graph rescaling and rendering: {t2 - t1} s')
except (EmptyGraphError, EmptyEdgesError):
graph_to_store = ''
static_img = ''
error_msg = 'Graph ist leer und konnte nicht generiert werden!'
finally:
return graph_to_store, static_img, error_msg
@callback(
Output('static-graph-download', 'data'),
Input('bt-reset', 'n_clicks'),
prevent_initial_call=True,
)
def download_graph(_):
return dcc.send_file(path=PTH_RENDERED_GRAPH)
@callback(
Output('dl-timeline', 'data'),
Input('bt-dl-timeline', 'n_clicks'),
State('figure-occurrences', 'figure'),
prevent_initial_call=True,
)
def download_timeline(_, fig: dict):
# add these lines before fig = go.Figure(fig_raw)
if 'rangeslider' in fig['layout']['xaxis']:
del fig['layout']['xaxis']['rangeslider']['yaxis']
figure = Figure(fig)
figure.write_image(PTH_RENDERED_TIMELINE)
return dcc.send_file(path=PTH_RENDERED_TIMELINE)
@callback(
Output('dl-deltarepair', 'data'),
Input('bt-dl-deltarepair', 'n_clicks'),
State('figure-delta', 'figure'),
prevent_initial_call=True,
)
def download_delta_repair(_, fig: dict):
# add these lines before fig = go.Figure(fig_raw)
if 'rangeslider' in fig['layout']['xaxis']:
del fig['layout']['xaxis']['rangeslider']['yaxis']
figure = Figure(fig)
figure.write_image(PTH_RENDERED_DELTA_REPAIR)
return dcc.send_file(path=PTH_RENDERED_DELTA_REPAIR)
@callback(
Output('dl-table-timeline', 'data'),
Input('bt-table-timeline', 'n_clicks'),
prevent_initial_call=True,
)
def download_table_timeline(_):
return dcc.send_file(path=PTH_TABLE_TIMELINE)
@callback(
Output('dl-table-deltarepair', 'data'),
Input('bt-table-deltarepair', 'n_clicks'),
prevent_initial_call=True,
)
def download_table_delta_repair(_):
return dcc.send_file(path=PTH_TABLE_DELTA_REPAIR)
def _start_webbrowser():
host = '127.0.0.1'
port = '8050'
adress = f'http://{host}:{port}/'
time.sleep(2)
webbrowser.open_new(adress)
def main():
webbrowser_thread = Thread(target=_start_webbrowser, daemon=True)
webbrowser_thread.start()
app.run(debug=True)
if __name__ == '__main__':
main()