import re
from typing import List
from datetime import datetime, UTC
import itertools
import sqlalchemy as sql
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime
from app.model.orm.orm_base import OrmBase
[docs]
class Study(OrmBase):
"""
The main container for a particular scientific study.
Updates to experiments and measurements can only be done by issuing a
``Submission`` to update a particular study. Access control over the
individual experiments is also done at the study level.
It has a fixed ``publicId`` identifier starting with the prefix "SMGDB".
"""
[docs]
__tablename__ = 'Studies'
# A relationship representing ownership of these records. Clearing them out
# should directly delete them so they can be replaced.
[docs]
owner_relationship = lambda **kwargs: relationship(
back_populates='study',
cascade='all, delete-orphan',
**kwargs
)
[docs]
publicId: Mapped[str] = mapped_column(sql.String(100), primary_key=True)
[docs]
uuid: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
ownerUuid: Mapped[str] = mapped_column(sql.ForeignKey('Users.uuid'))
[docs]
owner: Mapped['User'] = relationship(back_populates='ownedStudies')
[docs]
name: Mapped[str] = mapped_column(sql.String(255))
[docs]
description: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs]
url: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs]
licenseUrl: Mapped[str] = mapped_column(sql.String, nullable=True)
[docs]
timeUnits: Mapped[str] = mapped_column(sql.String(100))
[docs]
authors: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
authorCache: Mapped[str] = mapped_column(sql.String)
[docs]
projectUuid: Mapped[str] = mapped_column(sql.ForeignKey('Projects.uuid'))
[docs]
project: Mapped['Project'] = relationship(back_populates="studies")
[docs]
createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
publishableAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs]
publishedAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs]
embargoExpiresAt: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True)
[docs]
studyUsers: Mapped[List['StudyUser']] = owner_relationship()
[docs]
experiments: Mapped[List['Experiment']] = owner_relationship()
[docs]
strains: Mapped[List['StudyStrain']] = owner_relationship(order_by='StudyStrain.name')
[docs]
communities: Mapped[List['Community']] = owner_relationship()
[docs]
compartments: Mapped[List['Compartment']] = owner_relationship()
[docs]
studyTechniques: Mapped[List['StudyTechnique']] = owner_relationship(
order_by='StudyTechnique.subjectTypeOrdering, StudyTechnique.typeOrdering',
)
[docs]
measurementContexts: Mapped[List['MeasurementContext']] = owner_relationship()
[docs]
customModels: Mapped[List['CustomModel']] = owner_relationship()
[docs]
bioreplicates: Mapped[List['Bioreplicate']] = relationship(
secondary='Experiments',
viewonly=True,
)
[docs]
measurementTechniques: Mapped[List['MeasurementTechnique']] = relationship(
secondary='StudyTechniques',
viewonly=True,
)
[docs]
measurements: Mapped[List['Measurement']] = relationship(
order_by='Measurement.timeInSeconds',
secondary='MeasurementContexts',
viewonly=True,
)
[docs]
modelingResults: Mapped[List['ModelingResult']] = relationship(
secondary='MeasurementContexts',
viewonly=True,
)
[docs]
lastSubmissionId: Mapped[int] = mapped_column(sql.ForeignKey('Submissions.id'), nullable=True)
[docs]
lastSubmission: Mapped['Submission'] = relationship()
@hybrid_property
[docs]
def isPublished(self):
return self.publishedAt != None
@property
[docs]
def nameWithId(self):
return f"[{self.publicId}] {self.name}"
@property
[docs]
def isPublishable(self):
now = datetime.now(UTC)
if self.embargoExpiresAt:
return self.embargoExpiresAt <= now
elif self.publishableAt:
return self.publishableAt <= now
else:
return False
@property
[docs]
def managerUuids(self):
return {su.userUniqueID for su in self.studyUsers}
[docs]
def visible_to_user(self, user):
if self.isPublished:
return True
elif not user or not user.uuid:
return False
elif user.isAdmin:
return True
else:
return user.uuid in self.managerUuids
[docs]
def manageable_by_user(self, user):
if not user or not user.uuid:
return False
else:
return user.uuid in self.managerUuids
[docs]
def get_model_info_list(self):
info_set = set()
for modeling_result in self.modelingResults:
if not modeling_result.isPublished:
continue
info_set.add(modeling_result.info)
return sorted(info_set, key=lambda i: i.name)
[docs]
def find_last_submission(self, db_session):
from app.model.orm import Submission
return db_session.scalars(
sql.select(Submission)
.where(Submission.studyUniqueID == self.uuid)
.order_by(Submission.updatedAt.desc())
.limit(1)
).one_or_none()
[docs]
def fetch_grouped_measurement_subjects(self, db_session):
from app.model.orm import MeasurementContext, MeasurementTechnique, StudyTechnique
records = db_session.execute(
sql.select(
MeasurementContext.subjectType,
MeasurementContext.subjectId,
MeasurementContext.subjectName,
)
.join(MeasurementContext.technique)
.join(MeasurementTechnique.studyTechnique)
.where(StudyTechnique.studyId == self.publicId)
.distinct()
.order_by(MeasurementContext.subjectId)
).all()
# Hack: sort averages first by checking the name. Annoying, but we
# don't have `calculationType` here
sort_order = ["bioreplicate", "strain", "metabolite"]
sorted_records = sorted(
records,
key=lambda r: (sort_order.index(r[0]), not r[2].startswith('Average('))
)
grouped_records = [
(type, [(id, name) for (_, id, name) in group])
for type, group in itertools.groupby(sorted_records, key=lambda r: r[0])
]
return grouped_records
[docs]
def fetch_experiment_ids_by_measurement_subject(self, db_session):
from app.model.orm import Bioreplicate, Experiment, MeasurementContext
records = db_session.execute(
sql.select(
MeasurementContext.subjectType,
MeasurementContext.subjectId,
Experiment.publicId,
)
.join(MeasurementContext.bioreplicate)
.join(Bioreplicate.experiment)
.where(Experiment.studyId == self.publicId)
.order_by(
MeasurementContext.subjectType,
MeasurementContext.subjectId,
)
.distinct()
).all()
grouped_records = {
(type, id): sorted([record[2] for record in group])
for (type, id), group in itertools.groupby(records, key=lambda r: (r[0], r[1]))
}
return grouped_records
[docs]
def publish(self, db_session):
if not self.isPublishable:
return False
else:
self.publishedAt = datetime.now(UTC)
db_session.add(self)
db_session.commit()
if submission := self.lastSubmission:
submission.publishedAt = datetime.now(UTC)
db_session.add(submission)
db_session.commit()
return True
[docs]
def get_cc_code(self):
"""
If the license URL is to a Creative Commons license, get the
corresponding code to render the appropriate image.
"""
from urllib.parse import urlsplit
if self.licenseUrl is None:
return None
parts = urlsplit(self.licenseUrl)
if parts.netloc != 'creativecommons.org':
return None
for code in ('by', 'by-sa', 'by-nd', 'by-nc', 'by-nc-sa', 'by-nc-nd', 'cc-zero'):
if parts.path.startswith(f'/licenses/{code}/'):
return code
@staticmethod
[docs]
def generate_public_id(db_session):
last_string_id = db_session.scalars(
sql.select(Study.publicId)
.order_by(Study.publicId.desc())
.limit(1)
).one_or_none()
if last_string_id:
last_numeric_id = int(re.sub(r'SMGDB0*', '', last_string_id))
else:
last_numeric_id = 0
return "SMGDB{:08d}".format(last_numeric_id + 1)