Java源码示例:org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction
示例1
@SuppressWarnings("unchecked")
@Test
public void testInternalIterableAllWindowFunction() throws Exception {
AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
InternalIterableAllWindowFunction<Long, String, TimeWindow> windowFunction =
new InternalIterableAllWindowFunction<>(mock);
// check setOutputType
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
Configuration config = new Configuration();
windowFunction.open(config);
verify(mock).open(config);
// check setRuntimeContext
RuntimeContext rCtx = mock(RuntimeContext.class);
windowFunction.setRuntimeContext(rCtx);
verify(mock).setRuntimeContext(rCtx);
// check apply
TimeWindow w = mock(TimeWindow.class);
Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
windowFunction.process(((byte) 0), w, ctx, i, c);
verify(mock).apply(w, i, c);
// check close
windowFunction.close();
verify(mock).close();
}
示例2
@SuppressWarnings("unchecked")
@Test
public void testInternalIterableAllWindowFunction() throws Exception {
AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
InternalIterableAllWindowFunction<Long, String, TimeWindow> windowFunction =
new InternalIterableAllWindowFunction<>(mock);
// check setOutputType
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
Configuration config = new Configuration();
windowFunction.open(config);
verify(mock).open(config);
// check setRuntimeContext
RuntimeContext rCtx = mock(RuntimeContext.class);
windowFunction.setRuntimeContext(rCtx);
verify(mock).setRuntimeContext(rCtx);
// check apply
TimeWindow w = mock(TimeWindow.class);
Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
windowFunction.process(((byte) 0), w, ctx, i, c);
verify(mock).apply(w, i, c);
// check close
windowFunction.close();
verify(mock).close();
}
示例3
@SuppressWarnings("unchecked")
@Test
public void testInternalIterableAllWindowFunction() throws Exception {
AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
InternalIterableAllWindowFunction<Long, String, TimeWindow> windowFunction =
new InternalIterableAllWindowFunction<>(mock);
// check setOutputType
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
Configuration config = new Configuration();
windowFunction.open(config);
verify(mock).open(config);
// check setRuntimeContext
RuntimeContext rCtx = mock(RuntimeContext.class);
windowFunction.setRuntimeContext(rCtx);
verify(mock).setRuntimeContext(rCtx);
// check apply
TimeWindow w = mock(TimeWindow.class);
Iterable<Long> i = (Iterable<Long>) mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
windowFunction.process(((byte) 0), w, ctx, i, c);
verify(mock).apply(w, i, c);
// check close
windowFunction.close();
verify(mock).close();
}
示例4
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
示例5
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
示例6
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
示例7
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
示例8
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
示例9
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}