Java源码示例:org.apache.beam.sdk.transforms.GroupByKey

示例1
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
  return input
      .apply(
          Window.<KV<K, V>>into(new GlobalWindows())
              .discardingFiredPanes()
              .triggering(
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(Duration.ZERO)
                          .alignedTo(intervalDuration, org.joda.time.Instant.now()))))
      .apply(GroupByKey.create())
      .apply(
          ParDo.of(
              new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                  LOG.debug(
                      "TS: {} | Element: {} | Pane: {}", c.timestamp(), c.element(), c.pane());
                  Iterator<V> it = c.element().getValue().iterator();
                  if (it.hasNext()) {
                    c.output(KV.of(c.element().getKey(), it.next()));
                  }
                }
              }));
}
 
示例2
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
  return input
      .apply(
          Window.<KV<K, V>>into(new GlobalWindows())
              .discardingFiredPanes()
              .triggering(
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(Duration.ZERO)
                          .alignedTo(intervalDuration, org.joda.time.Instant.now()))))
      .apply(GroupByKey.create())
      .apply(
          ParDo.of(
              new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                  LOG.debug(
                      "TS: {} | Element: {} | Pane: {}", c.timestamp(), c.element(), c.pane());
                  Iterator<V> it = c.element().getValue().iterator();
                  if (it.hasNext()) {
                    c.output(KV.of(c.element().getKey(), it.next()));
                  }
                }
              }));
}
 
示例3
/**
 * @param Document indexes
 * @return a POJO containing 2 PCollections: Unique docs, and Duplicates
 */
private static ContentDuplicateOrNot filterSoftDuplicates(
		PCollection<ContentIndexSummary> indexes) {
	// 
	PCollectionTuple dedupeOrNot = indexes
		.apply("Extract Text grouping key", 
			ParDo.of(new GetContentIndexSummaryKeyFn()))
		.apply("Group by Text grouping key", 
			GroupByKey.<ContentSoftDeduplicationKey, ContentIndexSummary>create())
		.apply("Eliminate Text dupes", 
			ParDo.of(new EliminateTextDupes())
				.withOutputTags(PipelineTags.indexedContentNotToDedupeTag, 
					TupleTagList.of(PipelineTags.indexedContentToDedupeTag))); 	
		
	PCollection<TableRow> dedupedWebresources = 
		dedupeOrNot.get(PipelineTags.indexedContentToDedupeTag)
			.apply(ParDo.of(new CreateWebresourceTableRowFromDupeIndexSummaryFn()));
	
	ContentDuplicateOrNot contentDuplicateOrNot = new ContentDuplicateOrNot(
		dedupeOrNot.get(PipelineTags.indexedContentNotToDedupeTag),
		dedupedWebresources);
	
	return contentDuplicateOrNot;
}
 
示例4
@Test
public void testSecondaryKeySorting() {
  // Create a PCollection of <Key, <SecondaryKey, Value>> pairs.
  PCollection<KV<String, KV<String, Integer>>> input =
      p.apply(
          Create.of(
              Arrays.asList(
                  KV.of("key1", KV.of("secondaryKey2", 20)),
                  KV.of("key2", KV.of("secondaryKey2", 200)),
                  KV.of("key1", KV.of("secondaryKey3", 30)),
                  KV.of("key1", KV.of("secondaryKey1", 10)),
                  KV.of("key2", KV.of("secondaryKey1", 100)))));

  // Group by Key, bringing <SecondaryKey, Value> pairs for the same Key together.
  PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
      input.apply(GroupByKey.create());

  // For every Key, sort the iterable of <SecondaryKey, Value> pairs by SecondaryKey.
  PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
      grouped.apply(SortValues.create(BufferedExternalSorter.options()));

  PAssert.that(groupedAndSorted)
      .satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting());

  p.run();
}
 
示例5
@Override
void loadTest() throws IOException {
  Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());

  PCollection<KV<byte[], byte[]>> input =
      pipeline
          .apply("Read input", readFromSource(sourceOptions))
          .apply("Collect start time metrics", ParDo.of(runtimeMonitor))
          .apply(
              "Total bytes monitor",
              ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));

  input = applyWindowing(input);

  for (int branch = 0; branch < options.getFanout(); branch++) {
    applyStepIfPresent(input, format("Synthetic step (%s)", branch), syntheticStep)
        .apply(format("Group by key (%s)", branch), GroupByKey.create())
        .apply(
            format("Ungroup and reiterate (%s)", branch),
            ParDo.of(new UngroupAndReiterate(options.getIterations())))
        .apply(format("Collect end time metrics (%s)", branch), ParDo.of(runtimeMonitor));
  }
}
 
