Source code for initialization.admin

import io
from datetime import datetime, timezone

import simplejson as json
from wtforms import fields
from werkzeug.exceptions import NotFound
from sqlalchemy.orm import configure_mappers
from markupsafe import Markup
from flask import (
    g,
    send_file,
    request,
)
from flask_admin import Admin, form, AdminIndexView, expose
from flask_admin.model.form import converts
from flask_admin.contrib.sqla import ModelView
from flask_admin.contrib.sqla.form import AdminModelConverter
from flask_admin._compat import as_unicode
from flask_admin.model.template import EndpointLinkRowAction

from db import FLASK_DB
from app.model.orm.orm_base import OrmBase
from app.model.orm import (
    Bioreplicate,
    Community,
    Compartment,
    CustomModel,
    ExcelFile,
    Experiment,
    ExperimentCompartment,
    Measurement,
    MeasurementContext,
    MeasurementTechnique,
    Metabolite,
    ModelingResult,
    PageError,
    PageVisit,
    PageVisitCounter,
    Perturbation,
    Project,
    ProjectUser,
    Study,
    StudyMetabolite,
    StudyStrain,
    StudyTechnique,
    StudyUser,
    Submission,
    SubmissionBackup,
    Taxon,
    User,
)
from app.model.lib.util import humanize_camelcased_string


