Source code for app.model.orm.submission

import os
from typing import Optional
from datetime import datetime, UTC
from pathlib import Path
import shutil
import subprocess

import simplejson as json
import sqlalchemy as sql
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime

from app.model.orm.orm_base import OrmBase


[docs] class Submission(OrmBase): """ A temporary container for the data of a ``Study``, uploaded by a particular ``User``. The study design is stored in a JSON field, built up over several steps in a frontend form. The study measurements are stored in an uploaded excel file. Both of these are processed to create individual entities that are accessible to the public. """
[docs] __tablename__ = 'Submissions'
[docs] id: Mapped[int] = mapped_column(primary_key=True)
[docs] projectUniqueID: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] studyUniqueID: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs] project: Mapped[Optional['Project']] = relationship( foreign_keys=[projectUniqueID], primaryjoin="Submission.projectUniqueID == Project.uuid", )
[docs] study: Mapped[Optional['Study']] = relationship( foreign_keys=[studyUniqueID], primaryjoin="Submission.studyUniqueID == Study.uuid", )
[docs] userUniqueID: Mapped[str] = mapped_column(sql.ForeignKey('Users.uuid'), nullable=False)
[docs] user: Mapped['User'] = relationship(back_populates='submissions')
[docs] studyDesign: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs] dataFileId: Mapped[int] = mapped_column(sql.ForeignKey('ExcelFiles.id'), nullable=True)
[docs] dataFile: Mapped[Optional['ExcelFile']] = relationship( foreign_keys=[dataFileId], cascade='all, delete-orphan', single_parent=True, )
[docs] createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs] publishedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue(), nullable=True)
[docs] changelogText: Mapped[sql.String] = mapped_column(sql.String, nullable=True)
@hybrid_property
[docs] def isPublished(self): return self.publishedAt != None
@property
[docs] def completedStepCount(self): return sum([ 1 if self.projectUniqueID and self.studyUniqueID else 0, 1 if len(self.studyDesign.get('strains', [])) + len(self.studyDesign.get('custom_strains', [])) > 0 else 0, 1 if len(self.studyDesign.get('techniques', [])) > 0 else 0, 1 if len(self.studyDesign.get('compartments', [])) > 0 and len(self.studyDesign.get('communities', [])) > 0 else 0, 1 if len(self.studyDesign.get('experiments', [])) > 0 else 0, 1 if self.dataFileId else 0, 1 if self.study and self.study.isPublished else 0, ])
[docs] def build_techniques(self): from app.model.orm import StudyTechnique, MeasurementTechnique study_techniques = [] for technique_data in self.studyDesign['techniques']: cell_types = technique_data.get('cellTypes', []) study_technique = StudyTechnique(**StudyTechnique.filter_keys(technique_data)) for cell_type in cell_types: mt = MeasurementTechnique(**MeasurementTechnique.filter_keys(technique_data), cellType=cell_type) study_technique.measurementTechniques.append(mt) if len(cell_types) == 0: mt = MeasurementTechnique(**MeasurementTechnique.filter_keys(technique_data)) study_technique.measurementTechniques.append(mt) study_techniques.append(study_technique) return study_techniques
[docs] def export_data(self, message, timestamp=None): assert(self.study is not None) assert(self.study.isPublished) if timestamp is None: timestamp = datetime.now(UTC) app_env = os.getenv('APP_ENV', 'development') if app_env == 'test': base_dir = Path("var/test/export") else: base_dir = Path("static/export") study_dir = base_dir / self.study.publicId study_dir.mkdir(parents=True, exist_ok=True) # Clean up previous files: for file in study_dir.glob('*.csv'): file.unlink() for file in study_dir.glob('*.json'): file.unlink() # Export study design: with open(study_dir / 'study_design.json', 'w') as f: json.dump(self.studyDesign, f, use_decimal=True, indent=2) # Export data files: # (Note: file should always exist, but it might not in tests) if self.dataFile: for name, df in self.dataFile.extract_sheets().items(): file_name = '_'.join(name.lower().split()) + '.csv' df.to_csv(study_dir / file_name, index=False) # Record a changelog entry with open(study_dir / 'changes.log', 'a') as f: print(f"[{timestamp.isoformat()}] {message}", file=f) # Zip data for batch downloads if zip_exe := shutil.which('zip'): f'zip {self.study.publicId}.zip -r {self.study.publicId}/' subprocess.run( [zip_exe, f"{self.study.publicId}.zip", '-r', f"{self.study.publicId}/"], cwd=base_dir ) subprocess.run( [zip_exe, f"all_studies.zip", '-r', f"{self.study.publicId}/"], cwd=base_dir )