示例6
@Override
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
  WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

  return input
      .apply(Reify.windows())
      .apply(
          WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
              .withKeyType(new TypeDescriptor<Integer>() {}))
      .apply(
          Window.into(
                  new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
                      originalWindowFn.windowCoder()))
              .triggering(Never.ever())
              .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
              .discardingFiredPanes())
      // all values have the same key so they all appear as a single output element
      .apply(GroupByKey.create())
      .apply(Values.create())
      .setWindowingStrategyInternal(input.getWindowingStrategy());
}
 
示例7
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
  List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
  if (numShardsView != null) {
    shardingSideInputs.add(numShardsView);
  }

  ShardingFunction<UserT, DestinationT> shardingFunction =
      getShardingFunction() == null
          ? new RandomShardingFunction(destinationCoder)
          : getShardingFunction();

  return input
      .apply(
          "ApplyShardingKey",
          ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
              .withSideInputs(shardingSideInputs))
      .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
      .apply("GroupIntoShards", GroupByKey.create())
      .apply(
          "WriteShardsIntoTempFiles",
          ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
      .setCoder(fileResultCoder);
}
 
示例8
@Override
public PDone expand(PCollection<KV<K, V>> in) {
    // Make sure that a window has been applied.
    in = ofDefaultWindow(in);

    // Add an artificial GroupByKey to collect the window results together.
    PCollection<KV<Instant, KV<K, V>>> pc2 =
            in.apply("GroupToOneShard", ParDo.of(new GroupToOneShard<KV<K, V>>())).setCoder(
                    KvCoder.of(InstantCoder.of(), in.getCoder()));

    PCollection<KV<Instant, Iterable<KV<K, V>>>> pc3 = pc2.apply(GroupByKey.<Instant, KV<K, V>> create());

    pc3.apply("UnboundedWrite", ParDo.of(new UnboundedWriteToFile<K, V>(sink)));

    return PDone.in(in.getPipeline());
}
 
示例9
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testElementsAtAlmostPositiveInfinity() {
  Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
  TestStream<String> stream =
      TestStream.create(StringUtf8Coder.of())
          .addElements(
              TimestampedValue.of("foo", endOfGlobalWindow),
              TimestampedValue.of("bar", endOfGlobalWindow))
          .advanceWatermarkToInfinity();

  FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
  PCollection<String> windowedValues =
      p.apply(stream)
          .apply(into(windows))
          .apply(WithKeys.of(1))
          .apply(GroupByKey.create())
          .apply(Values.create())
          .apply(Flatten.iterables());

  PAssert.that(windowedValues)
      .inWindow(windows.assignWindow(endOfGlobalWindow))
      .containsInAnyOrder("foo", "bar");
  p.run();
}
 
示例10
@Test
public void keyedBundleWorkingCoderSucceedsClonesOutput() {
  PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));

  PCollection<KV<String, Iterable<Integer>>> keyed =
      created
          .apply(WithKeys.of("foo"))
          .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
          .apply(GroupByKey.create());
  WindowedValue<KV<String, Iterable<Integer>>> foos =
      WindowedValue.valueInGlobalWindow(
          KV.<String, Iterable<Integer>>of("foo", ImmutableList.of(1, 3)));
  CommittedBundle<KV<String, Iterable<Integer>>> keyedBundle =
      factory
          .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), keyed)
          .add(foos)
          .commit(Instant.now());

  assertThat(keyedBundle.getElements(), containsInAnyOrder(foos));
  assertThat(
      Iterables.getOnlyElement(keyedBundle.getElements()).getValue(),
      not(theInstance(foos.getValue())));
  assertThat(keyedBundle.getPCollection(), equalTo(keyed));
  assertThat(keyedBundle.getKey(), equalTo(StructuralKey.of("foo", StringUtf8Coder.of())));
}
 
示例11
@Test
public void testGroupByKey() {
  List<KV<Integer, Integer>> elems = new ArrayList<>();
  elems.add(KV.of(1, 1));
  elems.add(KV.of(1, 3));
  elems.add(KV.of(1, 5));
  elems.add(KV.of(2, 2));
  elems.add(KV.of(2, 4));
  elems.add(KV.of(2, 6));

  PCollection<KV<Integer, Iterable<Integer>>> input =
      pipeline.apply(Create.of(elems)).apply(GroupByKey.create());
  PAssert.thatMap(input)
      .satisfies(
          results -> {
            assertThat(results.get(1), containsInAnyOrder(1, 3, 5));
            assertThat(results.get(2), containsInAnyOrder(2, 4, 6));
            return null;
          });
  pipeline.run();
}
 
