Java源码示例:org.apache.flink.annotation.PublicEvolving

示例1
/**
 * Publishes the keyed stream as a queryable ValueState instance.
 *
 * @param queryableStateName Name under which to the publish the queryable state instance
 * @param stateDescriptor State descriptor to create state instance from
 * @return Queryable state instance
 */
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
		String queryableStateName,
		ValueStateDescriptor<T> stateDescriptor) {

	transform("Queryable state: " + queryableStateName,
			getType(),
			new QueryableValueStateOperator<>(queryableStateName, stateDescriptor));

	stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());

	return new QueryableStateStream<>(
			queryableStateName,
			stateDescriptor,
			getKeyType().createSerializer(getExecutionConfig()));
}
 
示例2
/**
 * Writes a DataStream to the file specified by the path parameter. The
 * writing is performed periodically every millis milliseconds.
 *
 * <p>For every field of an element of the DataStream the result of {@link Object#toString()}
 * is written. This method can only be used on data streams of tuples.
 *
 * @param path
 *            the path pointing to the location the text file is written to
 * @param writeMode
 *            Controls the behavior for existing files. Options are
 *            NO_OVERWRITE and OVERWRITE.
 * @param rowDelimiter
 *            the delimiter for two rows
 * @param fieldDelimiter
 *            the delimiter for two fields
 *
 * @return the closed DataStream
 */
@SuppressWarnings("unchecked")
@PublicEvolving
public <X extends Tuple> DataStreamSink<T> writeAsCsv(
		String path,
		WriteMode writeMode,
		String rowDelimiter,
		String fieldDelimiter) {
	Preconditions.checkArgument(
		getType().isTupleType(),
		"The writeAsCsv() method can only be used on data streams of tuples.");

	CsvOutputFormat<X> of = new CsvOutputFormat<>(
		new Path(path),
		rowDelimiter,
		fieldDelimiter);

	if (writeMode != null) {
		of.setWriteMode(writeMode);
	}

	return writeUsingOutputFormat((OutputFormat<T>) of);
}
 
示例3
/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
 * that the window function typically has only a single value to process when called.
 *
 * @param aggFunction The aggregate function that is used for incremental aggregation.
 * @param windowFunction The window function.
 *
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @param <ACC> The type of the AggregateFunction's accumulator
 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
 * @param <R> The type of the elements in the resulting stream, equal to the
 *            WindowFunction's result type
 */
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
		AggregateFunction<T, ACC, V> aggFunction,
		AllWindowFunction<V, R, W> windowFunction) {

	checkNotNull(aggFunction, "aggFunction");
	checkNotNull(windowFunction, "windowFunction");

	TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
			aggFunction, input.getType(), null, false);

	TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
			aggFunction, input.getType(), null, false);

	TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

	return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
}
 
示例4
/**
 * Publishes the keyed stream as a queryable ReducingState instance.
 *
 * @param queryableStateName Name under which to the publish the queryable state instance
 * @param stateDescriptor State descriptor to create state instance from
 * @return Queryable state instance
 */
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
		String queryableStateName,
		ReducingStateDescriptor<T> stateDescriptor) {

	transform("Queryable state: " + queryableStateName,
			getType(),
			new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));

	stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());

	return new QueryableStateStream<>(
			queryableStateName,
			stateDescriptor,
			getKeyType().createSerializer(getExecutionConfig()));
}
 
示例5
/**
 * Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param processFunction The {@link ProcessFunction} that is called for each element
 *                      in the stream.
 *
 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 *
 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
 */
@Deprecated
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

	TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
		processFunction,
		ProcessFunction.class,
		0,
		1,
		TypeExtractor.NO_INDEX,
		getType(),
		Utils.getCallLocationName(),
		true);

	return process(processFunction, outType);
}
 
示例6
/**
 * Applies the given {@code AggregateFunction} to each window. The AggregateFunction
 * aggregates all elements of a window into a single result element. The stream of these
 * result elements (one per window) is interpreted as a regular non-windowed stream.
 *
 * @param function The aggregation function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @param <ACC> The type of the AggregateFunction's accumulator
 * @param <R> The type of the elements in the resulting stream, equal to the
 *            AggregateFunction's result type
 */
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
	checkNotNull(function, "function");

	if (function instanceof RichFunction) {
		throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
	}

	TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
			function, input.getType(), null, false);

	TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
			function, input.getType(), null, false);

	return aggregate(function, accumulatorType, resultType);
}
 
示例7
@PublicEvolving
public static <IN, ACC> TypeInformation<ACC> getAggregateFunctionAccumulatorType(
		AggregateFunction<IN, ACC, ?> function,
		TypeInformation<IN> inType,
		String functionName,
		boolean allowMissing)
{
	return getUnaryOperatorReturnType(
		function,
		AggregateFunction.class,
		0,
		1,
		NO_INDEX,
		inType,
		functionName,
		allowMissing);
}
 
