一个可能对此有帮助的现成选项是twisted.internet.defer.DeferredSemaphore
。这是正常(计数)信号量的异步版本,您可能已经知道如果您执行了很多线程编程。
(计数)信号量很像互斥锁(锁)。但是,在一个互斥锁只能获取一次直到相应的发行版的情况下,(计数)信号灯可以配置为允许任意数量(但指定的)的获取成功之前需要任何相应的发行版。
这是一个DeferredSemaphore
用于运行十个异步操作,但一次最多运行三个异步操作的示例:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
也有显式acquire
和release
方法,但是该run
方法非常方便,几乎总是您想要的。它调用acquire
方法,该方法返回一个Deferred
。首先Deferred
,它添加了一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个Deferred
,则在第二秒Deferred
添加一个回调,以调用该release
方法。
同步案例也可以通过立即调用来处理release
。还可以通过允许错误传播来处理错误,但请确保release
已进行了必要的操作以使错误DeferredSemaphore
状态保持一致。传递给函数的结果run
(或的结果Deferred
返回)变的结果Deferred
通过返回run
。
另一种可能的方法可能基于DeferredQueue
和cooperate
。DeferredQueue
与通常的队列一样,但是其get
方法返回Deferred
。如果在调用时队列中没有项目,则在Deferred
添加项目之前不会触发。
这是一个例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
请注意,async
worker函数与第一个示例中的相同。不过,这一次,还有一个worker
被明确地拉动了就业的功能,DeferredQueue
与和处理它们async
(通过添加async
作为回调到Deferred
由返回get
)。该worker
发电机由驱动cooperate
,其中每个后迭代一次Deferred
它产生火灾。然后,主循环启动了这些工作者生成器中的三个,以便在任何给定时间进行三个作业。
这种方法比该方法包含更多的代码DeferredSemaphore
,但是具有一些有趣的好处。首先,cooperate
返回一个CooperativeTask
具有有用的方法类似的例子pause
,resume
和几个人。同样,分配给同一合作者的所有作业都将在调度中相互协作 ,以免使事件循环过载(这就是给API命名的原因)。在DeferredQueue
方面,它也有可能设限的许多项目是如何待处理,这样你就可以完全避免超载您的服务器(例如,如果你的图像处理器卡住,停止完成任务)。如果代码调用put
处理队列溢出异常,您可以以此为压力来尝试停止接受新作业(也许将它们分流到另一台服务器,或者警告管理员)。使用类似的方法DeferredSemaphore
比较棘手,因为没有办法限制等待多少工作才能获取信号量。