Source code for app.view.forms.submission_form

import json
import copy
import itertools
from uuid import uuid4

import sqlalchemy as sql
from sqlalchemy.orm.attributes import flag_modified

from app.model.orm import (
    Taxon,
    Metabolite,
    Project,
    Study,
    Submission,
    SubmissionBackup
)

[docs] DEFAULT_STUDY_DESIGN = { 'project': { 'name': None, 'description': None, }, 'study': { 'name': None, 'description': None, 'url': None, 'licenseUrl': None, 'authors': [], 'authorCache': None, 'embargoExpiresAt': None, }, 'timeUnits': 'h', 'strains': [], 'custom_strains': [], 'techniques': [], 'compartments': [], 'communities': [], 'experiments': [], }
""" The structure of a Submission's `studyDesign` field. Any parameters given to the form will be merged with this as a default. Changing the structure here will allow stored submissions to be reused and made compatible with the new structure. """
[docs] class SubmissionForm: @classmethod
[docs] def create(Self, db_session, user_uuid, study_uuid=None): form = Self( db_session=db_session, user_uuid=user_uuid, study_uuid=study_uuid, ) form.init_from_existing_study() form.save() return form
@classmethod
[docs] def load(Self, db_session, submission_id, step=0): return Self( db_session=db_session, submission_id=submission_id, step=step, )
def __init__(self, db_session=None, submission_id=None, step=0, user_uuid=None, study_uuid=None):
[docs] self.step = step
[docs] self.db_session = db_session
[docs] self.errors = []
self._default_study_design = copy.deepcopy(DEFAULT_STUDY_DESIGN) if submission_id is not None: # Find existing submission: self.submission = self.db_session.get_one(Submission, submission_id) self.submission.studyDesign = { **self._default_study_design, **self.submission.studyDesign, } else: # Initialize a brand new submission: self.submission = Submission( projectUniqueID=None, studyUniqueID=(study_uuid if study_uuid != '_new' else str(uuid4())), userUniqueID=user_uuid, studyDesign=self._default_study_design, ) # Check for an existing project/study and set the submission "type" accordingly:
[docs] self.project_id = self._find_project_id()
[docs] self.study_id = self._find_study_id()
[docs] def init_from_existing_study(self): if self.study_id is None: return if study := self.db_session.get(Study, self.study_id): self.submission.projectUniqueID = study.project.uuid self.project_id = self._find_project_id() # Reuse its last published design: if previous_submission := study.lastSubmission: self.submission.studyDesign = { **self._default_study_design, **previous_submission.studyDesign, } self.submission.dataFileId = previous_submission.dataFileId
@property
[docs] def show_embargo_date_input(self): "Embargo date input is shown if the study is not published yet" if not self.submission.study: return True if not self.submission.study.isPublished: return True return False
@property
[docs] def show_reuse_study_input(self): "Input for reusing a study design is shown for new studies" return not self.study_id
[docs] def update_study_info(self, data): # Update IDs: if data['project_uuid'] == '_new': self.submission.projectUniqueID = str(uuid4()) else: self.submission.projectUniqueID = data['project_uuid'] # If study to reuse has been given, copy its last submission's study # design: if data.get('reuse_study_uuid', '') != '': previous_submission = self.db_session.scalars( sql.select(Submission) .where(Submission.studyUniqueID == data['reuse_study_uuid']) .order_by(Submission.updatedAt.desc()) .limit(1) ).one_or_none() if previous_submission: self.submission.studyDesign = previous_submission.studyDesign # Clear out experiment ids: for experiment_data in self.submission.studyDesign.get('experiments', []): experiment_data['publicId'] = None # Update text fields: self.submission.studyDesign['project'] = { 'name': data['project_name'], 'description': data.get('project_description', ''), } self.submission.studyDesign['study'] = { 'name': data['study_name'], 'description': data.get('study_description', ''), 'url': data.get('study_url', ''), 'licenseUrl': data.get('license_url', ''), 'authors': json.loads(data.get('authors', '[]')), 'authorCache': data.get('authorCache', ''), 'embargoExpiresAt': data.get('embargo_expires_at', None), } # Validate uniqueness: self._validate_unique_project_names() # Check whether project exists: self.project_id = self._find_project_id() self.study_id = self._find_study_id()
[docs] def update_strains(self, data): # Existing strains self.submission.studyDesign['strains'] = data['strains'] # Add parent species name to custom strain data: for strain in data['custom_strains']: if 'species_name' in strain: continue strain['species_name'] = self.db_session.scalars( sql.select(Taxon.name) .where(Taxon.ncbiId == strain['species']) .limit(1) ).one_or_none() # Save new strains self.submission.studyDesign['custom_strains'] = data['custom_strains'] # Clean up strain names: for strain_data in self.submission.studyDesign['custom_strains']: strain_data['name'] = strain_data['name'].strip()
[docs] def update_techniques(self, data): for i in range(len(data['techniques'])): technique_data = data['techniques'][i] cell_types = [] if technique_data.get('includeLive', False): cell_types.append('live') if technique_data.get('includeDead', False): cell_types.append('dead') if technique_data.get('includeTotal', False): cell_types.append('total') technique_data['cellTypes'] = cell_types self.update_study_design(data)
[docs] def update_study_design(self, data): study_design = {**self.submission.studyDesign, **data} if 'csrf_token' in study_design: del study_design['csrf_token'] self.submission.studyDesign = study_design
[docs] def fetch_taxa(self): strains = self.submission.studyDesign['strains'] return self.db_session.scalars( sql.select(Taxon) .where(Taxon.ncbiId.in_(strains)) ).all()
[docs] def fetch_metabolites_for_technique(self, technique_index=None): if technique_index is None: # In a new form, we don't have any metabolites to list return [] techniques = self.submission.studyDesign['techniques'] metabolites = techniques[technique_index]['metaboliteIds'] return self.db_session.scalars( sql.select(Metabolite) .where(Metabolite.chebiId.in_(metabolites)) ).all()
[docs] def fetch_all_metabolites(self): ids = [ m_id for t in self.submission.studyDesign['techniques'] for m_id in t['metaboliteIds'] ] return self.db_session.scalars( sql.select(Metabolite) .where(Metabolite.chebiId.in_(ids)) ).all()
[docs] def save(self): # When the `submission.studyDesign` is modified, we need to use # `flag_modified` to tell the ORM to persist the field. We always do # this before saving, because almost all updates to the submission are # updates to the study design. # flag_modified(self.submission, 'studyDesign') self.db_session.add(self.submission) self.db_session.commit() return self.submission.id
[docs] def save_backup(self, study_id, project_id): self.db_session.add(SubmissionBackup( projectId=project_id, studyId=study_id, userUuid=self.submission.userUniqueID, studyDesign=self.submission.studyDesign, dataFileId=self.submission.dataFileId, )) self.db_session.commit()
[docs] def has_error(self, key): return key in self.errors
@property
[docs] def is_published(self): return self.submission.study and self.submission.study.isPublished
[docs] def error_messages(self): # Flatten messages per property: return list(itertools.chain.from_iterable(self.errors.values()))
[docs] def technique_descriptions(self): ordering = ('bioreplicate', 'strain', 'metabolite') study_techniques = self.submission.build_techniques() sorted_techniques = sorted(study_techniques, key=lambda t: ordering.index(t.subjectType)) for (subject_type, grouped_techniques) in itertools.groupby(sorted_techniques, lambda t: t.subjectType): match subject_type: case 'bioreplicate': type = 'Community-level' case 'strain': type = 'Strain-level' case 'metabolite': type = 'Metabolite' yield (type, list(grouped_techniques))
[docs] def html_step_classes(self, target_step): if self.step < target_step: return 'disabled' elif self.step == target_step: return 'active' else: return ''
[docs] def has_valid_project_data(self): if self.submission.studyDesign['project']['name'] is None: return False return self._validate_unique_project_names()
[docs] def has_valid_study_data(self): return self.submission.studyDesign['study']['name'] is not None
def _find_project_id(self): if self.submission.projectUniqueID is None: return None return self.db_session.scalars( sql.select(Project.publicId) .where(Project.uuid == self.submission.projectUniqueID) ).one_or_none() def _find_study_id(self): if self.submission.studyUniqueID is None: return None return self.db_session.scalars( sql.select(Study.publicId) .where(Study.uuid == self.submission.studyUniqueID) ).one_or_none() def _validate_unique_project_names(self): self.errors = {} project_name = self.submission.studyDesign['project']['name'] if len(project_name) > 0: project_exists = self.db_session.query( sql.exists() .where( Project.name == project_name, Project.uuid != self.submission.projectUniqueID ) ).scalar() if project_exists: self.errors['project_name'] = ["Project name is taken"] return len(self.errors) == 0