您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何在python中使用pyarrow从S3读取分区实木复合地板文件

如何在python中使用pyarrow从S3读取分区实木复合地板文件

我设法使它与最新版本的fastparquet&s3fs一起使用。下面是相同的代码

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

感谢马丁通过我们的对话为我指明了正确的方向

:根据基准,这比使用pyarrow慢。一旦通过ARROW-1213在pyarrow中实现了s3fs支持,我将更新我的答案

我使用pyarrow对单个迭代进行了快速基准测试,并将文件列表作为全局文件发送到fastparquet。使用s3fs和pyarrow +我的hackish代码,fastparquet更快。但是我认为pyarrow + s3fs一旦实施便会更快。

代码和基准如下:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

在完成所有PR之后,诸如Arrow-2038Fast Parquet-PR#182之类的问题已解决

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

这可能不是基准测试的最佳方法。请阅读博客文章以获得通过基准测试

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

有关Pyarrow速度的更多阅读

参考:

python 2022/1/1 18:39:16 有259人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