Java源码示例:org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows

示例1
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));

	when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
 
示例2
@Test
public void testWindowAssignmentWithOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
 
示例3
@Test
public void testWindowAssignmentWithNegativeOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
		mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
 
示例4
@Test
public void testTimeUnits() {
	// sanity check with one other time unit

	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));

	when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
 
示例5
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 构建输入数据
    List<Tuple2<String, Long>> data = new ArrayList<>();
    Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
    Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
    data.add(a);
    data.add(b);
    DataStreamSource<Tuple2<String, Long>> input = env.fromCollection(data);

    // 使用 ProcessTime 滚动窗口, 10s 为一个窗口长度
    input.keyBy(x -> x.f1)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .reduce(new MyWindowFunction());

    env.execute();
}
 
示例6
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));

	when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
 
示例7
@Test
public void testWindowAssignmentWithOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
 
示例8
@Test
public void testWindowAssignmentWithNegativeOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
		mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
 
示例9
@Test
public void testTimeUnits() {
	// sanity check with one other time unit

	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));

	when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
 
示例10
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000));

	when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000)));
}
 
示例11
@Test
public void testWindowAssignmentWithStagger() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(0), WindowStagger.NATURAL);

	when(mockContext.getCurrentProcessingTime()).thenReturn(150L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(150, 5150)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5049L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(150, 5150)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5150L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5150, 10150)));
}
 
示例12
@Test
public void testWindowAssignmentWithGlobalOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5099L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(100, 5100)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}
 
示例13
@Test
public void testWindowAssignmentWithNegativeGlobalOffset() {
	WindowAssigner.WindowAssignerContext mockContext =
		mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));

	when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}
 
示例14
@Test
public void testTimeUnits() {
	// sanity check with one other time unit

	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));

	when(mockContext.getCurrentProcessingTime()).thenReturn(1000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(5999L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(1000, 6000)));

	when(mockContext.getCurrentProcessingTime()).thenReturn(6000L);
	assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(6000, 11000)));
}
 
示例15
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple3<String, String, Integer>> window = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String tuple,
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple3<String, String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);

	processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例16
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
		Tuple3.of("hello", "hallo", 1),
		Tuple3.of("hello", "hallo", 2));

	DataStream<String> window = source
			.keyBy(new Tuple3KeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.aggregate(new DummyAggregationFunction(), new TestWindowFunction());

	final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
		(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();

	final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();

	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
		(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;

	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);

	processElementAndEnsureOutput(
			operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
 
示例17
@Test
public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
		Tuple3.of("hello", "hallo", 1),
		Tuple3.of("hello", "hallo", 2));

	DataStream<String> window = source
			.keyBy(new Tuple3KeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());

	final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
		(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();

	final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();

	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
		(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;

	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);

	processElementAndEnsureOutput(
			operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
 
示例18
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String key,
						Context ctx,
						Iterable<Tuple3<String, String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple3<String, String, Integer> in : values) {
						out.collect(new Tuple2<>(in.f0, in.f2));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例19
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String key,
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

}
 
示例20
@Test
public void testProperties() {
	TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.seconds(5), Time.milliseconds(100));

	assertFalse(assigner.isEventTime());
	assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
}
 
示例21
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple3<String, String, Integer>> window = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple3<String, String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);

	processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例22
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple3<String, String, Integer>> window = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.aggregate(new DummyAggregationFunction(), new TestAllWindowFunction());

	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();

	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();

	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
			(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;

	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);

	processElementAndEnsureOutput(
			operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例23
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

}
 
示例24
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(
						Context ctx,
						Iterable<Tuple3<String, String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple3<String, String, Integer> in : values) {
						out.collect(new Tuple2<>(in.f0, in.f2));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例25
@Test
@SuppressWarnings("rawtypes")
public void testApplyProcessingTimeTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void apply(
						TimeWindow window,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

}
 
示例26
@Test
@SuppressWarnings("rawtypes")
public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple3<String, String, Integer>> window = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String tuple,
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple3<String, String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);

	processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例27
@Test
public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
		Tuple3.of("hello", "hallo", 1),
		Tuple3.of("hello", "hallo", 2));

	DataStream<String> window = source
			.keyBy(new Tuple3KeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.aggregate(new DummyAggregationFunction(), new TestWindowFunction());

	final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
		(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();

	final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();

	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
		(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;

	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);

	processElementAndEnsureOutput(
			operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
 
示例28
@Test
public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
		Tuple3.of("hello", "hallo", 1),
		Tuple3.of("hello", "hallo", 2));

	DataStream<String> window = source
			.keyBy(new Tuple3KeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());

	final OneInputTransformation<Tuple3<String, String, Integer>, String> transform =
		(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();

	final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator = transform.getOperator();

	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator =
		(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;

	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);

	processElementAndEnsureOutput(
			operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1));
}
 
示例29
@Test
@SuppressWarnings("rawtypes")
public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String key,
						Context ctx,
						Iterable<Tuple3<String, String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple3<String, String, Integer> in : values) {
						out.collect(new Tuple2<>(in.f0, in.f2));
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
			(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
示例30
@Test
@SuppressWarnings("rawtypes")
public void testProcessProcessingTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.keyBy(new TupleKeySelector())
			.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void process(String key,
						Context ctx,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

}