您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

无法在Python中创建新线程

无法在Python中创建新线程

问题在于,没有一个主要平台(截至2013年中)允许您在此数量的线程附近创建任何位置。您可能会遇到各种各样的限制,并且在不了解平台,平台配置以及确切错误的情况下,无法知道遇到了哪个限制。但是这里有两个例子:

因此,我如何控制要创建的线程数或以其他方式使它工作(如超时等)?

使用尽可能多的线程实际上并不是您想要的。在8核计算机上运行800个线程意味着您需要花费大量时间在线程之间进行上下文切换,并且缓存在被启动之前一直保持刷新状态,依此类推。

最有可能的,您真正想要的是以下之一:

但这当然是 可能的

一旦您达到了所要达到的极限,那么很可能再次尝试将失败,直到线程完成其工作并被加入为止,而且很可能再次尝试将在此之后成功。因此,鉴于您显然要获取异常,因此可以使用Python中的其他方式来处理此异常:使用try/except块。例如,如下所示:

threads = []
for n in range(0, 60000):
    while True:
        t = threading.Thread(target=function,args=(x, n))
        try:
            t.start()
            threads.append(t)
        except WhateverTheExceptionIs as e:
            if threads:
                threads[0].join()
                del threads[0]
            else:
                raise
        else:
            break
for t in threads:
    t.join()

当然,这假设启动的第一个任务很可能是完成的第一个任务之一。如果不是这样,则需要某种方式来显式表示完成状态(条件,信号量,队列等),或者需要使用一些较低级别的(特定于平台的)库来提供等待整个列表,直到至少一个线程完成。

另外,请注意,在某些平台(例如Windows XP)上,您 接近 极限时会出现奇怪的行为。

除了做得更好之外,做正确的事情也可能会容易得多。例如,这是每个cpu的进程池:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

…和固定线程数池:

with concurrent.futures.ThreadPoolExecutor(12) as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

…和带有numpy矢量化的平衡cpu并行处理批处理池:

with concurrent.futures.ThreadPoolExecutor() as executor:
    batchsize = 60000 // os.cpu_count()
    fs = [executor.submit(np.vector_function, x, 
                          np.arange(n, min(n+batchsize, 60000)))
          for n in range(0, 60000, batchsize)]
    concurrent.futures.wait(fs)

在上面的示例中,我使用列表推导来提交所有工作并收集他们的未来,因为在循环中我们没有做任何其他事情。但是从您的评论看来,您似乎还需要在循环中执行其他操作。因此,让我们将其转换回显式for语句:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = []
    for n in range(60000):
        fs.append(executor.submit(function, x, n))
    concurrent.futures.wait(fs)

现在,无论您想在该循环中添加什么,都可以。

但是,我不认为您实际上要在该循环内添加任何内容。循环只是尽可能快地提交所有作业。它是wait等待它们全部完成的功能,您可能想早点退出

为此,您可以将wait其与FIRST_COMPLETED标志一起使用,但使用起来要简单得多as_completed

另外,我假设error任务会设置某种值。在这种情况下,您将需要加上一个Lock线程,就像线程之间共享的任何其他可变值一样。(这是一个地方,aProcessPoolExecutor和a之间的差异仅比单行多一点ThreadPoolExecutor-如果您使用流程,则需要multiprocessing.Lock而不是threading.Lock。)

所以:

error_lock = threading.Lock
error = []

def function(x, n):
    # blah blah
    try:
        # blah blah
    except Exception as e:
        with error_lock:
            error.append(e)
    # blah blah

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    for f in concurrent.futures.as_completed(fs):
        do_something_with(f.result())
        with error_lock:
            if len(error) > 1: exit()

但是,您可能需要考虑其他设计。通常,如果您可以避免线程之间的共享,那么您的生活会变得更加轻松。期货旨在通过让您返回值或引发异常来简化这一过程,就像常规函数调用一样。这f.result()将为您提供返回的值或引发引发的异常。因此,您可以将该代码重写为:

def function(x, n):
    # blah blah
    # don't bother to catch exceptions here, let them propagate out

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    error = []
    for f in concurrent.futures.as_completed(fs):
        try:
            result = f.result()
        except Exception as e:
            error.append(e)
            if len(error) > 1: exit()
        else:
            do_something_with(result)

请注意,这与文档中的ThreadPoolExecutor示例看起来有多相似。只要任务不需要彼此交互,这种简单的模式就足以处理几乎所有没有锁的东西。

python 2022/1/1 18:36:18 有231人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