Java源码示例:org.apache.flink.api.common.functions.CombineFunction

示例1
private boolean checkCombinability() {
	if (function instanceof GroupCombineFunction || function instanceof CombineFunction) {

		// check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e.,
		//   GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>.
		// This is a best effort check. If the check cannot be done, we might fail at runtime.
		Type[] reduceTypes = null;
		Type[] combineTypes = null;

		Type[] genInterfaces = function.getClass().getGenericInterfaces();
		for (Type genInterface : genInterfaces) {
			if (genInterface instanceof ParameterizedType) {
				// get parameters of GroupReduceFunction
				if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) {
					reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				// get parameters of GroupCombineFunction
				} else if ((((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) ||
					(((ParameterizedType) genInterface).getRawType().equals(CombineFunction.class))) {

					combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				}
			}
		}

		if (reduceTypes != null && reduceTypes.length == 2 &&
			combineTypes != null && combineTypes.length == 2) {

			if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) {
				return true;
			} else {
				LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " +
					"Generic types are incompatible.");
				return false;
			}
		}
		else if (reduceTypes == null || reduceTypes.length != 2) {
			LOG.warn("Cannot check generic types of GroupReduceFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
		else {
			LOG.warn("Cannot check generic types of GroupCombineFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
	}
	return false;
}
 
示例2
private boolean checkCombinability() {
	if (function instanceof GroupCombineFunction || function instanceof CombineFunction) {

		// check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e.,
		//   GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>.
		// This is a best effort check. If the check cannot be done, we might fail at runtime.
		Type[] reduceTypes = null;
		Type[] combineTypes = null;

		Type[] genInterfaces = function.getClass().getGenericInterfaces();
		for (Type genInterface : genInterfaces) {
			if (genInterface instanceof ParameterizedType) {
				// get parameters of GroupReduceFunction
				if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) {
					reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				// get parameters of GroupCombineFunction
				} else if ((((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) ||
					(((ParameterizedType) genInterface).getRawType().equals(CombineFunction.class))) {

					combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				}
			}
		}

		if (reduceTypes != null && reduceTypes.length == 2 &&
			combineTypes != null && combineTypes.length == 2) {

			if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) {
				return true;
			} else {
				LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " +
					"Generic types are incompatible.");
				return false;
			}
		}
		else if (reduceTypes == null || reduceTypes.length != 2) {
			LOG.warn("Cannot check generic types of GroupReduceFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
		else {
			LOG.warn("Cannot check generic types of GroupCombineFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
	}
	return false;
}
 
示例3
private boolean checkCombinability() {
	if (function instanceof GroupCombineFunction || function instanceof CombineFunction) {

		// check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e.,
		//   GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>.
		// This is a best effort check. If the check cannot be done, we might fail at runtime.
		Type[] reduceTypes = null;
		Type[] combineTypes = null;

		Type[] genInterfaces = function.getClass().getGenericInterfaces();
		for (Type genInterface : genInterfaces) {
			if (genInterface instanceof ParameterizedType) {
				// get parameters of GroupReduceFunction
				if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) {
					reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				// get parameters of GroupCombineFunction
				} else if ((((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) ||
					(((ParameterizedType) genInterface).getRawType().equals(CombineFunction.class))) {

					combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments();
				}
			}
		}

		if (reduceTypes != null && reduceTypes.length == 2 &&
			combineTypes != null && combineTypes.length == 2) {

			if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) {
				return true;
			} else {
				LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " +
					"Generic types are incompatible.");
				return false;
			}
		}
		else if (reduceTypes == null || reduceTypes.length != 2) {
			LOG.warn("Cannot check generic types of GroupReduceFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
		else {
			LOG.warn("Cannot check generic types of GroupCombineFunction. " +
				"Enabling combiner but combine function might fail at runtime.");
			return true;
		}
	}
	return false;
}