您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Celery撤销任务,然后使用Django数据库执行

Celery撤销任务,然后使用Django数据库执行

由于Celery通过ID跟踪任务,因此您真正需要的就是能够知道哪些ID已被取消。kombu您可以创建自己的表(或memcached类似表)来跟踪已取消的ID,而不用修改内部信息,然后检查当前可取消任务的ID是否在其中。

这是支持远程revoke命令的传输在内部执行的操作:

所有工作节点都在内存中或持久性磁盘上保留已撤销任务ID的内存(请参阅持久性撤销)。 (来自Celery文档)

当您使用django传输时,您要自己负责。在这种情况下,由每个任务来检查它是否已被取消。

因此,任务的基本形式(代替实际操作而添加日志)变为:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

您可以通过多种方式来扩展和改进它,例如通过创建基本的CancelableTask类,就像您链接到的其他答案之一一样,但这是基本形式。您现在缺少的是模型和检查它的功能

请注意,这种情况下的ID将是字符串ID,例如a5644f08-7d30-43ff-a61e-81c165ad9e19而不是 整数。您的模型可以像这样简单:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

现在,您可以通过查看celery服务的调试日志来检查行为,例如:

my_task.delay()
models.cancel_task(my_task.delay())
Go 2022/1/1 18:40:48 有480人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