Source code for studies

import io

from flask import (
    g,
    render_template,
    send_file,
    request,
)
from werkzeug.exceptions import Forbidden
import sqlalchemy as sql

from app.model.orm import (
    Bioreplicate,
    Experiment,
    MeasurementContext,
    ModelingRequest,
    ModelingResult,
    Study,
)
from app.view.forms.experiment_export_form import ExperimentExportForm
from app.view.forms.comparative_chart_form import ComparativeChartForm
from app.model.lib.chart import Chart
from app.model.lib.modeling_tasks import process_modeling_request
from app.model.lib.model_export import export_model_csv
from app.model.lib.log_transform import apply_log_transform
import app.model.lib.util as util


[docs] def study_show_page(studyId): study = _fetch_study( studyId, check_user_visibility=False, sql_options=( sql.orm.selectinload( Study.experiments, Experiment.bioreplicates, Bioreplicate.measurementContexts, MeasurementContext.technique, ), sql.orm.selectinload( Study.experiments, Experiment.bioreplicates, Bioreplicate.measurementContexts, MeasurementContext.measurements, ), ) ) if study.visible_to_user(g.current_user): return render_template("pages/studies/show.html", study=study) else: return render_template("pages/studies/show_unpublished.html", study=study)
[docs] def study_manage_page(studyId): study = _fetch_study(studyId) if not study.manageable_by_user(g.current_user): raise Forbidden() return render_template("pages/studies/manage.html", study=study)
[docs] def study_export_page(studyId): study = _fetch_study(studyId) return render_template( "pages/studies/export.html", study=study, studyId=studyId, )
[docs] def study_export_preview_fragment(studyId): # We only need the id here, but we call it to apply visibility checks: _fetch_study(studyId) csv_previews = [] export_form = ExperimentExportForm(g.db_session, request.args) experiment_data = export_form.get_experiment_data() for experiment, experiment_df in experiment_data.items(): csv = experiment_df[:5].to_csv(index=False, sep=export_form.csv_separator) csv_previews.append(f""" <h3>{experiment.name}.csv ({len(experiment_df)} rows)</h3> <pre>{csv}</pre> """) return '\n'.join(csv_previews)
[docs] def study_download_data_zip(studyId): study = _fetch_study(studyId) csv_data = [] export_form = ExperimentExportForm(g.db_session, request.args) experiment_data = export_form.get_experiment_data() for experiment, experiment_df in experiment_data.items(): csv_bytes = experiment_df.to_csv(index=False, sep=export_form.csv_separator) csv_name = f"{experiment.name}.csv" csv_data.append((csv_name, csv_bytes)) readme_text = render_template( 'pages/studies/export_readme.md', study=study, experiments=experiment_data.keys(), ) csv_data.append(('README.md', readme_text.encode('utf-8'))) zip_file = util.createzip(csv_data) return send_file( zip_file, as_attachment=True, download_name=f"{studyId}.zip", )
[docs] def study_download_models_csv(studyId): study = _fetch_study(studyId) csv_data = export_model_csv(g.db_session, study) return send_file( io.BytesIO(csv_data), as_attachment=True, download_name=f"{studyId}_models.csv", )
[docs] def study_visualize_page(studyId): study = _fetch_study(studyId) left_axis_ids = [int(s) for s in request.args.get('l', '').split(',') if s != ''] right_axis_ids = [int(s) for s in request.args.get('r', '').split(',') if s != ''] chart_form = ComparativeChartForm( g.db_session, time_units=study.timeUnits, left_axis_ids=left_axis_ids, right_axis_ids=right_axis_ids, ) return render_template( "pages/studies/visualize.html", study=study, chart_form=chart_form, )
[docs] def study_chart_fragment(studyId): study = _fetch_study(studyId) args = request.form.to_dict() width = request.args.get('width', None) chart_form = ComparativeChartForm(g.db_session, time_units=study.timeUnits) chart = chart_form.build_chart(args, width) return render_template( 'pages/studies/visualize/_chart.html', chart_form=chart_form, chart=chart, study=study, )
[docs] def study_modeling_submit_action(studyId): study = _fetch_study(studyId) args = request.form.to_dict() modeling_type = args.pop('modelingType') measurement_context_id = int(args.pop('selectedContext').removeprefix('measurementContext|')) modeling_request = g.db_session.scalars( sql.select(ModelingRequest) .where( ModelingRequest.type == modeling_type, ModelingRequest.studyId == study.publicId, ) ).one_or_none() if modeling_request is None: modeling_request = ModelingRequest( type=modeling_type, study=study, ) g.db_session.add(modeling_request) g.db_session.commit() result = process_modeling_request.delay(modeling_request.id, [measurement_context_id], args) modeling_request.jobUuid = result.task_id g.db_session.commit() return {'modelingRequestId': modeling_request.id}
[docs] def study_modeling_check_json(studyId): study = _fetch_study(studyId) # TODO (2025-05-20) Return counts of pending requests? ready = all([mr.state in ('ready', 'error') for mr in study.modelingRequests]) successful = all([mr.state != 'error' for mr in study.modelingRequests]) return { "ready": ready, "successful": successful, }
[docs] def study_modeling_chart_fragment(studyId, measurementContextId): study = _fetch_study(studyId) args = request.args.to_dict() # TODO (2025-06-12) Unused? # width = args.pop('width') # height = args.pop('height') modeling_type = args.pop('modelingType') log_transform = args.pop('logTransform', 'false') == 'true' measurement_context = g.db_session.get(MeasurementContext, measurementContextId) measurement_df = measurement_context.get_df(g.db_session) chart = Chart( time_units=study.timeUnits, title=measurement_context.get_chart_label(g.db_session), legend_position='right', log_left=log_transform, ) units = measurement_context.technique.units if units == '': units = measurement_context.technique.short_name if log_transform: apply_log_transform(measurement_df) chart.add_df( measurement_df, units=units, label="Measurements", ) modeling_result = g.db_session.scalars( sql.select(ModelingResult) .join(ModelingRequest) .where( ModelingRequest.type == modeling_type, ModelingResult.measurementContextId == measurement_context.id, ModelingResult.state == 'ready', ) ).one_or_none() if modeling_result: df = modeling_result.generate_chart_df(measurement_df) if log_transform: apply_log_transform(df) label = modeling_result.model_name chart.add_model_df(df, units=units, label=label) model_inputs = modeling_result.inputs model_coefficients = modeling_result.coefficients model_fit = modeling_result.fit r_summary = modeling_result.rSummary else: model_inputs = ModelingResult.empty_inputs(modeling_type) model_coefficients = ModelingResult.empty_coefficients(modeling_type) model_fit = ModelingResult.empty_fit() r_summary = None return render_template( 'pages/studies/manage/_modeling_chart.html', chart=chart, form_data=request.form, model_type=modeling_type, model_inputs=model_inputs, model_coefficients=model_coefficients, model_fit=model_fit, r_summary=r_summary, measurement_context=measurement_context, log_transform=log_transform, )
[docs] def _fetch_study(studyId, check_user_visibility=True, sql_options=None): sql_options = sql_options or () study = g.db_session.scalars( sql.select(Study) .where(Study.studyId == studyId) .options(*sql_options) .limit(1) ).one() if check_user_visibility and not study.visible_to_user(g.current_user): raise Forbidden() return study