list(file_obj)
大的时候可能需要很多内存fileobj
。我们可以通过使用itertools根据需要拉出几行代码来减少内存需求。
特别是,我们可以使用
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
将文件拆分为可处理的块,以及
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
使多处理池一次处理多个num_chunks
块。
这样,我们大约只需要足够的内存即可在内存中保存几个(num_chunks
)块,而不是整个文件。
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()