您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Python多进程通信Queue、Pipe、Value、Array实例

5b51 2022/1/14 8:17:28 python 字数 9089 阅读 318 来源 www.jb51.cc/python

queue和pipe的区别:pipe用来在两个进程间通信。queue用来在多个进程间实现通信。此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。

概述

queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法

1)Queue & JoinableQueue

queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。

multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法

task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

join() 阻塞直到queue中的所有的task都被处理(即task_done方法调用)。

代码

class Consumer(multiprocessing.Process):
   
    def __init__(self,task_queue,result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print ('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print ('%s: %s' % (proc_name,next_task))
            answer = next_task() # __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self,a,b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a,self.b,self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a,self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
   
    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    print ('Creating %d consumers' % num_consumers)
    consumers = [ Consumer(tasks,results)
                  for i in range(num_consumers) ]
    for w in consumers:
        w.start()
   
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i,i))
   
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
   
    # Start printing results
    while num_jobs:
        result = results.get()
        print ('Result:',result)
        num_jobs -= 1

注意小技巧: 使用None来表示task处理完毕。

运行结果:

2)pipe

pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法

代码

def f(conn):
    conn.send([42,None,'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f,args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())   # prints "[42,'hello']"

3)Value + Array

Value + Array 是python中共享内存 映射文件方法,速度比较快。

def f(n,a):
    n.value = n.value + 1
    for i in range(len(a)):
        a[i] = a[i] * 10

if __name__ == '__main__':
    num = Value('i',1)
    arr = Array('i',range(10))

    p = Process(target=f,args=(num,arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])
   
    p2 = Process(target=f,arr))
    p2.start()
    p2.join()

    print(num.value)
    print(arr[:])

# the output is :
# 2
# [0,10,20,30,40,50,60,70,80,90]
# 3
# [0,100,200,300,400,500,600,700,800,900]

总结

以上是编程之家为你收集整理的Python多进程通信Queue、Pipe、Value、Array实例全部内容,希望文章能够帮你解决Python多进程通信Queue、Pipe、Value、Array实例所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