import re
import json
from flask import (
g,
request,
url_for,
)
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
import sqlalchemy as sql
from app.model.lib.conversion import (
convert_df_units,
CELL_COUNT_UNITS,
CFU_COUNT_UNITS,
METABOLITE_UNITS,
)
from app.model.orm import (
Bioreplicate,
Experiment,
Measurement,
MeasurementContext,
Metabolite,
ModelingResult,
Project,
Study,
StudyStrain,
User,
Workspace,
WorkspaceEntry,
)
from app.model.lib.errors import ClientError
[docs]
def project_json(publicId):
project = g.db_session.get_one(Project, publicId)
return {
'id': project.publicId,
'name': project.name,
'description': project.description,
'studies': [
{'id': s.publicId, 'name': s.name}
for s in project.studies
]
}
[docs]
def study_json(publicId):
study = g.db_session.get_one(Study, publicId)
data = {
'id': study.publicId,
'name': study.name,
'projectId': study.project.publicId,
}
if study.isPublished:
data.update({
'description': study.description,
'url': study.url,
'uploadedAt': study.createdAt.isoformat(),
'publishedAt': study.publishedAt.isoformat(),
'experiments': [
{'id': e.publicId, 'name': e.name}
for e in study.experiments
]
})
return data
[docs]
def experiment_json(publicId):
experiment = g.db_session.get_one(Experiment, publicId)
if not experiment.study.isPublished:
raise NotFound
if experiment.community:
community_strains = experiment.community.strains
else:
community_strains = []
return {
'id': experiment.publicId,
'name': experiment.name,
'description': experiment.description,
'studyId': experiment.study.publicId,
'cultivationMode': experiment.cultivationMode,
'communityStrains': [
{
'id': s.id,
'NCBId': s.ncbiId,
'custom': not s.defined,
'name': s.name,
} for s in community_strains
],
'compartments': [
{
'name': c.name,
'volume': c.volume,
'pressure': c.pressure,
'stirringSpeed': c.stirringSpeed,
'stirringMode': c.stirringMode,
'O2': c.O2,
'CO2': c.CO2,
'H2': c.H2,
'N2': c.N2,
'inoculumConcentration': c.inoculumConcentration,
'inoculumVolume': c.inoculumVolume,
'initialPh': c.initialPh,
'dilutionRate': c.dilutionRate,
'initialTemperature': c.initialTemperature,
'mediumName': c.mediumName,
'mediumUrl': c.mediumUrl,
}
for c in experiment.compartments
],
'bioreplicates': [
{
'id': b.id,
'name': b.name,
'biosampleUrl': b.biosampleUrl,
'isAverage': b.calculationType == 'average',
'measurementContexts': [
{
'id': mc.id,
**_measurement_technique_fields(mc),
**_measurement_subject_fields(mc)
}
for mc in b.measurementContexts
]
}
for b in experiment.bioreplicates
]
}
[docs]
def experiment_csv(publicId):
experiment = g.db_session.get(Experiment, publicId)
if not experiment or not experiment.study.isPublished:
raise NotFound
df = experiment.get_df(g.db_session)
return df.to_csv(index=False)
[docs]
def measurement_context_json(id):
measurement_context = g.db_session.get(MeasurementContext, id)
if not measurement_context or not measurement_context.study.isPublished:
raise NotFound
measurement_count = g.db_session.scalars(
sql.select(sql.func.count(Measurement.id))
.where(Measurement.contextId == measurement_context.id)
).one()
return {
'id': measurement_context.id,
'experimentId': measurement_context.bioreplicate.experimentId,
'studyId': measurement_context.studyId,
'bioreplicateId': measurement_context.bioreplicate.id,
'bioreplicateName': measurement_context.bioreplicate.name,
'measurementCount': measurement_count,
'measurementTimeUnits': 'h',
'modelPredictionIds': [mr.id for mr in measurement_context.modelingResults],
**_measurement_technique_fields(measurement_context),
**_measurement_subject_fields(measurement_context),
}
[docs]
def measurement_context_csv(id):
measurement_context = g.db_session.get(MeasurementContext, id)
if not measurement_context or not measurement_context.study.isPublished:
raise NotFound
df = measurement_context.get_df(g.db_session)
source_units = measurement_context.technique.units
metabolite_mass = _get_metabolite_mass(measurement_context)
_convert_to_requested_units(df, source_units, metabolite_mass)
if request.args.get('withLabel'):
html_label = measurement_context.get_chart_label()
plain_label = re.sub(r'</?(b|sub)>', '', html_label)
df.rename(columns={'value': plain_label}, inplace=True)
return df.to_csv(index=False)
[docs]
def model_prediction_json(id):
modeling_result = g.db_session.get(ModelingResult, id)
if not modeling_result or not modeling_result.study.isPublished:
raise NotFound
return {
'id': modeling_result.id,
'measurementContextId': modeling_result.measurementContextId,
'studyId': modeling_result.study.publicId,
'type': modeling_result.type,
'params': modeling_result.params,
'calculatedAt': modeling_result.calculatedAt,
}
[docs]
def model_prediction_csv(id):
modeling_result = g.db_session.get(ModelingResult, id)
if not modeling_result:
raise NotFound
current_user = _get_current_user()
if not modeling_result.visible_to_user(current_user):
raise NotFound
if modeling_result.measurementContext:
measurements_df = modeling_result.measurementContext.get_df(g.db_session)
else:
measurements_df = modeling_result.workspaceEntry.get_df()
df = modeling_result.generate_chart_df(measurements_df)
return df.to_csv(index=False)
[docs]
def bioreplicate_json(id):
bioreplicate = g.db_session.get(Bioreplicate, id)
if not bioreplicate or not bioreplicate.study.isPublished:
raise NotFound
return {
'id': bioreplicate.id,
'experimentId': bioreplicate.experiment.publicId,
'studyId': bioreplicate.experiment.studyId,
'name': bioreplicate.name,
'biosampleUrl': bioreplicate.biosampleUrl,
'isAverage': bioreplicate.calculationType == 'average',
'measurementTimeUnits': 'h',
'measurementContexts': [
{
'id': mc.id,
**_measurement_technique_fields(mc),
**_measurement_subject_fields(mc),
}
for mc in bioreplicate.measurementContexts
]
}
[docs]
def bioreplicate_csv(id):
bioreplicate = g.db_session.get(Bioreplicate, id)
if not bioreplicate or not bioreplicate.study.isPublished:
raise NotFound
df = bioreplicate.get_df(g.db_session)
measurement_contexts = g.db_session.scalars(
sql.select(MeasurementContext)
.where(MeasurementContext.id.in_([mc.id for mc in bioreplicate.measurementContexts]))
.options(sql.orm.selectinload(MeasurementContext.technique))
)
# Convert units for each individual measurement context:
for mc in measurement_contexts:
mc_df = df[df['measurementContextId'] == mc.id].copy()
_convert_to_requested_units(mc_df, mc.technique.units, _get_metabolite_mass(mc))
df.loc[df['measurementContextId'] == mc.id] = mc_df
return df.to_csv(index=False)
[docs]
def search_json():
request_args = request.args.to_dict()
if len(request_args) == 0:
return {"error": "No search query parameters"}, 400
results = set()
for (key, value) in request_args.items():
if key == 'strainNcbiIds':
values = value.split(',')
study_strain_ids = g.db_session.scalars(
sql.select(StudyStrain.id)
.where(StudyStrain.ncbiId.in_(values)),
).all()
results.update(_contexts_by_subject('strain', study_strain_ids))
elif key == 'metaboliteChebiIds':
values = [f"CHEBI:{v}" for v in value.split(',')]
metabolite_ids = g.db_session.scalars(
sql.select(Metabolite.id)
.where(Metabolite.chebiId.in_(values)),
).all()
results.update(_contexts_by_subject('metabolite', metabolite_ids))
else:
return {"error": f"Unknown search parameter: {key}"}, 400
measurement_contexts = list(results)
experiment_ids = sorted({mc.experiment.publicId for mc in measurement_contexts})
study_ids = sorted({mc.experiment.studyId for mc in measurement_contexts})
return {
'studies': study_ids,
'experiments': experiment_ids,
'measurementTimeUnits': 'h',
'measurementContexts': [
{
'id': mc.id,
'experimentId': mc.experiment.publicId,
'studyId': mc.studyId,
'bioreplicateId': mc.bioreplicate.id,
'bioreplicateName': mc.bioreplicate.name,
**_measurement_technique_fields(mc),
**_measurement_subject_fields(mc),
}
for mc in measurement_contexts
]
}
[docs]
def workspace_json(orcidId, name="default"):
current_user = _get_current_user()
workspace = _get_workspace(orcidId, name, current_user)
return {
"name": workspace.name,
"entries": [{
"id": entry.id,
"label": entry.label,
} for entry in workspace.entries],
}
[docs]
def workspace_update_json(orcidId, name="default"):
request_json = json.loads(request.data)
if 'apiKey' not in request_json:
raise Forbidden
if 'entries' not in request_json:
raise BadRequest
current_user = _get_current_user(request_json['apiKey'])
workspace = _get_workspace(orcidId, name, current_user)
# Clear out existing "api" entries:
for entry in workspace.entries:
if entry.sourceType == 'api':
g.db_session.delete(entry)
# Recreate "api" entries
for entry in request_json['entries']:
workspace_entry = WorkspaceEntry(
workspace=workspace,
sourceType="api",
label=entry['label'],
data=entry['data'],
)
g.db_session.add(workspace_entry)
g.db_session.commit()
workspace_url = url_for('workspaces_index_page', orcidId=workspace.user.orcidId, name=workspace.name)
return {
'workspaceUrl': workspace_url,
'workspaceEntryId': workspace_entry.id,
}
[docs]
def workspace_entry_json(id):
workspace_entry = g.db_session.get(WorkspaceEntry, id)
current_user = _get_current_user()
if not workspace_entry:
raise NotFound
if not workspace_entry.workspace.isPublished and workspace_entry.user != current_user:
raise NotFound
return {
"id": workspace_entry.id,
"label": workspace_entry.label,
"units": workspace_entry.units,
"sourceType": workspace_entry.sourceType,
"dataType": workspace_entry.dataType,
"subjectType": workspace_entry.subjectType,
"subjectId": workspace_entry.subjectId,
}
[docs]
def workspace_entry_csv(id):
workspace_entry = g.db_session.get(WorkspaceEntry, id)
current_user = _get_current_user()
if not workspace_entry:
raise NotFound
if not workspace_entry.workspace.isPublished and workspace_entry.user != current_user:
raise NotFound
df = workspace_entry.get_df()
source_units = workspace_entry.units
# TODO (2026-05-25) Allow assigning subjects to workspace entries
# metabolite_mass = _get_metabolite_mass(workspace_entry)
_convert_to_requested_units(df, source_units)
if request.args.get('withLabel'):
df.rename(columns={
'value': workspace_entry.label,
'error': workspace_entry.label + ' error',
}, inplace=True)
return df.to_csv(index=False)
def _get_current_user(api_key=None):
if api_key is None:
api_key = request.args.get('apiKey')
if api_key is None:
return g.current_user
user = g.db_session.scalars(
sql.select(User)
.where(User.apiKey == api_key)
.limit(1)
).one_or_none()
if user is None:
raise ClientError("Given API key did not correspond to an active user")
return user
def _get_workspace(orcidId, name, current_user):
workspace = g.db_session.scalars(
sql.select(Workspace)
.join(User)
.where(
User.orcidId == orcidId,
Workspace.userId == User.id,
Workspace.name == name,
)
.limit(1)
).one()
if not workspace.isPublished and workspace.user != current_user:
raise Forbidden
return workspace
def _measurement_technique_fields(measurement_context):
measurement_technique = measurement_context.technique
metabolite_mass = _get_metabolite_mass(measurement_context)
requested_units = _convert_unit_label_to_requested(measurement_technique.units, metabolite_mass)
fields = {
'techniqueType': measurement_technique.type,
'techniqueOriginalUnits': measurement_technique.units,
'techniqueUnits': requested_units,
}
if cell_type := measurement_technique.cellType:
fields['techniqueCellType'] = cell_type
return fields
def _measurement_subject_fields(measurement_context):
subject_type = measurement_context.subjectType
subject_name = measurement_context.subjectName
extra_data = {}
if measurement_context.subjectExternalId:
if subject_type == 'strain':
external_id = int(measurement_context.subjectExternalId.removeprefix('NCBI:'))
extra_data = {'NCBId': external_id}
elif subject_type == 'metabolite':
external_id = int(measurement_context.subjectExternalId.removeprefix('CHEBI:'))
extra_data = {'chebiId': external_id}
return {
'subject': {
'type': subject_type,
'name': subject_name,
**extra_data,
}
}
def _contexts_by_subject(subject_type, subject_id):
sql_options = (
sql.orm.joinedload(MeasurementContext.technique),
sql.orm.joinedload(MeasurementContext.experiment),
)
if isinstance(subject_id, list):
return g.db_session.scalars(
sql.select(MeasurementContext)
.where(
MeasurementContext.subjectType == subject_type,
MeasurementContext.subjectId.in_(subject_id),
)
.options(*sql_options)
).all()
else:
return g.db_session.scalars(
sql.select(MeasurementContext)
.where(
MeasurementContext.subjectType == subject_type,
MeasurementContext.subjectId == subject_id,
)
.options(*sql_options)
).all()
def _get_metabolite_mass(measurement_context):
if measurement_context.subjectType != 'metabolite':
return None
return measurement_context.get_subject(g.db_session).averageMass
def _convert_unit_label_to_requested(source_units, metabolite_mass=None):
if source_units in CELL_COUNT_UNITS:
return request.args.get('cellCountUnits', 'Cells/mL')
elif source_units in CFU_COUNT_UNITS:
return request.args.get('cfuCountUnits', 'CFUs/mL')
elif source_units in METABOLITE_UNITS and metabolite_mass:
return request.args.get('metaboliteUnits', 'mM')
else:
return source_units
def _convert_to_requested_units(df, source_units, metabolite_mass=None):
if source_units in CELL_COUNT_UNITS:
target_units = request.args.get('cellCountUnits', 'Cells/mL')
if target_units not in CELL_COUNT_UNITS:
raise ClientError(f"Unexpected cell count units requested: {target_units}")
convert_df_units(df, source_units, target_units)
elif source_units in CFU_COUNT_UNITS:
target_units = request.args.get('cfuCountUnits', 'CFUs/mL')
if target_units not in CFU_COUNT_UNITS:
raise ClientError(f"Unexpected CFU count units requested: {target_units}")
convert_df_units(df, source_units, target_units)
elif source_units in METABOLITE_UNITS and metabolite_mass:
target_units = request.args.get('metaboliteUnits', 'mM')
if target_units not in METABOLITE_UNITS:
raise ClientError(f"Unexpected metabolite count units requested: {target_units}")
convert_df_units(df, source_units, target_units, metabolite_mass)