您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

python – 使用Celery在部分任务中具有位置参数的链组

5b51 2022/1/14 8:21:48 python 字数 4553 阅读 528 来源 www.jb51.cc/python

我正在编写一个应用程序,它将异步执行一组多个同步任务链.换句话说,我可能有管道foo(a,b,c) - >一些bs列表的boo(a,b,c).我的理解是创建一个foo链(a,b,c)| boo(a,b,c)表示此列表中的每个b.然后这些链形成一个芹菜组,可以异步应用.我的代码如下: my_app.py #!/usr/bin/env python3 im

概述

我正在编写一个应用程序,它将异步执行一组多个同步任务链.

换句话说,我可能有管道foo(a,b,c) – >一些bs列表的boo(a,c).

我的理解是创建一个foo链(a,c)| boo(a,c)表示此列表中的每个b.然后这些链形成一个芹菜组,可以异步应用.

我的代码如下:

#!/usr/bin/env python3

import functools
import time

from celery import chain,group,Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery("my_app",broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')

@app.task
def foo(a,c):
    logger.info("foo from {0}!".format(b))
    return b

@app.task
def boo(a,c):
    logger.info("boo from {0}!".format(b))
    return b

def break_up_tasks(tasks):
    try:
        first_task,*remaining_tasks = tasks
    except ValueError as e:
        first_task,remaining_tasks = [],[]
    return first_task,remaining_tasks

def do_tasks(a,bs,c,opts):
    tasks = [foo,boo]

    # There should be an option for each task
    if len(opts) != len(tasks):
        raise ValueError("There should be {0} provided options".format(len(tasks)))

    # Create a list of tasks that should be included per the list of options' boolean values
    tasks = [task for opt,task in zip(opts,tasks) if opt]

    first_task,remaining_tasks = break_up_tasks(tasks)

    # If there are no tasks,we're done.
    if not first_task: return

    chains = (
        functools.reduce(
            # `a` should be provided by `apply_async`'s `args` kwarg
            # `b` should be provided by prevIoUs partials in chain
            lambda x,y: x | y.s(c),remaining_tasks,first_task.s(a,c)
        ) for b in bs
    )

    g = group(*chains)
    res = g.apply_async(args=(a,),queue="default")
    print("Applied async... waiting for termination.")

    total_tasks = len(tasks)

    while not res.ready():
        print("Waiting... {0}/{1} tasks complete".format(res.completed_count(),total_tasks))
        time.sleep(1)

if __name__ == "__main__":
    a = "whatever"
    bs = ["hello","world"]
    c = "baz"

    opts = [
        # do "foo"
        True,# do "boo"
        True
    ]

    do_tasks(a,opts)
celery worker -A my_app -l info -c 5 -Q default

但是,我发现,当我运行上述操作时,我的服务器客户端运行无限循环,因为boo缺少一个参数:

TypeError: boo() missing 1 required positional argument: 'c'

我的理解是apply_async将为每个链提供args kwarg,并且链中的先前链接将为后续链接提供其返回值.

为什么嘘声没有正确接受论点?我确信这些任务写得不好,因为这是我第一次涉足Celery.如果您有其他建议,我很乐意接受他们.

chains = (
    functools.reduce(
        # `a` should be provided by `apply_async`'s `args` kwarg
        # `b` should be provided by prevIoUs partials in chain
        lambda x,y: x | y.s(b,c),# <- here is the 'new guy'
        remaining_tasks,c)
    ) for b in bs
)

希望能帮助到你.

总结

以上是编程之家为你收集整理的python – 使用Celery在部分任务中具有位置参数的链组全部内容,希望文章能够帮你解决python – 使用Celery在部分任务中具有位置参数的链组所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