示例8
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getAggregateFunctionReturnType(
		AggregateFunction<IN, ?, OUT> function,
		TypeInformation<IN> inType,
		String functionName,
		boolean allowMissing)
{
	return getUnaryOperatorReturnType(
		function,
		AggregateFunction.class,
		0,
		2,
		NO_INDEX,
		inType,
		functionName,
		allowMissing);
}
 
示例9
/**
 * Publishes the keyed stream as a queryable FoldingState instance.
 *
 * @param queryableStateName Name under which to the publish the queryable state instance
 * @param stateDescriptor State descriptor to create state instance from
 * @return Queryable state instance
 *
 * @deprecated will be removed in a future version
 */
@PublicEvolving
@Deprecated
public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
		String queryableStateName,
		FoldingStateDescriptor<T, ACC> stateDescriptor) {

	transform("Queryable state: " + queryableStateName,
			getType(),
			new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));

	stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());

	return new QueryableStateStream<>(
			queryableStateName,
			stateDescriptor,
			getKeyType().createSerializer(getExecutionConfig()));
}
 
示例10
/**
 * Executes the job remotely.
 *
 * <p>This method can be used independent of the {@link StreamExecutionEnvironment} type.
 * @return The result of the job execution, containing elapsed time and accumulators.
 */
@PublicEvolving
public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
	List<URL> jarFiles,
	String host,
	int port,
	Configuration clientConfiguration,
	List<URL> globalClasspaths,
	String jobName,
	SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
	StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
	streamGraph.setJobName(jobName);
	return executeRemotely(streamGraph,
		streamExecutionEnvironment.getClass().getClassLoader(),
		streamExecutionEnvironment.getConfig(),
		jarFiles,
		host,
		port,
		clientConfiguration,
		globalClasspaths,
		savepointRestoreSettings);
}
 
示例11
/**
 * Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
 *
 * <p>The function will be called for every element in the input streams and can produce zero
 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
 * function, this function can also query the time and set timers. When reacting to the firing
 * of set timers the function can directly emit elements and/or register yet more timers.
 *
 * @param processFunction The {@link ProcessFunction} that is called for each element
 *                      in the stream.
 *
 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
 *
 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
 */
@Deprecated
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

	TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
		processFunction,
		ProcessFunction.class,
		0,
		1,
		TypeExtractor.NO_INDEX,
		getType(),
		Utils.getCallLocationName(),
		true);

	return process(processFunction, outType);
}
 
示例12
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
	if (comparatorClass != null) {
		return instantiateComparator(comparatorClass, sortOrderAscending);
	} else {
		throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
	}
}
 
示例13
/**
 * Sets the time by which elements are allowed to be late. Elements that
 * arrive behind the watermark by more than the specified time will be dropped.
 * By default, the allowed lateness is {@code 0L}.
 *
 * <p>Setting an allowed lateness is only valid for event-time windows.
 */
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
	final long millis = lateness.toMilliseconds();
	checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

	this.allowedLateness = millis;
	return this;
}
 
示例14
/**
 * Tries to get the PrimitiveArrayTypeInfo for an array. Returns null, if the type is an array,
 * but the component type is not a primitive type.
 *
 * @param type The class of the array.
 * @return The corresponding PrimitiveArrayTypeInfo, or null, if the array is not an array of primitives.
 * @throws InvalidTypesException Thrown, if the given class does not represent an array.
 */
@SuppressWarnings("unchecked")
@PublicEvolving
public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
	if (!type.isArray()) {
		throw new InvalidTypesException("The given class is no array.");
	}

	// basic type arrays
	return (PrimitiveArrayTypeInfo<X>) TYPES.get(type);
}
 
示例15
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
	if (Comparable.class.isAssignableFrom(typeClass)) {
		return new WritableComparator(sortOrderAscending, typeClass);
	}
	else {
		throw new UnsupportedOperationException("Cannot create Comparator for " + typeClass.getCanonicalName() + ". " +
												"Class does not implement Comparable interface.");
	}
}
 
示例16
/**
 * Returns a future holding the request result.
 * @param jobId                     JobID of the job the queryable state belongs to.
 * @param queryableStateName        Name under which the state is queryable.
 * @param key			            The key we are interested in.
 * @param keyTypeHint				A {@link TypeHint} used to extract the type of the key.
 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
 * @return Future holding the immutable {@link State} object containing the result.
 */
@PublicEvolving
public <K, S extends State, V> CompletableFuture<S> getKvState(
		final JobID jobId,
		final String queryableStateName,
		final K key,
		final TypeHint<K> keyTypeHint,
		final StateDescriptor<S, V> stateDescriptor) {

	Preconditions.checkNotNull(keyTypeHint);

	TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
	return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
}
 
