Source code for app.model.orm.experiment

import re
from typing import List

import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
)

from app.model.lib.db import execute_into_df
from app.model.orm.orm_base import OrmBase


[docs] class Experiment(OrmBase): """ An entity that describes the design of a particular experiment. The specific measurements of an experiment are connected to its biological replicates (``Bioreplicate``), which are the concrete implementations of the experimental design. A published study contains experiments with fixed ``publicId`` identifiers starting with the prefix "EMGDB". """
[docs] __tablename__ = "Experiments"
[docs] publicId: Mapped[str] = mapped_column(sql.String(100), primary_key=True)
[docs] name: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] description: Mapped[str] = mapped_column(sql.String)
[docs] bioreplicates: Mapped[List['Bioreplicate']] = relationship( order_by='Bioreplicate.calculationType.is_(None), Bioreplicate.id', back_populates='experiment', cascade="all, delete-orphan" )
[docs] communityId: Mapped[int] = mapped_column(sql.ForeignKey('Communities.id'))
[docs] community: Mapped['Community'] = relationship(back_populates='experiments')
[docs] studyId: Mapped[str] = mapped_column(sql.ForeignKey('Studies.publicId'), nullable=False)
[docs] study: Mapped['Study'] = relationship(back_populates='experiments')
[docs] cultivationMode: Mapped[str] = mapped_column(sql.String(50))
[docs] experimentCompartments: Mapped[List['ExperimentCompartment']] = relationship( back_populates='experiment', cascade='all, delete-orphan' )
[docs] compartments: Mapped[List['Compartment']] = relationship( secondary='ExperimentCompartments', viewonly=True, )
[docs] perturbations: Mapped[List['Perturbation']] = relationship( back_populates='experiment', cascade="all, delete-orphan" )
[docs] measurementContexts: Mapped[List['MeasurementContext']] = relationship( order_by='MeasurementContext.subjectTypeOrdering, MeasurementContext.subjectName', secondary='Bioreplicates', viewonly=True, )
[docs] def get_df(self, db_session): from app.model.orm import ( Bioreplicate, Compartment, Measurement, MeasurementTechnique, MeasurementContext, ) query = ( sql.select( Bioreplicate.id.label("bioreplicateId"), Bioreplicate.name.label("bioreplicateName"), Compartment.name.label("compartmentName"), MeasurementTechnique.type.label("techniqueType"), MeasurementContext.id.label("measurementContextId"), MeasurementContext.subjectType.label("subjectType"), MeasurementContext.subjectName.label("subjectName"), MeasurementContext.subjectExternalId.label("subjectExternalId"), Measurement.timeInHours.label("time"), Measurement.value, Measurement.std, ) .distinct() .select_from(Measurement) .join(MeasurementContext) .join(Compartment) .join(Bioreplicate) .join(Experiment) .where( Experiment.publicId == self.publicId, Measurement.value.is_not(None), ) .order_by( Bioreplicate.id, Compartment.name, MeasurementTechnique.type, MeasurementContext.id, Measurement.timeInHours, ) ) return execute_into_df(db_session, query)
@staticmethod
[docs] def generate_public_id(db_session): last_string_id = db_session.scalars( sql.select(Experiment.publicId) .order_by(Experiment.publicId.desc()) .limit(1) ).one_or_none() if last_string_id: last_numeric_id = int(re.sub(r'EMGDB0*', '', last_string_id)) else: last_numeric_id = 0 return "EMGDB{:09d}".format(last_numeric_id + 1)