Java源码示例:org.apache.beam.sdk.options.ValueProvider.NestedValueProvider

示例1
/** Public constructor. */
public BigQueryOutput(ValueProvider<String> tableSpecTemplate, BigQueryWriteMethod writeMethod,
    Duration triggeringFrequency, InputType inputType, int numShards, long maxBytesPerPartition,
    ValueProvider<List<String>> streamingDocTypes,
    ValueProvider<List<String>> strictSchemaDocTypes, ValueProvider<String> schemasLocation,
    ValueProvider<TableRowFormat> tableRowFormat, ValueProvider<String> partitioningField,
    ValueProvider<List<String>> clusteringFields) {
  this.tableSpecTemplate = tableSpecTemplate;
  this.writeMethod = writeMethod;
  this.triggeringFrequency = triggeringFrequency;
  this.inputType = inputType;
  this.numShards = numShards;
  this.maxBytesPerPartition = maxBytesPerPartition;
  this.streamingDocTypes = NestedValueProvider.of(streamingDocTypes,
      value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
  this.strictSchemaDocTypes = NestedValueProvider.of(strictSchemaDocTypes,
      value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
  this.schemasLocation = schemasLocation;
  this.tableRowFormat = tableRowFormat;
  this.partitioningField = NestedValueProvider.of(partitioningField,
      f -> f != null ? f : Attribute.SUBMISSION_TIMESTAMP);
  this.clusteringFields = NestedValueProvider.of(clusteringFields,
      f -> f != null ? f : Collections.singletonList(Attribute.SUBMISSION_TIMESTAMP));
}
 
示例2
/**
 * Method to read a BigQuery schema file from GCS and return the file contents as a string.
 *
 * @param gcsPath Path string for the schema file in GCS.
 * @return File contents as a string.
 */
private static ValueProvider<String> getSchemaFromGCS(ValueProvider<String> gcsPath) {
  return NestedValueProvider.of(
      gcsPath,
      new SimpleFunction<String, String>() {
        @Override
        public String apply(String input) {
          ResourceId sourceResourceId = FileSystems.matchNewResource(input, false);

          String schema;
          try (ReadableByteChannel rbc = FileSystems.open(sourceResourceId)) {
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
              try (WritableByteChannel wbc = Channels.newChannel(baos)) {
                ByteStreams.copy(rbc, wbc);
                schema = baos.toString(Charsets.UTF_8.name());
                LOG.info("Extracted schema: " + schema);
              }
            }
          } catch (IOException e) {
            LOG.error("Error extracting schema: " + e.getMessage());
            throw new RuntimeException(e);
          }
          return schema;
        }
      });
}
 
示例3
@Test
@Category(NeedsRunner.class)
public void testCreateOfProvider() throws Exception {
  PAssert.that(
          p.apply(
              "Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
      .containsInAnyOrder("foo");
  PAssert.that(
          p.apply(
              "Static nested",
              Create.ofProvider(
                  NestedValueProvider.of(StaticValueProvider.of("foo"), input -> input + "bar"),
                  StringUtf8Coder.of())))
      .containsInAnyOrder("foobar");
  PAssert.that(
          p.apply(
              "Runtime", Create.ofProvider(p.newProvider("runtimeFoo"), StringUtf8Coder.of())))
      .containsInAnyOrder("runtimeFoo");

  p.run();
}
 
示例4
/**
 * Returns the table to write, or {@code null} if writing with {@code tableFunction}.
 *
 * <p>If the table's project is not specified, use the executing project.
 */
@Nullable
ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
  ValueProvider<TableReference> table = getTable();
  if (table == null) {
    return table;
  }

  if (!table.isAccessible()) {
    LOG.info(
        "Using a dynamic value for table input. This must contain a project"
            + " in the table reference: {}",
        table);
    return table;
  }
  if (Strings.isNullOrEmpty(table.get().getProjectId())) {
    // If user does not specify a project we assume the table to be located in
    // the default project.
    TableReference tableRef = table.get();
    tableRef.setProjectId(bqOptions.getProject());
    return NestedValueProvider.of(
        StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
        new JsonTableRefToTableRef());
  }
  return table;
}
 
示例5
/** Returns an IO transform that writes the overall invoice to a single CSV file. */
private TextIO.Write writeInvoice(ValueProvider<String> yearMonthProvider) {
  return TextIO.write()
      .to(
          NestedValueProvider.of(
              yearMonthProvider,
              yearMonth ->
                  String.format(
                      "%s/%s/%s/%s-%s",
                      billingBucketUrl,
                      BillingModule.INVOICES_DIRECTORY,
                      yearMonth,
                      invoiceFilePrefix,
                      yearMonth)))
      .withHeader(InvoiceGroupingKey.invoiceHeader())
      .withoutSharding()
      .withSuffix(".csv");
}
 
示例6
/**
 * Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
 *
 * <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
 *
 * @param outputBucket the GCS bucket we're outputting reports to
 * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
 */
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
    String outputBucket, ValueProvider<String> yearMonthProvider) {
  return billingEvent ->
      new Params()
          .withShardTemplate("")
          .withSuffix(".csv")
          .withBaseFilename(
              NestedValueProvider.of(
                  yearMonthProvider,
                  yearMonth ->
                      FileBasedSink.convertToFileResourceIfPossible(
                          String.format(
                              "%s/%s/%s",
                              outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
}
 
示例7
/**
 * Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
 *
 * <p>We only know yearMonth at runtime, so this provider fills in the {@code
 * sql/billing_events.sql} template at runtime.
 *
 * @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
 * @param projectId the projectId we're generating invoicing for.
 */
static ValueProvider<String> makeQueryProvider(
    ValueProvider<String> yearMonthProvider, String projectId) {
  return NestedValueProvider.of(
      yearMonthProvider,
      (yearMonth) -> {
        // Get the timestamp endpoints capturing the entire month with microsecond precision
        YearMonth reportingMonth = YearMonth.parse(yearMonth);
        LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
        LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
        // Construct the month's query by filling in the billing_events.sql template
        return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql"))
            .put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
            .put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
            .put("PROJECT_ID", projectId)
            .put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
            .put("ONETIME_TABLE", "OneTime")
            .put("REGISTRY_TABLE", "Registry")
            .put("REGISTRAR_TABLE", "Registrar")
            .put("CANCELLATION_TABLE", "Cancellation")
            .build();
      });
}
 
示例8
public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args)
      .withValidation().as(WordCountOptions.class);

  Pipeline pipeline = Pipeline.create(options);
  pipeline
      .apply("Read lines", TextIO.read().from(options.getInputFile()))
      // [END value_provider]
      .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
          .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
      .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
      .apply("Filter with substring", ParDo.of(new FilterWithSubstring(
          options.getWithSubstring(), options.getIsCaseSensitive())))
      .apply("Count words", Count.perElement())
      .apply("Format results", MapElements.into(TypeDescriptors.strings())
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
      // [START nested_value_provider]
      .apply("Write results", TextIO.write().to(NestedValueProvider.of(
          options.getOutputBucket(),
          (String bucket) -> String.format("gs://%s/samples/dataflow/wordcount/outputs", bucket)
      )));
      // [END nested_value_provider]
  pipeline.run();
}
 
示例9
/**
 * Set all the derived fields of a {@link RepublisherOptions.Parsed} instance.
 */
static void enrichRepublisherOptions(Parsed options) {
  SinkOptions.enrichSinkOptions(options);
  options
      .setDeduplicateExpireSeconds(NestedValueProvider.of(options.getDeduplicateExpireDuration(),
          value -> Ints.checkedCast(Time.parseSeconds(value))));
  options.setParsedRedisUri(NestedValueProvider.of(options.getRedisUri(),
      s -> Optional.ofNullable(s).map(URI::create).orElse(null)));
}
 
示例10
@Override
public WithFailures.Result<PDone, PubsubMessage> expand(PCollection<PubsubMessage> input) {
  ValueProvider<DynamicPathTemplate> pathTemplate = NestedValueProvider.of(outputPrefix,
      DynamicPathTemplate::new);
  ValueProvider<String> staticPrefix = NestedValueProvider.of(pathTemplate,
      value -> value.staticPrefix);

  FileIO.Write<List<String>, PubsubMessage> write = FileIO
      .<List<String>, PubsubMessage>writeDynamic()
      // We can't pass the attribute map to by() directly since MapCoder isn't
      // deterministic;
      // instead, we extract an ordered list of the needed placeholder values.
      // That list is later available to withNaming() to determine output location.
      .by(message -> pathTemplate.get()
          .extractValuesFrom(DerivedAttributesMap.of(message.getAttributeMap())))
      .withDestinationCoder(ListCoder.of(StringUtf8Coder.of())) //
      .withCompression(compression) //
      .via(Contextful.fn(format::encodeSingleMessage), TextIO.sink()) //
      .to(staticPrefix) //
      .withNaming(placeholderValues -> NoColonFileNaming.defaultNaming(
          pathTemplate.get().replaceDynamicPart(placeholderValues), format.suffix()));

  if (inputType == InputType.pubsub) {
    // Passing a ValueProvider to withNumShards disables runner-determined sharding, so we
    // need to be careful to pass this only for streaming input (where runner-determined
    // sharding is not an option).
    write = write.withNumShards(numShards);
  }

  input //
      .apply(Window.<PubsubMessage>into(FixedWindows.of(windowDuration))
          // We allow lateness up to the maximum Cloud Pub/Sub retention of 7 days documented in
          // https://cloud.google.com/pubsub/docs/subscriber
          .withAllowedLateness(Duration.standardDays(7)) //
          .discardingFiredPanes())
      .apply(write);
  return WithFailures.Result.of(PDone.in(input.getPipeline()),
      EmptyErrors.in(input.getPipeline()));
}
 
示例11
/** Public constructor. */
public AvroOutput(ValueProvider<String> outputPrefix, Duration windowDuration,
    ValueProvider<Integer> numShards, Compression compression, InputType inputType,
    ValueProvider<String> schemasLocation) {
  this.outputPrefix = outputPrefix;
  this.windowDuration = windowDuration;
  this.numShards = numShards;
  this.compression = compression;
  this.inputType = inputType;
  this.schemasLocation = schemasLocation;
  this.pathTemplate = NestedValueProvider.of(outputPrefix, DynamicPathTemplate::new);
}
 
示例12
/**
 * Runs the pipeline with the supplied options.
 *
 * @param options The execution parameters to the pipeline.
 * @return  The result of the pipeline execution.
 */
public static PipelineResult run(Options options) {
  // Create the pipeline
  Pipeline pipeline = Pipeline.create(options);

  /*
   * Steps:
   *   1) Read string messages from PubSub
   *   2) Window the messages into minute intervals specified by the executor.
   *   3) Output the windowed files to GCS
   */
  pipeline
      .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
      .apply(
          options.getWindowDuration() + " Window",
          Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

      // Apply windowed file writes. Use a NestedValueProvider because the filename
      // policy requires a resourceId generated from the input value at runtime.
      .apply(
          "Write File(s)",
          TextIO.write()
              .withWindowedWrites()
              .withNumShards(options.getNumShards())
              .to(
                  new WindowedFilenamePolicy(
                      options.getOutputDirectory(),
                      options.getOutputFilenamePrefix(),
                      options.getOutputShardTemplate(),
                      options.getOutputFilenameSuffix()))
              .withTempDirectory(NestedValueProvider.of(
                  maybeUseUserTempLocation(
                      options.getUserTempLocation(),
                      options.getOutputDirectory()),
                  (SerializableFunction<String, ResourceId>) input ->
                      FileBasedSink.convertToFileResourceIfPossible(input))));

  // Execute the pipeline and return the result.
  return pipeline.run();
}
 
示例13
@Override
public PCollection<Export> expand(PBegin input) {
  NestedValueProvider<String, String> manifestFile =
      NestedValueProvider.of(importDirectory, s -> GcsUtil.joinPath(s, "spanner-export.json"));
  return input
      .apply("Read manifest", FileIO.match().filepattern(manifestFile))
      .apply(
          "Resource id",
          MapElements.into(TypeDescriptor.of(ResourceId.class))
              .via((MatchResult.Metadata::resourceId)))
      .apply(
          "Read manifest json",
          MapElements.into(TypeDescriptor.of(Export.class))
              .via(ReadExportManifestFile::readManifest));
}
 
示例14
/** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
    ValueProvider<ResourceId> tempDirectoryProvider,
    DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
    WritableByteChannelFactory writableByteChannelFactory) {
  this.tempDirectoryProvider =
      NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
  this.dynamicDestinations = checkNotNull(dynamicDestinations);
  this.writableByteChannelFactory = writableByteChannelFactory;
}
 
示例15
/** Like {@link #to(String)}. */
public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) {
  return toResource(
      NestedValueProvider.of(
          outputPrefix,
          // The function cannot be created as an anonymous class here since the enclosed class
          // may contain unserializable members.
          new OutputPrefixToResourceId()));
}
 
示例16
@Test
public void testNestedValueProviderStatic() throws Exception {
  SerializableFunction<String, String> function = from -> from + "bar";
  ValueProvider<String> svp = StaticValueProvider.of("foo");
  ValueProvider<String> nvp = NestedValueProvider.of(svp, function);
  assertTrue(nvp.isAccessible());
  assertEquals("foobar", nvp.get());
  assertEquals("foobar", nvp.toString());
  assertEquals(nvp, NestedValueProvider.of(svp, function));
}
 
示例17
@Test
public void testNestedValueProviderRuntime() throws Exception {
  TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
  ValueProvider<String> rvp = options.getBar();
  ValueProvider<String> nvp = NestedValueProvider.of(rvp, from -> from + "bar");
  ValueProvider<String> doubleNvp = NestedValueProvider.of(nvp, from -> from);
  assertEquals("bar", ((NestedValueProvider) nvp).propertyName());
  assertEquals("bar", ((NestedValueProvider) doubleNvp).propertyName());
  assertFalse(nvp.isAccessible());
  expectedException.expect(RuntimeException.class);
  expectedException.expectMessage("Value only available at runtime");
  nvp.get();
}
 
示例18
@Test
public void testNestedValueProviderCached() throws Exception {
  AtomicInteger increment = new AtomicInteger();
  ValueProvider<Integer> nvp =
      NestedValueProvider.of(
          StaticValueProvider.of(increment), new IncrementAtomicIntegerTranslator());
  Integer originalValue = nvp.get();
  Integer cachedValue = nvp.get();
  Integer incrementValue = increment.incrementAndGet();
  Integer secondCachedValue = nvp.get();
  assertEquals(originalValue, cachedValue);
  assertEquals(secondCachedValue, cachedValue);
  assertNotEquals(originalValue, incrementValue);
}
 
示例19
/** Return a displayable string representation for a {@link TableReference}. */
@Nullable
static ValueProvider<String> displayTable(@Nullable ValueProvider<TableReference> table) {
  if (table == null) {
    return null;
  }
  return NestedValueProvider.of(table, new TableRefToTableSpec());
}
 
示例20
@Nullable
static ValueProvider<String> displayTableRefProto(
    @Nullable ValueProvider<TableReferenceProto.TableReference> table) {
  if (table == null) {
    return null;
  }

  return NestedValueProvider.of(table, new TableRefProtoToTableSpec());
}
 
示例21
/** See {@link Read#getTableProvider()}. */
@Nullable
public ValueProvider<TableReference> getTableProvider() {
  return getJsonTableRef() == null
      ? null
      : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
 
示例22
/** See {@link Read#from(ValueProvider)}. */
public TypedRead<T> from(ValueProvider<String> tableSpec) {
  ensureFromNotCalledYet();
  return toBuilder()
      .setJsonTableRef(
          NestedValueProvider.of(
              NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
              new TableRefToJson()))
      .build();
}
 
示例23
/** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> tableSpec) {
  checkArgument(tableSpec != null, "tableSpec can not be null");
  return toBuilder()
      .setJsonTableRef(
          NestedValueProvider.of(
              NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
              new TableRefToJson()))
      .build();
}
 
示例24
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
  return getJsonTableRef() == null
      ? null
      : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
 
示例25
/** Like {@code subscription()} but with a {@link ValueProvider}. */
public Read<T> fromSubscription(ValueProvider<String> subscription) {
  if (subscription.isAccessible()) {
    // Validate.
    PubsubSubscription.fromPath(subscription.get());
  }
  return toBuilder()
      .setSubscriptionProvider(
          NestedValueProvider.of(subscription, PubsubSubscription::fromPath))
      .build();
}
 
示例26
/** Like {@code topic()} but with a {@link ValueProvider}. */
public Read<T> fromTopic(ValueProvider<String> topic) {
  if (topic.isAccessible()) {
    // Validate.
    PubsubTopic.fromPath(topic.get());
  }
  return toBuilder()
      .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
      .build();
}
 
示例27
@Override
public PCollection<T> expand(PBegin input) {
  if (getTopicProvider() == null && getSubscriptionProvider() == null) {
    throw new IllegalStateException(
        "Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
  }
  if (getTopicProvider() != null && getSubscriptionProvider() != null) {
    throw new IllegalStateException(
        "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
  }

  @Nullable
  ValueProvider<TopicPath> topicPath =
      getTopicProvider() == null
          ? null
          : NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
  @Nullable
  ValueProvider<SubscriptionPath> subscriptionPath =
      getSubscriptionProvider() == null
          ? null
          : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
  PubsubUnboundedSource source =
      new PubsubUnboundedSource(
          getClock(),
          getPubsubClientFactory(),
          null /* always get project from runtime PipelineOptions */,
          topicPath,
          subscriptionPath,
          getTimestampAttribute(),
          getIdAttribute(),
          getNeedsAttributes(),
          getNeedsMessageId());
  PCollection<T> read =
      input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
  return read.setCoder(getCoder());
}
 
示例28
@Override
public PDone expand(PCollection<T> input) {
  if (getTopicProvider() == null) {
    throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
  }

  switch (input.isBounded()) {
    case BOUNDED:
      input.apply(
          ParDo.of(
              new PubsubBoundedWriter(
                  MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE),
                  MoreObjects.firstNonNull(
                      getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
      return PDone.in(input.getPipeline());
    case UNBOUNDED:
      return input
          .apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {}).via(getFormatFn()))
          .apply(
              new PubsubUnboundedSink(
                  getPubsubClientFactory(),
                  NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
                  getTimestampAttribute(),
                  getIdAttribute(),
                  100 /* numShards */,
                  MoreObjects.firstNonNull(
                      getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
                  MoreObjects.firstNonNull(
                      getMaxBatchBytesSize(),
                      PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
  }
  throw new RuntimeException(); // cases are exhaustive.
}
 
示例29
@Override
public void translate(StreamingPubsubIOWrite transform, TranslationContext context) {
  checkArgument(
      context.getPipelineOptions().isStreaming(),
      "StreamingPubsubIOWrite is only for streaming pipelines.");
  PubsubUnboundedSink overriddenTransform = transform.getOverriddenTransform();
  StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
  stepContext.addInput(PropertyNames.FORMAT, "pubsub");
  if (overriddenTransform.getTopicProvider().isAccessible()) {
    stepContext.addInput(
        PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
  } else {
    stepContext.addInput(
        PropertyNames.PUBSUB_TOPIC_OVERRIDE,
        ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
  }
  if (overriddenTransform.getTimestampAttribute() != null) {
    stepContext.addInput(
        PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
  }
  if (overriddenTransform.getIdAttribute() != null) {
    stepContext.addInput(
        PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
  }
  stepContext.addInput(
      PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
      byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));
  // No coder is needed in this case since the collection being written is already of
  // PubsubMessage, however the Dataflow backend require a coder to be set.
  stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
  stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
 
示例30
static <T> ValueProvider<T> providerWithDefault(ValueProvider<T> inner, T defaultValue) {
  return NestedValueProvider.of(inner, value -> value == null ? defaultValue : value);
}