Source code for app.model.orm.modeling_result

import re
from datetime import datetime
from decimal import Decimal

import numpy as np
import pandas as pd
import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
    validates,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime
from sqlalchemy.orm.attributes import flag_modified

from app.model.orm.orm_base import OrmBase
from app.model.lib.modeling import (
    MODEL_NAMES,
    SHORT_MODEL_NAMES,
    MODEL_DESCRIPTIONS,
    ALL_COEFFICIENTS,
    FIT_PARAMETERS,
)

_VALID_TYPES = [
    'easy_linear',
    'logistic',
    'baranyi_roberts',
]
_VALID_STATES = [
    'pending',
    'ready',
    'error',
]


[docs] class ModelInfo: def __init__(self, *, type, name, url, description, params):
[docs] self.type = type
[docs] self.name = name
[docs] self.url = url
[docs] self.description = description
[docs] self.params = params
@property
[docs] def is_custom(self): return self.type.startswith('custom_')
[docs] def __eq__(self, other): return self.type == other.type
[docs] def __hash__(self): return self.type.__hash__()
[docs] class ModelingResult(OrmBase): """ The results of fitting a model onto a set of measurements. The measurements are represented by a ``ModelingContext`` and the results of the calculation are stored in the ``params`` field. The ``state`` of the record describes the status of the job that runs the calculations. """
[docs] __tablename__ = "ModelingResults"
[docs] id: Mapped[int] = mapped_column(primary_key=True)
[docs] type: Mapped[str] = mapped_column(sql.String(100), nullable=False)
# A modeling result should either have a target MeasurementContext or # WorkspaceEntry. Both are marked as nullable, but at least one should be # present: #
[docs] measurementContextId: Mapped[int] = mapped_column( sql.ForeignKey('MeasurementContexts.id'), nullable=True, )
[docs] measurementContext: Mapped['MeasurementContext'] = relationship(back_populates='modelingResults')
[docs] study: Mapped['Study'] = relationship( secondary='MeasurementContexts', viewonly=True )
[docs] workspaceEntryId: Mapped[int] = mapped_column( sql.ForeignKey('WorkspaceEntries.id'), nullable=True, )
[docs] workspaceEntry: Mapped['WorkspaceEntry'] = relationship(back_populates='modelingResults')
[docs] customModelId: Mapped[int] = mapped_column(sql.ForeignKey('CustomModels.id'))
[docs] customModel: Mapped['CustomModel'] = relationship()
[docs] params: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] state: Mapped[str] = mapped_column(sql.String(100), default='pending')
[docs] error: Mapped[str] = mapped_column(sql.String)
[docs] rSummary: Mapped[str] = mapped_column(sql.String)
[docs] createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] calculatedAt: Mapped[datetime] = mapped_column(UtcDateTime)
[docs] publishedAt: Mapped[datetime] = mapped_column(UtcDateTime)
# For custom models:
[docs] xValues: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] yValues: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] yErrors: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
@validates('type') def _validate_type(self, key, value): if re.fullmatch(r'custom_\d+', value): return value else: return self._validate_inclusion(key, value, _VALID_TYPES) @validates('state') def _validate_state(self, key, value): return self._validate_inclusion(key, value, _VALID_STATES) @classmethod
[docs] def empty_params(Self, model_type): fit = { 'r2': None, 'rss': None, } if model_type == 'easy_linear': inputs = {'pointCount': '5'} coefficients = { 'y0': None, 'y0_lm': None, 'mumax': None, 'lag': None, } elif model_type == 'logistic': inputs = {'endTime': ''} coefficients = { 'y0': None, 'mumax': None, 'K': None, } elif model_type == 'baranyi_roberts': inputs = {'endTime': ''} coefficients = { 'y0': None, 'mumax': None, 'K': None, 'h0': None, } elif model_type.startswith('custom_'): inputs = {} coefficients = {} fit = {} else: raise ValueError(f"Don't know what the coefficients are for model type: {repr(model_type)}") return { 'coefficients': coefficients, 'inputs': inputs, 'fit': fit, }
@hybrid_property
[docs] def isPublished(self): return self.publishedAt != None
@property
[docs] def info(self): return ModelInfo( type=self.type, name=self.model_name, url=self.model_url, description=self.model_description, params=self.model_params, )
@property
[docs] def model_name(self): if self.type.startswith('custom_'): return self.customModel.name else: return MODEL_NAMES[self.type]
@property
[docs] def short_model_name(self): if self.type.startswith('custom_'): return self.customModel.shortName else: return SHORT_MODEL_NAMES[self.type]
@property
[docs] def model_url(self): if self.type.startswith('custom_'): return self.customModel.url
@property
[docs] def model_description(self): if self.type.startswith('custom_'): return self.customModel.description else: return MODEL_DESCRIPTIONS[self.type]
@property
[docs] def model_params(self): coefficient_names = [] fit_names = [] if self.type.startswith('custom_'): coefficient_names = self.customModel.coefficientNames fit_names = self.customModel.fitNames else: empty_params = self.__class__.empty_params(self.type) coefficient_names = empty_params['coefficients'].keys() fit_names = empty_params['fit'].keys() return { 'coefficients': [ALL_COEFFICIENTS[c] for c in coefficient_names], 'fit': [FIT_PARAMETERS[f] for f in fit_names], }
@property
[docs] def target(self): if self.measurementContextId: return self.measurementContext elif self.workspaceEntryId: return self.workspaceEntry else: raise ValueError("None of measurementContextId or workspaceEntryId are present")
@property
[docs] def units(self): return self.target.units
[docs] def get_chart_label(self): model_name = self.short_model_name or self.model_name return self.target.get_chart_label(model_name=model_name)
[docs] def generate_chart_df(self, measurements_df=None): if self.type.startswith('custom_'): timepoints = _map_float(self.xValues) values = _map_float(self.yValues) errors = _map_float(self.yErrors) else: if measurements_df is None: raise ValueError("A dataframe with measurements need to be provided") start_time = measurements_df['time'].min() end_time = measurements_df['time'].max() timepoints = np.linspace(start_time, end_time, 200) values = self._predict(timepoints) errors = None data = { 'time': timepoints, 'value': values, 'std': errors or [float('nan') for _ in range(len(timepoints))], } df = pd.DataFrame.from_dict(data) return df
[docs] def update_model_params(self, form): if not self.customModel: raise RuntimeError("Tried to update non-custom modeling result") coefficients = self.params.get('coefficients', {}) fit = self.params.get('fit', {}) for name in self.customModel.coefficientNames: key = f"coefficients[{name}]" if key in form: coefficients[name] = form[key] for name in self.customModel.fitNames: key = f"fit[{name}]" if key in form: fit[name] = form[key] self.params['coefficients'] = coefficients self.params['fit'] = fit flag_modified(self, 'params')
[docs] def visible_to_user(self, user): if self.measurementContextId is not None: return self.study.visible_to_user(user) elif self.workspaceEntryId is not None: return self.workspaceEntry.workspace.visible_to_user(user) else: raise ValueError("None of measurementContextId or workspaceEntryId are present")
def _predict(self, timepoints): if self.type == 'easy_linear': return self._predict_easy_linear(timepoints) elif self.type == 'logistic': return self._predict_logistic(timepoints) elif self.type == 'baranyi_roberts': return self._predict_baranyi_roberts(timepoints) else: raise ValueError(f"Don't know how to predict values for model type: {repr(self.type)}") def _predict_easy_linear(self, time): coefficients = self.params['coefficients'] # y0 = float(coefficients['y0']) y0_lm = float(coefficients['y0_lm']) mumax = float(coefficients['mumax']) # lag = float(coefficients['lag']) # No lag: # return y0 * np.exp(time * mumax) # Exponential: return y0_lm * np.exp(time * mumax) def _predict_logistic(self, time): coefficients = self.params['coefficients'] y0 = float(coefficients['y0']) mumax = float(coefficients['mumax']) K = float(coefficients['K']) return (K * y0)/(y0 + (K - y0) * np.exp(-mumax * time)) def _predict_baranyi_roberts(self, time): coefficients = self.params['coefficients'] y0 = float(coefficients['y0']) mumax = float(coefficients['mumax']) K = float(coefficients['K']) h0 = float(coefficients['h0']) # Formula taken from the "growthrates" documentation under `grow_baranyi`: # https://cran.r-project.org/web/packages/growthrates/growthrates.pdf # A = time + 1/mumax * np.log(np.exp(-mumax * time) + np.exp(-h0) - np.exp(-mumax * time - h0)) log_y = np.log(y0) + mumax * A - np.log(1 + (np.exp(mumax * A) - 1)/np.exp(np.log(K) - np.log(y0))) return np.exp(log_y)
def _map_float(decimal_list): result = [] if len(decimal_list) == 1 and isinstance(decimal_list[0], list): # Fix for an odd serialization bug where a list of nulls gets stored as # a nested list: decimal_list = decimal_list[0] for value in decimal_list: if isinstance(value, Decimal): result.append(float(value)) else: result.append(value) return result