当您尝试multiprocessing.Queue()
通过传递a作为参数共享a时,听起来好像问题开始了。您可以通过创建托管队列来解决此问题:
import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
当使用管理器创建它时,您正在存储并传递一个 代理 到队列,而不是队列本身,因此即使传递给工作进程的对象是复制的,它仍将指向相同的基础数据结构:您的队列。在概念上,它与C / C ++中的指针非常相似。如果以这种方式创建队列,则在启动工作进程时将能够通过它们。
由于您现在可以传递队列,因此不再需要管理字典。在main中保留一个普通字典,该字典将存储所有映射,并且仅为您的工作进程提供所需的队列,因此他们无需访问任何映射。
我在这里写了一个例子。看起来您在工作人员之间传递对象,这就是在这里完成的工作。假设我们有两个处理阶段,并且数据在的控制下开始和结束main
。看看我们如何创建像管道一样连接工人的队列,但是通过只 给他们提供 他们需要的队列 ,就不需要他们知道任何映射:
import multiprocessing as mp
def stage1(q_in, q_out):
q_out.put(q_in.get()+"Stage 1 did some work.\n")
return
def stage2(q_in, q_out):
q_out.put(q_in.get()+"Stage 2 did some work.\n")
return
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
Main开始了工作。 第一阶段做了一些工作。 第二阶段做了一些工作。 Main完成了工作。
我没有提供AsyncResults
在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作。但是,既然您可以自由地传递队列,则可以构建字典来根据需要存储队列/进程映射。
实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要保留对中“工作人员之间”队列的引用main
。创建队列,将其传递给您的工作人员,然后仅保留对main
将要使用的队列的引用。如果您确实有“任意数量”的队列,我绝对建议您尝试尽快让旧队列被垃圾回收。