Source code for app.view.forms.experiment_export_form

import sqlalchemy as sql

from app.model.orm import (
    Bioreplicate,
    Compartment,
    Experiment,
    Measurement,
    MeasurementContext,
)
from app.model.lib.db import execute_into_df
from app.model.lib.conversion import (
    convert_df_units,
    CELL_COUNT_UNITS,
    CFU_COUNT_UNITS,
)


[docs] class ExperimentExportForm: def __init__(self, db_session, args):
[docs] self.db_session = db_session
[docs] self.bioreplicate_uuids = []
self._extract_bioreplicate_args(args)
[docs] self.csv_separator = ','
self._extract_csv_args(args)
[docs] self.include_std = False
self._extract_std_args(args)
[docs] self.cell_count_units = 'Cells/mL'
[docs] self.cfu_count_units = 'CFUs/mL'
[docs] self.metabolite_units = 'mM'
self._extract_measurement_unit_args(args)
[docs] self.experiments = self.db_session.scalars( sql.select(Experiment) .join(Bioreplicate) .where(Bioreplicate.id.in_(self.bioreplicate_uuids)) .group_by(Experiment.publicId) .order_by(Experiment.publicId) ).all()
[docs] def get_experiment_data(self): experiment_data = {} for experiment in self.experiments: measurement_dfs = [] measurement_targets = { 'bioreplicate': set(), 'metabolite': set(), 'strain': set(), } # Collect targets for each column of measurements: for measurement_context in experiment.measurementContexts: if measurement_context.subjectType == 'bioreplicate': measurement_targets['bioreplicate'].add(measurement_context.technique) else: subject = measurement_context.get_subject(self.db_session) measurement_targets[measurement_context.subjectType].add(( subject, measurement_context.technique, )) # Bioreplicate-level measurements: for technique in measurement_targets['bioreplicate']: df = self._get_bioreplicate_df(experiment, technique) measurement_dfs.append(df) # Strain-level measurements: for (strain, technique) in sorted(measurement_targets['strain']): df = self._get_strain_df(experiment, strain, technique) measurement_dfs.append(df) # Metabolite measurements: for (metabolite, technique) in sorted(measurement_targets['metabolite']): df = self._get_metabolite_df(experiment, metabolite, technique) measurement_dfs.append(df) if len(measurement_dfs) == 0: continue # Join separate dataframes, one per column experiment_df = measurement_dfs[0] for df in measurement_dfs[1:]: experiment_df = experiment_df.merge( df, how='outer', on=['Time (hours)', 'Biological Replicate', 'Compartment'], validate='one_to_one', suffixes=(None, None), ) if len(experiment_df) == 0: continue experiment_df.sort_values( inplace=True, by=['Biological Replicate', 'Compartment', 'Time (hours)'], ) experiment_data[experiment] = experiment_df return experiment_data
def _get_bioreplicate_df(self, experiment, technique): condition = ( MeasurementContext.subjectType == 'bioreplicate', MeasurementContext.techniqueId == technique.id, ) query = self._base_bioreplicate_query(experiment).where(*condition) df = execute_into_df(self.db_session, query) if technique.units in CELL_COUNT_UNITS: units = convert_df_units(df, technique.units, self.cell_count_units) elif technique.units in CFU_COUNT_UNITS: units = convert_df_units(df, technique.units, self.cfu_count_units) else: units = technique.units value_label = f"Community {technique.short_name}" std_label = f"Community {technique.short_name} STD" if units is not None and units != '': std_label += f" ({units})" value_label += f" ({units})" return df.rename(columns={'value': value_label, 'std': std_label}) def _get_strain_df(self, experiment, metabolite, technique): condition = ( MeasurementContext.subjectType == 'strain', MeasurementContext.subjectId == metabolite.id, MeasurementContext.techniqueId == technique.id, ) query = self._base_bioreplicate_query(experiment).where(*condition) df = execute_into_df(self.db_session, query) if technique.units in CELL_COUNT_UNITS: units = convert_df_units(df, technique.units, self.cell_count_units) elif technique.units in CFU_COUNT_UNITS: units = convert_df_units(df, technique.units, self.cfu_count_units) else: units = technique.units value_label = f"{metabolite.name} {technique.short_name} ({units})" std_label = f"{metabolite.name} {technique.short_name} STD ({units})" return df.rename(columns={'value': value_label, 'std': std_label}) def _get_metabolite_df(self, experiment, metabolite, technique): condition = ( MeasurementContext.subjectType == 'metabolite', MeasurementContext.subjectId == metabolite.id, ) query = self._base_bioreplicate_query(experiment).where(*condition) df = execute_into_df(self.db_session, query) units = convert_df_units(df, technique.units, self.metabolite_units, metabolite.averageMass) value_label = f"{metabolite.name} ({units})" std_label = f"{metabolite.name} STD ({units})" return df.rename(columns={'value': value_label, 'std': std_label}) def _base_bioreplicate_query(self, experiment): select_list = [ Measurement.timeInHours.label("Time (hours)"), Bioreplicate.name.label("Biological Replicate"), Compartment.name.label("Compartment"), Measurement.value.label("value"), ] if self.include_std: select_list.append(Measurement.std.label("std")) return ( sql.select(*select_list) .select_from(Measurement) .join(MeasurementContext) .join(Bioreplicate) .join(Compartment) .join(Experiment) .where( Experiment.publicId == experiment.publicId, Bioreplicate.id.in_(self.bioreplicate_uuids), ) .order_by( Bioreplicate.name, Compartment.name, Measurement.timeInSeconds, ) ) def _extract_bioreplicate_args(self, args): for arg in args.getlist('bioreplicates'): self.bioreplicate_uuids.append(arg) def _extract_csv_args(self, args): delimiter = args.get('delimiter', 'comma') if delimiter == 'comma': self.csv_separator = ',' elif delimiter == 'tab': self.csv_separator = '\t' elif delimiter == 'custom': self.csv_separator = args.get('custom_delimiter', '|') if self.csv_separator == '': self.csv_separator = ' ' else: raise Exception(f"Unknown delimiter requested: {delimiter}") def _extract_std_args(self, args): if args.get('includeStd', False): self.include_std = True else: self.include_std = False def _extract_measurement_unit_args(self, args): self.cell_count_units = args.get('cellCountUnits', self.cell_count_units) self.cfu_count_units = args.get('cfuCountUnits', self.cfu_count_units) self.metabolite_units = args.get('metaboliteUnits', self.metabolite_units)