提问者:小点点

使用Apache Beam将文件写入Parquet中的动态目标PythonSDK


我试图写Parquet文件使用动态目标通过WriteToFiles类。

我什至找到了一些进一步开发的示例,例如这个,他们在其中构建了一个自定义Avro文件接收器。

我目前正在尝试使用pyarrow库来编写一个可以以分布式方式管理写入操作的Parquet接收器,类似于WriteToParque PTransform的方式。

class ParquetFileSink(fileio.FileSink):
    def __init__(self, schema, codec='deflate'):
        self._schema = schema
        self._codec = codec
        self.writer = None

    def open(self, fh):
        # This is called on every new bundle.
        self.writer = pq.ParquetWriter(
            fh,
            self._schema,
            compression=self._codec,
            use_deprecated_int96_timestamps=False
        )

    def write(self, record):
        # This is called on every element.
        row = pa.Table.from_pandas(
            pd.DataFrame(record), schema=self._schema, preserve_index=False
        )
        self.writer.write_table(row)

    def flush(self):
        pass

这里的主要问题是,据我所知,不可能将无界的PCollection写成Parquet文件,所以如果我尝试使用以下类按记录写入,要么我在封闭的文件处理程序上写入错误,要么一些文件根本没有创建。我还尝试使用GroupByKeyPTransform编写批处理,但是由于无法关闭pyarrow. parque.ParquetWriter对象,文件最终只能部分写入并被损坏。此外,这种策略不安全,因为批处理可能非常大,将它们写成单个文件不是一个好主意。

我可以看到这个问题在类apache_beam.io. parquetio._ParquetSink中遇到,但我不认为这可以直接应用于WriteToFiles类,因为我看不出如何用它完全管理文件处理程序。


共2个答案

匿名用户

我遇到了类似的问题,我最终编写了一个ParquetSink,它可以与WriteToFiles一起使用。因此,它会根据您的配置在内存中批处理记录。我已经使用它在依赖于记录中字段的批处理过程中创建动态文件,但我假设它也可以与流式管道一起使用,尽管我还没有测试过它。

你可以在这个要点中找到代码

class ParquetSink(fileio.FileSink):
    def __init__(self,
                file_path_prefix,
                schema,
                row_group_buffer_size=64 * 1024 * 1024,
                record_batch_size=1000,
                codec='none',
                use_deprecated_int96_timestamps=False,
                file_name_suffix='',
                num_shards=0,
                shard_name_template=None,
                mime_type='application/x-parquet'):
        self._inner_sink = beam.io.parquetio._create_parquet_sink(
            file_path_prefix,
            schema,
            codec,
            row_group_buffer_size,
            record_batch_size,
            use_deprecated_int96_timestamps,
            file_name_suffix,
            num_shards,
            shard_name_template,
            mime_type
        )
        self._codec = codec
        self._schema = schema
        self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps

    def open(self, fh):
        self._pw = pyarrow.parquet.ParquetWriter(
            fh,
            self._schema,
            compression=self._codec,
            use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)

    def write(self, record):
        self._inner_sink.write_record(self._pw, record)

    def flush(self):
        if len(self._inner_sink._buffer[0]) > 0:
            self._inner_sink._flush_buffer()
        if self._inner_sink._record_batches_byte_size > 0:
            self._inner_sink._write_batches(self._pw)

        self._pw.close()



   def parquet_compatible_filenaming(suffix=None):
    def _inner(window, pane, shard_index, total_shards, compression, destination):
        return fileio.destination_prefix_naming(suffix )(
            window, pane, shard_index, total_shards, compression, destination).replace(":", ".")

    return _inner


def get_parquet_pipeline(pipeline_options, input, output):
    with beam.Pipeline(options=pipeline_options) as p:
        lines = (p 
                    | 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
                    | 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
                    | 'Write to Parquet' >> fileio.WriteToFiles(
                                                path=str(output),
                                                destination=lambda x: x["some_key"],
                                                sink=lambda x: ParquetSink(
                                                                    file_path_prefix=output,
                                                                    file_name_suffix=".parquet",
                                                                    codec="snappy",
                                                                    schema=pyarrow.schema([
                                                                        pyarrow.field("some_key", pyarrow.string()),
                                                                        pyarrow.field("raw", pyarrow.string())
                                                                    ])),
                                                    file_naming=parquet_compatible_filenaming(suffix=".parquet")
                                                )
        )

匿名用户

parket格式针对批量写入数据进行了优化。因此,它不适合流式传输,在流式传输中,您会逐个接收记录。在您的示例中,您在parket文件中逐行写入,这非常低效。

我建议将您的数据保存为适合逐行附加数据的格式,然后有一个常规作业将这些数据批量移动到拼花文件。

或者你可以像apache_beam.io. parquetio._ParquetSink一样。它将记录保存在内存中的缓冲区中,并偶尔批量写入它们。但是,如果您的应用程序崩溃,您将面临丢失缓冲区中记录的风险。