问题在于,没有一个主要平台(截至2013年中)允许您在此数量的线程附近创建任何位置。您可能会遇到各种各样的限制,并且在不了解平台,平台配置以及确切错误的情况下,无法知道遇到了哪个限制。但是这里有两个例子:
因此,我如何控制要创建的线程数或以其他方式使它工作(如超时等)?
使用尽可能多的线程实际上并不是您想要的。在8核计算机上运行800个线程意味着您需要花费大量时间在线程之间进行上下文切换,并且缓存在被启动之前一直保持刷新状态,依此类推。
最有可能的,您真正想要的是以下之一:
但这当然是 可能的 。
一旦您达到了所要达到的极限,那么很可能再次尝试将失败,直到线程完成其工作并被加入为止,而且很可能再次尝试将在此之后成功。因此,鉴于您显然要获取异常,因此可以使用Python中的其他方式来处理此异常:使用try
/except
块。例如,如下所示:
threads = []
for n in range(0, 60000):
while True:
t = threading.Thread(target=function,args=(x, n))
try:
t.start()
threads.append(t)
except WhateverTheExceptionIs as e:
if threads:
threads[0].join()
del threads[0]
else:
raise
else:
break
for t in threads:
t.join()
当然,这假设启动的第一个任务很可能是完成的第一个任务之一。如果不是这样,则需要某种方式来显式表示完成状态(条件,信号量,队列等),或者需要使用一些较低级别的(特定于平台的)库来提供等待整个列表,直到至少一个线程完成。
另外,请注意,在某些平台(例如Windows XP)上,您 接近 极限时会出现奇怪的行为。
除了做得更好之外,做正确的事情也可能会容易得多。例如,这是每个cpu的进程池:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
…和固定线程数池:
with concurrent.futures.ThreadPoolExecutor(12) as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
…和带有numpy矢量化的平衡cpu并行处理批处理池:
with concurrent.futures.ThreadPoolExecutor() as executor:
batchsize = 60000 // os.cpu_count()
fs = [executor.submit(np.vector_function, x,
np.arange(n, min(n+batchsize, 60000)))
for n in range(0, 60000, batchsize)]
concurrent.futures.wait(fs)
在上面的示例中,我使用列表推导来提交所有工作并收集他们的未来,因为在循环中我们没有做任何其他事情。但是从您的评论看来,您似乎还需要在循环中执行其他操作。因此,让我们将其转换回显式for
语句:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = []
for n in range(60000):
fs.append(executor.submit(function, x, n))
concurrent.futures.wait(fs)
现在,无论您想在该循环中添加什么,都可以。
但是,我不认为您实际上要在该循环内添加任何内容。循环只是尽可能快地提交所有作业。它是wait
等待它们全部完成的功能,您可能想早点退出。
为此,您可以将wait
其与FIRST_COMPLETED
标志一起使用,但使用起来要简单得多as_completed
。
另外,我假设error
任务会设置某种值。在这种情况下,您将需要加上一个Lock
线程,就像线程之间共享的任何其他可变值一样。(这是一个地方,aProcessPoolExecutor
和a之间的差异仅比单行多一点ThreadPoolExecutor
-如果您使用流程,则需要multiprocessing.Lock
而不是threading.Lock
。)
所以:
error_lock = threading.Lock
error = []
def function(x, n):
# blah blah
try:
# blah blah
except Exception as e:
with error_lock:
error.append(e)
# blah blah
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
for f in concurrent.futures.as_completed(fs):
do_something_with(f.result())
with error_lock:
if len(error) > 1: exit()
但是,您可能需要考虑其他设计。通常,如果您可以避免线程之间的共享,那么您的生活会变得更加轻松。期货旨在通过让您返回值或引发异常来简化这一过程,就像常规函数调用一样。这f.result()
将为您提供返回的值或引发引发的异常。因此,您可以将该代码重写为:
def function(x, n):
# blah blah
# don't bother to catch exceptions here, let them propagate out
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
error = []
for f in concurrent.futures.as_completed(fs):
try:
result = f.result()
except Exception as e:
error.append(e)
if len(error) > 1: exit()
else:
do_something_with(result)
请注意,这与文档中的ThreadPoolExecutor示例看起来有多相似。只要任务不需要彼此交互,这种简单的模式就足以处理几乎所有没有锁的东西。