Java源码示例:org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
示例1
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));
when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
示例2
@Test
public void testWindowAssignmentWithOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
示例3
@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
示例4
@Test
public void testTimeUnits() {
// sanity check with one other time unit
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
示例5
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建输入数据
List<Tuple2<String, Long>> data = new ArrayList<>();
Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
data.add(a);
data.add(b);
DataStreamSource<Tuple2<String, Long>> input = env.fromCollection(data);
// 使用 ProcessTime 滚动窗口, 10s 为一个窗口长度
input.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new MyWindowFunction());
env.execute();
}
示例6
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));
when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
示例7
@Test
public void testWindowAssignmentWithOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
示例8
@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
示例9
@Test
public void testTimeUnits() {
// sanity check with one other time unit
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
示例10
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));
when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
示例11
@Test
public void testWindowAssignmentWithStagger() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(0), WindowStagger.NATURAL);
when(mockContext.getCurrentProcessingTime()).thenReturn(150L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(150, 5150)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5049L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(150, 5150)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5150L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5150, 10150)));
}
示例12
@Test
public void testWindowAssignmentWithGlobalOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
示例13
@Test
public void testWindowAssignmentWithNegativeGlobalOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
示例14
@Test
public void testTimeUnits() {
// sanity check with one other time unit
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));
when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
示例15
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String tuple,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例16
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
Tuple3.of("hello", "hallo", 1),
Tuple3.of("hello", "hallo", 2));
DataStream<String> window = source
.keyBy(new Tuple3KeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestWindowFunction());
final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
示例17
@Test
public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
Tuple3.of("hello", "hallo", 1),
Tuple3.of("hello", "hallo", 2));
DataStream<String> window = source
.keyBy(new Tuple3KeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
示例18
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key,
Context ctx,
Iterable<Tuple3<String, String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple3<String, String, Integer> in : values) {
out.collect(new Tuple2<>(in.f0, in.f2));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例19
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例20
@Test
public void testProperties() {
TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));
assertFalse(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
}
示例21
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例22
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例23
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例24
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(
Context ctx,
Iterable<Tuple3<String, String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple3<String, String, Integer> in : values) {
out.collect(new Tuple2<>(in.f0, in.f2));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例25
@Test
@SuppressWarnings("rawtypes")
public void testApplyProcessingTimeTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例26
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple3<String, String, Integer>> window = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String tuple,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例27
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
Tuple3.of("hello", "hallo", 1),
Tuple3.of("hello", "hallo", 2));
DataStream<String> window = source
.keyBy(new Tuple3KeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestWindowFunction());
final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
示例28
@Test
public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
Tuple3.of("hello", "hallo", 1),
Tuple3.of("hello", "hallo", 2));
DataStream<String> window = source
.keyBy(new Tuple3KeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
processElementAndEnsureOutput(
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
示例29
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key,
Context ctx,
Iterable<Tuple3<String, String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple3<String, String, Integer> in : values) {
out.collect(new Tuple2<>(in.f0, in.f2));
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
示例30
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(new TupleKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key,
Context ctx,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}