import re
from datetime import datetime
from decimal import Decimal
import numpy as np
import pandas as pd
import sqlalchemy as sql
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
validates,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime
from sqlalchemy.orm.attributes import flag_modified
from app.model.orm.orm_base import OrmBase
from app.model.lib.modeling import (
MODEL_NAMES,
SHORT_MODEL_NAMES,
MODEL_DESCRIPTIONS,
ALL_COEFFICIENTS,
FIT_PARAMETERS,
)
_VALID_TYPES = [
'easy_linear',
'logistic',
'baranyi_roberts',
]
_VALID_STATES = [
'pending',
'ready',
'error',
]
[docs]
class ModelInfo:
def __init__(self, *, type, name, url, description, params):
[docs]
self.description = description
@property
[docs]
def is_custom(self):
return self.type.startswith('custom_')
[docs]
def __eq__(self, other):
return self.type == other.type
[docs]
def __hash__(self):
return self.type.__hash__()
[docs]
class ModelingResult(OrmBase):
"""
The results of fitting a model onto a set of measurements.
The measurements are represented by a ``ModelingContext`` and the results
of the calculation are stored in the ``params`` field. The ``state`` of the
record describes the status of the job that runs the calculations.
"""
[docs]
__tablename__ = "ModelingResults"
[docs]
id: Mapped[int] = mapped_column(primary_key=True)
[docs]
type: Mapped[str] = mapped_column(sql.String(100), nullable=False)
# A modeling result should either have a target MeasurementContext or
# WorkspaceEntry. Both are marked as nullable, but at least one should be
# present:
#
[docs]
measurementContextId: Mapped[int] = mapped_column(
sql.ForeignKey('MeasurementContexts.id'),
nullable=True,
)
[docs]
measurementContext: Mapped['MeasurementContext'] = relationship(back_populates='modelingResults')
[docs]
study: Mapped['Study'] = relationship(
secondary='MeasurementContexts',
viewonly=True
)
[docs]
workspaceEntryId: Mapped[int] = mapped_column(
sql.ForeignKey('WorkspaceEntries.id'),
nullable=True,
)
[docs]
workspaceEntry: Mapped['WorkspaceEntry'] = relationship(back_populates='modelingResults')
[docs]
customModelId: Mapped[int] = mapped_column(sql.ForeignKey('CustomModels.id'))
[docs]
customModel: Mapped['CustomModel'] = relationship()
[docs]
params: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
state: Mapped[str] = mapped_column(sql.String(100), default='pending')
[docs]
error: Mapped[str] = mapped_column(sql.String)
[docs]
rSummary: Mapped[str] = mapped_column(sql.String)
[docs]
createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
calculatedAt: Mapped[datetime] = mapped_column(UtcDateTime)
[docs]
publishedAt: Mapped[datetime] = mapped_column(UtcDateTime)
# For custom models:
[docs]
xValues: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
yValues: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
yErrors: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
@validates('type')
def _validate_type(self, key, value):
if re.fullmatch(r'custom_\d+', value):
return value
else:
return self._validate_inclusion(key, value, _VALID_TYPES)
@validates('state')
def _validate_state(self, key, value):
return self._validate_inclusion(key, value, _VALID_STATES)
@classmethod
[docs]
def empty_params(Self, model_type):
fit = {
'r2': None,
'rss': None,
}
if model_type == 'easy_linear':
inputs = {'pointCount': '5'}
coefficients = {
'y0': None,
'y0_lm': None,
'mumax': None,
'lag': None,
}
elif model_type == 'logistic':
inputs = {'endTime': ''}
coefficients = {
'y0': None,
'mumax': None,
'K': None,
}
elif model_type == 'baranyi_roberts':
inputs = {'endTime': ''}
coefficients = {
'y0': None,
'mumax': None,
'K': None,
'h0': None,
}
elif model_type.startswith('custom_'):
inputs = {}
coefficients = {}
fit = {}
else:
raise ValueError(f"Don't know what the coefficients are for model type: {repr(model_type)}")
return {
'coefficients': coefficients,
'inputs': inputs,
'fit': fit,
}
@hybrid_property
[docs]
def isPublished(self):
return self.publishedAt != None
@property
[docs]
def info(self):
return ModelInfo(
type=self.type,
name=self.model_name,
url=self.model_url,
description=self.model_description,
params=self.model_params,
)
@property
[docs]
def model_name(self):
if self.type.startswith('custom_'):
return self.customModel.name
else:
return MODEL_NAMES[self.type]
@property
[docs]
def short_model_name(self):
if self.type.startswith('custom_'):
return self.customModel.shortName
else:
return SHORT_MODEL_NAMES[self.type]
@property
[docs]
def model_url(self):
if self.type.startswith('custom_'):
return self.customModel.url
@property
[docs]
def model_description(self):
if self.type.startswith('custom_'):
return self.customModel.description
else:
return MODEL_DESCRIPTIONS[self.type]
@property
[docs]
def model_params(self):
coefficient_names = []
fit_names = []
if self.type.startswith('custom_'):
coefficient_names = self.customModel.coefficientNames
fit_names = self.customModel.fitNames
else:
empty_params = self.__class__.empty_params(self.type)
coefficient_names = empty_params['coefficients'].keys()
fit_names = empty_params['fit'].keys()
return {
'coefficients': [ALL_COEFFICIENTS[c] for c in coefficient_names],
'fit': [FIT_PARAMETERS[f] for f in fit_names],
}
@property
[docs]
def target(self):
if self.measurementContextId:
return self.measurementContext
elif self.workspaceEntryId:
return self.workspaceEntry
else:
raise ValueError("None of measurementContextId or workspaceEntryId are present")
@property
[docs]
def units(self):
return self.target.units
[docs]
def get_chart_label(self):
model_name = self.short_model_name or self.model_name
return self.target.get_chart_label(model_name=model_name)
[docs]
def generate_chart_df(self, measurements_df=None):
if self.type.startswith('custom_'):
timepoints = _map_float(self.xValues)
values = _map_float(self.yValues)
errors = _map_float(self.yErrors)
else:
if measurements_df is None:
raise ValueError("A dataframe with measurements need to be provided")
start_time = measurements_df['time'].min()
end_time = measurements_df['time'].max()
timepoints = np.linspace(start_time, end_time, 200)
values = self._predict(timepoints)
errors = None
data = {
'time': timepoints,
'value': values,
'std': errors or [float('nan') for _ in range(len(timepoints))],
}
df = pd.DataFrame.from_dict(data)
return df
[docs]
def update_model_params(self, form):
if not self.customModel:
raise RuntimeError("Tried to update non-custom modeling result")
coefficients = self.params.get('coefficients', {})
fit = self.params.get('fit', {})
for name in self.customModel.coefficientNames:
key = f"coefficients[{name}]"
if key in form:
coefficients[name] = form[key]
for name in self.customModel.fitNames:
key = f"fit[{name}]"
if key in form:
fit[name] = form[key]
self.params['coefficients'] = coefficients
self.params['fit'] = fit
flag_modified(self, 'params')
[docs]
def visible_to_user(self, user):
if self.measurementContextId is not None:
return self.study.visible_to_user(user)
elif self.workspaceEntryId is not None:
return self.workspaceEntry.workspace.visible_to_user(user)
else:
raise ValueError("None of measurementContextId or workspaceEntryId are present")
def _predict(self, timepoints):
if self.type == 'easy_linear':
return self._predict_easy_linear(timepoints)
elif self.type == 'logistic':
return self._predict_logistic(timepoints)
elif self.type == 'baranyi_roberts':
return self._predict_baranyi_roberts(timepoints)
else:
raise ValueError(f"Don't know how to predict values for model type: {repr(self.type)}")
def _predict_easy_linear(self, time):
coefficients = self.params['coefficients']
# y0 = float(coefficients['y0'])
y0_lm = float(coefficients['y0_lm'])
mumax = float(coefficients['mumax'])
# lag = float(coefficients['lag'])
# No lag:
# return y0 * np.exp(time * mumax)
# Exponential:
return y0_lm * np.exp(time * mumax)
def _predict_logistic(self, time):
coefficients = self.params['coefficients']
y0 = float(coefficients['y0'])
mumax = float(coefficients['mumax'])
K = float(coefficients['K'])
return (K * y0)/(y0 + (K - y0) * np.exp(-mumax * time))
def _predict_baranyi_roberts(self, time):
coefficients = self.params['coefficients']
y0 = float(coefficients['y0'])
mumax = float(coefficients['mumax'])
K = float(coefficients['K'])
h0 = float(coefficients['h0'])
# Formula taken from the "growthrates" documentation under `grow_baranyi`:
# https://cran.r-project.org/web/packages/growthrates/growthrates.pdf
#
A = time + 1/mumax * np.log(np.exp(-mumax * time) + np.exp(-h0) - np.exp(-mumax * time - h0))
log_y = np.log(y0) + mumax * A - np.log(1 + (np.exp(mumax * A) - 1)/np.exp(np.log(K) - np.log(y0)))
return np.exp(log_y)
def _map_float(decimal_list):
result = []
if len(decimal_list) == 1 and isinstance(decimal_list[0], list):
# Fix for an odd serialization bug where a list of nulls gets stored as
# a nested list:
decimal_list = decimal_list[0]
for value in decimal_list:
if isinstance(value, Decimal):
result.append(float(value))
else:
result.append(value)
return result