import itertools
from io import BytesIO
from datetime import datetime
from typing import List, Literal
import pandas as pd
import sqlalchemy as sql
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from sqlalchemy_utc.sqltypes import UtcDateTime
from app.model.orm.orm_base import OrmBase
[docs]
class WorkspaceEntry(OrmBase):
"""
Data uploaded by a user that is not linked to a study.
This data is meant to be shown on a user's dashboard, or attached to a
study post-upload. It could be observational measurements, or it could be
modeling results.
"""
[docs]
__tablename__ = 'WorkspaceEntries'
[docs]
id: Mapped[int] = mapped_column(primary_key=True)
[docs]
label: Mapped[str] = mapped_column(sql.String(255), nullable=False)
[docs]
data: Mapped[str] = mapped_column(sql.String, nullable=False)
[docs]
sourceType: Mapped[Literal[
'upload',
'api',
]] = mapped_column(sql.String(100))
[docs]
dataType: Mapped[Literal[
'measurement',
'model',
'other',
]] = mapped_column(sql.String(100))
[docs]
subjectType: Mapped[Literal[
'community',
'strain',
'metabolite',
]] = mapped_column(sql.String(100))
[docs]
subjectId: Mapped[int] = mapped_column(sql.Integer)
[docs]
units: Mapped[str] = mapped_column(sql.String(100))
[docs]
workspaceId: Mapped[int] = mapped_column(sql.ForeignKey('Workspaces.id'), nullable=False)
[docs]
workspace: Mapped['Workspace'] = relationship(back_populates="entries")
[docs]
user: Mapped['User'] = relationship(secondary='Workspaces', viewonly=True)
[docs]
createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
modelingResults: Mapped[List['ModelingResult']] = relationship(
back_populates='workspaceEntry',
cascade='all, delete-orphan',
)
@classmethod
[docs]
def from_upload(Self, df, workspace, metadata={}, include_error=False):
"""
Construct workspace entry records from the data in a CSV file.
The first column will be parsed as time values, every other column will
be considered to represent measurements. If `include_error` is truthy,
columns will be parsed as pairs of value and error measurements.
"""
time_col = df.columns[0]
if include_error:
value_and_error_columns = list(itertools.batched(df.columns[1:], 2))
else:
value_and_error_columns = [(c, None) for c in df.columns[1:]]
entries = []
for (value_column, error_column) in value_and_error_columns:
if error_column is None:
subset = [time_col, value_column]
else:
subset = [time_col, value_column, error_column]
csv_data = df[subset].rename(columns={
time_col: 'time',
value_column: 'value',
error_column: 'error',
}).to_csv(index=False)
entries.append(Self(
label=value_column,
workspace=workspace,
data=csv_data,
sourceType='upload',
**metadata,
))
return entries
@property
[docs]
def isGrowth(self):
return self.subjectType in ('community', 'strain')
@property
[docs]
def canBeModeled(self):
return self.sourceType == 'upload' and self.dataType == 'measurement' and self.isGrowth
@property
[docs]
def readyModelingResults(self):
return [mr for mr in self.modelingResults if mr.state == 'ready']
[docs]
def get_df(self, db_session=None):
# The `db_session` parameter is provided for compatibility with other
# types of records
return pd.read_csv(BytesIO(self.data.encode('utf-8')))
[docs]
def get_chart_label(self, model_name=None):
from markupsafe import escape
if model_name:
return f"{self.label} ({escape(model_name)} fit)"
else:
return self.label