Java源码示例:org.apache.flink.api.common.functions.JoinFunction

示例1
@Test
public void testInputInferenceWithCustomTupleAndRichFunction() {
	JoinFunction<CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>> function = new JoinWithCustomTuple2WithArray<>();

	TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(
		function,
		new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo(),
		new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo());

	Assert.assertTrue(ti.isTupleType());
	TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
	Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1));

	Assert.assertTrue(tti.getTypeAt(0) instanceof ObjectArrayTypeInfo<?, ?>);
	ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) tti.getTypeAt(0);
	Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, oati.getComponentInfo());
}
 
示例2
private void executeTaskWithGenerator(
		JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
		int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
	DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(joiner)
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());

	env.setParallelism(PARALLELISM);

	runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
 
示例3
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
	DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
	DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
				@Override
				public Tuple2<HeavyCompare, Integer> join(
					Tuple2<HeavyCompare, Integer> first,
					Tuple2<HeavyCompare, Integer> second) throws Exception {
					throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
				}
			})
			.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());

	runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
 
示例4
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
		TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
	return getBinaryOperatorReturnType(
		(Function) joinInterface,
		JoinFunction.class,
		0,
		1,
		2,
		NO_INDEX,
		in1Type,
		in2Type,
		functionName,
		allowMissing);
}
 
示例5
private static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
	JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
	ListStateDescriptor<STATE> listStateDescriptor) {

	JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
		List<STATE> newState = new ArrayList<>();
		for (STATE s : second) {
			newState.add(inputAndOldStateToNewState.join(first, s));
		}
		return newState;
	};

	return new ArtificialListStateBuilder<>(
		listStateDescriptor.getName(),
		listStateGenerator,
		listStateGenerator,
		listStateDescriptor);
}
 
示例6
/**
 * Completes the join operation with the user function that is executed
 * for each combination of elements with the same key in a window.
 *
 * <p>Note: This method's return type does not support setting an operator-specific parallelism.
 * Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
 * method to set an operator-specific parallelism.
 */
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
	TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
		function,
		JoinFunction.class,
		0,
		1,
		2,
		TypeExtractor.NO_INDEX,
		input1.getType(),
		input2.getType(),
		"Join",
		false);

	return apply(function, resultType);
}
 
示例7
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
		DataStream<Tuple2<String, Integer>> grades,
		DataStream<Tuple2<String, Integer>> salaries,
		long windowSize) {

	return grades.join(salaries)
			.where(new NameKeySelector())
			.equalTo(new NameKeySelector())

			.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))

			.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {

				@Override
				public Tuple3<String, Integer, Integer> join(
								Tuple2<String, Integer> first,
								Tuple2<String, Integer> second) {
					return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
				}
			});
}
 
示例8
public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // item dataSet
    String itemPath = "item.csv";
    String[] itemField = new String[]{"id", "price"};
    DataSet<Item> items = getSource(env, itemPath, itemField, Item.class);

    // info dataSet
    String infoPath = "info.csv";
    String[] infoField = new String[]{"id", "color", "country"};
    DataSet<Info> infos = getSource(env, infoPath, infoField, Info.class);
    // 关联两个dataset
    JoinOperator.DefaultJoin<Item, Info> dataSet = items.join(infos).where("id").equalTo("id");
    // 使用 joinFunction 处理合并后的两个dataSet
    dataSet.with(new JoinFunction<Item, Info, String>() {
        @Override
        public String join(Item item, Info info) throws Exception {
            return "商品ID:" + item.getId() + " 价格:"+item.getPrice() + " 颜色:"+ info.getColor() + " 国家:" + info.getCountry();
        }
    }).print();

}
 
示例9
private void executeTaskWithGenerator(
		JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
		int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
	DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(joiner)
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());

	env.setParallelism(PARALLELISM);

	runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
 
示例10
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
	DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
	DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
				@Override
				public Tuple2<HeavyCompare, Integer> join(
					Tuple2<HeavyCompare, Integer> first,
					Tuple2<HeavyCompare, Integer> second) throws Exception {
					throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
				}
			})
			.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());

	runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
 
示例11
/**
 * Completes the join operation with the user function that is executed
 * for each combination of elements with the same key in a window.
 *
 * <p>Note: This method's return type does not support setting an operator-specific parallelism.
 * Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
 * method to set an operator-specific parallelism.
 */
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
	TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
		function,
		JoinFunction.class,
		0,
		1,
		2,
		TypeExtractor.NO_INDEX,
		input1.getType(),
		input2.getType(),
		"Join",
		false);

	return apply(function, resultType);
}
 
示例12
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
		TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
	return getBinaryOperatorReturnType(
		(Function) joinInterface,
		JoinFunction.class,
		0,
		1,
		2,
		NO_INDEX,
		in1Type,
		in2Type,
		functionName,
		allowMissing);
}
 
示例13
@Test
public void testInputInferenceWithCustomTupleAndRichFunction() {
	JoinFunction<CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>> function = new JoinWithCustomTuple2WithArray<>();

	TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(
		function,
		new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo(),
		new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo());

	Assert.assertTrue(ti.isTupleType());
	TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
	Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1));

	Assert.assertTrue(tti.getTypeAt(0) instanceof ObjectArrayTypeInfo<?, ?>);
	ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) tti.getTypeAt(0);
	Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, oati.getComponentInfo());
}
 
示例14
private static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
	JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
	ListStateDescriptor<STATE> listStateDescriptor) {

	JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
		List<STATE> newState = new ArrayList<>();
		for (STATE s : second) {
			newState.add(inputAndOldStateToNewState.join(first, s));
		}
		return newState;
	};

	return new ArtificialListStateBuilder<>(
		listStateDescriptor.getName(),
		listStateGenerator,
		listStateGenerator,
		listStateDescriptor);
}
 
