我需要从存储在s3中的Parquet文件增量加载数据到熊猫,我试图为此使用PyArrow,但没有任何运气。
将Parquet文件的整个目录写入Pandas就可以了:
import s3fs
import pyarrow.parquet as pq
import pandas as pd
fs = s3fs.S3FileSystem(mykey,mysecret)
p_dataset = pq.ParquetDataset('s3://mys3bucket/directory', filesystem=fs)
df = p_dataset.read().to_pandas()
但是当我尝试加载单个Parquet文件时,我收到一个错误:
fs = s3fs.S3FileSystem(mykey,mysecret)
p_dataset = pq.ParquetDataset('s3://mys3bucket/directory/1_0_00000000000000014012'
, filesystem=fs)
df = p_dataset.read().to_pandas()
抛出错误:
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
<ipython-input-179-3d01b32c60f7> in <module>()
15 p_dataset = pq.ParquetDataset(
16 's3://mys3bucket/directory/1_0_00000000000000014012',
---> 17 filesystem=fs)
18
19 table2.to_pandas()
C:\User\Anaconda3\lib\site-packages\pyarrow\parquet.py in __init__(self, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema, filters, metadata_nthreads)
880
881 if validate_schema:
--> 882 self.validate_schemas()
883
884 if filters is not None:
C:\User\Anaconda3\lib\site-packages\pyarrow\parquet.py in validate_schemas(self)
893 self.schema = self.common_metadata.schema
894 else:
--> 895 self.schema = self.pieces[0].get_metadata(open_file).schema
896 elif self.schema is None:
897 self.schema = self.metadata.schema
IndexError: list index out of range
将不胜感激与此错误的任何帮助。
理想情况下,我需要将添加到s3的所有新数据(自上次运行此脚本以来添加)附加到Pandas dataframe,所以我想我将文件名列表传递给ParquetDataset。有更好的方法来实现这一点吗?谢谢
您想使用pq.read_table
(传递文件路径或文件句柄)而不是pq. ParquetDataset
(传递目录)。HTH
对于python 3.6AWS有一个名为aws-data-wrangler的库,它有助于Pandas/S3/Parquet之间的集成
安装do;
pip install awswrangler
要使用awswrangler 1. x.x及更高版本从s3读取单个镶木地板文件,请执行;
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/my-file.parquet")
要读取拼花文件列表,请执行;
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)
通过设置dataset=True awswrangler将读取s3键下方的所有单独的镶木地板文件。