Source code for app.model.orm.study

import re
from typing import List
from datetime import datetime, UTC
import itertools

import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime

from app.model.orm.orm_base import OrmBase


[docs] class Study(OrmBase): """ The main container for a particular scientific study. Updates to experiments and measurements can only be done by issuing a ``Submission`` to update a particular study. Access control over the individual experiments is also done at the study level. It has a fixed ``publicId`` identifier starting with the prefix "SMGDB". """
[docs] __tablename__ = 'Studies'
# A relationship representing ownership of these records. Clearing them out # should directly delete them so they can be replaced.
[docs] owner_relationship = lambda **kwargs: relationship( back_populates='study', cascade='all, delete-orphan', **kwargs )
[docs] publicId: Mapped[str] = mapped_column(sql.String(100), primary_key=True)
[docs] uuid: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] ownerUuid: Mapped[str] = mapped_column(sql.ForeignKey('Users.uuid'))
[docs] owner: Mapped['User'] = relationship(back_populates='ownedStudies')
[docs] name: Mapped[str] = mapped_column(sql.String(255))
[docs] description: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs] url: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs] licenseUrl: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs] timeUnits: Mapped[str] = mapped_column(sql.String(100))
[docs] authors: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] authorCache: Mapped[str] = mapped_column(sql.String)
[docs] projectUuid: Mapped[str] = mapped_column(sql.ForeignKey('Projects.uuid'))
[docs] project: Mapped['Project'] = relationship(back_populates="studies")
[docs] createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] publishableAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs] publishedAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs] embargoExpiresAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs] studyUsers: Mapped[List['StudyUser']] = owner_relationship()
[docs] experiments: Mapped[List['Experiment']] = owner_relationship()
[docs] strains: Mapped[List['StudyStrain']] = owner_relationship(order_by='StudyStrain.name')
[docs] communities: Mapped[List['Community']] = owner_relationship()
[docs] compartments: Mapped[List['Compartment']] = owner_relationship()
[docs] studyTechniques: Mapped[List['StudyTechnique']] = owner_relationship( order_by='StudyTechnique.subjectTypeOrdering, StudyTechnique.typeOrdering', )
[docs] measurementContexts: Mapped[List['MeasurementContext']] = owner_relationship()
[docs] customModels: Mapped[List['CustomModel']] = owner_relationship()
[docs] bioreplicates: Mapped[List['Bioreplicate']] = relationship( secondary='Experiments', viewonly=True, )
[docs] measurementTechniques: Mapped[List['MeasurementTechnique']] = relationship( secondary='StudyTechniques', viewonly=True, )
[docs] measurements: Mapped[List['Measurement']] = relationship( order_by='Measurement.timeInSeconds', secondary='MeasurementContexts', viewonly=True, )
[docs] modelingResults: Mapped[List['ModelingResult']] = relationship( secondary='MeasurementContexts', viewonly=True, )
[docs] studyMetabolites: Mapped[List['StudyMetabolite']] = owner_relationship()
[docs] metabolites: Mapped[List['Metabolite']] = relationship( order_by='Metabolite.name', secondary='StudyMetabolites', viewonly=True, )
[docs] lastSubmissionId: Mapped[int] = mapped_column(sql.ForeignKey('Submissions.id'), nullable=True)
[docs] lastSubmission: Mapped['Submission'] = relationship()
@hybrid_property
[docs] def isPublished(self): return self.publishedAt != None
@property
[docs] def nameWithId(self): return f"[{self.publicId}] {self.name}"
@property
[docs] def isPublishable(self): now = datetime.now(UTC) if self.embargoExpiresAt: return self.embargoExpiresAt <= now elif self.publishableAt: return self.publishableAt <= now else: return False
@property
[docs] def managerUuids(self): return {su.userUniqueID for su in self.studyUsers}
[docs] def visible_to_user(self, user): if self.isPublished: return True elif not user or not user.uuid: return False elif user.isAdmin: return True else: return user.uuid in self.managerUuids
[docs] def manageable_by_user(self, user): if not user or not user.uuid: return False else: return user.uuid in self.managerUuids
[docs] def get_model_info_list(self): info_set = set() for modeling_result in self.modelingResults: if not modeling_result.isPublished: continue info_set.add(modeling_result.info) return sorted(info_set, key=lambda i: i.name)
[docs] def find_last_submission(self, db_session): from app.model.orm import Submission return db_session.scalars( sql.select(Submission) .where(Submission.studyUniqueID == self.uuid) .order_by(Submission.updatedAt.desc()) .limit(1) ).one_or_none()
[docs] def fetch_grouped_measurement_subjects(self, db_session): from app.model.orm import MeasurementContext, MeasurementTechnique, StudyTechnique records = db_session.execute( sql.select( MeasurementContext.subjectType, MeasurementContext.subjectId, MeasurementContext.subjectName, ) .join(MeasurementContext.technique) .join(MeasurementTechnique.studyTechnique) .where(StudyTechnique.studyId == self.publicId) .distinct() .order_by(MeasurementContext.subjectId) ).all() # Hack: sort averages first by checking the name. Annoying, but we # don't have `calculationType` here sort_order = ["bioreplicate", "strain", "metabolite"] sorted_records = sorted( records, key=lambda r: (sort_order.index(r[0]), not r[2].startswith('Average(')) ) grouped_records = [ (type, [(id, name) for (_, id, name) in group]) for type, group in itertools.groupby(sorted_records, key=lambda r: r[0]) ] return grouped_records
[docs] def fetch_experiment_ids_by_measurement_subject(self, db_session): from app.model.orm import Bioreplicate, Experiment, MeasurementContext records = db_session.execute( sql.select( MeasurementContext.subjectType, MeasurementContext.subjectId, Experiment.publicId, ) .join(MeasurementContext.bioreplicate) .join(Bioreplicate.experiment) .where(Experiment.studyId == self.publicId) .order_by( MeasurementContext.subjectType, MeasurementContext.subjectId, ) .distinct() ).all() grouped_records = { (type, id): sorted([record[2] for record in group]) for (type, id), group in itertools.groupby(records, key=lambda r: (r[0], r[1])) } return grouped_records
[docs] def publish(self, db_session): if not self.isPublishable: return False else: self.publishedAt = datetime.now(UTC) db_session.add(self) db_session.commit() if submission := self.lastSubmission: submission.publishedAt = datetime.now(UTC) db_session.add(submission) db_session.commit() return True
[docs] def get_cc_code(self): """ If the license URL is to a Creative Commons license, get the corresponding code to render the appropriate image. """ from urllib.parse import urlsplit if self.licenseUrl is None: return None parts = urlsplit(self.licenseUrl) if parts.netloc != 'creativecommons.org': return None for code in ('by', 'by-sa', 'by-nd', 'by-nc', 'by-nc-sa', 'by-nc-nd', 'cc-zero'): if parts.path.startswith(f'/licenses/{code}/'): return code
@staticmethod
[docs] def generate_public_id(db_session): last_string_id = db_session.scalars( sql.select(Study.publicId) .order_by(Study.publicId.desc()) .limit(1) ).one_or_none() if last_string_id: last_numeric_id = int(re.sub(r'SMGDB0*', '', last_string_id)) else: last_numeric_id = 0 return "SMGDB{:08d}".format(last_numeric_id + 1)