您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Python分块CSV文件多处理

Python分块CSV文件多处理

根据评论,我们希望每个进程可以处理10000行的块。这并不是很难做到的。请参阅iter/islice下面的食谱。但是,使用问题

pool.map(worker, ten_thousand_row_chunks)

是 。如果这需要的内存超过可用内存,那么您将获得一个MemoryError。(注意:pool.imap遇到相同的问题)

因此,我们需要对pool.map每个块的各个部分进行迭代调用

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

每个文件chunk最多包含chunksize*num_procs文件中的几行。这些数据足以使池中的所有工作人员都能工作,但又不会太大,以至于引起MemoryError-如果设置的chunksize值没有太大。

每个chunk然后被分解成块,其中每个块由多达的 chunksize行从该文件。然后将这些片段发送到pool.map

这是将迭代器分为长度为chunksize的块的惯用法。让我们看一下它如何工作的例子:

In [111]: iterator = iter(range(10))

请注意,每次IT.islice(iterator, 3)调用时,都会从迭代器中切出一个新的3个项目块:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

当迭代器中剩余的项目少于3个时,仅返回剩余的内容

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

如果再次调用它,则会得到一个空列表:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize))一个list(IT.islice(iterator, chunksize))调用时返回的函数。这是“单线”,相当于

def func():
    return  list(IT.islice(iterator, chunksize))

最后,iter(callable, sentinel)返回另一个迭代器。该迭代器产生的值是可调用函数返回的值。它会不断产生值,直到可调用对象返回等于前哨值的值为止。所以

iter(lambda: list(IT.islice(iterator, chunksize)), [])

将继续返回值,list(IT.islice(iterator, chunksize))直到该值是空列表为止:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
python 2022/1/1 18:33:26 有226人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