您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

python – 使用concurrent.futures一次消耗许多出列的消息

5b51 2022/1/14 8:21:48 python 字数 2953 阅读 571 来源 www.jb51.cc/python

我正在使用来自RabbitMQ频道的消息,我希望我一次可以消耗n个元素.我想我可以使用ProcessPoolExecutor(或ThreadPoolExecutor).我只是想知道是否可以知道池中是否有免费执行程序.这就是我想写的:executor = futures.ProcessPoolExecutor(max_workers=5) running =

概述

我正在使用来自RabbitMQ频道的消息,我希望我一次可以消耗n个元素.我想我可以使用ProcessPoolExecutor(或ThreadPoolExecutor).
我只是想知道是否可以知道池中是否有免费执行程序.

这就是我想写的:

executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
    print "actually consuming a single message"

def on_message(channel,method_frame,header_frame,message):
    # this method is called once per incoming message
    future = executor.submit(consume,message)
    block_until_a_free_worker(executor,future)

def block_until_a_free_worker(executor,future):
    running.append(future) # this grows forever!
    futures.wait(running,timeout=5,return_when=futures.FIRST_COMPLETED)

[...]
channel.basic_consume(on_message,'my_queue')
channel.start_consuming()

我需要编写函数block_until_a_free_worker.
方法应该能够检查是否所有正在运行的工作程序都在使用中.

在替代方案中,我可以使用任何阻塞executor.submit选项(如果可用).

我尝试了一种不同的方法,并在完成后改变期货清单.
我试图从列表中明确添加删除期货,然后像这样等待:

futures.wait(running,return_when=futures.FIRST_COMPLETED)

这似乎不是解决方案.

我可以设置future.add_done_callback,并可能计算正在运行的实例…

任何提示或想法?
谢谢.

信号量用于限制对一组工作者的资源访问.

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor 

class TaskManager:
    def __init__(self,workers):
        self.pool = ProcessPoolExecutor(max_workers=workers)
        self.workers = Semaphore(workers)

    def new_task(self,function):
        """Start a new task,blocks if all workers are busy."""
        self.workers.acquire()  # flag a worker as busy

        future = self.pool.submit(function,... )

        future.add_task_done(self.task_done)

    def task_done(self,future):
        """Called once task is done,releases one worker."""
        self.workers.release()

总结

以上是编程之家为你收集整理的python – 使用concurrent.futures一次消耗许多出列的消息全部内容,希望文章能够帮你解决python – 使用concurrent.futures一次消耗许多出列的消息所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