示例17
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
		WindowAssigner<? super T, W> windowAssigner) {
	this.input = input;
	this.windowAssigner = windowAssigner;
	this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
 
示例18
@PublicEvolving
@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
	if (this.blockBasedInput == null) {
		throw new RuntimeException("You must have forgotten to call open() on your input format.");
	}

	return  new Tuple2<>(
		this.blockBasedInput.getCurrBlockPos(), 		// the last read index in the block
		this.readRecords								// the number of records read
	);
}
 
示例19
@PublicEvolving
public EnumTypeInfo(Class<T> typeClass) {
	checkNotNull(typeClass, "Enum type class must not be null.");

	if (!Enum.class.isAssignableFrom(typeClass) ) {
		throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
	}

	this.typeClass = typeClass;
}
 
示例20
@PublicEvolving
public ValueTypeInfo(Class<T> type) {
	this.type = checkNotNull(type);

	checkArgument(
		Value.class.isAssignableFrom(type) || type.equals(Value.class),
		"ValueTypeInfo can only be used for subclasses of %s", Value.class.getName());
}
 
示例21
/**
 * Sorts each local partition of a data set on the field(s) specified by the field expression
 * in the specified {@link Order} before it is emitted by the output format.
 *
 * <p><b>Note: Non-composite types can only be sorted on the full element which is specified by
 * a wildcard expression ("*" or "_").</b>
 *
 * <p>Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
 * by chaining {@link #sortLocalOutput(String, Order)} calls.
 *
 * @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
 * @param order The Order in which the specified field(s) are locally sorted.
 * @return This data sink operator with specified output order.
 *
 * @see Order
 *
 * @deprecated Use {@link DataSet#sortPartition(String, Order)} instead
 */
@Deprecated
@PublicEvolving
public DataSink<T> sortLocalOutput(String fieldExpression, Order order) {

	int numFields;
	int[] fields;
	Order[] orders;

	// compute flat field positions for (nested) sorting fields
	Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fieldExpression, this.type);
	fields = ek.computeLogicalKeyPositions();

	if (!Keys.ExpressionKeys.isSortKey(fieldExpression, this.type)) {
		throw new InvalidProgramException("Selected sort key is not a sortable type");
	}

	numFields = fields.length;
	orders = new Order[numFields];
	Arrays.fill(orders, order);

	if (this.sortKeyPositions == null) {
		// set sorting info
		this.sortKeyPositions = fields;
		this.sortOrders = orders;
	} else {
		// append sorting info to existing info
		int oldLength = this.sortKeyPositions.length;
		int newLength = oldLength + numFields;
		this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
		this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
		for (int i = 0; i < numFields; i++) {
			this.sortKeyPositions[oldLength + i] = fields[i];
			this.sortOrders[oldLength + i] = orders[i];
		}
	}

	return this;
}
 
示例22
/**
 * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
 * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
 *
 * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
 * @param outTypeInfo The type of the output elements.
 * @param <KS> The type of the keys in the keyed stream.
 * @param <OUT> The type of the output elements.
 * @return The transformed {@link DataStream}.
 */
@PublicEvolving
public <KS, OUT> SingleOutputStreamOperator<OUT> process(
		final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
		final TypeInformation<OUT> outTypeInfo) {

	Preconditions.checkNotNull(function);
	Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
			"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");

	TwoInputStreamOperator<IN1, IN2, OUT> operator =
			new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
	return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
}
 
示例23
/**
 * @deprecated will be removed in a future version
 */
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
	return getUnaryOperatorReturnType(
		(Function) foldInterface,
		FoldFunction.class,
		0,
		1,
		NO_INDEX,
		inType,
		functionName,
		allowMissing);
}
 
示例24
/**
 * @deprecated will be removed in a future version
 */
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
	return getUnaryOperatorReturnType(
		(Function) foldInterface,
		FoldFunction.class,
		0,
		1,
		NO_INDEX,
		inType,
		functionName,
		allowMissing);
}
 
示例25
@Override
@SuppressWarnings("unchecked")
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
	// special case the string array
	if (componentInfo.getTypeClass().equals(String.class)) {
		return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
	} else {
		return (TypeSerializer<T>) new GenericArraySerializer<>(
			this.componentInfo.getTypeClass(),
			this.componentInfo.createSerializer(executionConfig));
	}
}
 
示例26
@Override
@PublicEvolving
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
	throw new UnsupportedOperationException(
			"This state is only accessible by functions executed on a KeyedStream");
}
 
示例27
@Override
@PublicEvolving
public int getArity() {
	return 1;
}
 
示例28
@Override
@PublicEvolving
public boolean isBasicType() {
	return false;
}
 
示例29
/**
 * Sets the {@code Trigger} that should be used to trigger window emission.
 */
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
	return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
			windowAssigner, newTrigger, evictor, allowedLateness);
}
 
示例30
@PublicEvolving
public boolean isBasicValueType() {
	return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) ||
			type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) ||
			type.equals(NullValue.class) || type.equals(BooleanValue.class);
}