[docs] def json_formatter(_view, data, _name): "Format JSON using ``simplejson`` with ``use_decimal=True``" return Markup(f"<pre>{json.dumps(data, indent=2, use_decimal=True)}</pre>")
[docs] def json_page_visit_counter_formatter(_view, data, _name): "Format nested JSON as an HTML table, specifically for Page Visit records" sorted_entries = sorted( ((k, v) for k, v in data.items() if v['visitCount'] > 0), key=lambda pair: -pair[1]['visitCount'], ) rows = [] for key, entry in sorted_entries: rows.append(f""" <tr> <td>{key}</td> <td>{entry['visitCount']}</td> <td>{entry['visitorCount']}</td> <td>{entry['userCount']}</td> </tr> """) html = f""" <table style="font-size: 14px;"> <thead> <tr> <th>Key ({len(sorted_entries)})</th> <th>Visits</th> <th>Visitors</th> <th>Users</th> </tr> </thead> <tbody> {"\n".join(rows)} </tbody> </table> """ return Markup(html)
[docs] def record_formatter(_view, record, *args): "Format ORM records to make them easier to read" if hasattr(record, 'publicId'): return record.publicId elif hasattr(record, 'name'): return record.name else: return str(record)
_FORMATTERS = { dict: json_formatter, OrmBase: record_formatter, }
[docs] class AppJSONField(fields.TextAreaField): "Copied from Flask-admin to replace ``json`` with ``simplejson`` with ``use_decimal=True``" def _value(self): if self.raw_data: return self.raw_data[0] elif self.data: # prevent utf8 characters from being converted to ascii return as_unicode(json.dumps(self.data, use_decimal=True, ensure_ascii=False)) else: return '{}'
[docs] def process_formdata(self, valuelist): "Process data received over the wire from a form." if valuelist: value = valuelist[0] # allow saving blank field as None if not value: self.data = None return try: self.data = json.loads(valuelist[0], use_decimal=True) except ValueError: raise ValueError(self.gettext('Invalid JSON'))
[docs] class AppModelConverter(AdminModelConverter): "Custom converter for JSON and datetime fields" @converts('JSON')
[docs] def convert_JSON(self, field_args, **extra): return AppJSONField(**field_args)
@converts('UtcDateTime')
[docs] def convert_datetime(self, field_args, **extra): return AppDateTimeField(**field_args, default=datetime.utcnow)
[docs] class AppDateTimeField(form.DateTimeField): "Ensures that timestamps in the database are stored in the UTC timezone"
[docs] def process_formdata(self, valuelist): "Process data received over the wire from a form." if not valuelist: return date_str = " ".join(valuelist) for format in self.strptime_format: try: self.data = datetime.strptime(date_str, format).replace(tzinfo=timezone.utc) return except ValueError: self.data = None raise ValueError(self.gettext("Not a valid datetime value."))
[docs] class AppAdminIndexView(AdminIndexView): "Blocks the entire admin for non-admin users" @expose('/')
[docs] def index(self): if not g.current_user or not g.current_user.isAdmin: raise NotFound() return super(AppAdminIndexView, self).index()
[docs] class AppView(ModelView): "Custom view with common functionality for ORM records"
[docs] can_export = True
[docs] can_view_details = True
# By default, sort by id DESC, so the newest records show up first. This is # overridden below for the three records that use "publicId" as the primary # key.
[docs] column_default_sort = ('id', True)
[docs] model_form_converter = AppModelConverter
"Custom converter for JSON and datetime" def _prettify_name(self, name): "Overridden to render camelCased strings nicely in various places" return humanize_camelcased_string(name).title()
[docs] column_type_formatters = _FORMATTERS
"Custom formatters for the list view"
[docs] column_type_formatters_export = _FORMATTERS
"Custom formatters for the export view"
[docs] column_type_formatters_detail = _FORMATTERS
"Custom formatters for the detail view"
[docs] def is_accessible(self): "Only allow admin users to the admin" return g.current_user and g.current_user.isAdmin
[docs] def inaccessible_callback(self, name, **kwargs): "Raise 404 instead of 403 to hide the presence of the endpoint for bots" raise NotFound()
[docs] def init_admin(app): """ Main entry point of the module, initializes Flask-Admin for our Flask app """ configure_mappers() admin = Admin( app, name='mGrowthDB admin', index_view=AppAdminIndexView(), ) db_session = FLASK_DB.session class StudyView(AppView): column_searchable_list = ['name'] column_exclude_list = ['description', 'authors'] column_default_sort = ('publicId', True) form_excluded_columns = [ 'measurements', 'measurementContexts', 'studyTechniques', 'measurementTechniques', 'studyUsers', 'experiments', 'strains', 'communities', 'compartments', 'modelingResults', 'bioreplicates', 'studyMetabolites', 'metabolites', 'authorCache', ] class ProjectView(AppView): column_default_sort = ('publicId', True) class SubmissionView(AppView): column_exclude_list = ['studyDesign', 'dataFile'] form_excluded_columns = ['project', 'study'] class ExcelFileView(AppView): column_exclude_list = ['content'] column_details_exclude_list = ['content'] can_edit = False can_create = False can_export = False column_extra_row_actions = [ EndpointLinkRowAction("fa fa-download", ".download_view"), ] @expose("/download", methods=("GET",)) def download_view(self): file = g.db_session.get(ExcelFile, request.args['id']) return send_file( io.BytesIO(file.content), as_attachment=True, download_name=file.filename ) admin.add_view(ProjectView(Project, db_session, category="Studies")) admin.add_view(StudyView(Study, db_session, category="Studies")) admin.add_view(SubmissionView(Submission, db_session, category="Studies")) admin.add_view(SubmissionView(SubmissionBackup, db_session, category="Studies")) admin.add_view(AppView(StudyStrain, db_session, category="Studies")) admin.add_view(AppView(StudyMetabolite, db_session, category="Studies")) admin.add_view(ExcelFileView(ExcelFile, db_session, category="Studies")) class ExperimentEntityView(AppView): # We ignore all of these "child" entities, because they can't # practically be updated through the form. form_excluded_columns = [ 'bioreplicates', 'compartments', 'perturbations', 'experimentCompartments', 'experiments', 'measurementContexts', 'measurements', ] class ExperimentView(ExperimentEntityView): column_default_sort = ('publicId', True) admin.add_view(ExperimentView(Experiment, db_session, category="Experiments")) admin.add_view(ExperimentEntityView(ExperimentCompartment, db_session, category="Experiments")) admin.add_view(ExperimentEntityView(Compartment, db_session, category="Experiments")) admin.add_view(ExperimentEntityView(Bioreplicate, db_session, category="Experiments")) admin.add_view(ExperimentEntityView(Community, db_session, category="Experiments")) admin.add_view(ExperimentEntityView(Perturbation, db_session, category="Experiments")) class ModelingResultView(AppView): column_exclude_list = ['rSummary', 'xValues', 'yValues', 'yErrors'] admin.add_view(AppView(StudyTechnique, db_session, category="Measurements")) admin.add_view(AppView(MeasurementTechnique, db_session, category="Measurements")) admin.add_view(AppView(MeasurementContext, db_session, category="Measurements")) admin.add_view(AppView(Measurement, db_session, category="Measurements")) admin.add_view(ModelingResultView(ModelingResult, db_session, category="Measurements")) admin.add_view(ModelingResultView(CustomModel, db_session, category="Measurements")) class MetaboliteView(AppView): column_searchable_list = ['name'] form_excluded_columns = ['studyMetabolites'] class TaxonView(AppView): column_searchable_list = ['name'] admin.add_view(MetaboliteView(Metabolite, db_session, category="External data")) admin.add_view(TaxonView(Taxon, db_session, category="External data")) class UserView(AppView): form_excluded_columns = [ 'createdAt', 'lastLoginAt', 'updatedAt', 'submissions', 'orcidToken', 'managedProjects', 'managedStudies', 'ownedProjects', 'ownedStudies', 'projectUsers', 'studyUsers', ] class PageVisitView(AppView): column_list = [ 'createdAt', 'path', 'parsedQuery', 'userAgent', 'ip', 'country', 'uuid', 'isBot', 'isUser', 'isAdmin', ] column_filters = ['isBot', 'isUser', 'isAdmin', 'country', 'path'] class PageVisitCounterView(AppView): column_type_formatters = { **_FORMATTERS, dict: json_page_visit_counter_formatter, } class PageErrorView(AppView): column_list = ['createdAt', 'fullPath', 'traceback', 'uuid', 'userId'] column_formatters = { 'traceback': lambda v, c, m, p: Markup(f"<pre>{m.traceback}</pre>"), } admin.add_view(UserView(User, db_session, category="Users")) admin.add_view(AppView(StudyUser, db_session, category="Users")) admin.add_view(AppView(ProjectUser, db_session, category="Users")) admin.add_view(PageVisitView(PageVisit, db_session, category="Users")) admin.add_view(PageVisitCounterView(PageVisitCounter, db_session, category="Users")) admin.add_view(PageErrorView(PageError, db_session, category="Users")) return app