Java源码示例:org.apache.beam.sdk.io.WriteFilesResult

示例1
@Override
public WriteFilesResult<Void> expand(PCollection<KV<String, String>> kafkaRecords) {
  return kafkaRecords
      /*
       * Converting KV<String, String> records to GenericRecord using DoFn and {@link
       * KeyValueToGenericRecordFn} class.
       */
      .apply("Create GenericRecord(s)", ParDo.of(new KeyValueToGenericRecordFn()))
      .setCoder(AvroCoder.of(GenericRecord.class, KeyValueToGenericRecordFn.SCHEMA))
      /*
       * Writing as parquet file using {@link FileIO} and {@link ParquetIO}.
       *
       * The {@link WindowedFilenamePolicy} class specifies the file path for writing the file.
       * The {@link withNumShards} option specifies the number of shards passed by the user.
       */
      .apply(
          "Writing as Parquet",
          FileIO.<GenericRecord>write()
              .via(ParquetIO.sink(KeyValueToGenericRecordFn.SCHEMA))
              .to(outputDirectory())
              .withPrefix(outputFilenamePrefix())
              .withSuffix(
                  WriteToGCSUtility.FILE_SUFFIX_MAP.get(WriteToGCSUtility.FileFormat.PARQUET))
              .withNumShards(numShards()));
}
 
示例2
public static PTransform<PCollection<String>, WriteFilesResult<Void>> createWrite(
    String filenamePrefix, String filenameSuffix, Schema schema, JdbcAvroArgs jdbcAvroArgs) {
  filenamePrefix = filenamePrefix.replaceAll("/+$", "") + "/part";
  ValueProvider<ResourceId> prefixProvider =
      StaticValueProvider.of(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
  FileBasedSink.FilenamePolicy filenamePolicy =
      DefaultFilenamePolicy.fromStandardParameters(
          prefixProvider, DEFAULT_SHARD_TEMPLATE, filenameSuffix, false);

  final DynamicAvroDestinations<String, Void, String> destinations =
      AvroIO.constantDestinations(
          filenamePolicy,
          schema,
          ImmutableMap.of(),
          // since Beam does not support zstandard
          CodecFactory.nullCodec(),
          SerializableFunctions.identity());
  final FileBasedSink<String, Void, String> sink =
      new JdbcAvroSink<>(prefixProvider, destinations, jdbcAvroArgs);
  return WriteFiles.to(sink);
}
 
示例3
@Test
public void testExtractionDirectFromTransform() throws Exception {
  PCollection<String> input = p.apply(Create.of("hello"));
  WriteFilesResult<Void> output = input.apply(writeFiles);

  AppliedPTransform<PCollection<String>, WriteFilesResult<Void>, WriteFiles<String, Void, String>>
      appliedPTransform =
          AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p);

  assertThat(
      WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
      equalTo(
          writeFiles.getNumShardsProvider() == null && writeFiles.getComputeNumShards() == null));

  assertThat(
      WriteFilesTranslation.isWindowedWrites(appliedPTransform),
      equalTo(writeFiles.getWindowedWrites()));
  assertThat(
      WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform),
      equalTo(writeFiles.getSink()));
}
 
示例4
@Test
public void testRunnerDeterminedSharding() {
  FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
  options.setRunner(TestFlinkRunner.class);
  options.setFlinkMaster("[auto]");
  options.setParallelism(5);

  TestPipeline p = TestPipeline.fromOptions(options);

  StreamingShardedWriteFactory<Object, Void, Object> factory =
      new StreamingShardedWriteFactory<>(p.getOptions());

  WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
  @SuppressWarnings("unchecked")
  PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
  AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
      originalApplication =
          AppliedPTransform.of("writefiles", objs.expand(), Collections.emptyMap(), original, p);

  WriteFiles<Object, Void, Object> replacement =
      (WriteFiles<Object, Void, Object>)
          factory.getReplacementTransform(originalApplication).getTransform();

  assertThat(replacement, not(equalTo((Object) original)));
  assertThat(replacement.getNumShardsProvider().get(), is(10));
}
 