示例12
@Test
public void shouldCacheTest() {
  SparkPipelineOptions options = createOptions();
  options.setCacheDisabled(true);
  Pipeline pipeline = Pipeline.create(options);

  Values<String> valuesTransform = Create.of("foo", "bar");
  PCollection pCollection = mock(PCollection.class);

  JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
  EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
  ctxt.getCacheCandidates().put(pCollection, 2L);

  assertFalse(ctxt.shouldCache(valuesTransform, pCollection));

  options.setCacheDisabled(false);
  assertTrue(ctxt.shouldCache(valuesTransform, pCollection));

  GroupByKey<String, String> gbkTransform = GroupByKey.create();
  assertFalse(ctxt.shouldCache(gbkTransform, pCollection));
}
 
示例13
@Test
public void testElementsAtAlmostPositiveInfinity() throws IOException {
  Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
  CreateStream<String> source =
      CreateStream.of(StringUtf8Coder.of(), batchDuration())
          .nextBatch(
              TimestampedValue.of("foo", endOfGlobalWindow),
              TimestampedValue.of("bar", endOfGlobalWindow))
          .advanceNextBatchWatermarkToInfinity();

  FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
  PCollection<String> windowedValues =
      p.apply(source)
          .apply(Window.into(windows))
          .apply(WithKeys.of(1))
          .apply(GroupByKey.create())
          .apply(Values.create())
          .apply(Flatten.iterables());

  PAssert.that(windowedValues)
      .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
      .containsInAnyOrder("foo", "bar");
  p.run();
}
 
示例14
@Test
public void testDisabledReIterationThrowsAnException() {
  // If output during closing is not supported, we can not chain DoFns and results
  // are therefore materialized during output serialization.
  Assume.assumeTrue(FlinkCapabilities.supportsOutputDuringClosing());
  final Pipeline p = FlinkTestPipeline.createForBatch();
  p.apply(Create.of(Arrays.asList(KV.of("a", 1), KV.of("b", 2), KV.of("c", 3))))
      .apply(GroupByKey.create())
      .apply(ParDo.of(new ReiterateDoFn<>()));
  Pipeline.PipelineExecutionException resultException = null;
  try {
    p.run().waitUntilFinish();
  } catch (Pipeline.PipelineExecutionException exception) {
    resultException = exception;
  }
  Assert.assertEquals(
      IllegalStateException.class, Objects.requireNonNull(resultException).getCause().getClass());
  Assert.assertTrue(
      resultException.getCause().getMessage().contains("GBK result is not re-iterable."));
}
 
示例15
@SuppressWarnings("unchecked")
private static <K, InputT, OutputT>
    SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
        Pipeline pipeline,
        KvCoder<K, InputT> kvInputCoder) {
  if (transform instanceof GroupByKey) {
    return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
        SystemReduceFn.buffering(kvInputCoder.getValueCoder());
  } else if (transform instanceof Combine.PerKey) {
    final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> combineFn =
        ((Combine.PerKey) transform).getFn();
    return SystemReduceFn.combining(
        kvInputCoder.getKeyCoder(),
        AppliedCombineFn.withInputCoder(combineFn, pipeline.getCoderRegistry(), kvInputCoder));
  } else {
    throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
  }
}
 
示例16
@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
  return input
      .apply("GroupAll", GroupByKey.create())
      .apply(
          "SplitIntoBatches",
          ParDo.of(
              new DoFn<KV<K, Iterable<V>>, KV<K, Iterable<V>>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                  // Iterators.partition lazily creates the partitions as they are accessed
                  // allowing it to partition very large iterators.
                  Iterator<List<V>> iterator =
                      Iterators.partition(c.element().getValue().iterator(), (int) batchSize);

                  // Note that GroupIntoBatches only outputs when the batch is non-empty.
                  while (iterator.hasNext()) {
                    c.output(KV.of(c.element().getKey(), iterator.next()));
                  }
                }
              }));
}
 
示例17
@Test
public void testInvalidWindowsService() {
  Pipeline p = createTestServiceRunner();

  List<KV<String, Integer>> ungroupedPairs = Arrays.asList();

  PCollection<KV<String, Integer>> input =
      p.apply(
              Create.of(ungroupedPairs)
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))));

  thrown.expect(IllegalStateException.class);
  thrown.expectMessage("GroupByKey must have a valid Window merge function");
  input.apply("GroupByKey", GroupByKey.create()).apply("GroupByKeyAgain", GroupByKey.create());
}
 
