Java源码示例:org.apache.flink.annotation.PublicEvolving
示例1
/**
* Publishes the keyed stream as a queryable ValueState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ValueStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
示例2
/**
* Writes a DataStream to the file specified by the path parameter. The
* writing is performed periodically every millis milliseconds.
*
* <p>For every field of an element of the DataStream the result of {@link Object#toString()}
* is written. This method can only be used on data streams of tuples.
*
* @param path
* the path pointing to the location the text file is written to
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
* @param rowDelimiter
* the delimiter for two rows
* @param fieldDelimiter
* the delimiter for two fields
*
* @return the closed DataStream
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public <X extends Tuple> DataStreamSink<T> writeAsCsv(
String path,
WriteMode writeMode,
String rowDelimiter,
String fieldDelimiter) {
Preconditions.checkArgument(
getType().isTupleType(),
"The writeAsCsv() method can only be used on data streams of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<>(
new Path(path),
rowDelimiter,
fieldDelimiter);
if (writeMode != null) {
of.setWriteMode(writeMode);
}
return writeUsingOutputFormat((OutputFormat<T>) of);
}
示例3
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Arriving data is incrementally aggregated using the given aggregate function. This means
* that the window function typically has only a single value to process when called.
*
* @param aggFunction The aggregate function that is used for incremental aggregation.
* @param windowFunction The window function.
*
* @return The data stream that is the result of applying the window function to the window.
*
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <V> The type of AggregateFunction's result, and the WindowFunction's input
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
AllWindowFunction<V, R, W> windowFunction) {
checkNotNull(aggFunction, "aggFunction");
checkNotNull(windowFunction, "windowFunction");
TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
aggFunction, input.getType(), null, false);
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
}
示例4
/**
* Publishes the keyed stream as a queryable ReducingState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ReducingStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
示例5
/**
* Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*
* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
*/
@Deprecated
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
processFunction,
ProcessFunction.class,
0,
1,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(processFunction, outType);
}
示例6
/**
* Applies the given {@code AggregateFunction} to each window. The AggregateFunction
* aggregates all elements of a window into a single result element. The stream of these
* result elements (one per window) is interpreted as a regular non-windowed stream.
*
* @param function The aggregation function.
* @return The data stream that is the result of applying the fold function to the window.
*
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <R> The type of the elements in the resulting stream, equal to the
* AggregateFunction's result type
*/
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
}
TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);
TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);
return aggregate(function, accumulatorType, resultType);
}
示例7
@PublicEvolving
public static <IN, ACC> TypeInformation<ACC> getAggregateFunctionAccumulatorType(
AggregateFunction<IN, ACC, ?> function,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing)
{
return getUnaryOperatorReturnType(
function,
AggregateFunction.class,
0,
1,
NO_INDEX,
inType,
functionName,
allowMissing);
}
示例8
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getAggregateFunctionReturnType(
AggregateFunction<IN, ?, OUT> function,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing)
{
return getUnaryOperatorReturnType(
function,
AggregateFunction.class,
0,
2,
NO_INDEX,
inType,
functionName,
allowMissing);
}
示例9
/**
* Publishes the keyed stream as a queryable FoldingState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
String queryableStateName,
FoldingStateDescriptor<T, ACC> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
示例10
/**
* Executes the job remotely.
*
* <p>This method can be used independent of the {@link StreamExecutionEnvironment} type.
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@PublicEvolving
public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
String jobName,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
streamGraph.setJobName(jobName);
return executeRemotely(streamGraph,
streamExecutionEnvironment.getClass().getClassLoader(),
streamExecutionEnvironment.getConfig(),
jarFiles,
host,
port,
clientConfiguration,
globalClasspaths,
savepointRestoreSettings);
}
示例11
/**
* Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*
* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
*/
@Deprecated
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
processFunction,
ProcessFunction.class,
0,
1,
TypeExtractor.NO_INDEX,
getType(),
Utils.getCallLocationName(),
true);
return process(processFunction, outType);
}
示例12
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (comparatorClass != null) {
return instantiateComparator(comparatorClass, sortOrderAscending);
} else {
throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
}
}
示例13
/**
* Sets the time by which elements are allowed to be late. Elements that
* arrive behind the watermark by more than the specified time will be dropped.
* By default, the allowed lateness is {@code 0L}.
*
* <p>Setting an allowed lateness is only valid for event-time windows.
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
final long millis = lateness.toMilliseconds();
checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
this.allowedLateness = millis;
return this;
}
示例14
/**
* Tries to get the PrimitiveArrayTypeInfo for an array. Returns null, if the type is an array,
* but the component type is not a primitive type.
*
* @param type The class of the array.
* @return The corresponding PrimitiveArrayTypeInfo, or null, if the array is not an array of primitives.
* @throws InvalidTypesException Thrown, if the given class does not represent an array.
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
if (!type.isArray()) {
throw new InvalidTypesException("The given class is no array.");
}
// basic type arrays
return (PrimitiveArrayTypeInfo<X>) TYPES.get(type);
}
示例15
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (Comparable.class.isAssignableFrom(typeClass)) {
return new WritableComparator(sortOrderAscending, typeClass);
}
else {
throw new UnsupportedOperationException("Cannot create Comparator for " + typeClass.getCanonicalName() + ". " +
"Class does not implement Comparable interface.");
}
}
示例16
/**
* Returns a future holding the request result.
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key we are interested in.
* @param keyTypeHint A {@link TypeHint} used to extract the type of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
@PublicEvolving
public <K, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final TypeHint<K> keyTypeHint,
final StateDescriptor<S, V> stateDescriptor) {
Preconditions.checkNotNull(keyTypeHint);
TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
}
示例17
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
示例18
@PublicEvolving
@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.blockBasedInput == null) {
throw new RuntimeException("You must have forgotten to call open() on your input format.");
}
return new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
);
}
示例19
@PublicEvolving
public EnumTypeInfo(Class<T> typeClass) {
checkNotNull(typeClass, "Enum type class must not be null.");
if (!Enum.class.isAssignableFrom(typeClass) ) {
throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
}
this.typeClass = typeClass;
}
示例20
@PublicEvolving
public ValueTypeInfo(Class<T> type) {
this.type = checkNotNull(type);
checkArgument(
Value.class.isAssignableFrom(type) || type.equals(Value.class),
"ValueTypeInfo can only be used for subclasses of %s", Value.class.getName());
}
示例21
/**
* Sorts each local partition of a data set on the field(s) specified by the field expression
* in the specified {@link Order} before it is emitted by the output format.
*
* <p><b>Note: Non-composite types can only be sorted on the full element which is specified by
* a wildcard expression ("*" or "_").</b>
*
* <p>Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
* by chaining {@link #sortLocalOutput(String, Order)} calls.
*
* @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
* @param order The Order in which the specified field(s) are locally sorted.
* @return This data sink operator with specified output order.
*
* @see Order
*
* @deprecated Use {@link DataSet#sortPartition(String, Order)} instead
*/
@Deprecated
@PublicEvolving
public DataSink<T> sortLocalOutput(String fieldExpression, Order order) {
int numFields;
int[] fields;
Order[] orders;
// compute flat field positions for (nested) sorting fields
Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fieldExpression, this.type);
fields = ek.computeLogicalKeyPositions();
if (!Keys.ExpressionKeys.isSortKey(fieldExpression, this.type)) {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}
numFields = fields.length;
orders = new Order[numFields];
Arrays.fill(orders, order);
if (this.sortKeyPositions == null) {
// set sorting info
this.sortKeyPositions = fields;
this.sortOrders = orders;
} else {
// append sorting info to existing info
int oldLength = this.sortKeyPositions.length;
int newLength = oldLength + numFields;
this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
for (int i = 0; i < numFields; i++) {
this.sortKeyPositions[oldLength + i] = fields[i];
this.sortOrders[oldLength + i] = orders[i];
}
}
return this;
}
示例22
/**
* Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
* {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
*
* @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
* @param outTypeInfo The type of the output elements.
* @param <KS> The type of the keys in the keyed stream.
* @param <OUT> The type of the output elements.
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <KS, OUT> SingleOutputStreamOperator<OUT> process(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final TypeInformation<OUT> outTypeInfo) {
Preconditions.checkNotNull(function);
Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
TwoInputStreamOperator<IN1, IN2, OUT> operator =
new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
}
示例23
/**
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) foldInterface,
FoldFunction.class,
0,
1,
NO_INDEX,
inType,
functionName,
allowMissing);
}
示例24
/**
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) foldInterface,
FoldFunction.class,
0,
1,
NO_INDEX,
inType,
functionName,
allowMissing);
}
示例25
@Override
@SuppressWarnings("unchecked")
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
// special case the string array
if (componentInfo.getTypeClass().equals(String.class)) {
return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
} else {
return (TypeSerializer<T>) new GenericArraySerializer<>(
this.componentInfo.getTypeClass(),
this.componentInfo.createSerializer(executionConfig));
}
}
示例26
@Override
@PublicEvolving
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
示例27
@Override
@PublicEvolving
public int getArity() {
return 1;
}
示例28
@Override
@PublicEvolving
public boolean isBasicType() {
return false;
}
示例29
/**
* Sets the {@code Trigger} that should be used to trigger window emission.
*/
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, newTrigger, evictor, allowedLateness);
}
示例30
@PublicEvolving
public boolean isBasicValueType() {
return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) ||
type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) ||
type.equals(NullValue.class) || type.equals(BooleanValue.class);
}