您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

在Python中的进程之间共享许多队列

在Python中的进程之间共享许多队列

当您尝试multiprocessing.Queue()通过传递a作为参数共享a时,听起来好像问题开始了。您可以通过创建托管队列解决此问题:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

当使用管理器创建它时,您正在存储并传递一个 代理 到队列,而不是队列本身,因此即使传递给工作进程的对象是复制的,它仍将指向相同的基础数据结构:您的队列。在概念上,它与C / C ++中的指针非常相似。如果以这种方式创建队列,则在启动工作进程时将能够通过它们。

由于您现在可以传递队列,因此不再需要管理字典。在main中保留一个普通字典,该字典将存储所有映射,并且仅为您的工作进程提供所需的队列,因此他们无需访问任何映射。

在这里写了一个例子。看起来您在工作人员之间传递对象,这就是在这里完成的工作。假设我们有两个处理阶段,并且数据在的控制下开始和结束main。看看我们如何创建像管道一样连接工人的队列,但是通过 给他们提供 他们需要的队列 ,就不需要他们知道任何映射:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()

代码产生以下输出

Main开始了工作。 第一阶段做了一些工作。 第二阶段做了一些工作。 Main完成了工作。

我没有提供AsyncResults在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作。但是,既然您可以自由地传递队列,则可以构建字典来根据需要存储队列/进程映射。

实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要保留对中“工作人员之间”队列的引用main。创建队列,将其传递给您的工作人员,然后仅保留对main将要使用的队列的引用。如果您确实有“任意数量”的队列,我绝对建议您尝试尽快让旧队列被垃圾回收。

python 2022/1/1 18:36:12 有224人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