我试图写Parquet文件使用动态目标通过WriteToFiles类。
我什至找到了一些进一步开发的示例,例如这个,他们在其中构建了一个自定义Avro文件接收器。
我目前正在尝试使用pyarrow
库来编写一个可以以分布式方式管理写入操作的Parquet接收器,类似于WriteToParque PTransform的方式。
class ParquetFileSink(fileio.FileSink):
def __init__(self, schema, codec='deflate'):
self._schema = schema
self._codec = codec
self.writer = None
def open(self, fh):
# This is called on every new bundle.
self.writer = pq.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=False
)
def write(self, record):
# This is called on every element.
row = pa.Table.from_pandas(
pd.DataFrame(record), schema=self._schema, preserve_index=False
)
self.writer.write_table(row)
def flush(self):
pass
这里的主要问题是,据我所知,不可能将无界的PCollection写成Parquet文件,所以如果我尝试使用以下类按记录写入,要么我在封闭的文件处理程序上写入错误,要么一些文件根本没有创建。我还尝试使用GroupByKey
PTransform编写批处理,但是由于无法关闭pyarrow. parque.ParquetWriter
对象,文件最终只能部分写入并被损坏。此外,这种策略不安全,因为批处理可能非常大,将它们写成单个文件不是一个好主意。
我可以看到这个问题在类apache_beam.io. parquetio._ParquetSink
中遇到,但我不认为这可以直接应用于WriteToFiles
类,因为我看不出如何用它完全管理文件处理程序。
我遇到了类似的问题,我最终编写了一个ParquetSink
,它可以与WriteToFiles
一起使用。因此,它会根据您的配置在内存中批处理记录。我已经使用它在依赖于记录中字段的批处理过程中创建动态文件,但我假设它也可以与流式管道一起使用,尽管我还没有测试过它。
你可以在这个要点中找到代码
class ParquetSink(fileio.FileSink):
def __init__(self,
file_path_prefix,
schema,
row_group_buffer_size=64 * 1024 * 1024,
record_batch_size=1000,
codec='none',
use_deprecated_int96_timestamps=False,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
mime_type='application/x-parquet'):
self._inner_sink = beam.io.parquetio._create_parquet_sink(
file_path_prefix,
schema,
codec,
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
file_name_suffix,
num_shards,
shard_name_template,
mime_type
)
self._codec = codec
self._schema = schema
self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
def open(self, fh):
self._pw = pyarrow.parquet.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
def write(self, record):
self._inner_sink.write_record(self._pw, record)
def flush(self):
if len(self._inner_sink._buffer[0]) > 0:
self._inner_sink._flush_buffer()
if self._inner_sink._record_batches_byte_size > 0:
self._inner_sink._write_batches(self._pw)
self._pw.close()
def parquet_compatible_filenaming(suffix=None):
def _inner(window, pane, shard_index, total_shards, compression, destination):
return fileio.destination_prefix_naming(suffix )(
window, pane, shard_index, total_shards, compression, destination).replace(":", ".")
return _inner
def get_parquet_pipeline(pipeline_options, input, output):
with beam.Pipeline(options=pipeline_options) as p:
lines = (p
| 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
| 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
| 'Write to Parquet' >> fileio.WriteToFiles(
path=str(output),
destination=lambda x: x["some_key"],
sink=lambda x: ParquetSink(
file_path_prefix=output,
file_name_suffix=".parquet",
codec="snappy",
schema=pyarrow.schema([
pyarrow.field("some_key", pyarrow.string()),
pyarrow.field("raw", pyarrow.string())
])),
file_naming=parquet_compatible_filenaming(suffix=".parquet")
)
)
parket格式针对批量写入数据进行了优化。因此,它不适合流式传输,在流式传输中,您会逐个接收记录。在您的示例中,您在parket文件中逐行写入,这非常低效。
我建议将您的数据保存为适合逐行附加数据的格式,然后有一个常规作业将这些数据批量移动到拼花文件。
或者你可以像apache_beam.io. parquetio._ParquetSink
一样。它将记录保存在内存中的缓冲区中,并偶尔批量写入它们。但是,如果您的应用程序崩溃,您将面临丢失缓冲区中记录的风险。