Java源码示例:org.apache.beam.sdk.io.WriteFilesResult
示例1
@Override
public WriteFilesResult<Void> expand(PCollection<KV<String, String>> kafkaRecords) {
return kafkaRecords
/*
* Converting KV<String, String> records to GenericRecord using DoFn and {@link
* KeyValueToGenericRecordFn} class.
*/
.apply("Create GenericRecord(s)", ParDo.of(new KeyValueToGenericRecordFn()))
.setCoder(AvroCoder.of(GenericRecord.class, KeyValueToGenericRecordFn.SCHEMA))
/*
* Writing as parquet file using {@link FileIO} and {@link ParquetIO}.
*
* The {@link WindowedFilenamePolicy} class specifies the file path for writing the file.
* The {@link withNumShards} option specifies the number of shards passed by the user.
*/
.apply(
"Writing as Parquet",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(KeyValueToGenericRecordFn.SCHEMA))
.to(outputDirectory())
.withPrefix(outputFilenamePrefix())
.withSuffix(
WriteToGCSUtility.FILE_SUFFIX_MAP.get(WriteToGCSUtility.FileFormat.PARQUET))
.withNumShards(numShards()));
}
示例2
public static PTransform<PCollection<String>, WriteFilesResult<Void>> createWrite(
String filenamePrefix, String filenameSuffix, Schema schema, JdbcAvroArgs jdbcAvroArgs) {
filenamePrefix = filenamePrefix.replaceAll("/+$", "") + "/part";
ValueProvider<ResourceId> prefixProvider =
StaticValueProvider.of(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
FileBasedSink.FilenamePolicy filenamePolicy =
DefaultFilenamePolicy.fromStandardParameters(
prefixProvider, DEFAULT_SHARD_TEMPLATE, filenameSuffix, false);
final DynamicAvroDestinations<String, Void, String> destinations =
AvroIO.constantDestinations(
filenamePolicy,
schema,
ImmutableMap.of(),
// since Beam does not support zstandard
CodecFactory.nullCodec(),
SerializableFunctions.identity());
final FileBasedSink<String, Void, String> sink =
new JdbcAvroSink<>(prefixProvider, destinations, jdbcAvroArgs);
return WriteFiles.to(sink);
}
示例3
@Test
public void testExtractionDirectFromTransform() throws Exception {
PCollection<String> input = p.apply(Create.of("hello"));
WriteFilesResult<Void> output = input.apply(writeFiles);
AppliedPTransform<PCollection<String>, WriteFilesResult<Void>, WriteFiles<String, Void, String>>
appliedPTransform =
AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p);
assertThat(
WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
equalTo(
writeFiles.getNumShardsProvider() == null && writeFiles.getComputeNumShards() == null));
assertThat(
WriteFilesTranslation.isWindowedWrites(appliedPTransform),
equalTo(writeFiles.getWindowedWrites()));
assertThat(
WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform),
equalTo(writeFiles.getSink()));
}
示例4
@Test
public void testRunnerDeterminedSharding() {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("[auto]");
options.setParallelism(5);
TestPipeline p = TestPipeline.fromOptions(options);
StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
originalApplication =
AppliedPTransform.of("writefiles", objs.expand(), Collections.emptyMap(), original, p);
WriteFiles<Object, Void, Object> replacement =
(WriteFiles<Object, Void, Object>)
factory.getReplacementTransform(originalApplication).getTransform();
assertThat(replacement, not(equalTo((Object) original)));
assertThat(replacement.getNumShardsProvider().get(), is(10));
}
示例5
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
TestPipeline p = TestPipeline.fromOptions(options);
StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
originalApplication =
AppliedPTransform.of("writefiles", objs.expand(), Collections.emptyMap(), original, p);
WriteFiles<Object, Void, Object> replacement =
(WriteFiles<Object, Void, Object>)
factory.getReplacementTransform(originalApplication).getTransform();
assertThat(replacement, not(equalTo((Object) original)));
assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards));
WriteFilesResult<Void> originalResult = objs.apply(original);
WriteFilesResult<Void> replacementResult = objs.apply(replacement);
Map<PValue, ReplacementOutput> res =
factory.mapOutputs(originalResult.expand(), replacementResult);
assertEquals(1, res.size());
assertEquals(
originalResult.getPerDestinationOutputFilenames(),
res.get(replacementResult.getPerDestinationOutputFilenames()).getOriginal().getValue());
}
示例6
@Override
public PTransformReplacement<PCollection<InputT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
PCollection<InputT>,
WriteFilesResult<DestinationT>,
PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>>
transform) {
try {
WriteFiles<InputT, DestinationT, ?> replacement =
WriteFiles.to(WriteFilesTranslation.getSink(transform))
.withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform))
.withSharding(new LogElementShardsWithDrift<>());
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform), replacement);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
示例7
private PCollection<String> writeFiles(PCollection<T> input, String stagingBucketDir) {
PCollection<String> mappedUserData =
input
.apply(
MapElements.via(
new SimpleFunction<T, Object[]>() {
@Override
public Object[] apply(T element) {
return getUserDataMapper().mapRow(element);
}
}))
.apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
.setCoder(StringUtf8Coder.of());
WriteFilesResult filesResult =
mappedUserData.apply(
"Write files to specified location",
FileIO.<String>write()
.via(TextIO.sink())
.to(stagingBucketDir)
.withPrefix(getFileNameTemplate())
.withSuffix(".csv")
.withCompression(Compression.GZIP));
return (PCollection)
filesResult
.getPerDestinationOutputFilenames()
.apply("Parse KV filenames to Strings", Values.<String>create());
}
示例8
public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
AppliedPTransform<
PCollection<UserT>,
WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
return (FileBasedSink<UserT, DestinationT, OutputT>)
sinkFromProto(getWriteFilesPayload(transform).getSink());
}
示例9
public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
AppliedPTransform<
PCollection<UserT>,
WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
SdkComponents sdkComponents = SdkComponents.create(transform.getPipeline().getOptions());
RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
List<PCollectionView<?>> views = Lists.newArrayList();
Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
PCollection<?> originalPCollection =
checkNotNull(
(PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
"no input with tag %s",
entry.getKey());
views.add(
PCollectionViewTranslation.viewFromProto(
entry.getValue(),
entry.getKey(),
originalPCollection,
transformProto,
RehydratedComponents.forComponents(sdkComponents.toComponents())));
}
return views;
}
示例10
public static <T, DestinationT> boolean isWindowedWrites(
AppliedPTransform<
PCollection<T>,
WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
return getWriteFilesPayload(transform).getWindowedWrites();
}
示例11
public static <T, DestinationT> boolean isRunnerDeterminedSharding(
AppliedPTransform<
PCollection<T>,
WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
return getWriteFilesPayload(transform).getRunnerDeterminedSharding();
}
示例12
private static <T, DestinationT> WriteFilesPayload getWriteFilesPayload(
AppliedPTransform<
PCollection<T>,
WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
SdkComponents components = SdkComponents.create(transform.getPipeline().getOptions());
return WriteFilesPayload.parseFrom(
PTransformTranslation.toProto(transform, Collections.emptyList(), components)
.getSpec()
.getPayload());
}
示例13
@Override
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
PCollection<UserT>,
WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>>
transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
// streaming, there are large numbers of small bundles, resulting in many tiny files.
// Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
// (current_num_workers * 2 might be a better choice, but that value is not easily available
// today).
// If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
int numShards;
if (options.getMaxNumWorkers() > 0) {
numShards = options.getMaxNumWorkers() * 2;
} else if (options.getNumWorkers() > 0) {
numShards = options.getNumWorkers() * 2;
} else {
numShards = DEFAULT_NUM_SHARDS;
}
try {
List<PCollectionView<?>> sideInputs =
WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
FileBasedSink sink = WriteFilesTranslation.getSink(transform);
WriteFiles<UserT, DestinationT, OutputT> replacement =
WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
replacement.withNumShards(numShards));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例14
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
// We must connect the new output from WriteFilesResult to the outputs provided by the original
// transform.
return ReplacementOutputs.tagged(outputs, newOutput);
}
示例15
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
PTransform<PCollection<Object>, WriteFilesResult<Void>> original =
WriteFiles.to(
new FileBasedSink<Object, Void, Object>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
});
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<
PCollection<Object>,
WriteFilesResult<Void>,
PTransform<PCollection<Object>, WriteFilesResult<Void>>>
originalApplication =
AppliedPTransform.of("write", objs.expand(), Collections.emptyMap(), original, p);
assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
not(equalTo((Object) original)));
}
示例16
@Override
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
PCollection<UserT>,
WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>>
transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
// streaming, there are large numbers of small bundles, resulting in many tiny files.
// Instead we pick parallelism * 2 to ensure full parallelism, but prevent too-many files.
Integer jobParallelism = options.getParallelism();
Preconditions.checkArgument(
jobParallelism > 0,
"Parallelism of a job should be greater than 0. Currently set: %s",
jobParallelism);
int numShards = jobParallelism * 2;
try {
List<PCollectionView<?>> sideInputs =
WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
FileBasedSink sink = WriteFilesTranslation.getSink(transform);
@SuppressWarnings("unchecked")
WriteFiles<UserT, DestinationT, OutputT> replacement =
WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
if (WriteFilesTranslation.isRunnerDeterminedSharding(transform)) {
replacement = replacement.withNumShards(numShards);
} else {
if (transform.getTransform().getNumShardsProvider() != null) {
replacement =
replacement.withNumShards(transform.getTransform().getNumShardsProvider());
}
if (transform.getTransform().getComputeNumShards() != null) {
replacement = replacement.withSharding(transform.getTransform().getComputeNumShards());
}
}
if (options.isAutoBalanceWriteFilesShardingEnabled()) {
replacement =
replacement.withShardingFunction(
new FlinkAutoBalancedShardKeyShardingFunction<>(
jobParallelism,
options.getMaxParallelism(),
sink.getDynamicDestinations().getDestinationCoder()));
}
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform), replacement);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例17
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
return ReplacementOutputs.tagged(outputs, newOutput);
}
示例18
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
return ReplacementOutputs.tagged(outputs, newOutput);
}