Source code for app.view.forms.experiment_export_form
import sqlalchemy as sql
from app.model.orm import (
Bioreplicate,
Compartment,
Experiment,
Measurement,
MeasurementContext,
)
from app.model.lib.db import execute_into_df
from app.model.lib.conversion import (
convert_df_units,
CELL_COUNT_UNITS,
CFU_COUNT_UNITS,
)
[docs]
class ExperimentExportForm:
def __init__(self, db_session, args):
self._extract_bioreplicate_args(args)
self._extract_csv_args(args)
self._extract_std_args(args)
self._extract_measurement_unit_args(args)
[docs]
self.experiments = self.db_session.scalars(
sql.select(Experiment)
.join(Bioreplicate)
.where(Bioreplicate.id.in_(self.bioreplicate_uuids))
.group_by(Experiment.publicId)
.order_by(Experiment.publicId)
).all()
[docs]
def get_experiment_data(self):
experiment_data = {}
for experiment in self.experiments:
measurement_dfs = []
measurement_targets = {
'bioreplicate': set(),
'metabolite': set(),
'strain': set(),
}
# Collect targets for each column of measurements:
for measurement_context in experiment.measurementContexts:
if measurement_context.subjectType == 'bioreplicate':
measurement_targets['bioreplicate'].add(measurement_context.technique)
else:
subject = measurement_context.get_subject(self.db_session)
measurement_targets[measurement_context.subjectType].add((
subject,
measurement_context.technique,
))
# Bioreplicate-level measurements:
for technique in measurement_targets['bioreplicate']:
df = self._get_bioreplicate_df(experiment, technique)
measurement_dfs.append(df)
# Strain-level measurements:
for (strain, technique) in sorted(measurement_targets['strain']):
df = self._get_strain_df(experiment, strain, technique)
measurement_dfs.append(df)
# Metabolite measurements:
for (metabolite, technique) in sorted(measurement_targets['metabolite']):
df = self._get_metabolite_df(experiment, metabolite, technique)
measurement_dfs.append(df)
if len(measurement_dfs) == 0:
continue
# Join separate dataframes, one per column
experiment_df = measurement_dfs[0]
for df in measurement_dfs[1:]:
experiment_df = experiment_df.merge(
df,
how='outer',
on=['Time (hours)', 'Biological Replicate', 'Compartment'],
validate='one_to_one',
suffixes=(None, None),
)
if len(experiment_df) == 0:
continue
experiment_df.sort_values(
inplace=True,
by=['Biological Replicate', 'Compartment', 'Time (hours)'],
)
experiment_data[experiment] = experiment_df
return experiment_data
def _get_bioreplicate_df(self, experiment, technique):
condition = (
MeasurementContext.subjectType == 'bioreplicate',
MeasurementContext.techniqueId == technique.id,
)
query = self._base_bioreplicate_query(experiment).where(*condition)
df = execute_into_df(self.db_session, query)
if technique.units in CELL_COUNT_UNITS:
units = convert_df_units(df, technique.units, self.cell_count_units)
elif technique.units in CFU_COUNT_UNITS:
units = convert_df_units(df, technique.units, self.cfu_count_units)
else:
units = technique.units
value_label = f"Community {technique.short_name}"
std_label = f"Community {technique.short_name} STD"
if units is not None and units != '':
std_label += f" ({units})"
value_label += f" ({units})"
return df.rename(columns={'value': value_label, 'std': std_label})
def _get_strain_df(self, experiment, metabolite, technique):
condition = (
MeasurementContext.subjectType == 'strain',
MeasurementContext.subjectId == metabolite.id,
MeasurementContext.techniqueId == technique.id,
)
query = self._base_bioreplicate_query(experiment).where(*condition)
df = execute_into_df(self.db_session, query)
if technique.units in CELL_COUNT_UNITS:
units = convert_df_units(df, technique.units, self.cell_count_units)
elif technique.units in CFU_COUNT_UNITS:
units = convert_df_units(df, technique.units, self.cfu_count_units)
else:
units = technique.units
value_label = f"{metabolite.name} {technique.short_name} ({units})"
std_label = f"{metabolite.name} {technique.short_name} STD ({units})"
return df.rename(columns={'value': value_label, 'std': std_label})
def _get_metabolite_df(self, experiment, metabolite, technique):
condition = (
MeasurementContext.subjectType == 'metabolite',
MeasurementContext.subjectId == metabolite.id,
)
query = self._base_bioreplicate_query(experiment).where(*condition)
df = execute_into_df(self.db_session, query)
units = convert_df_units(df, technique.units, self.metabolite_units, metabolite.averageMass)
value_label = f"{metabolite.name} ({units})"
std_label = f"{metabolite.name} STD ({units})"
return df.rename(columns={'value': value_label, 'std': std_label})
def _base_bioreplicate_query(self, experiment):
select_list = [
Measurement.timeInHours.label("Time (hours)"),
Bioreplicate.name.label("Biological Replicate"),
Compartment.name.label("Compartment"),
Measurement.value.label("value"),
]
if self.include_std:
select_list.append(Measurement.std.label("std"))
return (
sql.select(*select_list)
.select_from(Measurement)
.join(MeasurementContext)
.join(Bioreplicate)
.join(Compartment)
.join(Experiment)
.where(
Experiment.publicId == experiment.publicId,
Bioreplicate.id.in_(self.bioreplicate_uuids),
)
.order_by(
Bioreplicate.name,
Compartment.name,
Measurement.timeInSeconds,
)
)
def _extract_bioreplicate_args(self, args):
for arg in args.getlist('bioreplicates'):
self.bioreplicate_uuids.append(arg)
def _extract_csv_args(self, args):
delimiter = args.get('delimiter', 'comma')
if delimiter == 'comma':
self.csv_separator = ','
elif delimiter == 'tab':
self.csv_separator = '\t'
elif delimiter == 'custom':
self.csv_separator = args.get('custom_delimiter', '|')
if self.csv_separator == '':
self.csv_separator = ' '
else:
raise Exception(f"Unknown delimiter requested: {delimiter}")
def _extract_std_args(self, args):
if args.get('includeStd', False):
self.include_std = True
else:
self.include_std = False
def _extract_measurement_unit_args(self, args):
self.cell_count_units = args.get('cellCountUnits', self.cell_count_units)
self.cfu_count_units = args.get('cfuCountUnits', self.cfu_count_units)
self.metabolite_units = args.get('metaboliteUnits', self.metabolite_units)