import os
from typing import Optional
from datetime import datetime, UTC
from pathlib import Path
import shutil
import subprocess
import simplejson as json
import sqlalchemy as sql
from sqlalchemy.orm import (
Mapped,
mapped_column,
relationship,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy_utc.sqltypes import UtcDateTime
from app.model.orm.orm_base import OrmBase
[docs]
class Submission(OrmBase):
"""
A temporary container for the data of a ``Study``, uploaded by a particular ``User``.
The study design is stored in a JSON field, built up over several steps in
a frontend form. The study measurements are stored in an uploaded excel
file. Both of these are processed to create individual entities that are
accessible to the public.
"""
[docs]
__tablename__ = 'Submissions'
[docs]
id: Mapped[int] = mapped_column(primary_key=True)
[docs]
projectUniqueID: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
studyUniqueID: Mapped[str] = mapped_column(sql.String(100), nullable=False)
[docs]
project: Mapped[Optional['Project']] = relationship(
foreign_keys=[projectUniqueID],
primaryjoin="Submission.projectUniqueID == Project.uuid",
)
[docs]
study: Mapped[Optional['Study']] = relationship(
foreign_keys=[studyUniqueID],
primaryjoin="Submission.studyUniqueID == Study.uuid",
)
[docs]
userUniqueID: Mapped[str] = mapped_column(sql.ForeignKey('Users.uuid'), nullable=False)
[docs]
user: Mapped['User'] = relationship(back_populates='submissions')
[docs]
studyDesign: Mapped[sql.JSON] = mapped_column(sql.JSON, nullable=False)
[docs]
dataFileId: Mapped[int] = mapped_column(sql.ForeignKey('ExcelFiles.id'), nullable=True)
[docs]
dataFile: Mapped[Optional['ExcelFile']] = relationship(
foreign_keys=[dataFileId],
cascade='all, delete-orphan',
single_parent=True,
)
[docs]
createdAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
updatedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue())
[docs]
publishedAt: Mapped[datetime] = mapped_column(UtcDateTime, server_default=sql.FetchedValue(), nullable=True)
[docs]
changelogText: Mapped[sql.String] = mapped_column(sql.String, nullable=True)
@hybrid_property
[docs]
def isPublished(self):
return self.publishedAt != None
@property
[docs]
def completedStepCount(self):
return sum([
1 if self.projectUniqueID and self.studyUniqueID else 0,
1 if len(self.studyDesign.get('strains', [])) + len(self.studyDesign.get('custom_strains', [])) > 0 else 0,
1 if len(self.studyDesign.get('techniques', [])) > 0 else 0,
1 if len(self.studyDesign.get('compartments', [])) > 0 and len(self.studyDesign.get('communities', [])) > 0 else 0,
1 if len(self.studyDesign.get('experiments', [])) > 0 else 0,
1 if self.dataFileId else 0,
1 if self.study and self.study.isPublished else 0,
])
[docs]
def build_techniques(self):
from app.model.orm import StudyTechnique, MeasurementTechnique
study_techniques = []
for technique_data in self.studyDesign['techniques']:
cell_types = technique_data.get('cellTypes', [])
study_technique = StudyTechnique(**StudyTechnique.filter_keys(technique_data))
for cell_type in cell_types:
mt = MeasurementTechnique(**MeasurementTechnique.filter_keys(technique_data), cellType=cell_type)
study_technique.measurementTechniques.append(mt)
if len(cell_types) == 0:
mt = MeasurementTechnique(**MeasurementTechnique.filter_keys(technique_data))
study_technique.measurementTechniques.append(mt)
study_techniques.append(study_technique)
return study_techniques
[docs]
def export_data(self, message, timestamp=None):
assert(self.study is not None)
assert(self.study.isPublished)
if timestamp is None:
timestamp = datetime.now(UTC)
app_env = os.getenv('APP_ENV', 'development')
if app_env == 'test':
base_dir = Path("var/test/export")
else:
base_dir = Path("static/export")
study_dir = base_dir / self.study.publicId
study_dir.mkdir(parents=True, exist_ok=True)
# Clean up previous files:
for file in study_dir.glob('*.csv'):
file.unlink()
for file in study_dir.glob('*.json'):
file.unlink()
# Export study design:
with open(study_dir / 'study_design.json', 'w') as f:
json.dump(self.studyDesign, f, use_decimal=True, indent=2)
# Export data files:
# (Note: file should always exist, but it might not in tests)
if self.dataFile:
for name, df in self.dataFile.extract_sheets().items():
file_name = '_'.join(name.lower().split()) + '.csv'
df.to_csv(study_dir / file_name, index=False)
# Record a changelog entry
with open(study_dir / 'changes.log', 'a') as f:
print(f"[{timestamp.isoformat()}] {message}", file=f)
# Zip data for batch downloads
if zip_exe := shutil.which('zip'):
f'zip {self.study.publicId}.zip -r {self.study.publicId}/'
subprocess.run(
[zip_exe, f"{self.study.publicId}.zip", '-r', f"{self.study.publicId}/"],
cwd=base_dir
)
subprocess.run(
[zip_exe, f"all_studies.zip", '-r', f"{self.study.publicId}/"],
cwd=base_dir
)