from datetime import datetime
from typing import List
import itertools
import sqlalchemy as sql
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from sqlalchemy.schema import FetchedValue
from sqlalchemy_utc.sqltypes import UtcDateTime
from app.model.orm.orm_base import OrmBase
[docs]
TECHNIQUE_SHORT_NAMES = {
'ph': 'pH',
'fc': 'FC',
'od': 'OD',
'plates': 'PC',
'16s': '16S-rRNA reads',
'metabolite': 'Metabolite',
}
[docs]
TECHNIQUE_LONG_NAMES = {
'ph': 'pH',
'fc': 'Flow Cytometry',
'od': 'Optical Density',
'plates': 'Plate Counts',
'16s': '16S-rRNA reads',
'metabolite': 'Metabolites',
}
[docs]
class MeasurementTechnique(OrmBase):
[docs]
__tablename__ = "MeasurementTechniques"
[docs]
id: Mapped[int] = mapped_column(primary_key=True)
[docs]
type: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
units: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
subjectType: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
description: Mapped[str] = mapped_column(sql.String)
[docs]
includeStd: Mapped[bool] = mapped_column(sql.Boolean, nullable=False, default=False)
[docs]
strainIds: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
studyUniqueID: Mapped[str] = mapped_column(sql.ForeignKey('Studies.studyUniqueID'), nullable=False)
[docs]
study: Mapped['Study'] = relationship(back_populates="measurementTechniques")
[docs]
createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=FetchedValue())
[docs]
updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=FetchedValue())
[docs]
measurementContexts: Mapped[List['MeasurementContext']] = relationship(
back_populates="technique",
order_by='MeasurementContext.calculationType.is_(None), MeasurementContext.bioreplicateId, MeasurementContext.compartmentId',
)
[docs]
measurements: Mapped[List['Measurement']] = relationship(
secondary='MeasurementContexts',
viewonly=True,
)
[docs]
def __lt__(self, other):
return self.id < other.id
@property
[docs]
def short_name(self):
return TECHNIQUE_SHORT_NAMES[self.type]
@property
[docs]
def long_name(self):
return TECHNIQUE_LONG_NAMES[self.type]
@property
[docs]
def long_name_with_subject_type(self):
parts = [self.long_name]
if self.subjectType != 'metabolite':
parts.append(self.subject_short_name)
return ' per '.join(parts)
@property
[docs]
def subject_short_name(self):
match self.subjectType:
case 'bioreplicate': return 'community'
case 'strain': return 'strain'
case 'metabolite': return 'metabolite'
@property
[docs]
def is_growth(self):
return self.type not in ('ph', 'metabolite')
[docs]
def get_bioreplicates(self, db_session):
from app.model.orm import Bioreplicate, MeasurementContext
return db_session.scalars(
sql.select(Bioreplicate)
.distinct()
.join(MeasurementContext)
.where(MeasurementContext.techniqueId == self.id)
).all()
[docs]
def get_subjects_for_bioreplicate(self, db_session, bioreplicate):
from app.model.orm import MeasurementContext, Measurement
match self.subjectType:
case 'bioreplicate':
from app.model.orm import Bioreplicate
subject_class = Bioreplicate
case 'strain':
from app.model.orm import Strain
subject_class = Strain
case 'metabolite':
from app.model.orm import Metabolite
subject_class = Metabolite
return db_session.scalars(
sql.select(subject_class)
.where(subject_class.id.in_(
sql.select(MeasurementContext.subjectId)
.join(Measurement)
.distinct()
.where(
MeasurementContext.techniqueId == self.id,
MeasurementContext.bioreplicateId == bioreplicate.id,
Measurement.value.is_not(None),
)
))
).all()
[docs]
def csv_column_name(self, subject_name=None):
if self.subjectType == 'bioreplicate':
return f"Community {TECHNIQUE_SHORT_NAMES[self.type]}"
elif self.subjectType == 'metabolite':
return subject_name
elif self.subjectType == 'strain':
if self.type == '16s':
suffix = 'rRNA reads'
elif self.type == 'fc':
suffix = 'FC counts'
elif self.type == 'plates':
suffix = 'plate counts'
else:
raise ValueError(f"Incompatible type and subjectType: {self.type}, {self.subjectType}")
return f"{subject_name} {suffix}"
[docs]
def get_grouped_contexts(self):
grouper = lambda mc: (mc.bioreplicate, mc.compartment)
for ((bioreplicate, compartment), group) in itertools.groupby(self.measurementContexts, grouper):
contexts = list(group)
yield ((bioreplicate, compartment), contexts)
[docs]
def __str__(self):
return f"<MeasurementTechnique type={self.type}, id={self.id}>"