您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Python Flask with celery 脱离了应用程序上下文

Python Flask with celery 脱离了应用程序上下文

这是一个与flask应用程序工厂模式一起使用的解决方案,并且无需使用即可创建具有上下文的celery任务app.app_context()。在避免循环导入的同时获取该应用程序确实很棘手,但这可以解决问题。这是针对celery4.2的,这是撰写本文时的最新版本。

结构体:

repo_name/
    manage.py
    base/
    base/__init__.py
    base/app.py
    base/runcelery.py
    base/celeryconfig.py
    base/utility/celery_util.py
    base/tasks/workers.py

因此base是主要的应用程序包在这个例子中。在中,base/__init__.py我们创建celery实例,如下所示:

from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')

该base/app.py文件包含flask应用程序工厂,create_app并注意init_celery(app, celery)其中包含:

from base import celery
from base.utility.celery_util import init_celery

def create_app(config_obj):
    """An application factory, as explained here:
    http://flask.pocoo.org/docs/patterns/appfactories/.
    :param config_object: The configuration object to use.
    """
    app = Flask('base')
    app.config.from_object(config_obj)
    init_celery(app, celery=celery)
    register_extensions(app)
    register_blueprints(app)
    register_errorhandlers(app)
    register_app_context_processors(app)
    return app

转到base/runcelery.py内容

from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONfig = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONfig)
init_celery(app, celery)

接下来,base/celeryconfig.py文件(作为示例):

# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""

## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0

# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)

## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False

accept_content = ['json', 'application/text']

result_serializer = 'json'
timezone = "UTC"

# define periodic tasks / cron here
# beat_schedule = {
#    'add-every-10-seconds': {
#        'task': 'workers.add_together',
#        'schedule': 10.0,
#        'args': (16, 16)
#    },
# }

现在在base/utility/celery_util.py文件中定义init_celery :

# -*- coding: utf-8 -*-

def init_celery(app, celery):
    """Add flask app context to celery.Task"""
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask

对于以下方面的工人base/tasks/workers.py:

from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail

@celery_app.task
def send_async_email(msg):
    """Background task to send an email with Flask-mail."""
    #with app.app_context():
    mail.send(msg)

@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
    """Background task to send a welcome email with flask-security's mail.
    You don't need to use with app.app_context() here. Task has context.
    """
    user = User.query.filter_by(id=user_id).first()
    print(f'sending user {user} a welcome email')
    send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
              email,
              'welcome', user=user,
              confirmation_link=confirmation_link) 

然后,你需要从文件夹内部repo_name以两个不同的cmd提示符启动celery beat和celery worker 。

一个cmd提示符下,执行a celery -A base.runcelery:celery beat和另一个celery -A base.runcelery:celery worker

然后,执行需要flask上下文的任务。应该管用。

Flask-mail需要Flask应用程序上下文才能正常工作。在celery端实例化app对象,并使用app.app_context,如下所示:

with app.app_context():
    celery.start()
Python 2022/1/1 18:24:11 有175人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