Source code for initialization.celery

import os

from celery import Celery, Task
from celery.schedules import crontab

from app.model.tasks.tracking import aggregate_page_visits
from app.model.tasks.submissions import publish_eligible_studies


[docs] def init_celery(app): """ Main entry point of the module. The redis config defaults to port ``6379`` of ``localhost`` to match the one used in the "services" docker image. This can be overridden by two environment variables: * ``REDIS_HOST`` * ``REDIS_PORT`` """ class AppTask(Task): def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): return self.run(*args, **kwargs) redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = os.getenv('REDIS_PORT', '6379') redis_url = f"redis://{redis_host}:{redis_port}/0" celery_app = Celery(app.name, task_cls=AppTask) celery_app.config_from_object({ 'broker_url': redis_url, 'result_backend': redis_url, 'task_ignore_result': True, 'timezone': 'UTC', 'beat_schedule': { 'aggregate-page-visits': { 'task': 'app.model.tasks.tracking.aggregate_page_visits', # 1pm UTC on Sunday: 'schedule': crontab(hour=13, minute=0, day_of_week='sunday'), 'args': (), }, 'publish-eligible-studies': { 'task': 'app.model.tasks.submissions.publish_eligible_studies', # Daily at 2am UTC 'schedule': crontab(hour=2, minute=0), 'args': (), }, } }) celery_app.set_default() app.extensions["celery"] = celery_app return app