Java源码示例:org.apache.beam.sdk.values.TupleTagList

示例1
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {

  // Map the incoming messages into FailsafeElements so we can recover from failures
  // across multiple transforms.
  PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
      input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));

  // If a Udf is supplied then use it to parse the PubSubMessages.
  if (javascriptTextTransformGcsPath() != null) {
    return failsafeElements.apply(
        "InvokeUDF",
        FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
            .setFileSystemPath(javascriptTextTransformGcsPath())
            .setFunctionName(javascriptTextTransformFunctionName())
            .setSuccessTag(TRANSFORM_OUT)
            .setFailureTag(TRANSFORM_DEADLETTER_OUT)
            .build());
  } else {
    return failsafeElements.apply(
        "ProcessPubSubMessages",
        ParDo.of(new ProcessFailsafePubSubFn())
            .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
  }
}
 
示例2
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {

  // Map the incoming messages into FailsafeElements so we can recover from failures
  // across multiple transforms.
  PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements =
          input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()));

  // If a Udf is supplied then use it to parse the PubSubMessages.
  if (javascriptTextTransformGcsPath() != null) {
    return failsafeElements.apply(
            "InvokeUDF",
            JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                    .setFileSystemPath(javascriptTextTransformGcsPath())
                    .setFunctionName(javascriptTextTransformFunctionName())
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());
  } else {
    return failsafeElements.apply(
            "ProcessPubSubMessages",
            ParDo.of(new ProcessFailsafePubSubFn())
                    .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT)));
  }
}
 
示例3
@Override
public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> failsafeElements) {
  return failsafeElements.apply(
      "JsonToTableRow",
      ParDo.of(
              new DoFn<FailsafeElement<T, String>, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext context) {
                  FailsafeElement<T, String> element = context.element();
                  String json = element.getPayload();

                  try {
                    TableRow row = convertJsonToTableRow(json);
                    context.output(row);
                  } catch (Exception e) {
                    context.output(
                        failureTag(),
                        FailsafeElement.of(element)
                            .setErrorMessage(e.getMessage())
                            .setStacktrace(Throwables.getStackTraceAsString(e)));
                  }
                }
              })
          .withOutputTags(successTag(), TupleTagList.of(failureTag())));
}
 
示例4
@Parameters(name = "{index}: {0}")
public static Iterable<ParDo.MultiOutput<?, ?>> data() {
  return ImmutableList.of(
      ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<>(), TupleTagList.empty()),
      ParDo.of(new DropElementsFn())
          .withOutputTags(new TupleTag<>(), TupleTagList.empty())
          .withSideInputs(singletonSideInput, multimapSideInput),
      ParDo.of(new DropElementsFn())
          .withOutputTags(
              new TupleTag<>(),
              TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
          .withSideInputs(singletonSideInput, multimapSideInput),
      ParDo.of(new DropElementsFn())
          .withOutputTags(
              new TupleTag<>(),
              TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})),
      ParDo.of(new SplittableDropElementsFn())
          .withOutputTags(new TupleTag<>(), TupleTagList.empty()),
      ParDo.of(new StateTimerDropElementsFn())
          .withOutputTags(new TupleTag<>(), TupleTagList.empty()));
}
 
示例5
@Override
public PCollectionTuple expand(PBegin input) {

  if (hasHeaders()) {
    return input
        .apply("MatchFilePattern", FileIO.match().filepattern(inputFileSpec()))
        .apply("ReadMatches", FileIO.readMatches())
        .apply(
            "ReadCsvWithHeaders",
            ParDo.of(new GetCsvHeadersFn(headerTag(), lineTag(), csvFormat(), delimiter()))
                .withOutputTags(headerTag(), TupleTagList.of(lineTag())));
  }

  return PCollectionTuple.of(
      lineTag(), input.apply("ReadCsvWithoutHeaders", TextIO.read().from(inputFileSpec())));
}
 
示例6
@Override
public PCollectionTuple expand(PCollection<FailsafeElement<T, String>> failsafeElements) {
  return failsafeElements.apply(
      "JsonToTableRow",
      ParDo.of(
              new DoFn<FailsafeElement<T, String>, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext context) {
                  FailsafeElement<T, String> element = context.element();
                  String json = element.getPayload();

                  try {
                    TableRow row = convertJsonToTableRow(json);
                    context.output(row);
                  } catch (Exception e) {
                    context.output(
                        failureTag(),
                        FailsafeElement.of(element)
                            .setErrorMessage(e.getMessage())
                            .setStacktrace(Throwables.getStackTraceAsString(e)));
                  }
                }
              })
          .withOutputTags(successTag(), TupleTagList.of(failureTag())));
}
 