示例15
/**
 * Completes the join operation with the user function that is executed
 * for each combination of elements with the same key in a window.
 *
 * <p>Note: This method's return type does not support setting an operator-specific parallelism.
 * Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
 * method to set an operator-specific parallelism.
 */
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
	TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
		function,
		JoinFunction.class,
		0,
		1,
		2,
		TypeExtractor.NO_INDEX,
		input1.getType(),
		input2.getType(),
		"Join",
		false);

	return apply(function, resultType);
}
 
示例16
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
		DataStream<Tuple2<String, Integer>> grades,
		DataStream<Tuple2<String, Integer>> salaries,
		long windowSize) {

	return grades.join(salaries)
			.where(new NameKeySelector())
			.equalTo(new NameKeySelector())

			.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))

			.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {

				@Override
				public Tuple3<String, Integer, Integer> join(
								Tuple2<String, Integer> first,
								Tuple2<String, Integer> second) {
					return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
				}
			});
}
 
示例17
private void executeTaskWithGenerator(
		JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
		int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
	DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(joiner)
			.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());

	env.setParallelism(PARALLELISM);

	runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
 
示例18
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
	DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
	DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);

	input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
			.where(0)
			.equalTo(0)
			.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
				@Override
				public Tuple2<HeavyCompare, Integer> join(
					Tuple2<HeavyCompare, Integer> first,
					Tuple2<HeavyCompare, Integer> second) throws Exception {
					throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
				}
			})
			.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());

	runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
 
示例19
@Override
protected void testProgram() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(verticesPath).fieldDelimiter(" ").types(Long.class).name("Vertices");

	DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).name("Edges");

	DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.map(new MapFunction<Tuple1<Long>, Tuple2<Long, Long>>() {
		@Override
		public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
			return new Tuple2<>(value.f0, value.f0);
		}
	}).name("Assign Vertex Ids");

	DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesWithId.iterateDelta(verticesWithId, MAX_ITERATIONS, 0);

	JoinOperator<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset()
			.join(edges).where(0).equalTo(0)
			.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
				@Override
				public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
					return new Tuple2<>(second.f1, first.f1);
				}
			})
			.name("Join Candidate Id With Neighbor");

	CoGroupOperator<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors
			.coGroup(iteration.getSolutionSet()).where(0).equalTo(0)
			.with(new MinIdAndUpdate())
			.name("min Id and Update");

	iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(resultPath, "\n", " ").name("Result");

	env.execute("Workset Connected Components");
}
 
示例20
@Test
public void testJoinLambda() {
	JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;

	TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
	if (!(ti instanceof MissingTypeInfo)) {
		assertTrue(ti.isTupleType());
		assertEquals(2, ti.getArity());
		assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
		assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
	}
}
 
示例21
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
		Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
		TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
	super(input1, input2, keys1, keys2, returnType, hint, type);

	this.joinLocationName = joinLocationName;

	if (function == null) {
		throw new NullPointerException();
	}

	this.function = generatedFunction;

	UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
}
 
示例22
public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) {
	if (function == null) {
		throw new NullPointerException("Join function must not be null.");
	}
	FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<>(clean(function));
	TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type(), Utils.getCallLocationName(), true);
	return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
}
 
示例23
@Test
public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
	compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
 
示例24
private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
	return (Event first, ComplexPayload second) -> {
		verifyState(strPayload, second);
		boolean isLastEvent = second != null && first.getEventTime() <= second.getEventTime();
		return isLastEvent ? second : new ComplexPayload(first, strPayload);
	};
}
 
示例25
@Test
public void testJoinLambda() {
	JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;

	TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
	if (!(ti instanceof MissingTypeInfo)) {
		assertTrue(ti.isTupleType());
		assertEquals(2, ti.getArity());
		assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
		assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
	}
}
 
示例26
/**
 * Computes the intersection between the edge set and the given edge set. For all matching pairs,
 * only one edge will be in the resulting data set.
 *
 * @param edges edges to compute intersection with
 * @return edge set containing one edge for all matching pairs of the same edge
 */
private DataSet<Edge<K, EV>> getDistinctEdgeIntersection(DataSet<Edge<K, EV>> edges) {
	return this.getEdges()
			.join(edges)
			.where(0, 1, 2)
			.equalTo(0, 1, 2)
			.with(new JoinFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>>() {
				@Override
				public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
					return first;
				}
			}).withForwardedFieldsFirst("*").name("Intersect edges")
			.distinct()
				.name("Edges");
}
 
示例27
private static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
	JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
	ValueStateDescriptor<STATE> valueStateDescriptor) {

	return new ArtificialValueStateBuilder<>(
		valueStateDescriptor.getName(),
		inputAndOldStateToNewState,
		valueStateDescriptor);
}
 
示例28
public ArtificialMapStateBuilder(
	String stateName,
	JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
	TypeSerializer<K> keySerializer,
	TypeSerializer<V> valueSerializer) {

	super(stateName);
	this.keySerializer = keySerializer;
	this.valueSerializer = valueSerializer;
	this.stateValueGenerator = stateValueGenerator;
}
 
示例29
public ArtificialListStateBuilder(
	String stateName,
	JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
	JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
	ListStateDescriptor<STATE> listStateDescriptor) {
	super(stateName);
	this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
	this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
	this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
}
 
示例30
public ArtificialValueStateBuilder(
	String stateName,
	JoinFunction<IN, STATE, STATE> stateValueGenerator,
	ValueStateDescriptor<STATE> valueStateDescriptor) {
	super(stateName);
	this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
	this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
}