示例18
/** Creates a simple pipeline with a {@link Combine.GroupedValues} with side inputs. */
private static TestPipeline createCombineGroupedValuesWithSideInputsPipeline() {
  TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
  PCollection<KV<String, Integer>> input =
      pipeline
          .apply(Create.of(KV.of("key", 1)))
          .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
  PCollection<String> sideInput = pipeline.apply(Create.of("side input"));
  PCollectionView<String> sideInputView = sideInput.apply(View.asSingleton());

  input
      .apply(GroupByKey.create())
      .apply(
          Combine.<String, Integer, Integer>groupedValues(new SumCombineFnWithContext())
              .withSideInputs(sideInputView));

  return pipeline;
}
 
示例19
/**
 * @param input PCollection of variants to process.
 * @return PCollection of variant-only Variant objects with calls from non-variant-segments
 *     merged into the SNP variants with which they overlap.
 */
@Override
public PCollection<Variant> expand(PCollection<Variant> input) {
  return input
      .apply(ParDo.of(new BinVariantsFn()))
      .apply(GroupByKey.<KV<String, Long>, Variant>create())
      .apply(ParDo.of(new CombineVariantsFn()));
}
 
示例20
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {

  int numShards = spec.getNumShards();
  if (numShards <= 0) {
    try (Consumer<?, ?> consumer = openConsumer(spec)) {
      numShards = consumer.partitionsFor(spec.getTopic()).size();
      LOG.info(
          "Using {} shards for exactly-once writer, matching number of partitions "
              + "for topic '{}'",
          numShards,
          spec.getTopic());
    }
  }
  checkState(numShards > 0, "Could not set number of shards");

  return input
      .apply(
          Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
              .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
              .discardingFiredPanes())
      .apply(
          String.format("Shuffle across %d shards", numShards),
          ParDo.of(new Reshard<>(numShards)))
      .apply("Persist sharding", GroupByKey.create())
      .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
      .apply("Persist ids", GroupByKey.create())
      .apply(
          String.format("Write to Kafka topic '%s'", spec.getTopic()),
          ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}
 
示例21
/**
 * @param contentToIndexNotSkipped
 * @param contentNotToIndexSkipped
 * @param pipeline
 * @param options
 * @return
 */
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
		PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
		Pipeline pipeline, IndexerPipelineOptions options) {
	PCollection<KV<String,Long>> alreadyProcessedDocs = null;
	
	if (!options.getWriteTruncate()) {
		String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
		alreadyProcessedDocs = pipeline
			.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
			.apply(ParDo.of(new GetDocumentHashFn()));

	} else {
		Map<String, Long> map = new HashMap<String,Long>();
		alreadyProcessedDocs = pipeline
			.apply("Create empty side input of Docs",
				Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
	}			
	
	final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =  
		alreadyProcessedDocs.apply(View.<String,Long>asMap());
	
	PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
		.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
		.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
		.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
			.withSideInputs(alreadyProcessedDocsSideInput)
			.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
				TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection	
	
	PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
	PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
	
	// Merge the sets of items that are dupes or skipped
	PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
	
	ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
	return content;
}
 
示例22
@Override
public void translateNode(GroupByKey<K, V> transform, Twister2BatchTranslationContext context) {
  PCollection<KV<K, V>> input = context.getInput(transform);
  BatchTSetImpl<WindowedValue<KV<K, V>>> inputTTSet = context.getInputDataSet(input);
  final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
  Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
  WindowingStrategy windowingStrategy = input.getWindowingStrategy();
  WindowFn<KV<K, V>, BoundedWindow> windowFn =
      (WindowFn<KV<K, V>, BoundedWindow>) windowingStrategy.getWindowFn();
  final WindowedValue.WindowedValueCoder<V> wvCoder =
      WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
  KeyedTSet<byte[], byte[]> keyedTSet =
      inputTTSet.mapToTuple(new MapToTupleFunction<K, V>(inputKeyCoder, wvCoder));

  // todo add support for a partition function to be specified, this would use
  // todo keyedPartition function instead of KeyedGather
  ComputeTSet<KV<K, Iterable<WindowedValue<V>>>, Iterator<Tuple<byte[], Iterator<byte[]>>>>
      groupedbyKeyTset =
      keyedTSet.keyedGather().map(new ByteToWindowFunction(inputKeyCoder, wvCoder));

  // --- now group also by window.
  ComputeTSet<WindowedValue<KV<K, Iterable<V>>>, Iterable<KV<K, Iterator<WindowedValue<V>>>>>
      outputTset =
      groupedbyKeyTset
          .direct()
          .<WindowedValue<KV<K, Iterable<V>>>>flatmap(
              new GroupByWindowFunction(
                  windowingStrategy,
                  SystemReduceFn.buffering(coder.getValueCoder())));
  PCollection output = context.getOutput(transform);
  context.setOutputDataSet(output, outputTset);
}
 
示例23
@Override
public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
  // Apply a GroupByKey transform to collect a list of all station
  // readings for a given route.
  PCollection<KV<String, Iterable<StationSpeed>>> timeGroup =
      stationSpeed.apply(GroupByKey.create());

  // Analyze 'slowdown' over the route readings.
  PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));

  // Format the results for writing to BigQuery
  PCollection<TableRow> results = stats.apply(ParDo.of(new FormatStatsFn()));

  return results;
}
 
