您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

将远程调用排队到Python Twisted透视图代理?

将远程调用排队到Python Twisted透视图代理?

一个可能对此有帮助的现成选项是twisted.internet.defer.DeferredSemaphore。这是正常(计数)信号量的异步版本,您可能已经知道如果您执行了很多线程编程。

(计数)信号量很像互斥锁(锁)。但是,在一个互斥锁只能获取一次直到相应的发行版的情况下,(计数)信号灯可以配置为允许任意数量(但指定的)的获取成功之前需要任何相应的发行版。

这是一个DeferredSemaphore用于运行十个异步操作,但一次最多运行三个异步操作的示例:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore也有显式acquirerelease方法,但是该run方法非常方便,几乎总是您想要的。它调用acquire方法,该方法返回一个Deferred。首先Deferred,它添加一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个Deferred,则在第二秒Deferred添加一个回调,以调用release方法

同步案例也可以通过立即调用来处理release。还可以通过允许错误传播来处理错误,但请确保release已进行了必要的操作以使错误DeferredSemaphore状态保持一致。传递给函数的结果run(或的结果Deferred返回)变的结果Deferred通过返回run

另一种可能的方法可能基于DeferredQueuecooperateDeferredQueue与通常的队列一样,但是其get方法返回Deferred。如果在调用时队列中没有项目,则在Deferred添加项目之前不会触发。

这是一个例子:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

请注意,asyncworker函数与第一个示例中的相同。不过,这一次,还有一个worker被明确地拉动了就业的功能DeferredQueue与和处理它们async(通过添加async作为回调到Deferred由返回get)。该worker发电机由驱动cooperate,其中每个后迭代一次Deferred它产生火灾。然后,主循环启动了这些工作者生成器中的三个,以便在任何给定时间进行三个作业。

这种方法比该方法包含更多的代码DeferredSemaphore,但是具有一些有趣的好处。首先,cooperate返回一个CooperativeTask具有有用的方法类似的例子pauseresume和几个人。同样,分配给同一合作者的所有作业都将在调度中相互协作 ,以免使事件循环过载(这就是给API命名的原因)。在DeferredQueue方面,它也有可能设限的许多项目是如何待处理,这样你就可以完全避免超载您的服务器(例如,如果你的图像处理器卡住,停止完成任务)。如果代码调用put处理队列溢出异常,您可以以此为压力来尝试停止接受新作业(也许将它们分流到另一台服务器,或者警告管理员)。使用类似的方法DeferredSemaphore比较棘手,因为没有办法限制等待多少工作才能获取信号量。

python 2022/1/1 18:37:02 有221人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