Source code for app.model.orm.workspace_entry

import itertools
from io import BytesIO
from datetime import datetime
from typing import List, Literal

import pandas as pd
import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
)
from sqlalchemy_utc.sqltypes import UtcDateTime

from app.model.orm.orm_base import OrmBase


[docs] class WorkspaceEntry(OrmBase): """ Data uploaded by a user that is not linked to a study. This data is meant to be shown on a user's dashboard, or attached to a study post-upload. It could be observational measurements, or it could be modeling results. """
[docs] __tablename__ = 'WorkspaceEntries'
[docs] id: Mapped[int] = mapped_column(primary_key=True)
[docs] label: Mapped[str] = mapped_column(sql.String(255), nullable=False)
[docs] data: Mapped[str] = mapped_column(sql.String, nullable=False)
[docs] sourceType: Mapped[Literal[ 'upload', 'api', ]] = mapped_column(sql.String(100))
[docs] dataType: Mapped[Literal[ 'measurement', 'model', 'other', ]] = mapped_column(sql.String(100))
[docs] subjectType: Mapped[Literal[ 'community', 'strain', 'metabolite', ]] = mapped_column(sql.String(100))
[docs] subjectId: Mapped[int] = mapped_column(sql.Integer)
[docs] units: Mapped[str] = mapped_column(sql.String(100))
[docs] workspaceId: Mapped[int] = mapped_column(sql.ForeignKey('Workspaces.id'), nullable=False)
[docs] workspace: Mapped['Workspace'] = relationship(back_populates="entries")
[docs] user: Mapped['User'] = relationship(secondary='Workspaces', viewonly=True)
[docs] createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] modelingResults: Mapped[List['ModelingResult']] = relationship( back_populates='workspaceEntry', cascade='all, delete-orphan', )
@classmethod
[docs] def from_upload(Self, df, workspace, metadata={}, include_error=False): """ Construct workspace entry records from the data in a CSV file. The first column will be parsed as time values, every other column will be considered to represent measurements. If `include_error` is truthy, columns will be parsed as pairs of value and error measurements. """ time_col = df.columns[0] if include_error: value_and_error_columns = list(itertools.batched(df.columns[1:], 2)) else: value_and_error_columns = [(c, None) for c in df.columns[1:]] entries = [] for (value_column, error_column) in value_and_error_columns: if error_column is None: subset = [time_col, value_column] else: subset = [time_col, value_column, error_column] csv_data = df[subset].rename(columns={ time_col: 'time', value_column: 'value', error_column: 'error', }).to_csv(index=False) entries.append(Self( label=value_column, workspace=workspace, data=csv_data, sourceType='upload', **metadata, )) return entries
@property
[docs] def isGrowth(self): return self.subjectType in ('community', 'strain')
@property
[docs] def canBeModeled(self): return self.sourceType == 'upload' and self.dataType == 'measurement' and self.isGrowth
@property
[docs] def readyModelingResults(self): return [mr for mr in self.modelingResults if mr.state == 'ready']
[docs] def get_df(self, db_session=None): # The `db_session` parameter is provided for compatibility with other # types of records return pd.read_csv(BytesIO(self.data.encode('utf-8')))
[docs] def get_chart_label(self, model_name=None): from markupsafe import escape if model_name: return f"{self.label} ({escape(model_name)} fit)" else: return self.label