Source code for app.model.orm.measurement_technique

from datetime import datetime
from typing import List
import itertools

import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
    column_property,
)
from sqlalchemy_utc.sqltypes import UtcDateTime

from app.model.orm.orm_base import OrmBase
from app.model.lib.techniques import (
    TECHNIQUE_SHORT_NAMES,
    TECHNIQUE_LONG_NAMES,
    TECHNIQUE_SUBJECT_NAMES,
)


[docs] class MeasurementTechnique(OrmBase): "The technique used for a particular set of measurements."
[docs] __tablename__ = "MeasurementTechniques"
[docs] id: Mapped[int] = mapped_column(primary_key=True)
[docs] type: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] cellType: Mapped[str] = mapped_column(sql.String(100), nullable=True)
[docs] subjectType: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] metaboliteIds: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] studyTechniqueId: Mapped[int] = mapped_column(sql.ForeignKey('StudyTechniques.id'))
[docs] studyTechnique: Mapped['StudyTechnique'] = relationship( back_populates="measurementTechniques" )
[docs] measurementContexts: Mapped[List['MeasurementContext']] = relationship( back_populates="technique", order_by='MeasurementContext.calculationType.is_(None), MeasurementContext.bioreplicateId, MeasurementContext.compartmentId, MeasurementContext.subjectTypeOrdering, MeasurementContext.subjectName', )
[docs] measurements: Mapped[List['Measurement']] = relationship( secondary='MeasurementContexts', viewonly=True, )
[docs] connectedBioreplicates: Mapped[List['Bioreplicate']] = relationship( secondary='MeasurementContexts', viewonly=True )
# Order techniques based on their type, according to the way they're # defined in the name index: FC first, then OD, etc.
[docs] typeOrdering = column_property(OrmBase.list_ordering( type, TECHNIQUE_SHORT_NAMES.keys(), ), deferred=True)
# Order records based on their subject type
[docs] subjectTypeOrdering = column_property(OrmBase.list_ordering( subjectType, ('bioreplicate', 'strain', 'metabolite'), ), deferred=True)
[docs] def __lt__(self, other): return self.id < other.id
@property
[docs] def units(self): return self.studyTechnique.units
@property
[docs] def short_name(self): cell_type = f" {self.cellType}" if self.cellType else "" label = f" ({self.studyTechnique.label})" if self.studyTechnique.label else "" return f"{TECHNIQUE_SHORT_NAMES[self.type]}{label}{cell_type}"
@property
[docs] def long_name(self): return TECHNIQUE_LONG_NAMES[self.type]
@property
[docs] def long_name_with_subject_type(self): parts = [self.long_name] if self.studyTechnique.label: parts.append(f"({self.studyTechnique.label})") if self.subjectType != 'metabolite': parts.append(f"per {TECHNIQUE_SUBJECT_NAMES[self.subjectType]}") return ' '.join(parts)
@property
[docs] def is_growth(self): return self.type not in ('ph', 'metabolite')
@property
[docs] def connectedExperimentIds(self): return sorted({b.experimentId for b in self.connectedBioreplicates})
[docs] def get_bioreplicates(self, db_session): from app.model.orm import Bioreplicate, MeasurementContext return db_session.scalars( sql.select(Bioreplicate) .distinct() .join(MeasurementContext) .where(MeasurementContext.techniqueId == self.id) ).all()
[docs] def csv_column_name(self, subject_name=None): cell_type = f"{self.cellType} " if self.cellType else "" label = f" ({self.studyTechnique.label})" if self.studyTechnique.label else "" if self.subjectType == 'bioreplicate': return f"Community {cell_type}{TECHNIQUE_SHORT_NAMES[self.type]}{label}" elif self.subjectType == 'metabolite': return f"{subject_name}{label}" elif self.subjectType == 'strain': if self.type == '16s': suffix = 'rRNA reads' elif self.type == 'qpcr': suffix = 'qPCR counts' elif self.type == 'fc': suffix = 'FC counts' elif self.type == 'plates': suffix = 'plate counts' else: raise ValueError(f"Incompatible type and subjectType: {self.type}, {self.subjectType}") return f"{subject_name} {cell_type}{suffix}{label}"
[docs] def get_grouped_contexts(self): grouper = lambda mc: (mc.bioreplicate, mc.compartment) for ((bioreplicate, compartment), group) in itertools.groupby(self.measurementContexts, grouper): contexts = list(group) yield ((bioreplicate, compartment), contexts)
[docs] def __str__(self): return f"<MeasurementTechnique type={self.type}, id={self.id}>"