示例5
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
  TestPipeline p = TestPipeline.fromOptions(options);

  StreamingShardedWriteFactory<Object, Void, Object> factory =
      new StreamingShardedWriteFactory<>(p.getOptions());
  WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
  PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
  AppliedPTransform<PCollection<Object>, WriteFilesResult<Void>, WriteFiles<Object, Void, Object>>
      originalApplication =
          AppliedPTransform.of("writefiles", objs.expand(), Collections.emptyMap(), original, p);

  WriteFiles<Object, Void, Object> replacement =
      (WriteFiles<Object, Void, Object>)
          factory.getReplacementTransform(originalApplication).getTransform();
  assertThat(replacement, not(equalTo((Object) original)));
  assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards));

  WriteFilesResult<Void> originalResult = objs.apply(original);
  WriteFilesResult<Void> replacementResult = objs.apply(replacement);
  Map<PValue, ReplacementOutput> res =
      factory.mapOutputs(originalResult.expand(), replacementResult);
  assertEquals(1, res.size());
  assertEquals(
      originalResult.getPerDestinationOutputFilenames(),
      res.get(replacementResult.getPerDestinationOutputFilenames()).getOriginal().getValue());
}
 
示例6
@Override
public PTransformReplacement<PCollection<InputT>, WriteFilesResult<DestinationT>>
    getReplacementTransform(
        AppliedPTransform<
                PCollection<InputT>,
                WriteFilesResult<DestinationT>,
                PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>>
            transform) {
  try {
    WriteFiles<InputT, DestinationT, ?> replacement =
        WriteFiles.to(WriteFilesTranslation.getSink(transform))
            .withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform))
            .withSharding(new LogElementShardsWithDrift<>());
    if (WriteFilesTranslation.isWindowedWrites(transform)) {
      replacement = replacement.withWindowedWrites();
    }
    return PTransformReplacement.of(
        PTransformReplacements.getSingletonMainInput(transform), replacement);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}
 
示例7
private PCollection<String> writeFiles(PCollection<T> input, String stagingBucketDir) {

      PCollection<String> mappedUserData =
          input
              .apply(
                  MapElements.via(
                      new SimpleFunction<T, Object[]>() {
                        @Override
                        public Object[] apply(T element) {
                          return getUserDataMapper().mapRow(element);
                        }
                      }))
              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
              .setCoder(StringUtf8Coder.of());

      WriteFilesResult filesResult =
          mappedUserData.apply(
              "Write files to specified location",
              FileIO.<String>write()
                  .via(TextIO.sink())
                  .to(stagingBucketDir)
                  .withPrefix(getFileNameTemplate())
                  .withSuffix(".csv")
                  .withCompression(Compression.GZIP));

      return (PCollection)
          filesResult
              .getPerDestinationOutputFilenames()
              .apply("Parse KV filenames to Strings", Values.<String>create());
    }
 
示例8
public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
    AppliedPTransform<
            PCollection<UserT>,
            WriteFilesResult<DestinationT>,
            ? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
        transform)
    throws IOException {
  return (FileBasedSink<UserT, DestinationT, OutputT>)
      sinkFromProto(getWriteFilesPayload(transform).getSink());
}
 
示例9
public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
    AppliedPTransform<
            PCollection<UserT>,
            WriteFilesResult<DestinationT>,
            ? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
        transform)
    throws IOException {
  SdkComponents sdkComponents = SdkComponents.create(transform.getPipeline().getOptions());
  RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
  List<PCollectionView<?>> views = Lists.newArrayList();
  Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
  for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
    PCollection<?> originalPCollection =
        checkNotNull(
            (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
            "no input with tag %s",
            entry.getKey());
    views.add(
        PCollectionViewTranslation.viewFromProto(
            entry.getValue(),
            entry.getKey(),
            originalPCollection,
            transformProto,
            RehydratedComponents.forComponents(sdkComponents.toComponents())));
  }
  return views;
}
 
示例10
public static <T, DestinationT> boolean isWindowedWrites(
    AppliedPTransform<
            PCollection<T>,
            WriteFilesResult<DestinationT>,
            ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
        transform)
    throws IOException {
  return getWriteFilesPayload(transform).getWindowedWrites();
}
 
示例11
public static <T, DestinationT> boolean isRunnerDeterminedSharding(
    AppliedPTransform<
            PCollection<T>,
            WriteFilesResult<DestinationT>,
            ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
        transform)
    throws IOException {
  return getWriteFilesPayload(transform).getRunnerDeterminedSharding();
}
 
示例12
private static <T, DestinationT> WriteFilesPayload getWriteFilesPayload(
    AppliedPTransform<
            PCollection<T>,
            WriteFilesResult<DestinationT>,
            ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
        transform)
    throws IOException {
  SdkComponents components = SdkComponents.create(transform.getPipeline().getOptions());
  return WriteFilesPayload.parseFrom(
      PTransformTranslation.toProto(transform, Collections.emptyList(), components)
          .getSpec()
          .getPayload());
}
 
