Java源码示例:org.apache.beam.sdk.values.TupleTagList
示例1
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));
// If a Udf is supplied then use it to parse the PubSubMessages.
if (javascriptTextTransformGcsPath() != null) {
return failsafeElements.apply(
"InvokeUDF",
FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(javascriptTextTransformGcsPath())
.setFunctionName(javascriptTextTransformFunctionName())
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
} else {
return failsafeElements.apply(
"ProcessPubSubMessages",
ParDo.of(new ProcessFailsafePubSubFn())
.withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
}
}
示例2
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
// Map the incoming messages into FailsafeElements so we can recover from failures
// across multiple transforms.
PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));
// If a Udf is supplied then use it to parse the PubSubMessages.
if (javascriptTextTransformGcsPath() != null) {
return failsafeElements.apply(
"InvokeUDF",
JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
.setFileSystemPath(javascriptTextTransformGcsPath())
.setFunctionName(javascriptTextTransformFunctionName())
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
} else {
return failsafeElements.apply(
"ProcessPubSubMessages",
ParDo.of(new ProcessFailsafePubSubFn())
.withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
}
}
示例3
@Override
public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> failsafeElements) {
return failsafeElements.apply(
"JsonToTableRow",
ParDo.of(
new DoFn<FailsafeElement<T, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext context) {
FailsafeElement<T, String> element = context.element();
String json = element.getPayload();
try {
TableRow row = convertJsonToTableRow(json);
context.output(row);
} catch (Exception e) {
context.output(
failureTag(),
FailsafeElement.of(element)
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
})
.withOutputTags(successTag(), TupleTagList.of(failureTag())));
}
示例4
@Parameters(name = "{index}: {0}")
public static Iterable<ParDo.MultiOutput<?, ?>> data() {
return ImmutableList.of(
ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<>(), TupleTagList.empty()),
ParDo.of(new DropElementsFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty())
.withSideInputs(singletonSideInput, multimapSideInput),
ParDo.of(new DropElementsFn())
.withOutputTags(
new TupleTag<>(),
TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
.withSideInputs(singletonSideInput, multimapSideInput),
ParDo.of(new DropElementsFn())
.withOutputTags(
new TupleTag<>(),
TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})),
ParDo.of(new SplittableDropElementsFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()),
ParDo.of(new StateTimerDropElementsFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()));
}
示例5
@Override
public PCollectionTuple expand(PBegin input) {
if (hasHeaders()) {
return input
.apply("MatchFilePattern", FileIO.match().filepattern(inputFileSpec()))
.apply("ReadMatches", FileIO.readMatches())
.apply(
"ReadCsvWithHeaders",
ParDo.of(new GetCsvHeadersFn(headerTag(), lineTag(), csvFormat(), delimiter()))
.withOutputTags(headerTag(), TupleTagList.of(lineTag())));
}
return PCollectionTuple.of(
lineTag(), input.apply("ReadCsvWithoutHeaders", TextIO.read().from(inputFileSpec())));
}
示例6
@Override
public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> failsafeElements) {
return failsafeElements.apply(
"JsonToTableRow",
ParDo.of(
new DoFn<FailsafeElement<T, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext context) {
FailsafeElement<T, String> element = context.element();
String json = element.getPayload();
try {
TableRow row = convertJsonToTableRow(json);
context.output(row);
} catch (Exception e) {
context.output(
failureTag(),
FailsafeElement.of(element)
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
})
.withOutputTags(successTag(), TupleTagList.of(failureTag())));
}
示例7
/**
* @param filteredIndexes
* @return
*/
private static PCollection<ContentIndexSummary> enrichWithCNLP(
PCollection<ContentIndexSummary> filteredIndexes, Float ratio) {
PCollectionTuple splitAB = filteredIndexes
.apply(ParDo.of(new SplitAB(ratio))
.withOutputTags(PipelineTags.BranchA,
TupleTagList.of(PipelineTags.BranchB)));
PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
ParDo.of(new EnrichWithCNLPEntities()));
//Merge all collections with WebResource table records
PCollectionList<ContentIndexSummary> contentIndexSummariesList =
PCollectionList.of(branchACol).and(enrichedBCol);
PCollection<ContentIndexSummary> allIndexSummaries =
contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());
filteredIndexes = allIndexSummaries;
return filteredIndexes;
}
示例8
@Test
public void testFinishBundle() throws Exception {
Pipeline p = Pipeline.create();
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
ParDoPayload payload =
ParDoTranslation.translateParDo(
ParDo.of(new FinishBundleDoFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of()),
DoFnSchemaInformation.create(),
TestPipeline.create(),
sdkComponents);
assertTrue(payload.getRequestsFinalization());
}
示例9
static PCollectionTuple applyTransform(
PCollection<Integer> numbers, TupleTag<Integer> numBelow100Tag,
TupleTag<Integer> numAbove100Tag) {
return numbers.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(@Element Integer number, MultiOutputReceiver out) {
if (number <= 100) {
out.get(numBelow100Tag).output(number);
} else {
out.get(numAbove100Tag).output(number);
}
}
}).withOutputTags(numBelow100Tag, TupleTagList.of(numAbove100Tag)));
}
示例10
@Override
public ParseResult expand(PCollection<String> jsonStrings) {
PCollectionTuple result =
jsonStrings.apply(
ParDo.of(ParseWithError.create(this))
.withOutputTags(PARSED_LINE, TupleTagList.of(PARSE_ERROR)));
PCollection<Row> failures;
if (getExtendedErrorInfo()) {
failures =
result.get(PARSE_ERROR).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
} else {
failures = result.get(PARSE_ERROR).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
}
return ParseResult.resultBuilder()
.setCallingPipeline(jsonStrings.getPipeline())
.setJsonToRowWithErrFn(this)
.setParsedLine(result.get(PARSED_LINE).setRowSchema(this.getSchema()))
.setFailedParse(failures)
.build();
}
示例11
@Override
public PCollectionList<T> expand(PCollection<T> in) {
final TupleTagList outputTags = partitionDoFn.getOutputTags();
PCollectionTuple outputs =
in.apply(
ParDo.of(partitionDoFn)
.withOutputTags(new TupleTag<Void>() {}, outputTags)
.withSideInputs(partitionDoFn.getSideInputs()));
PCollectionList<T> pcs = PCollectionList.empty(in.getPipeline());
Coder<T> coder = in.getCoder();
for (TupleTag<?> outputTag : outputTags.getAll()) {
// All the tuple tags are actually TupleTag<T>
// And all the collections are actually PCollection<T>
@SuppressWarnings("unchecked")
TupleTag<T> typedOutputTag = (TupleTag<T>) outputTag;
pcs = pcs.and(outputs.get(typedOutputTag).setCoder(coder));
}
return pcs;
}
示例12
/**
* Constructs a PartitionDoFn.
*
* @throws IllegalArgumentException if {@code numPartitions <= 0}
*/
private PartitionDoFn(
int numPartitions,
Contextful<Contextful.Fn<X, Integer>> ctxFn,
Object originalFnClassForDisplayData) {
this.ctxFn = ctxFn;
this.originalFnClassForDisplayData = originalFnClassForDisplayData;
if (numPartitions <= 0) {
throw new IllegalArgumentException("numPartitions must be > 0");
}
this.numPartitions = numPartitions;
TupleTagList buildOutputTags = TupleTagList.empty();
for (int partition = 0; partition < numPartitions; partition++) {
buildOutputTags = buildOutputTags.and(new TupleTag<X>());
}
outputTags = buildOutputTags;
}
示例13
@Test
@Ignore(
"TODO: BEAM-2902 Add support for user state in a ParDo.Multi once PTransformMatcher "
+ "exposes a way to know when the replacement is not required by checking that the "
+ "preceding ParDos to a GBK are key preserving.")
public void testFnApiMultiOutputOverrideNonCrashing() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
DummyStatefulDoFn fn = new DummyStatefulDoFn();
pipeline
.apply(Create.of(KV.of(1, 2)))
.apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
示例14
@Test
public void parDoRequiresStableInput() {
DoFn<Object, Object> doFnRSI =
new DoFn<Object, Object>() {
@RequiresStableInput
@ProcessElement
public void process(ProcessContext ctxt) {}
};
AppliedPTransform<?, ?, ?> single = getAppliedTransform(ParDo.of(doFn));
AppliedPTransform<?, ?, ?> singleRSI = getAppliedTransform(ParDo.of(doFnRSI));
AppliedPTransform<?, ?, ?> multi =
getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
AppliedPTransform<?, ?, ?> multiRSI =
getAppliedTransform(
ParDo.of(doFnRSI).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(single), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(singleRSI), is(true));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multi), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multiRSI), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(single), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(singleRSI), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multi), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multiRSI), is(true));
}
示例15
private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
PCollectionView<String> view = pipeline.apply(Create.of("foo")).apply(View.asSingleton());
PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
ParDo.MultiOutput<Long, KV<Long, String>> parDo =
ParDo.of(new TestDoFn())
.withSideInputs(view)
.withOutputTags(
new TupleTag<KV<Long, String>>() {},
TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
PCollectionTuple output = input.apply(parDo);
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
inputs.putAll(parDo.getAdditionalInputs());
inputs.putAll(input.expand());
return AppliedPTransform
.<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
"MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
示例16
@Test
public void testMultiOutputOverrideNonCrashing() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
DummyStatefulDoFn fn = new DummyStatefulDoFn();
pipeline
.apply(Create.of(KV.of(1, 2)))
.apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
示例17
@Test
public void testTaggedOutputUnregisteredExplicitCoder() throws Exception {
pipeline.enableAbandonedNodeEnforcement(false);
PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
final TupleTag<Integer> mainOutputTag = new TupleTag<>("main");
final TupleTag<TestDummy> additionalOutputTag = new TupleTag<>("unregisteredSide");
ParDo.MultiOutput<Integer, Integer> pardo =
ParDo.of(new TaggedOutputDummyFn(mainOutputTag, additionalOutputTag))
.withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag));
PCollectionTuple outputTuple = input.apply(pardo);
outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder());
outputTuple.get(additionalOutputTag).apply(View.asSingleton());
assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder());
outputTuple
.get(additionalOutputTag)
.finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes
assertEquals(
new TestDummyCoder(),
outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption
}
示例18
private void testAdditionalOutput(IsBounded bounded) {
TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};
PCollectionTuple res =
p.apply("input", Create.of(0, 1, 2))
.apply(
ParDo.of(sdfWithAdditionalOutput(bounded, additionalOutputTag))
.withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
PAssert.that(res.get(mainOutputTag))
.containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
PAssert.that(res.get(additionalOutputTag))
.containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
p.run();
}
示例19
private static Pipeline createPipeline(
PipelineOptions options, String singleOutputPrefix, String multiOutputPrefix) {
Pipeline p = Pipeline.create(options);
SerializableFunction<Void, Void> firstTime =
(SerializableFunction<Void, Void>)
value -> {
latch.countDown();
return null;
};
PCollection<String> impulse = p.apply("CreatePCollectionOfOneValue", Create.of(VALUE));
impulse
.apply(
"Single-PairWithRandomKey",
MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn()))
.apply(
"Single-MakeSideEffectAndThenFail",
ParDo.of(
new RequiresStableInputIT.MakeSideEffectAndThenFailFn(
singleOutputPrefix, firstTime)));
impulse
.apply(
"Multi-PairWithRandomKey",
MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn()))
.apply(
"Multi-MakeSideEffectAndThenFail",
ParDo.of(
new RequiresStableInputIT.MakeSideEffectAndThenFailFn(
multiOutputPrefix, firstTime))
.withOutputTags(new TupleTag<>(), TupleTagList.empty()));
return p;
}
示例20
@Test
public void parDoWithState() {
AppliedPTransform<?, ?, ?> statefulApplication =
getAppliedTransform(
ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));
AppliedPTransform<?, ?, ?> splittableApplication =
getAppliedTransform(
ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
}
示例21
@Override
public PCollectionTuple expand(PCollection<FeatureRowProto.FeatureRow> input) {
return input.apply(
"AssignRowToStore",
ParDo.of(
new DoFn<FeatureRowProto.FeatureRow, FeatureRowProto.FeatureRow>() {
@ProcessElement
public void process(ProcessContext c, @Element FeatureRowProto.FeatureRow row) {
Pair<String, String> projectAndSetNames =
parseFeatureSetReference(row.getFeatureSet());
getStores().stream()
.filter(
s ->
Store.isSubscribedToFeatureSet(
s.getSubscriptionsList(),
projectAndSetNames.getLeft(),
projectAndSetNames.getRight()))
.forEach(s -> c.output(getStoreTags().get(s), row));
}
})
.withOutputTags(
getStoreTags().get(getStores().get(0)),
TupleTagList.of(
getStores().stream()
.skip(1)
.map(getStoreTags()::get)
.collect(Collectors.toList()))));
}
示例22
@Override
public PCollectionTuple expand(PCollection<TableRow> input) {
PCollectionTuple udfOut;
PCollectionTuple failsafeTableRows =
input.apply(
"TableRowToFailsafeElement",
ParDo.of(new TableRowToFailsafeElementFn(transformDeadletterOutTag()))
.withOutputTags(transformOutTag(), TupleTagList.of(transformDeadletterOutTag())));
// Use Udf to parse table rows if supplied.
if (options().getJavascriptTextTransformGcsPath() != null) {
udfOut =
failsafeTableRows
.get(transformOutTag())
.apply(
"ProcessFailsafeRowsUdf",
JavascriptTextTransformer.FailsafeJavascriptUdf.<TableRow>newBuilder()
.setFileSystemPath(options().getJavascriptTextTransformGcsPath())
.setFunctionName(options().getJavascriptTextTransformFunctionName())
.setSuccessTag(udfOutTag())
.setFailureTag(udfDeadletterOutTag())
.build());
PCollection<FailsafeElement<TableRow, String>> failedOut =
PCollectionList.of(udfOut.get(udfDeadletterOutTag()))
.and(failsafeTableRows.get(transformDeadletterOutTag()))
.apply("FlattenFailedOut", Flatten.pCollections());
return PCollectionTuple.of(transformOutTag(), udfOut.get(udfOutTag()))
.and(transformDeadletterOutTag(), failedOut);
} else {
return failsafeTableRows;
}
}
示例23
@Test
public void parDoWithFnTypeWithMatchingType() {
DoFn<Object, Object> fn =
new DoFn<Object, Object>() {
@ProcessElement
public void process(ProcessContext ctxt) {}
};
AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
AppliedPTransform<?, ?, ?> parDoMulti =
getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(fn.getClass());
assertThat(matcher.matches(parDoSingle), is(true));
assertThat(matcher.matches(parDoMulti), is(true));
}
示例24
public static Result of(PCollectionTuple pct) throws IllegalArgumentException {
if (pct.getAll()
.keySet()
.containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
return new Result(pct);
} else {
throw new IllegalArgumentException(
"The PCollection tuple must have the HL7v2IO.Read.OUT "
+ "and HL7v2IO.Read.DEAD_LETTER tuple tags");
}
}
示例25
@Test
public void parDoMultiWithState() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
示例26
public GbkThenStatefulParDo(
DoFn<KV<K, InputT>, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
List<PCollectionView<?>> sideInputs,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
this.doFn = doFn;
this.additionalOutputTags = additionalOutputTags;
this.mainOutputTag = mainOutputTag;
this.sideInputs = sideInputs;
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
}
示例27
@Override
public Result expand(PCollection<String> msgIds) {
CoderRegistry coderRegistry = msgIds.getPipeline().getCoderRegistry();
coderRegistry.registerCoderForClass(HL7v2Message.class, HL7v2MessageCoder.of());
return new Result(
msgIds.apply(
ParDo.of(new FetchHL7v2Message.HL7v2MessageGetFn())
.withOutputTags(HL7v2IO.Read.OUT, TupleTagList.of(HL7v2IO.Read.DEAD_LETTER))));
}
示例28
/**
* @param contentToIndexNotSkipped
* @param contentNotToIndexSkipped
* @param pipeline
* @param options
* @return
*/
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
Pipeline pipeline, IndexerPipelineOptions options) {
PCollection<KV<String,Long>> alreadyProcessedDocs = null;
if (!options.getWriteTruncate()) {
String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
alreadyProcessedDocs = pipeline
.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetDocumentHashFn()));
} else {
Map<String, Long> map = new HashMap<String,Long>();
alreadyProcessedDocs = pipeline
.apply("Create empty side input of Docs",
Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
}
final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =
alreadyProcessedDocs.apply(View.<String,Long>asMap());
PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
.withSideInputs(alreadyProcessedDocsSideInput)
.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection
PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
// Merge the sets of items that are dupes or skipped
PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
return content;
}
示例29
@Test
public void retainOnlyPrimitivesWithOnlyPrimitivesUnchanged() {
Pipeline p = Pipeline.create();
p.apply("Read", Read.from(CountingSource.unbounded()))
.apply(
"multi-do",
ParDo.of(new TestFn()).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
Components originalComponents = PipelineTranslation.toProto(p).getComponents();
Collection<String> primitiveComponents =
QueryablePipeline.getPrimitiveTransformIds(originalComponents);
assertThat(primitiveComponents, equalTo(originalComponents.getTransformsMap().keySet()));
}
示例30
@Test
public void parDoMulti() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}