Source code for app.model.orm.measurement

import csv
from io import StringIO
from decimal import Decimal

import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
)
from sqlalchemy.ext.hybrid import hybrid_property

from app.model.orm.orm_base import OrmBase
from app.model.lib.conversion import convert_time
from app.model.lib.util import group_by_unique_name, is_non_negative_float


[docs] class Measurement(OrmBase): """ A single observed measurement at a particular time point. A measurement may be an average of multiple technical replicates with a standard deviation. It may also be a "calculated" measurement from an average of multiple biological replicates. This information is encapsulated in a ``MeasurementContext``, while this record mostly contains the time and recorded value. """
[docs] __tablename__ = "Measurements"
# A relationship that goes through the parent measurement context:
[docs] context_relationship = lambda: relationship( secondary='MeasurementContexts', viewonly=True, )
[docs] id: Mapped[int] = mapped_column(primary_key=True)
[docs] studyId: Mapped[str] = mapped_column(sql.ForeignKey('Studies.publicId'), nullable=False)
[docs] study: Mapped['Study'] = relationship(back_populates='measurements')
[docs] contextId: Mapped[int] = mapped_column(sql.ForeignKey('MeasurementContexts.id'))
[docs] context: Mapped['MeasurementContext'] = relationship(back_populates='measurements')
[docs] bioreplicate: Mapped['Bioreplicate'] = context_relationship()
[docs] compartment: Mapped['Compartment'] = context_relationship()
[docs] technique: Mapped['MeasurementTechnique'] = context_relationship()
[docs] timeInSeconds: Mapped[int] = mapped_column(sql.Integer, nullable=False)
[docs] value: Mapped[Decimal] = mapped_column(sql.Numeric(20, 2), nullable=True)
[docs] std: Mapped[Decimal] = mapped_column(sql.Numeric(20, 2), nullable=True)
@hybrid_property
[docs] def timeInHours(self): return self.timeInSeconds / 3600
@hybrid_property
[docs] def subjectId(self): return self.context.subjectId
@hybrid_property
[docs] def subjectType(self): return self.context.subjectType
@classmethod
[docs] def insert_from_csv_string(Self, db_session, study, csv_string): from app.model.orm import MeasurementContext reader = csv.DictReader(StringIO(csv_string), dialect='unix') bioreplicates_by_name = group_by_unique_name(study.bioreplicates) compartments_by_name = group_by_unique_name(study.compartments) context_cache = {} for row in reader: bioreplicate = bioreplicates_by_name[row['Biological Replicate'].strip()] compartment = compartments_by_name[row['Compartment'].strip()] if bioreplicate is None or compartment is None: # Missing entry, skip continue if not is_non_negative_float(row['Time'], isnan_check=True): # Missing time, skip continue time_in_seconds = convert_time(row['Time'], source=study.timeUnits, target='s') for technique in study.measurementTechniques: if technique.subjectType == 'bioreplicate': subjects = [bioreplicate] elif technique.subjectType == 'strain': subjects = study.strains elif technique.subjectType == 'metabolite': subjects = study.metabolites else: raise KeyError(f"Unexpected subject type: {subject_type}") for subject in subjects: value_column_name = technique.csv_column_name(subject.name) if value_column_name not in row: continue value = row[value_column_name] if value == '': value = None std = row.get(f"{value_column_name} STD", None) if std == '': std = None # Create a measurement context only if it doesn't already exist: context_key = ( bioreplicate.id, compartment.id, technique.id, subject.id, technique.subjectType, ) if context_key not in context_cache: context = MeasurementContext( # Relationships: study=study, bioreplicate=bioreplicate, compartment=compartment, # Subject: subjectId=subject.id, subjectType=technique.subjectType, subjectName=subject.name, subjectExternalId=subject.externalId, # Technique: techniqueId=technique.id, ) db_session.add(context) context_cache[context_key] = context context = context_cache[context_key] measurement = Measurement( study=study, context=context, timeInSeconds=time_in_seconds, value=value, std=std, ) db_session.add(measurement) db_session.commit() # Prune measurement contexts that only have empty values: measurements = [] for _, context in context_cache.items(): if all([m.value is None for m in context.measurements]): db_session.execute( sql.delete(MeasurementContext) .where(MeasurementContext.id == context.id) ) else: measurements.extend(context.measurements) return measurements