示例13
@Override
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
    getReplacementTransform(
        AppliedPTransform<
                PCollection<UserT>,
                WriteFilesResult<DestinationT>,
                WriteFiles<UserT, DestinationT, OutputT>>
            transform) {
  // By default, if numShards is not set WriteFiles will produce one file per bundle. In
  // streaming, there are large numbers of small bundles, resulting in many tiny files.
  // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
  // (current_num_workers * 2 might be a better choice, but that value is not easily available
  // today).
  // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
  int numShards;
  if (options.getMaxNumWorkers() > 0) {
    numShards = options.getMaxNumWorkers() * 2;
  } else if (options.getNumWorkers() > 0) {
    numShards = options.getNumWorkers() * 2;
  } else {
    numShards = DEFAULT_NUM_SHARDS;
  }

  try {
    List<PCollectionView<?>> sideInputs =
        WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
    FileBasedSink sink = WriteFilesTranslation.getSink(transform);
    WriteFiles<UserT, DestinationT, OutputT> replacement =
        WriteFiles.to(sink).withSideInputs(sideInputs);
    if (WriteFilesTranslation.isWindowedWrites(transform)) {
      replacement = replacement.withWindowedWrites();
    }
    return PTransformReplacement.of(
        PTransformReplacements.getSingletonMainInput(transform),
        replacement.withNumShards(numShards));
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
示例14
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
    Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
  // We must connect the new output from WriteFilesResult to the outputs provided by the original
  // transform.
  return ReplacementOutputs.tagged(outputs, newOutput);
}
 
示例15
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
  ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);

  PTransform<PCollection<Object>, WriteFilesResult<Void>> original =
      WriteFiles.to(
          new FileBasedSink<Object, Void, Object>(
              StaticValueProvider.of(outputDirectory),
              DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
            @Override
            public WriteOperation<Void, Object> createWriteOperation() {
              throw new IllegalArgumentException("Should not be used");
            }
          });
  @SuppressWarnings("unchecked")
  PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

  AppliedPTransform<
          PCollection<Object>,
          WriteFilesResult<Void>,
          PTransform<PCollection<Object>, WriteFilesResult<Void>>>
      originalApplication =
          AppliedPTransform.of("write", objs.expand(), Collections.emptyMap(), original, p);

  assertThat(
      factory.getReplacementTransform(originalApplication).getTransform(),
      not(equalTo((Object) original)));
}
 
示例16
@Override
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
    getReplacementTransform(
        AppliedPTransform<
                PCollection<UserT>,
                WriteFilesResult<DestinationT>,
                WriteFiles<UserT, DestinationT, OutputT>>
            transform) {
  // By default, if numShards is not set WriteFiles will produce one file per bundle. In
  // streaming, there are large numbers of small bundles, resulting in many tiny files.
  // Instead we pick parallelism * 2 to ensure full parallelism, but prevent too-many files.
  Integer jobParallelism = options.getParallelism();

  Preconditions.checkArgument(
      jobParallelism > 0,
      "Parallelism of a job should be greater than 0. Currently set: %s",
      jobParallelism);
  int numShards = jobParallelism * 2;

  try {
    List<PCollectionView<?>> sideInputs =
        WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
    FileBasedSink sink = WriteFilesTranslation.getSink(transform);

    @SuppressWarnings("unchecked")
    WriteFiles<UserT, DestinationT, OutputT> replacement =
        WriteFiles.to(sink).withSideInputs(sideInputs);
    if (WriteFilesTranslation.isWindowedWrites(transform)) {
      replacement = replacement.withWindowedWrites();
    }

    if (WriteFilesTranslation.isRunnerDeterminedSharding(transform)) {
      replacement = replacement.withNumShards(numShards);
    } else {
      if (transform.getTransform().getNumShardsProvider() != null) {
        replacement =
            replacement.withNumShards(transform.getTransform().getNumShardsProvider());
      }
      if (transform.getTransform().getComputeNumShards() != null) {
        replacement = replacement.withSharding(transform.getTransform().getComputeNumShards());
      }
    }

    if (options.isAutoBalanceWriteFilesShardingEnabled()) {

      replacement =
          replacement.withShardingFunction(
              new FlinkAutoBalancedShardKeyShardingFunction<>(
                  jobParallelism,
                  options.getMaxParallelism(),
                  sink.getDynamicDestinations().getDestinationCoder()));
    }

    return PTransformReplacement.of(
        PTransformReplacements.getSingletonMainInput(transform), replacement);
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
示例17
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
    Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
  return ReplacementOutputs.tagged(outputs, newOutput);
}
 
示例18
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
    Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
  return ReplacementOutputs.tagged(outputs, newOutput);
}