示例7
/**
 * @param filteredIndexes
 * @return
 */
private static PCollection<ContentIndexSummary> enrichWithCNLP(
		PCollection<ContentIndexSummary> filteredIndexes, Float ratio) {
	
	PCollectionTuple splitAB = filteredIndexes
		.apply(ParDo.of(new SplitAB(ratio))
			.withOutputTags(PipelineTags.BranchA,  
				TupleTagList.of(PipelineTags.BranchB))); 
	
	PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
	PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
	
	PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
		ParDo.of(new EnrichWithCNLPEntities()));
	
	//Merge all collections with WebResource table records
	PCollectionList<ContentIndexSummary> contentIndexSummariesList = 
		PCollectionList.of(branchACol).and(enrichedBCol);
	PCollection<ContentIndexSummary> allIndexSummaries = 
		contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());

	filteredIndexes = allIndexSummaries;
	return filteredIndexes;
}
 
示例8
@Test
public void testFinishBundle() throws Exception {
  Pipeline p = Pipeline.create();
  SdkComponents sdkComponents = SdkComponents.create();
  sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
  ParDoPayload payload =
      ParDoTranslation.translateParDo(
          ParDo.of(new FinishBundleDoFn())
              .withOutputTags(new TupleTag<>(), TupleTagList.empty()),
          PCollection.createPrimitiveOutputInternal(
              p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of()),
          DoFnSchemaInformation.create(),
          TestPipeline.create(),
          sdkComponents);

  assertTrue(payload.getRequestsFinalization());
}
 
示例9
static PCollectionTuple applyTransform(
    PCollection<Integer> numbers, TupleTag<Integer> numBelow100Tag,
    TupleTag<Integer> numAbove100Tag) {

  return numbers.apply(ParDo.of(new DoFn<Integer, Integer>() {

    @ProcessElement
    public void processElement(@Element Integer number, MultiOutputReceiver out) {
      if (number <= 100) {
        out.get(numBelow100Tag).output(number);
      } else {
        out.get(numAbove100Tag).output(number);
      }
    }

  }).withOutputTags(numBelow100Tag, TupleTagList.of(numAbove100Tag)));
}
 
示例10
@Override
public ParseResult expand(PCollection<String> jsonStrings) {

  PCollectionTuple result =
      jsonStrings.apply(
          ParDo.of(ParseWithError.create(this))
              .withOutputTags(PARSED_LINE, TupleTagList.of(PARSE_ERROR)));

  PCollection<Row> failures;

  if (getExtendedErrorInfo()) {
    failures =
        result.get(PARSE_ERROR).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
  } else {
    failures = result.get(PARSE_ERROR).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
  }

  return ParseResult.resultBuilder()
      .setCallingPipeline(jsonStrings.getPipeline())
      .setJsonToRowWithErrFn(this)
      .setParsedLine(result.get(PARSED_LINE).setRowSchema(this.getSchema()))
      .setFailedParse(failures)
      .build();
}
 
示例11
@Override
public PCollectionList<T> expand(PCollection<T> in) {
  final TupleTagList outputTags = partitionDoFn.getOutputTags();

  PCollectionTuple outputs =
      in.apply(
          ParDo.of(partitionDoFn)
              .withOutputTags(new TupleTag<Void>() {}, outputTags)
              .withSideInputs(partitionDoFn.getSideInputs()));

  PCollectionList<T> pcs = PCollectionList.empty(in.getPipeline());
  Coder<T> coder = in.getCoder();

  for (TupleTag<?> outputTag : outputTags.getAll()) {
    // All the tuple tags are actually TupleTag<T>
    // And all the collections are actually PCollection<T>
    @SuppressWarnings("unchecked")
    TupleTag<T> typedOutputTag = (TupleTag<T>) outputTag;
    pcs = pcs.and(outputs.get(typedOutputTag).setCoder(coder));
  }
  return pcs;
}
 
