这是一个与flask应用程序工厂模式一起使用的解决方案,并且无需使用即可创建具有上下文的celery任务app.app_context()。在避免循环导入的同时获取该应用程序确实很棘手,但这可以解决问题。这是针对celery4.2的,这是撰写本文时的最新版本。
结构体:
repo_name/
manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
因此base是主要的应用程序包在这个例子中。在中,base/__init__.py
我们创建celery实例,如下所示:
from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')
该base/app.py文件包含flask应用程序工厂,create_app并注意init_celery(app, celery)其中包含:
from base import celery
from base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask('base')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
转到base/runcelery.py内容:
from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONfig = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONfig)
init_celery(app, celery)
接下来,base/celeryconfig.py
文件(作为示例):
# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)
## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False
accept_content = ['json', 'application/text']
result_serializer = 'json'
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'workers.add_together',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
# }
现在在base/utility/celery_util.py文件中定义init_celery :
# -*- coding: utf-8 -*-
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
对于以下方面的工人base/tasks/workers.py:
from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail
@celery_app.task
def send_async_email(msg):
"""Background task to send an email with Flask-mail."""
#with app.app_context():
mail.send(msg)
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security's mail.
You don't need to use with app.app_context() here. Task has context.
"""
user = User.query.filter_by(id=user_id).first()
print(f'sending user {user} a welcome email')
send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
email,
'welcome', user=user,
confirmation_link=confirmation_link)
然后,你需要从文件夹内部repo_name以两个不同的cmd提示符启动celery beat和celery worker 。
在一个cmd提示符下,执行a celery -A base.runcelery:celery beat
和另一个celery -A base.runcelery:celery worker
。
然后,执行需要flask上下文的任务。应该管用。
Flask-mail需要Flask应用程序上下文才能正常工作。在celery端实例化app对象,并使用app.app_context,如下所示:
with app.app_context():
celery.start()