Java源码示例:org.apache.beam.sdk.options.ValueProvider.NestedValueProvider
示例1
/** Public constructor. */
public BigQueryOutput(ValueProvider<String> tableSpecTemplate, BigQueryWriteMethod writeMethod,
Duration triggeringFrequency, InputType inputType, int numShards, long maxBytesPerPartition,
ValueProvider<List<String>> streamingDocTypes,
ValueProvider<List<String>> strictSchemaDocTypes, ValueProvider<String> schemasLocation,
ValueProvider<TableRowFormat> tableRowFormat, ValueProvider<String> partitioningField,
ValueProvider<List<String>> clusteringFields) {
this.tableSpecTemplate = tableSpecTemplate;
this.writeMethod = writeMethod;
this.triggeringFrequency = triggeringFrequency;
this.inputType = inputType;
this.numShards = numShards;
this.maxBytesPerPartition = maxBytesPerPartition;
this.streamingDocTypes = NestedValueProvider.of(streamingDocTypes,
value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
this.strictSchemaDocTypes = NestedValueProvider.of(strictSchemaDocTypes,
value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
this.schemasLocation = schemasLocation;
this.tableRowFormat = tableRowFormat;
this.partitioningField = NestedValueProvider.of(partitioningField,
f -> f != null ? f : Attribute.SUBMISSION_TIMESTAMP);
this.clusteringFields = NestedValueProvider.of(clusteringFields,
f -> f != null ? f : Collections.singletonList(Attribute.SUBMISSION_TIMESTAMP));
}
示例2
/**
* Method to read a BigQuery schema file from GCS and return the file contents as a string.
*
* @param gcsPath Path string for the schema file in GCS.
* @return File contents as a string.
*/
private static ValueProvider<String> getSchemaFromGCS(ValueProvider<String> gcsPath) {
return NestedValueProvider.of(
gcsPath,
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
ResourceId sourceResourceId = FileSystems.matchNewResource(input, false);
String schema;
try (ReadableByteChannel rbc = FileSystems.open(sourceResourceId)) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (WritableByteChannel wbc = Channels.newChannel(baos)) {
ByteStreams.copy(rbc, wbc);
schema = baos.toString(Charsets.UTF_8.name());
LOG.info("Extracted schema: " + schema);
}
}
} catch (IOException e) {
LOG.error("Error extracting schema: " + e.getMessage());
throw new RuntimeException(e);
}
return schema;
}
});
}
示例3
@Test
@Category(NeedsRunner.class)
public void testCreateOfProvider() throws Exception {
PAssert.that(
p.apply(
"Static", Create.ofProvider(StaticValueProvider.of("foo"), StringUtf8Coder.of())))
.containsInAnyOrder("foo");
PAssert.that(
p.apply(
"Static nested",
Create.ofProvider(
NestedValueProvider.of(StaticValueProvider.of("foo"), input -> input + "bar"),
StringUtf8Coder.of())))
.containsInAnyOrder("foobar");
PAssert.that(
p.apply(
"Runtime", Create.ofProvider(p.newProvider("runtimeFoo"), StringUtf8Coder.of())))
.containsInAnyOrder("runtimeFoo");
p.run();
}
示例4
/**
* Returns the table to write, or {@code null} if writing with {@code tableFunction}.
*
* <p>If the table's project is not specified, use the executing project.
*/
@Nullable
ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTable();
if (table == null) {
return table;
}
if (!table.isAccessible()) {
LOG.info(
"Using a dynamic value for table input. This must contain a project"
+ " in the table reference: {}",
table);
return table;
}
if (Strings.isNullOrEmpty(table.get().getProjectId())) {
// If user does not specify a project we assume the table to be located in
// the default project.
TableReference tableRef = table.get();
tableRef.setProjectId(bqOptions.getProject());
return NestedValueProvider.of(
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
new JsonTableRefToTableRef());
}
return table;
}
示例5
/** Returns an IO transform that writes the overall invoice to a single CSV file. */
private TextIO.Write writeInvoice(ValueProvider<String> yearMonthProvider) {
return TextIO.write()
.to(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
String.format(
"%s/%s/%s/%s-%s",
billingBucketUrl,
BillingModule.INVOICES_DIRECTORY,
yearMonth,
invoiceFilePrefix,
yearMonth)))
.withHeader(InvoiceGroupingKey.invoiceHeader())
.withoutSharding()
.withSuffix(".csv");
}
示例6
/**
* Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
*
* <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
*
* @param outputBucket the GCS bucket we're outputting reports to
* @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
*/
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
String outputBucket, ValueProvider<String> yearMonthProvider) {
return billingEvent ->
new Params()
.withShardTemplate("")
.withSuffix(".csv")
.withBaseFilename(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
FileBasedSink.convertToFileResourceIfPossible(
String.format(
"%s/%s/%s",
outputBucket, yearMonth, billingEvent.toFilename(yearMonth)))));
}
示例7
/**
* Returns a provider that creates a Bigquery query for a given project and yearMonth at runtime.
*
* <p>We only know yearMonth at runtime, so this provider fills in the {@code
* sql/billing_events.sql} template at runtime.
*
* @param yearMonthProvider a runtime provider that returns which month we're invoicing for.
* @param projectId the projectId we're generating invoicing for.
*/
static ValueProvider<String> makeQueryProvider(
ValueProvider<String> yearMonthProvider, String projectId) {
return NestedValueProvider.of(
yearMonthProvider,
(yearMonth) -> {
// Get the timestamp endpoints capturing the entire month with microsecond precision
YearMonth reportingMonth = YearMonth.parse(yearMonth);
LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
// Construct the month's query by filling in the billing_events.sql template
return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql"))
.put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
.put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
.put("PROJECT_ID", projectId)
.put("DATASTORE_EXPORT_DATA_SET", "latest_datastore_export")
.put("ONETIME_TABLE", "OneTime")
.put("REGISTRY_TABLE", "Registry")
.put("REGISTRAR_TABLE", "Registrar")
.put("CANCELLATION_TABLE", "Cancellation")
.build();
});
}
示例8
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WordCountOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read lines", TextIO.read().from(options.getInputFile()))
// [END value_provider]
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Filter with substring", ParDo.of(new FilterWithSubstring(
options.getWithSubstring(), options.getIsCaseSensitive())))
.apply("Count words", Count.perElement())
.apply("Format results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// [START nested_value_provider]
.apply("Write results", TextIO.write().to(NestedValueProvider.of(
options.getOutputBucket(),
(String bucket) -> String.format("gs://%s/samples/dataflow/wordcount/outputs", bucket)
)));
// [END nested_value_provider]
pipeline.run();
}
示例9
/**
* Set all the derived fields of a {@link RepublisherOptions.Parsed} instance.
*/
static void enrichRepublisherOptions(Parsed options) {
SinkOptions.enrichSinkOptions(options);
options
.setDeduplicateExpireSeconds(NestedValueProvider.of(options.getDeduplicateExpireDuration(),
value -> Ints.checkedCast(Time.parseSeconds(value))));
options.setParsedRedisUri(NestedValueProvider.of(options.getRedisUri(),
s -> Optional.ofNullable(s).map(URI::create).orElse(null)));
}
示例10
@Override
public WithFailures.Result<PDone, PubsubMessage> expand(PCollection<PubsubMessage> input) {
ValueProvider<DynamicPathTemplate> pathTemplate = NestedValueProvider.of(outputPrefix,
DynamicPathTemplate::new);
ValueProvider<String> staticPrefix = NestedValueProvider.of(pathTemplate,
value -> value.staticPrefix);
FileIO.Write<List<String>, PubsubMessage> write = FileIO
.<List<String>, PubsubMessage>writeDynamic()
// We can't pass the attribute map to by() directly since MapCoder isn't
// deterministic;
// instead, we extract an ordered list of the needed placeholder values.
// That list is later available to withNaming() to determine output location.
.by(message -> pathTemplate.get()
.extractValuesFrom(DerivedAttributesMap.of(message.getAttributeMap())))
.withDestinationCoder(ListCoder.of(StringUtf8Coder.of())) //
.withCompression(compression) //
.via(Contextful.fn(format::encodeSingleMessage), TextIO.sink()) //
.to(staticPrefix) //
.withNaming(placeholderValues -> NoColonFileNaming.defaultNaming(
pathTemplate.get().replaceDynamicPart(placeholderValues), format.suffix()));
if (inputType == InputType.pubsub) {
// Passing a ValueProvider to withNumShards disables runner-determined sharding, so we
// need to be careful to pass this only for streaming input (where runner-determined
// sharding is not an option).
write = write.withNumShards(numShards);
}
input //
.apply(Window.<PubsubMessage>into(FixedWindows.of(windowDuration))
// We allow lateness up to the maximum Cloud Pub/Sub retention of 7 days documented in
// https://cloud.google.com/pubsub/docs/subscriber
.withAllowedLateness(Duration.standardDays(7)) //
.discardingFiredPanes())
.apply(write);
return WithFailures.Result.of(PDone.in(input.getPipeline()),
EmptyErrors.in(input.getPipeline()));
}
示例11
/** Public constructor. */
public AvroOutput(ValueProvider<String> outputPrefix, Duration windowDuration,
ValueProvider<Integer> numShards, Compression compression, InputType inputType,
ValueProvider<String> schemasLocation) {
this.outputPrefix = outputPrefix;
this.windowDuration = windowDuration;
this.numShards = numShards;
this.compression = compression;
this.inputType = inputType;
this.schemasLocation = schemasLocation;
this.pathTemplate = NestedValueProvider.of(outputPrefix, DynamicPathTemplate::new);
}
示例12
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
maybeUseUserTempLocation(
options.getUserTempLocation(),
options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
示例13
@Override
public PCollection<Export> expand(PBegin input) {
NestedValueProvider<String, String> manifestFile =
NestedValueProvider.of(importDirectory, s -> GcsUtil.joinPath(s, "spanner-export.json"));
return input
.apply("Read manifest", FileIO.match().filepattern(manifestFile))
.apply(
"Resource id",
MapElements.into(TypeDescriptor.of(ResourceId.class))
.via((MatchResult.Metadata::resourceId)))
.apply(
"Read manifest json",
MapElements.into(TypeDescriptor.of(Export.class))
.via(ReadExportManifestFile::readManifest));
}
示例14
/** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> tempDirectoryProvider,
DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
WritableByteChannelFactory writableByteChannelFactory) {
this.tempDirectoryProvider =
NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
this.dynamicDestinations = checkNotNull(dynamicDestinations);
this.writableByteChannelFactory = writableByteChannelFactory;
}
示例15
/** Like {@link #to(String)}. */
public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) {
return toResource(
NestedValueProvider.of(
outputPrefix,
// The function cannot be created as an anonymous class here since the enclosed class
// may contain unserializable members.
new OutputPrefixToResourceId()));
}
示例16
@Test
public void testNestedValueProviderStatic() throws Exception {
SerializableFunction<String, String> function = from -> from + "bar";
ValueProvider<String> svp = StaticValueProvider.of("foo");
ValueProvider<String> nvp = NestedValueProvider.of(svp, function);
assertTrue(nvp.isAccessible());
assertEquals("foobar", nvp.get());
assertEquals("foobar", nvp.toString());
assertEquals(nvp, NestedValueProvider.of(svp, function));
}
示例17
@Test
public void testNestedValueProviderRuntime() throws Exception {
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
ValueProvider<String> rvp = options.getBar();
ValueProvider<String> nvp = NestedValueProvider.of(rvp, from -> from + "bar");
ValueProvider<String> doubleNvp = NestedValueProvider.of(nvp, from -> from);
assertEquals("bar", ((NestedValueProvider) nvp).propertyName());
assertEquals("bar", ((NestedValueProvider) doubleNvp).propertyName());
assertFalse(nvp.isAccessible());
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Value only available at runtime");
nvp.get();
}
示例18
@Test
public void testNestedValueProviderCached() throws Exception {
AtomicInteger increment = new AtomicInteger();
ValueProvider<Integer> nvp =
NestedValueProvider.of(
StaticValueProvider.of(increment), new IncrementAtomicIntegerTranslator());
Integer originalValue = nvp.get();
Integer cachedValue = nvp.get();
Integer incrementValue = increment.incrementAndGet();
Integer secondCachedValue = nvp.get();
assertEquals(originalValue, cachedValue);
assertEquals(secondCachedValue, cachedValue);
assertNotEquals(originalValue, incrementValue);
}
示例19
/** Return a displayable string representation for a {@link TableReference}. */
@Nullable
static ValueProvider<String> displayTable(@Nullable ValueProvider<TableReference> table) {
if (table == null) {
return null;
}
return NestedValueProvider.of(table, new TableRefToTableSpec());
}
示例20
@Nullable
static ValueProvider<String> displayTableRefProto(
@Nullable ValueProvider<TableReferenceProto.TableReference> table) {
if (table == null) {
return null;
}
return NestedValueProvider.of(table, new TableRefProtoToTableSpec());
}
示例21
/** See {@link Read#getTableProvider()}. */
@Nullable
public ValueProvider<TableReference> getTableProvider() {
return getJsonTableRef() == null
? null
: NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
示例22
/** See {@link Read#from(ValueProvider)}. */
public TypedRead<T> from(ValueProvider<String> tableSpec) {
ensureFromNotCalledYet();
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
.build();
}
示例23
/** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> tableSpec) {
checkArgument(tableSpec != null, "tableSpec can not be null");
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
.build();
}
示例24
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
return getJsonTableRef() == null
? null
: NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
示例25
/** Like {@code subscription()} but with a {@link ValueProvider}. */
public Read<T> fromSubscription(ValueProvider<String> subscription) {
if (subscription.isAccessible()) {
// Validate.
PubsubSubscription.fromPath(subscription.get());
}
return toBuilder()
.setSubscriptionProvider(
NestedValueProvider.of(subscription, PubsubSubscription::fromPath))
.build();
}
示例26
/** Like {@code topic()} but with a {@link ValueProvider}. */
public Read<T> fromTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
// Validate.
PubsubTopic.fromPath(topic.get());
}
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.build();
}
示例27
@Override
public PCollection<T> expand(PBegin input) {
if (getTopicProvider() == null && getSubscriptionProvider() == null) {
throw new IllegalStateException(
"Need to set either the topic or the subscription for " + "a PubsubIO.Read transform");
}
if (getTopicProvider() != null && getSubscriptionProvider() != null) {
throw new IllegalStateException(
"Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
}
@Nullable
ValueProvider<TopicPath> topicPath =
getTopicProvider() == null
? null
: NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator());
@Nullable
ValueProvider<SubscriptionPath> subscriptionPath =
getSubscriptionProvider() == null
? null
: NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
PubsubUnboundedSource source =
new PubsubUnboundedSource(
getClock(),
getPubsubClientFactory(),
null /* always get project from runtime PipelineOptions */,
topicPath,
subscriptionPath,
getTimestampAttribute(),
getIdAttribute(),
getNeedsAttributes(),
getNeedsMessageId());
PCollection<T> read =
input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
return read.setCoder(getCoder());
}
示例28
@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
switch (input.isBounded()) {
case BOUNDED:
input.apply(
ParDo.of(
new PubsubBoundedWriter(
MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
return PDone.in(input.getPipeline());
case UNBOUNDED:
return input
.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {}).via(getFormatFn()))
.apply(
new PubsubUnboundedSink(
getPubsubClientFactory(),
NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
getTimestampAttribute(),
getIdAttribute(),
100 /* numShards */,
MoreObjects.firstNonNull(
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(),
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
}
throw new RuntimeException(); // cases are exhaustive.
}
示例29
@Override
public void translate(StreamingPubsubIOWrite transform, TranslationContext context) {
checkArgument(
context.getPipelineOptions().isStreaming(),
"StreamingPubsubIOWrite is only for streaming pipelines.");
PubsubUnboundedSink overriddenTransform = transform.getOverriddenTransform();
StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
} else {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, overriddenTransform.getTimestampAttribute());
}
if (overriddenTransform.getIdAttribute() != null) {
stepContext.addInput(
PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
}
stepContext.addInput(
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));
// No coder is needed in this case since the collection being written is already of
// PubsubMessage, however the Dataflow backend require a coder to be set.
stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
示例30
static <T> ValueProvider<T> providerWithDefault(ValueProvider<T> inner, T defaultValue) {
return NestedValueProvider.of(inner, value -> value == null ? defaultValue : value);
}