示例12
/**
 * Constructs a PartitionDoFn.
 *
 * @throws IllegalArgumentException if {@code numPartitions <= 0}
 */
private PartitionDoFn(
    int numPartitions,
    Contextful<Contextful.Fn<X, Integer>> ctxFn,
    Object originalFnClassForDisplayData) {
  this.ctxFn = ctxFn;
  this.originalFnClassForDisplayData = originalFnClassForDisplayData;
  if (numPartitions <= 0) {
    throw new IllegalArgumentException("numPartitions must be > 0");
  }

  this.numPartitions = numPartitions;

  TupleTagList buildOutputTags = TupleTagList.empty();
  for (int partition = 0; partition < numPartitions; partition++) {
    buildOutputTags = buildOutputTags.and(new TupleTag<X>());
  }
  outputTags = buildOutputTags;
}
 
示例13
@Test
@Ignore(
    "TODO: BEAM-2902 Add support for user state in a ParDo.Multi once PTransformMatcher "
        + "exposes a way to know when the replacement is not required by checking that the "
        + "preceding ParDos to a GBK are key preserving.")
public void testFnApiMultiOutputOverrideNonCrashing() throws Exception {
  DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
  options.setRunner(DataflowRunner.class);
  Pipeline pipeline = Pipeline.create(options);

  TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
  TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};

  DummyStatefulDoFn fn = new DummyStatefulDoFn();
  pipeline
      .apply(Create.of(KV.of(1, 2)))
      .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));

  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
 
示例14
@Test
public void parDoRequiresStableInput() {
  DoFn<Object, Object> doFnRSI =
      new DoFn<Object, Object>() {
        @RequiresStableInput
        @ProcessElement
        public void process(ProcessContext ctxt) {}
      };

  AppliedPTransform<?, ?, ?> single = getAppliedTransform(ParDo.of(doFn));
  AppliedPTransform<?, ?, ?> singleRSI = getAppliedTransform(ParDo.of(doFnRSI));
  AppliedPTransform<?, ?, ?> multi =
      getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
  AppliedPTransform<?, ?, ?> multiRSI =
      getAppliedTransform(
          ParDo.of(doFnRSI).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

  assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(single), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(singleRSI), is(true));
  assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multi), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multiRSI), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(single), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(singleRSI), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multi), is(false));
  assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multiRSI), is(true));
}
 
示例15
private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
  PCollectionView<String> view = pipeline.apply(Create.of("foo")).apply(View.asSingleton());
  PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
  ParDo.MultiOutput<Long, KV<Long, String>> parDo =
      ParDo.of(new TestDoFn())
          .withSideInputs(view)
          .withOutputTags(
              new TupleTag<KV<Long, String>>() {},
              TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
  PCollectionTuple output = input.apply(parDo);

  Map<TupleTag<?>, PValue> inputs = new HashMap<>();
  inputs.putAll(parDo.getAdditionalInputs());
  inputs.putAll(input.expand());

  return AppliedPTransform
      .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
          "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
 
示例16
@Test
public void testMultiOutputOverrideNonCrashing() throws Exception {
  DataflowPipelineOptions options = buildPipelineOptions();
  options.setRunner(DataflowRunner.class);
  Pipeline pipeline = Pipeline.create(options);

  TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
  TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};

  DummyStatefulDoFn fn = new DummyStatefulDoFn();
  pipeline
      .apply(Create.of(KV.of(1, 2)))
      .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));

  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
 
示例17
@Test
public void testTaggedOutputUnregisteredExplicitCoder() throws Exception {
  pipeline.enableAbandonedNodeEnforcement(false);

  PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));

  final TupleTag<Integer> mainOutputTag = new TupleTag<>("main");
  final TupleTag<TestDummy> additionalOutputTag = new TupleTag<>("unregisteredSide");
  ParDo.MultiOutput<Integer, Integer> pardo =
      ParDo.of(new TaggedOutputDummyFn(mainOutputTag, additionalOutputTag))
          .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag));
  PCollectionTuple outputTuple = input.apply(pardo);

  outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder());

  outputTuple.get(additionalOutputTag).apply(View.asSingleton());

  assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder());
  outputTuple
      .get(additionalOutputTag)
      .finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes
  assertEquals(
      new TestDummyCoder(),
      outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption
}
 
