Java源码示例:org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator
示例1
/**
* Publishes the keyed stream as a queryable ValueState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ValueStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
示例2
/**
* Publishes the keyed stream as a queryable ValueState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ValueStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}
示例3
/**
* Publishes the keyed stream as a queryable ValueState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ValueStateDescriptor<T> stateDescriptor) {
transform("Queryable state: " + queryableStateName,
getType(),
new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
return new QueryableStateStream<>(
queryableStateName,
stateDescriptor,
getKeyType().createSerializer(getExecutionConfig()));
}