示例24
static PCollection<KV<String, Iterable<String>>> applyTransform(PCollection<String> input) {
  return input
      .apply(MapElements.into(kvs(strings(), strings()))
          .via(word -> KV.of(word.substring(0, 1), word)))

      .apply(GroupByKey.create());
}
 
示例25
@Test
public void traverseMultipleTimesThrows() {
  p.apply(
          Create.of(KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null))
              .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of())))
      .apply(GroupByKey.create())
      .apply(Keys.create());

  p.traverseTopologically(visitor);
  thrown.expect(IllegalStateException.class);
  thrown.expectMessage("already been finalized");
  thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
  p.traverseTopologically(visitor);
}
 
示例26
@Override
public PCollection<Iterable<InputT>> expand(PCollection<InputT> input) {
  return input
      .apply("addNullKey", WithKeys.of((Void) null))
      .apply("group", GroupByKey.create())
      .apply("extractValues", Values.create());
}
 
示例27
@Override
public PCollection<KV<Row, Iterable<Row>>> expand(PCollection<InputT> input) {
  Schema schema = input.getSchema();
  FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
  rowSelector = new RowSelectorContainer(schema, resolved, true);
  Schema keySchema = getKeySchema(schema);

  return input
      .apply("toRow", Convert.toRows())
      .apply(
          "selectKeys",
          WithKeys.of((Row e) -> rowSelector.select(e)).withKeyType(TypeDescriptors.rows()))
      .setCoder(KvCoder.of(SchemaCoder.of(keySchema), SchemaCoder.of(schema)))
      .apply("GroupByKey", GroupByKey.create());
}
 
示例28
/**
 * Tests that when two elements are combined via a GroupByKey their output timestamp agrees with
 * the windowing function default, the end of the window.
 */
@Test
@Category(ValidatesRunner.class)
public void testTimestampCombinerDefault() {
  pipeline.enableAbandonedNodeEnforcement(true);

  pipeline
      .apply(
          Create.timestamped(
              TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
              TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10))))
      .apply(GroupByKey.create())
      .apply(
          ParDo.of(
              new DoFn<KV<Integer, Iterable<String>>, Void>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                  assertThat(
                      c.timestamp(),
                      equalTo(
                          new IntervalWindow(
                                  new Instant(0),
                                  new Instant(0).plus(Duration.standardMinutes(10)))
                              .maxTimestamp()));
                }
              }));

  pipeline.run();
}
 
示例29
@Override
public PCollection<Read> expand(PCollection<String> readGroupSetIds) {
  return readGroupSetIds.apply(ParDo.of(new CreateReadRequests()))
      // Force a shuffle operation here to break the fusion of these steps.
      // By breaking fusion, the work will be distributed to all available workers.
      .apply(GroupByKey.<Integer, StreamReadsRequest>create())
      .apply(ParDo.of(new ConvergeStreamReadsRequestList()))
      .apply(new ReadStreamer(auth, ShardBoundary.Requirement.STRICT, fields));
}
 
示例30
@Test
@Category(NeedsRunner.class)
public void singlePaneSingleReifiedPane() {
  PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes =
      p.apply(GenerateSequence.from(0).to(20000))
          .apply(WithTimestamps.of(input -> new Instant(input * 10)))
          .apply(
              Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
                  .triggering(AfterWatermark.pastEndOfWindow())
                  .withAllowedLateness(Duration.ZERO)
                  .discardingFiredPanes())
          .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
          .apply(GroupByKey.create())
          .apply(Values.create())
          .apply(GatherAllPanes.globally());

  PAssert.that(accumulatedPanes)
      .satisfies(
          input -> {
            for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) {
              if (Iterables.size(windowedInput) > 1) {
                fail("Expected all windows to have exactly one pane, got " + windowedInput);
                return null;
              }
            }
            return null;
          });

  p.run();
}