Java源码示例:org.apache.beam.sdk.transforms.GroupByKey
示例1
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
return input
.apply(
Window.<KV<K, V>>into(new GlobalWindows())
.discardingFiredPanes()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.ZERO)
.alignedTo(intervalDuration, org.joda.time.Instant.now()))))
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
@ProcessElement
public void process(ProcessContext c) {
LOG.debug(
"TS: {} | Element: {} | Pane: {}", c.timestamp(), c.element(), c.pane());
Iterator<V> it = c.element().getValue().iterator();
if (it.hasNext()) {
c.output(KV.of(c.element().getKey(), it.next()));
}
}
}));
}
示例2
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
return input
.apply(
Window.<KV<K, V>>into(new GlobalWindows())
.discardingFiredPanes()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.ZERO)
.alignedTo(intervalDuration, org.joda.time.Instant.now()))))
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
@ProcessElement
public void process(ProcessContext c) {
LOG.debug(
"TS: {} | Element: {} | Pane: {}", c.timestamp(), c.element(), c.pane());
Iterator<V> it = c.element().getValue().iterator();
if (it.hasNext()) {
c.output(KV.of(c.element().getKey(), it.next()));
}
}
}));
}
示例3
/**
* @param Document indexes
* @return a POJO containing 2 PCollections: Unique docs, and Duplicates
*/
private static ContentDuplicateOrNot filterSoftDuplicates(
PCollection<ContentIndexSummary> indexes) {
//
PCollectionTuple dedupeOrNot = indexes
.apply("Extract Text grouping key",
ParDo.of(new GetContentIndexSummaryKeyFn()))
.apply("Group by Text grouping key",
GroupByKey.<ContentSoftDeduplicationKey, ContentIndexSummary>create())
.apply("Eliminate Text dupes",
ParDo.of(new EliminateTextDupes())
.withOutputTags(PipelineTags.indexedContentNotToDedupeTag,
TupleTagList.of(PipelineTags.indexedContentToDedupeTag)));
PCollection<TableRow> dedupedWebresources =
dedupeOrNot.get(PipelineTags.indexedContentToDedupeTag)
.apply(ParDo.of(new CreateWebresourceTableRowFromDupeIndexSummaryFn()));
ContentDuplicateOrNot contentDuplicateOrNot = new ContentDuplicateOrNot(
dedupeOrNot.get(PipelineTags.indexedContentNotToDedupeTag),
dedupedWebresources);
return contentDuplicateOrNot;
}
示例4
@Test
public void testSecondaryKeySorting() {
// Create a PCollection of <Key, <SecondaryKey, Value>> pairs.
PCollection<KV<String, KV<String, Integer>>> input =
p.apply(
Create.of(
Arrays.asList(
KV.of("key1", KV.of("secondaryKey2", 20)),
KV.of("key2", KV.of("secondaryKey2", 200)),
KV.of("key1", KV.of("secondaryKey3", 30)),
KV.of("key1", KV.of("secondaryKey1", 10)),
KV.of("key2", KV.of("secondaryKey1", 100)))));
// Group by Key, bringing <SecondaryKey, Value> pairs for the same Key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.create());
// For every Key, sort the iterable of <SecondaryKey, Value> pairs by SecondaryKey.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(SortValues.create(BufferedExternalSorter.options()));
PAssert.that(groupedAndSorted)
.satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting());
p.run();
}
示例5
@Override
void loadTest() throws IOException {
Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", readFromSource(sourceOptions))
.apply("Collect start time metrics", ParDo.of(runtimeMonitor))
.apply(
"Total bytes monitor",
ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));
input = applyWindowing(input);
for (int branch = 0; branch < options.getFanout(); branch++) {
applyStepIfPresent(input, format("Synthetic step (%s)", branch), syntheticStep)
.apply(format("Group by key (%s)", branch), GroupByKey.create())
.apply(
format("Ungroup and reiterate (%s)", branch),
ParDo.of(new UngroupAndReiterate(options.getIterations())))
.apply(format("Collect end time metrics (%s)", branch), ParDo.of(runtimeMonitor));
}
}
示例6
@Override
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
.apply(Reify.windows())
.apply(
WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
.withKeyType(new TypeDescriptor<Integer>() {}))
.apply(
Window.into(
new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
originalWindowFn.windowCoder()))
.triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
// all values have the same key so they all appear as a single output element
.apply(GroupByKey.create())
.apply(Values.create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
示例7
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
shardingSideInputs.add(numShardsView);
}
ShardingFunction<UserT, DestinationT> shardingFunction =
getShardingFunction() == null
? new RandomShardingFunction(destinationCoder)
: getShardingFunction();
return input
.apply(
"ApplyShardingKey",
ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
.withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
.apply("GroupIntoShards", GroupByKey.create())
.apply(
"WriteShardsIntoTempFiles",
ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder);
}
示例8
@Override
public PDone expand(PCollection<KV<K, V>> in) {
// Make sure that a window has been applied.
in = ofDefaultWindow(in);
// Add an artificial GroupByKey to collect the window results together.
PCollection<KV<Instant, KV<K, V>>> pc2 =
in.apply("GroupToOneShard", ParDo.of(new GroupToOneShard<KV<K, V>>())).setCoder(
KvCoder.of(InstantCoder.of(), in.getCoder()));
PCollection<KV<Instant, Iterable<KV<K, V>>>> pc3 = pc2.apply(GroupByKey.<Instant, KV<K, V>> create());
pc3.apply("UnboundedWrite", ParDo.of(new UnboundedWriteToFile<K, V>(sink)));
return PDone.in(in.getPipeline());
}
示例9
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testElementsAtAlmostPositiveInfinity() {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
TestStream<String> stream =
TestStream.create(StringUtf8Coder.of())
.addElements(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceWatermarkToInfinity();
FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
PCollection<String> windowedValues =
p.apply(stream)
.apply(into(windows))
.apply(WithKeys.of(1))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables());
PAssert.that(windowedValues)
.inWindow(windows.assignWindow(endOfGlobalWindow))
.containsInAnyOrder("foo", "bar");
p.run();
}
示例10
@Test
public void keyedBundleWorkingCoderSucceedsClonesOutput() {
PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of()));
PCollection<KV<String, Iterable<Integer>>> keyed =
created
.apply(WithKeys.of("foo"))
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.apply(GroupByKey.create());
WindowedValue<KV<String, Iterable<Integer>>> foos =
WindowedValue.valueInGlobalWindow(
KV.<String, Iterable<Integer>>of("foo", ImmutableList.of(1, 3)));
CommittedBundle<KV<String, Iterable<Integer>>> keyedBundle =
factory
.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), keyed)
.add(foos)
.commit(Instant.now());
assertThat(keyedBundle.getElements(), containsInAnyOrder(foos));
assertThat(
Iterables.getOnlyElement(keyedBundle.getElements()).getValue(),
not(theInstance(foos.getValue())));
assertThat(keyedBundle.getPCollection(), equalTo(keyed));
assertThat(keyedBundle.getKey(), equalTo(StructuralKey.of("foo", StringUtf8Coder.of())));
}
示例11
@Test
public void testGroupByKey() {
List<KV<Integer, Integer>> elems = new ArrayList<>();
elems.add(KV.of(1, 1));
elems.add(KV.of(1, 3));
elems.add(KV.of(1, 5));
elems.add(KV.of(2, 2));
elems.add(KV.of(2, 4));
elems.add(KV.of(2, 6));
PCollection<KV<Integer, Iterable<Integer>>> input =
pipeline.apply(Create.of(elems)).apply(GroupByKey.create());
PAssert.thatMap(input)
.satisfies(
results -> {
assertThat(results.get(1), containsInAnyOrder(1, 3, 5));
assertThat(results.get(2), containsInAnyOrder(2, 4, 6));
return null;
});
pipeline.run();
}
示例12
@Test
public void shouldCacheTest() {
SparkPipelineOptions options = createOptions();
options.setCacheDisabled(true);
Pipeline pipeline = Pipeline.create(options);
Values<String> valuesTransform = Create.of("foo", "bar");
PCollection pCollection = mock(PCollection.class);
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
ctxt.getCacheCandidates().put(pCollection, 2L);
assertFalse(ctxt.shouldCache(valuesTransform, pCollection));
options.setCacheDisabled(false);
assertTrue(ctxt.shouldCache(valuesTransform, pCollection));
GroupByKey<String, String> gbkTransform = GroupByKey.create();
assertFalse(ctxt.shouldCache(gbkTransform, pCollection));
}
示例13
@Test
public void testElementsAtAlmostPositiveInfinity() throws IOException {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
CreateStream<String> source =
CreateStream.of(StringUtf8Coder.of(), batchDuration())
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceNextBatchWatermarkToInfinity();
FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
PCollection<String> windowedValues =
p.apply(source)
.apply(Window.into(windows))
.apply(WithKeys.of(1))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables());
PAssert.that(windowedValues)
.inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
.containsInAnyOrder("foo", "bar");
p.run();
}
示例14
@Test
public void testDisabledReIterationThrowsAnException() {
// If output during closing is not supported, we can not chain DoFns and results
// are therefore materialized during output serialization.
Assume.assumeTrue(FlinkCapabilities.supportsOutputDuringClosing());
final Pipeline p = FlinkTestPipeline.createForBatch();
p.apply(Create.of(Arrays.asList(KV.of("a", 1), KV.of("b", 2), KV.of("c", 3))))
.apply(GroupByKey.create())
.apply(ParDo.of(new ReiterateDoFn<>()));
Pipeline.PipelineExecutionException resultException = null;
try {
p.run().waitUntilFinish();
} catch (Pipeline.PipelineExecutionException exception) {
resultException = exception;
}
Assert.assertEquals(
IllegalStateException.class, Objects.requireNonNull(resultException).getCause().getClass());
Assert.assertTrue(
resultException.getCause().getMessage().contains("GBK result is not re-iterable."));
}
示例15
@SuppressWarnings("unchecked")
private static <K, InputT, OutputT>
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
Pipeline pipeline,
KvCoder<K, InputT> kvInputCoder) {
if (transform instanceof GroupByKey) {
return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
SystemReduceFn.buffering(kvInputCoder.getValueCoder());
} else if (transform instanceof Combine.PerKey) {
final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> combineFn =
((Combine.PerKey) transform).getFn();
return SystemReduceFn.combining(
kvInputCoder.getKeyCoder(),
AppliedCombineFn.withInputCoder(combineFn, pipeline.getCoderRegistry(), kvInputCoder));
} else {
throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
}
}
示例16
@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
return input
.apply("GroupAll", GroupByKey.create())
.apply(
"SplitIntoBatches",
ParDo.of(
new DoFn<KV<K, Iterable<V>>, KV<K, Iterable<V>>>() {
@ProcessElement
public void process(ProcessContext c) {
// Iterators.partition lazily creates the partitions as they are accessed
// allowing it to partition very large iterators.
Iterator<List<V>> iterator =
Iterators.partition(c.element().getValue().iterator(), (int) batchSize);
// Note that GroupIntoBatches only outputs when the batch is non-empty.
while (iterator.hasNext()) {
c.output(KV.of(c.element().getKey(), iterator.next()));
}
}
}));
}
示例17
@Test
public void testInvalidWindowsService() {
Pipeline p = createTestServiceRunner();
List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
PCollection<KV<String, Integer>> input =
p.apply(
Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("GroupByKey must have a valid Window merge function");
input.apply("GroupByKey", GroupByKey.create()).apply("GroupByKeyAgain", GroupByKey.create());
}
示例18
/** Creates a simple pipeline with a {@link Combine.GroupedValues} with side inputs. */
private static TestPipeline createCombineGroupedValuesWithSideInputsPipeline() {
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<KV<String, Integer>> input =
pipeline
.apply(Create.of(KV.of("key", 1)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
PCollection<String> sideInput = pipeline.apply(Create.of("side input"));
PCollectionView<String> sideInputView = sideInput.apply(View.asSingleton());
input
.apply(GroupByKey.create())
.apply(
Combine.<String, Integer, Integer>groupedValues(new SumCombineFnWithContext())
.withSideInputs(sideInputView));
return pipeline;
}
示例19
/**
* @param input PCollection of variants to process.
* @return PCollection of variant-only Variant objects with calls from non-variant-segments
* merged into the SNP variants with which they overlap.
*/
@Override
public PCollection<Variant> expand(PCollection<Variant> input) {
return input
.apply(ParDo.of(new BinVariantsFn()))
.apply(GroupByKey.<KV<String, Long>, Variant>create())
.apply(ParDo.of(new CombineVariantsFn()));
}
示例20
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
int numShards = spec.getNumShards();
if (numShards <= 0) {
try (Consumer<?, ?> consumer = openConsumer(spec)) {
numShards = consumer.partitionsFor(spec.getTopic()).size();
LOG.info(
"Using {} shards for exactly-once writer, matching number of partitions "
+ "for topic '{}'",
numShards,
spec.getTopic());
}
}
checkState(numShards > 0, "Could not set number of shards");
return input
.apply(
Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(
String.format("Shuffle across %d shards", numShards),
ParDo.of(new Reshard<>(numShards)))
.apply("Persist sharding", GroupByKey.create())
.apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
.apply("Persist ids", GroupByKey.create())
.apply(
String.format("Write to Kafka topic '%s'", spec.getTopic()),
ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}
示例21
/**
* @param contentToIndexNotSkipped
* @param contentNotToIndexSkipped
* @param pipeline
* @param options
* @return
*/
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
Pipeline pipeline, IndexerPipelineOptions options) {
PCollection<KV<String,Long>> alreadyProcessedDocs = null;
if (!options.getWriteTruncate()) {
String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
alreadyProcessedDocs = pipeline
.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetDocumentHashFn()));
} else {
Map<String, Long> map = new HashMap<String,Long>();
alreadyProcessedDocs = pipeline
.apply("Create empty side input of Docs",
Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
}
final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =
alreadyProcessedDocs.apply(View.<String,Long>asMap());
PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
.withSideInputs(alreadyProcessedDocsSideInput)
.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection
PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
// Merge the sets of items that are dupes or skipped
PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
return content;
}
示例22
@Override
public void translateNode(GroupByKey<K, V> transform, Twister2BatchTranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
BatchTSetImpl<WindowedValue<KV<K, V>>> inputTTSet = context.getInputDataSet(input);
final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
WindowingStrategy windowingStrategy = input.getWindowingStrategy();
WindowFn<KV<K, V>, BoundedWindow> windowFn =
(WindowFn<KV<K, V>, BoundedWindow>) windowingStrategy.getWindowFn();
final WindowedValue.WindowedValueCoder<V> wvCoder =
WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
KeyedTSet<byte[], byte[]> keyedTSet =
inputTTSet.mapToTuple(new MapToTupleFunction<K, V>(inputKeyCoder, wvCoder));
// todo add support for a partition function to be specified, this would use
// todo keyedPartition function instead of KeyedGather
ComputeTSet<KV<K, Iterable<WindowedValue<V>>>, Iterator<Tuple<byte[], Iterator<byte[]>>>>
groupedbyKeyTset =
keyedTSet.keyedGather().map(new ByteToWindowFunction(inputKeyCoder, wvCoder));
// --- now group also by window.
ComputeTSet<WindowedValue<KV<K, Iterable<V>>>, Iterable<KV<K, Iterator<WindowedValue<V>>>>>
outputTset =
groupedbyKeyTset
.direct()
.<WindowedValue<KV<K, Iterable<V>>>>flatmap(
new GroupByWindowFunction(
windowingStrategy,
SystemReduceFn.buffering(coder.getValueCoder())));
PCollection output = context.getOutput(transform);
context.setOutputDataSet(output, outputTset);
}
示例23
@Override
public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
// Apply a GroupByKey transform to collect a list of all station
// readings for a given route.
PCollection<KV<String, Iterable<StationSpeed>>> timeGroup =
stationSpeed.apply(GroupByKey.create());
// Analyze 'slowdown' over the route readings.
PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
// Format the results for writing to BigQuery
PCollection<TableRow> results = stats.apply(ParDo.of(new FormatStatsFn()));
return results;
}
示例24
static PCollection<KV<String, Iterable<String>>> applyTransform(PCollection<String> input) {
return input
.apply(MapElements.into(kvs(strings(), strings()))
.via(word -> KV.of(word.substring(0, 1), word)))
.apply(GroupByKey.create());
}
示例25
@Test
public void traverseMultipleTimesThrows() {
p.apply(
Create.of(KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null))
.withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of())))
.apply(GroupByKey.create())
.apply(Keys.create());
p.traverseTopologically(visitor);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("already been finalized");
thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
p.traverseTopologically(visitor);
}
示例26
@Override
public PCollection<Iterable<InputT>> expand(PCollection<InputT> input) {
return input
.apply("addNullKey", WithKeys.of((Void) null))
.apply("group", GroupByKey.create())
.apply("extractValues", Values.create());
}
示例27
@Override
public PCollection<KV<Row, Iterable<Row>>> expand(PCollection<InputT> input) {
Schema schema = input.getSchema();
FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
rowSelector = new RowSelectorContainer(schema, resolved, true);
Schema keySchema = getKeySchema(schema);
return input
.apply("toRow", Convert.toRows())
.apply(
"selectKeys",
WithKeys.of((Row e) -> rowSelector.select(e)).withKeyType(TypeDescriptors.rows()))
.setCoder(KvCoder.of(SchemaCoder.of(keySchema), SchemaCoder.of(schema)))
.apply("GroupByKey", GroupByKey.create());
}
示例28
/**
* Tests that when two elements are combined via a GroupByKey their output timestamp agrees with
* the windowing function default, the end of the window.
*/
@Test
@Category(ValidatesRunner.class)
public void testTimestampCombinerDefault() {
pipeline.enableAbandonedNodeEnforcement(true);
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10))))
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(
c.timestamp(),
equalTo(
new IntervalWindow(
new Instant(0),
new Instant(0).plus(Duration.standardMinutes(10)))
.maxTimestamp()));
}
}));
pipeline.run();
}
示例29
@Override
public PCollection<Read> expand(PCollection<String> readGroupSetIds) {
return readGroupSetIds.apply(ParDo.of(new CreateReadRequests()))
// Force a shuffle operation here to break the fusion of these steps.
// By breaking fusion, the work will be distributed to all available workers.
.apply(GroupByKey.<Integer, StreamReadsRequest>create())
.apply(ParDo.of(new ConvergeStreamReadsRequestList()))
.apply(new ReadStreamer(auth, ShardBoundary.Requirement.STRICT, fields));
}
示例30
@Test
@Category(NeedsRunner.class)
public void singlePaneSingleReifiedPane() {
PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes =
p.apply(GenerateSequence.from(0).to(20000))
.apply(WithTimestamps.of(input -> new Instant(input * 10)))
.apply(
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(GatherAllPanes.globally());
PAssert.that(accumulatedPanes)
.satisfies(
input -> {
for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) {
if (Iterables.size(windowedInput) > 1) {
fail("Expected all windows to have exactly one pane, got " + windowedInput);
return null;
}
}
return null;
});
p.run();
}