import itertools
from datetime import datetime, UTC
import pandas as pd
import sqlalchemy as sql
from flask import (
g,
render_template,
redirect,
request,
url_for,
session,
)
from werkzeug.exceptions import Forbidden
from app.view.forms.comparative_chart_form import ComparativeChartForm
from app.model.lib.chart import Chart
from app.model.lib.compare import init_compare_data
import app.model.lib.util as util
from app.model.tasks.modeling import process_modeling_request
from app.model.orm import (
MeasurementContext,
ModelingResult,
User,
Workspace,
WorkspaceEntry,
)
[docs]
def workspaces_root_page():
if g.current_user:
return redirect(url_for('workspaces_index_page', orcidId=g.current_user.orcidId, name="default"))
else:
return render_template('pages/workspaces/no_user.html')
[docs]
def workspaces_index_page(orcidId, name="default"):
errors = {}
workspace = _find_workspace(orcidId, name)
if request.method == 'POST':
file = request.files['data']
df, errors = _process_upload(file)
if df is not None:
metadata = _extract_entry_metadata()
new_entries = WorkspaceEntry.from_upload(
df,
workspace,
include_error=request.form.get('includeError', False),
metadata=metadata,
)
g.db_session.add_all(new_entries)
g.db_session.commit()
return render_template(
"pages/workspaces/index.html",
workspace=workspace,
errors=errors,
)
[docs]
def workspaces_data_preview_fragment():
file = request.files['file']
include_error = request.form.get('includeError', 'false') == 'true'
df, errors = _process_upload(file)
return render_template(
"pages/workspaces/_data_preview.html",
df=df,
include_error=include_error,
errors=errors,
)
[docs]
def workspaces_create_action():
if g.current_user is None:
raise Forbidden
name = request.form["name"]
existing_workspace = g.db_session.scalars(
sql.select(Workspace)
.where(Workspace.userId == g.current_user.id, Workspace.name == name)
.limit(1)
).one_or_none()
if existing_workspace is None:
g.db_session.add(Workspace(userId=g.current_user.id, name=name))
g.db_session.commit()
return redirect(url_for('workspaces_index_page', orcidId=g.current_user.orcidId, name=name))
[docs]
def workspaces_delete_action(id):
workspace = g.db_session.get_one(Workspace, id)
if g.current_user is None or g.current_user != workspace.user:
raise Forbidden
g.db_session.delete(workspace)
g.db_session.commit()
return {
'url': url_for('workspaces_index_page', orcidId=g.current_user.orcidId, name="default"),
}
[docs]
def workspaces_visualize_page(orcidId, name="default"):
workspace = _find_workspace(orcidId, name)
# TODO (2026-06-01) Extract logic, maybe inside of ComparativeChartForm?
left_axis_ids = util.parse_comma_separated_request_ids('l')
right_axis_ids = util.parse_comma_separated_request_ids('r')
left_axis_workspace_ids = util.parse_comma_separated_request_ids('lw')
right_axis_workspace_ids = util.parse_comma_separated_request_ids('rw')
left_axis_model_ids = util.parse_comma_separated_request_ids('lm')
right_axis_model_ids = util.parse_comma_separated_request_ids('rm')
compare_data = init_compare_data(session)
context_ids = compare_data['contexts'] + left_axis_ids + right_axis_ids
comparable_measurement_contexts = g.db_session.scalars(
sql.select(MeasurementContext)
.where(MeasurementContext.id.in_(context_ids))
).all()
model_ids = compare_data['models'] + left_axis_model_ids + right_axis_model_ids
comparable_modeling_results = g.db_session.scalars(
sql.select(ModelingResult)
.where(ModelingResult.id.in_(model_ids))
).all()
comparable_records_by_study = {}
mcs_by_study = itertools.groupby(comparable_measurement_contexts, lambda mc: mc.study)
for study, measurement_context_group in mcs_by_study:
if not study.visible_to_user(g.current_user):
continue
if study not in comparable_records_by_study:
comparable_records_by_study[study] = {'measurement_contexts': [], 'modeling_results': []}
comparable_records_by_study[study]['measurement_contexts'] = list(measurement_context_group)
mrs_by_study = itertools.groupby(comparable_modeling_results, lambda mr: mr.study)
for study, modeling_result_group in mrs_by_study:
if not study or not study.visible_to_user(g.current_user):
continue
if study not in comparable_records_by_study:
comparable_records_by_study[study] = {'measurement_contexts': [], 'modeling_results': []}
comparable_records_by_study[study]['modeling_results'] = list(modeling_result_group)
chart_form = ComparativeChartForm(
g.db_session,
left_axis_ids=left_axis_ids,
right_axis_ids=right_axis_ids,
left_axis_workspace_ids=left_axis_workspace_ids,
right_axis_workspace_ids=right_axis_workspace_ids,
left_axis_model_ids=left_axis_model_ids,
right_axis_model_ids=right_axis_model_ids,
)
return render_template(
"pages/workspaces/visualize.html",
workspace=workspace,
chart_form=chart_form,
comparable_records_by_study=comparable_records_by_study,
)
[docs]
def workspaces_chart_fragment(orcidId, name="default"):
workspace = _find_workspace(orcidId, name)
args = request.form.to_dict()
width = args.get('width', None)
chart_form = ComparativeChartForm(
g.db_session,
show_std=args.get('showStd', None) is not None,
)
chart = chart_form.build_chart(args, width, user=g.current_user)
return render_template(
'pages/workspaces/visualize/_chart.html',
workspace=workspace,
chart_form=chart_form,
chart=chart,
)
[docs]
def workspaces_update_entry_action(id):
workspace_entry = g.db_session.get_one(WorkspaceEntry, id)
workspace = workspace_entry.workspace
if workspace.user != g.current_user:
raise Forbidden
metadata = _extract_entry_metadata()
workspace_entry.update(**metadata)
g.db_session.add(workspace_entry)
g.db_session.commit()
return render_template('pages/workspaces/update.html', workspace_entry=workspace_entry)
[docs]
def workspaces_toggle_published_action(id):
workspace = g.db_session.get_one(Workspace, id)
if workspace.user != g.current_user:
raise Forbidden
if workspace.isPublished:
workspace.publishedAt = None
else:
workspace.publishedAt = datetime.now(UTC)
g.db_session.add(workspace)
g.db_session.commit()
return {'status': 'ok'}
[docs]
def workspaces_delete_entry_action(id):
workspace_entry = g.db_session.get_one(WorkspaceEntry, id)
workspace = workspace_entry.workspace
if workspace.user != g.current_user:
raise Forbidden
g.db_session.delete(workspace_entry)
g.db_session.commit()
return {'status': 'ok'}
[docs]
def workspaces_delete_all_action(id):
workspace = g.db_session.get_one(Workspace, id)
if workspace.user != g.current_user:
raise Forbidden
workspace.entries.clear()
g.db_session.add(workspace)
g.db_session.commit()
return {'status': 'ok'}
[docs]
def workspaces_modeling_page(orcidId, name="default"):
workspace = _find_workspace(orcidId, name, public=False)
return render_template(
"pages/workspaces/modeling.html",
workspace=workspace,
)
[docs]
def workspaces_modeling_chart_fragment(orcidId, name):
workspace = _find_workspace(orcidId, name, public=False)
args = request.args.to_dict()
modeling_type = args.pop('modelingType')
log_transform = args.pop('logTransform', 'false') == 'true'
workspace_entry = g.db_session.get_one(WorkspaceEntry, args['workspaceEntryId'])
measurement_df = workspace_entry.get_df(g.db_session)
chart = Chart(
time_units='h',
log_left=log_transform,
)
units = workspace_entry.units
chart.add_df(
measurement_df,
units=units,
label=workspace_entry.label,
)
modeling_record = g.db_session.scalars(
sql.select(ModelingResult)
.where(
ModelingResult.type == modeling_type,
ModelingResult.workspaceEntryId == workspace_entry.id,
ModelingResult.state == 'ready',
)
).one_or_none()
if modeling_record:
df = modeling_record.generate_chart_df(measurement_df)
label = modeling_record.model_name
chart.add_model_df(df, units=units, label=label)
model_params = modeling_record.params
r_summary = modeling_record.rSummary
else:
model_params = ModelingResult.empty_params(modeling_type)
r_summary = None
return render_template(
'pages/workspaces/modeling/_chart.html',
workspace=workspace,
chart=chart,
modeling_record=modeling_record,
model_params=model_params,
r_summary=r_summary,
units=units,
log_transform=log_transform,
)
[docs]
def workspaces_modeling_submit_action(orcidId, name="default"):
# Invoked in order to check permissions:
_workspace = _find_workspace(orcidId, name, public=False)
args = request.form.to_dict()
modeling_type = args.pop('modelingType')
workspace_entry_id = int(args.pop('selectedEntry').removeprefix('workspaceEntry|'))
modeling_result = g.db_session.scalars(
sql.select(ModelingResult)
.where(
ModelingResult.type == modeling_type,
ModelingResult.workspaceEntryId == workspace_entry_id,
)
).one_or_none()
if modeling_result is None:
modeling_result = ModelingResult(
type=modeling_type,
workspaceEntryId=workspace_entry_id,
)
modeling_result.state = 'pending'
g.db_session.add(modeling_result)
g.db_session.commit()
process_modeling_request.delay(
modeling_result.id,
target_type='WorkspaceEntry',
target_id=workspace_entry_id,
args=args,
)
return {'modelingResultId': modeling_result.id}
[docs]
def workspaces_modeling_check_json(orcidId, name):
workspace = _find_workspace(orcidId, name, public=False)
result_states = {}
for modeling_result in workspace.modelingResults:
result_states[modeling_result.id] = modeling_result.state
return result_states
def _find_workspace(orcidId, name, public=True):
workspace = g.db_session.scalars(
sql.select(Workspace)
.join(User)
.where(User.orcidId == orcidId)
.where(Workspace.name == name)
.limit(1)
).one()
if public:
if g.current_user != workspace.user and not workspace.isPublished:
raise Forbidden
else:
if g.current_user != workspace.user:
raise Forbidden
return workspace
def _process_upload(file):
errors = []
try:
df = pd.read_csv(file)
except RuntimeError:
errors.append(f"Could not process file {file.filename}")
return None, errors
column_count = len(df.columns)
if column_count < 2:
errors.append(f"At least 2 columns are expected, {column_count} were found")
row_count = df.shape[0]
if row_count <= 0:
errors.append("No data rows were found")
return df, errors
def _extract_entry_metadata():
subject_type = request.form.get('subjectType')
if subject_type in ('community', 'strain'):
units = request.form.get('growthUnits')
elif subject_type == 'metabolite':
units = request.form.get('metaboliteUnits')
else:
units = None
metadata = {
'dataType': request.form.get('dataType'),
'subjectType': subject_type,
'units': units,
}
if label := request.form.get('label'):
metadata['label'] = label
return metadata