示例18
private void testAdditionalOutput(IsBounded bounded) {
  TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
  TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};

  PCollectionTuple res =
      p.apply("input", Create.of(0, 1, 2))
          .apply(
              ParDo.of(sdfWithAdditionalOutput(bounded, additionalOutputTag))
                  .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));

  PAssert.that(res.get(mainOutputTag))
      .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
  PAssert.that(res.get(additionalOutputTag))
      .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));

  p.run();
}
 
示例19
private static Pipeline createPipeline(
    PipelineOptions options, String singleOutputPrefix, String multiOutputPrefix) {
  Pipeline p = Pipeline.create(options);

  SerializableFunction<Void, Void> firstTime =
      (SerializableFunction<Void, Void>)
          value -> {
            latch.countDown();
            return null;
          };

  PCollection<String> impulse = p.apply("CreatePCollectionOfOneValue", Create.of(VALUE));
  impulse
      .apply(
          "Single-PairWithRandomKey",
          MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn()))
      .apply(
          "Single-MakeSideEffectAndThenFail",
          ParDo.of(
              new RequiresStableInputIT.MakeSideEffectAndThenFailFn(
                  singleOutputPrefix, firstTime)));
  impulse
      .apply(
          "Multi-PairWithRandomKey",
          MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn()))
      .apply(
          "Multi-MakeSideEffectAndThenFail",
          ParDo.of(
                  new RequiresStableInputIT.MakeSideEffectAndThenFailFn(
                      multiOutputPrefix, firstTime))
              .withOutputTags(new TupleTag<>(), TupleTagList.empty()));

  return p;
}
 
示例20
@Test
public void parDoWithState() {
  AppliedPTransform<?, ?, ?> statefulApplication =
      getAppliedTransform(
          ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
  assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));

  AppliedPTransform<?, ?, ?> splittableApplication =
      getAppliedTransform(
          ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
  assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
}
 
示例21
@Override
public PCollectionTuple expand(PCollection<FeatureRowProto.FeatureRow> input) {
  return input.apply(
      "AssignRowToStore",
      ParDo.of(
              new DoFn<FeatureRowProto.FeatureRow, FeatureRowProto.FeatureRow>() {
                @ProcessElement
                public void process(ProcessContext c, @Element FeatureRowProto.FeatureRow row) {
                  Pair<String, String> projectAndSetNames =
                      parseFeatureSetReference(row.getFeatureSet());
                  getStores().stream()
                      .filter(
                          s ->
                              Store.isSubscribedToFeatureSet(
                                  s.getSubscriptionsList(),
                                  projectAndSetNames.getLeft(),
                                  projectAndSetNames.getRight()))
                      .forEach(s -> c.output(getStoreTags().get(s), row));
                }
              })
          .withOutputTags(
              getStoreTags().get(getStores().get(0)),
              TupleTagList.of(
                  getStores().stream()
                      .skip(1)
                      .map(getStoreTags()::get)
                      .collect(Collectors.toList()))));
}
 
示例22
@Override
public PCollectionTuple expand(PCollection<TableRow> input) {

  PCollectionTuple udfOut;

  PCollectionTuple failsafeTableRows =
      input.apply(
          "TableRowToFailsafeElement",
          ParDo.of(new TableRowToFailsafeElementFn(transformDeadletterOutTag()))
              .withOutputTags(transformOutTag(), TupleTagList.of(transformDeadletterOutTag())));

  // Use Udf to parse table rows if supplied.
  if (options().getJavascriptTextTransformGcsPath() != null) {
    udfOut =
        failsafeTableRows
            .get(transformOutTag())
            .apply(
                "ProcessFailsafeRowsUdf",
                JavascriptTextTransformer.FailsafeJavascriptUdf.<TableRow>newBuilder()
                    .setFileSystemPath(options().getJavascriptTextTransformGcsPath())
                    .setFunctionName(options().getJavascriptTextTransformFunctionName())
                    .setSuccessTag(udfOutTag())
                    .setFailureTag(udfDeadletterOutTag())
                    .build());

    PCollection<FailsafeElement<TableRow, String>> failedOut =
        PCollectionList.of(udfOut.get(udfDeadletterOutTag()))
            .and(failsafeTableRows.get(transformDeadletterOutTag()))
            .apply("FlattenFailedOut", Flatten.pCollections());

    return PCollectionTuple.of(transformOutTag(), udfOut.get(udfOutTag()))
        .and(transformDeadletterOutTag(), failedOut);
  } else {
    return failsafeTableRows;
  }
}
 
