Source code for app.view.forms.submission_form
import json
import copy
import itertools
from uuid import uuid4
import sqlalchemy as sql
from sqlalchemy.orm.attributes import flag_modified
from app.model.orm import (
Taxon,
Metabolite,
Project,
Study,
Submission,
SubmissionBackup
)
[docs]
DEFAULT_STUDY_DESIGN = {
'project': {
'name': None,
'description': None,
},
'study': {
'name': None,
'description': None,
'url': None,
'licenseUrl': None,
'authors': [],
'authorCache': None,
'embargoExpiresAt': None,
},
'timeUnits': 'h',
'strains': [],
'custom_strains': [],
'techniques': [],
'compartments': [],
'communities': [],
'experiments': [],
}
"""
The structure of a Submission's `studyDesign` field. Any parameters given to
the form will be merged with this as a default. Changing the structure here
will allow stored submissions to be reused and made compatible with the new
structure.
"""
[docs]
class SubmissionForm:
@classmethod
[docs]
def create(Self, db_session, user_uuid, study_uuid=None):
form = Self(
db_session=db_session,
user_uuid=user_uuid,
study_uuid=study_uuid,
)
form.init_from_existing_study()
form.save()
return form
@classmethod
[docs]
def load(Self, db_session, submission_id, step=0):
return Self(
db_session=db_session,
submission_id=submission_id,
step=step,
)
def __init__(self, db_session=None, submission_id=None, step=0, user_uuid=None, study_uuid=None):
self._default_study_design = copy.deepcopy(DEFAULT_STUDY_DESIGN)
if submission_id is not None:
# Find existing submission:
self.submission = self.db_session.get_one(Submission, submission_id)
self.submission.studyDesign = {
**self._default_study_design,
**self.submission.studyDesign,
}
else:
# Initialize a brand new submission:
self.submission = Submission(
projectUniqueID=None,
studyUniqueID=(study_uuid if study_uuid != '_new' else str(uuid4())),
userUniqueID=user_uuid,
studyDesign=self._default_study_design,
)
# Check for an existing project/study and set the submission "type" accordingly:
[docs]
def init_from_existing_study(self):
if self.study_id is None:
return
if study := self.db_session.get(Study, self.study_id):
self.submission.projectUniqueID = study.project.uuid
self.project_id = self._find_project_id()
# Reuse its last published design:
if previous_submission := study.lastSubmission:
self.submission.studyDesign = {
**self._default_study_design,
**previous_submission.studyDesign,
}
self.submission.dataFileId = previous_submission.dataFileId
@property
[docs]
def show_embargo_date_input(self):
"Embargo date input is shown if the study is not published yet"
if not self.submission.study:
return True
if not self.submission.study.isPublished:
return True
return False
@property
[docs]
def show_reuse_study_input(self):
"Input for reusing a study design is shown for new studies"
return not self.study_id
[docs]
def update_study_info(self, data):
# Update IDs:
if data['project_uuid'] == '_new':
self.submission.projectUniqueID = str(uuid4())
else:
self.submission.projectUniqueID = data['project_uuid']
# If study to reuse has been given, copy its last submission's study
# design:
if data.get('reuse_study_uuid', '') != '':
previous_submission = self.db_session.scalars(
sql.select(Submission)
.where(Submission.studyUniqueID == data['reuse_study_uuid'])
.order_by(Submission.updatedAt.desc())
.limit(1)
).one_or_none()
if previous_submission:
self.submission.studyDesign = previous_submission.studyDesign
# Clear out experiment ids:
for experiment_data in self.submission.studyDesign.get('experiments', []):
experiment_data['publicId'] = None
# Update text fields:
self.submission.studyDesign['project'] = {
'name': data['project_name'],
'description': data.get('project_description', ''),
}
self.submission.studyDesign['study'] = {
'name': data['study_name'],
'description': data.get('study_description', ''),
'url': data.get('study_url', ''),
'licenseUrl': data.get('license_url', ''),
'authors': json.loads(data.get('authors', '[]')),
'authorCache': data.get('authorCache', ''),
'embargoExpiresAt': data.get('embargo_expires_at', None),
}
# Validate uniqueness:
self._validate_unique_project_names()
# Check whether project exists:
self.project_id = self._find_project_id()
self.study_id = self._find_study_id()
[docs]
def update_strains(self, data):
# Existing strains
self.submission.studyDesign['strains'] = data['strains']
# Add parent species name to custom strain data:
for strain in data['custom_strains']:
if 'species_name' in strain:
continue
strain['species_name'] = self.db_session.scalars(
sql.select(Taxon.name)
.where(Taxon.ncbiId == strain['species'])
.limit(1)
).one_or_none()
# Save new strains
self.submission.studyDesign['custom_strains'] = data['custom_strains']
# Clean up strain names:
for strain_data in self.submission.studyDesign['custom_strains']:
strain_data['name'] = strain_data['name'].strip()
[docs]
def update_techniques(self, data):
for i in range(len(data['techniques'])):
technique_data = data['techniques'][i]
cell_types = []
if technique_data.get('includeLive', False):
cell_types.append('live')
if technique_data.get('includeDead', False):
cell_types.append('dead')
if technique_data.get('includeTotal', False):
cell_types.append('total')
technique_data['cellTypes'] = cell_types
self.update_study_design(data)
[docs]
def update_study_design(self, data):
study_design = {**self.submission.studyDesign, **data}
if 'csrf_token' in study_design:
del study_design['csrf_token']
self.submission.studyDesign = study_design
[docs]
def fetch_taxa(self):
strains = self.submission.studyDesign['strains']
return self.db_session.scalars(
sql.select(Taxon)
.where(Taxon.ncbiId.in_(strains))
).all()
[docs]
def fetch_metabolites_for_technique(self, technique_index=None):
if technique_index is None:
# In a new form, we don't have any metabolites to list
return []
techniques = self.submission.studyDesign['techniques']
metabolites = techniques[technique_index]['metaboliteIds']
return self.db_session.scalars(
sql.select(Metabolite)
.where(Metabolite.chebiId.in_(metabolites))
).all()
[docs]
def fetch_all_metabolites(self):
ids = [
m_id
for t in self.submission.studyDesign['techniques']
for m_id in t['metaboliteIds']
]
return self.db_session.scalars(
sql.select(Metabolite)
.where(Metabolite.chebiId.in_(ids))
).all()
[docs]
def save(self):
# When the `submission.studyDesign` is modified, we need to use
# `flag_modified` to tell the ORM to persist the field. We always do
# this before saving, because almost all updates to the submission are
# updates to the study design.
#
flag_modified(self.submission, 'studyDesign')
self.db_session.add(self.submission)
self.db_session.commit()
return self.submission.id
[docs]
def save_backup(self, study_id, project_id):
self.db_session.add(SubmissionBackup(
projectId=project_id,
studyId=study_id,
userUuid=self.submission.userUniqueID,
studyDesign=self.submission.studyDesign,
dataFileId=self.submission.dataFileId,
))
self.db_session.commit()
@property
[docs]
def error_messages(self):
# Flatten messages per property:
return list(itertools.chain.from_iterable(self.errors.values()))
[docs]
def technique_descriptions(self):
ordering = ('bioreplicate', 'strain', 'metabolite')
study_techniques = self.submission.build_techniques()
sorted_techniques = sorted(study_techniques, key=lambda t: ordering.index(t.subjectType))
for (subject_type, grouped_techniques) in itertools.groupby(sorted_techniques, lambda t: t.subjectType):
match subject_type:
case 'bioreplicate': type = 'Community-level'
case 'strain': type = 'Strain-level'
case 'metabolite': type = 'Metabolite'
yield (type, list(grouped_techniques))
[docs]
def html_step_classes(self, target_step):
if self.step < target_step:
return 'disabled'
elif self.step == target_step:
return 'active'
else:
return ''
[docs]
def has_valid_project_data(self):
if self.submission.studyDesign['project']['name'] is None:
return False
return self._validate_unique_project_names()
[docs]
def has_valid_study_data(self):
return self.submission.studyDesign['study']['name'] is not None
def _find_project_id(self):
if self.submission.projectUniqueID is None:
return None
return self.db_session.scalars(
sql.select(Project.publicId)
.where(Project.uuid == self.submission.projectUniqueID)
).one_or_none()
def _find_study_id(self):
if self.submission.studyUniqueID is None:
return None
return self.db_session.scalars(
sql.select(Study.publicId)
.where(Study.uuid == self.submission.studyUniqueID)
).one_or_none()
def _validate_unique_project_names(self):
self.errors = {}
project_name = self.submission.studyDesign['project']['name']
if len(project_name) > 0:
project_exists = self.db_session.query(
sql.exists()
.where(
Project.name == project_name,
Project.uuid != self.submission.projectUniqueID
)
).scalar()
if project_exists:
self.errors['project_name'] = ["Project name is taken"]
return len(self.errors) == 0