示例23
@Test
public void parDoWithFnTypeWithMatchingType() {
  DoFn<Object, Object> fn =
      new DoFn<Object, Object>() {
        @ProcessElement
        public void process(ProcessContext ctxt) {}
      };
  AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
  AppliedPTransform<?, ?, ?> parDoMulti =
      getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

  PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(fn.getClass());
  assertThat(matcher.matches(parDoSingle), is(true));
  assertThat(matcher.matches(parDoMulti), is(true));
}
 
示例24
public static Result of(PCollectionTuple pct) throws IllegalArgumentException {
  if (pct.getAll()
      .keySet()
      .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
    return new Result(pct);
  } else {
    throw new IllegalArgumentException(
        "The PCollection tuple must have the HL7v2IO.Read.OUT "
            + "and HL7v2IO.Read.DEAD_LETTER tuple tags");
  }
}
 
示例25
@Test
public void parDoMultiWithState() {
  AppliedPTransform<?, ?, ?> parDoApplication =
      getAppliedTransform(
          ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
  assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));

  assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
  assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
  assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
 
示例26
public GbkThenStatefulParDo(
    DoFn<KV<K, InputT>, OutputT> doFn,
    TupleTag<OutputT> mainOutputTag,
    TupleTagList additionalOutputTags,
    List<PCollectionView<?>> sideInputs,
    DoFnSchemaInformation doFnSchemaInformation,
    Map<String, PCollectionView<?>> sideInputMapping) {
  this.doFn = doFn;
  this.additionalOutputTags = additionalOutputTags;
  this.mainOutputTag = mainOutputTag;
  this.sideInputs = sideInputs;
  this.doFnSchemaInformation = doFnSchemaInformation;
  this.sideInputMapping = sideInputMapping;
}
 
示例27
@Override
public Result expand(PCollection<String> msgIds) {
  CoderRegistry coderRegistry = msgIds.getPipeline().getCoderRegistry();
  coderRegistry.registerCoderForClass(HL7v2Message.class, HL7v2MessageCoder.of());
  return new Result(
      msgIds.apply(
          ParDo.of(new FetchHL7v2Message.HL7v2MessageGetFn())
              .withOutputTags(HL7v2IO.Read.OUT, TupleTagList.of(HL7v2IO.Read.DEAD_LETTER))));
}
 
示例28
/**
 * @param contentToIndexNotSkipped
 * @param contentNotToIndexSkipped
 * @param pipeline
 * @param options
 * @return
 */
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
		PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
		Pipeline pipeline, IndexerPipelineOptions options) {
	PCollection<KV<String,Long>> alreadyProcessedDocs = null;
	
	if (!options.getWriteTruncate()) {
		String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
		alreadyProcessedDocs = pipeline
			.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
			.apply(ParDo.of(new GetDocumentHashFn()));

	} else {
		Map<String, Long> map = new HashMap<String,Long>();
		alreadyProcessedDocs = pipeline
			.apply("Create empty side input of Docs",
				Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
	}			
	
	final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =  
		alreadyProcessedDocs.apply(View.<String,Long>asMap());
	
	PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
		.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
		.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
		.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
			.withSideInputs(alreadyProcessedDocsSideInput)
			.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
				TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection	
	
	PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
	PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
	
	// Merge the sets of items that are dupes or skipped
	PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
	
	ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
	return content;
}
 
示例29
@Test
public void retainOnlyPrimitivesWithOnlyPrimitivesUnchanged() {
  Pipeline p = Pipeline.create();
  p.apply("Read", Read.from(CountingSource.unbounded()))
      .apply(
          "multi-do",
          ParDo.of(new TestFn()).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

  Components originalComponents = PipelineTranslation.toProto(p).getComponents();
  Collection<String> primitiveComponents =
      QueryablePipeline.getPrimitiveTransformIds(originalComponents);

  assertThat(primitiveComponents, equalTo(originalComponents.getTransformsMap().keySet()));
}
 
示例30
@Test
public void parDoMulti() {
  AppliedPTransform<?, ?, ?> parDoApplication =
      getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

  assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
  assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
  assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
  assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}