Java源码示例:org.apache.flink.api.common.functions.JoinFunction
示例1
@Test
public void testInputInferenceWithCustomTupleAndRichFunction() {
JoinFunction<CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>> function = new JoinWithCustomTuple2WithArray<>();
TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(
function,
new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo(),
new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo());
Assert.assertTrue(ti.isTupleType());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1));
Assert.assertTrue(tti.getTypeAt(0) instanceof ObjectArrayTypeInfo<?, ?>);
ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) tti.getTypeAt(0);
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, oati.getComponentInfo());
}
示例2
private void executeTaskWithGenerator(
JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
示例3
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
@Override
public Tuple2<HeavyCompare, Integer> join(
Tuple2<HeavyCompare, Integer> first,
Tuple2<HeavyCompare, Integer> second) throws Exception {
throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
}
})
.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());
runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
示例4
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) joinInterface,
JoinFunction.class,
0,
1,
2,
NO_INDEX,
in1Type,
in2Type,
functionName,
allowMissing);
}
示例5
private static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
ListStateDescriptor<STATE> listStateDescriptor) {
JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
List<STATE> newState = new ArrayList<>();
for (STATE s : second) {
newState.add(inputAndOldStateToNewState.join(first, s));
}
return newState;
};
return new ArtificialListStateBuilder<>(
listStateDescriptor.getName(),
listStateGenerator,
listStateGenerator,
listStateDescriptor);
}
示例6
/**
* Completes the join operation with the user function that is executed
* for each combination of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific parallelism.
* Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
* method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
JoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
input1.getType(),
input2.getType(),
"Join",
false);
return apply(function, resultType);
}
示例7
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first,
Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
}
});
}
示例8
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// item dataSet
String itemPath = "item.csv";
String[] itemField = new String[]{"id", "price"};
DataSet<Item> items = getSource(env, itemPath, itemField, Item.class);
// info dataSet
String infoPath = "info.csv";
String[] infoField = new String[]{"id", "color", "country"};
DataSet<Info> infos = getSource(env, infoPath, infoField, Info.class);
// 关联两个dataset
JoinOperator.DefaultJoin<Item, Info> dataSet = items.join(infos).where("id").equalTo("id");
// 使用 joinFunction 处理合并后的两个dataSet
dataSet.with(new JoinFunction<Item, Info, String>() {
@Override
public String join(Item item, Info info) throws Exception {
return "商品ID:" + item.getId() + " 价格:"+item.getPrice() + " 颜色:"+ info.getColor() + " 国家:" + info.getCountry();
}
}).print();
}
示例9
private void executeTaskWithGenerator(
JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
示例10
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
@Override
public Tuple2<HeavyCompare, Integer> join(
Tuple2<HeavyCompare, Integer> first,
Tuple2<HeavyCompare, Integer> second) throws Exception {
throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
}
})
.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());
runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
示例11
/**
* Completes the join operation with the user function that is executed
* for each combination of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific parallelism.
* Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
* method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
JoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
input1.getType(),
input2.getType(),
"Join",
false);
return apply(function, resultType);
}
示例12
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) joinInterface,
JoinFunction.class,
0,
1,
2,
NO_INDEX,
in1Type,
in2Type,
functionName,
allowMissing);
}
示例13
@Test
public void testInputInferenceWithCustomTupleAndRichFunction() {
JoinFunction<CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>, CustomTuple2WithArray<Long>> function = new JoinWithCustomTuple2WithArray<>();
TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(
function,
new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo(),
new TypeHint<CustomTuple2WithArray<Long>>(){}.getTypeInfo());
Assert.assertTrue(ti.isTupleType());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1));
Assert.assertTrue(tti.getTypeAt(0) instanceof ObjectArrayTypeInfo<?, ?>);
ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) tti.getTypeAt(0);
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, oati.getComponentInfo());
}
示例14
private static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
ListStateDescriptor<STATE> listStateDescriptor) {
JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
List<STATE> newState = new ArrayList<>();
for (STATE s : second) {
newState.add(inputAndOldStateToNewState.join(first, s));
}
return newState;
};
return new ArtificialListStateBuilder<>(
listStateDescriptor.getName(),
listStateGenerator,
listStateGenerator,
listStateDescriptor);
}
示例15
/**
* Completes the join operation with the user function that is executed
* for each combination of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific parallelism.
* Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(JoinFunction)}
* method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
JoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
input1.getType(),
input2.getType(),
"Join",
false);
return apply(function, resultType);
}
示例16
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first,
Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
}
});
}
示例17
private void executeTaskWithGenerator(
JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner,
int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input1 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
DataSet<Tuple2<Integer, Integer>> input2 = env.createInput(new UniformIntTupleGeneratorInputFormat(keys, vals));
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
示例18
@Test
public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
DataSet<Tuple2<HeavyCompare, Integer>> input1 = env.createInput(input);
DataSet<Tuple2<HeavyCompare, Integer>> input2 = env.createInput(input);
input1.join(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>() {
@Override
public Tuple2<HeavyCompare, Integer> join(
Tuple2<HeavyCompare, Integer> first,
Tuple2<HeavyCompare, Integer> second) throws Exception {
throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
}
})
.output(new DiscardingOutputFormat<Tuple2<HeavyCompare, Integer>>());
runAndCancelJob(env.createProgramPlan(), 30 * 1000, 60 * 1000);
}
示例19
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(verticesPath).fieldDelimiter(" ").types(Long.class).name("Vertices");
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).name("Edges");
DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.map(new MapFunction<Tuple1<Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
return new Tuple2<>(value.f0, value.f0);
}
}).name("Assign Vertex Ids");
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesWithId.iterateDelta(verticesWithId, MAX_ITERATIONS, 0);
JoinOperator<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset()
.join(edges).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
return new Tuple2<>(second.f1, first.f1);
}
})
.name("Join Candidate Id With Neighbor");
CoGroupOperator<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors
.coGroup(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new MinIdAndUpdate())
.name("min Id and Update");
iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(resultPath, "\n", " ").name("Result");
env.execute("Workset Connected Components");
}
示例20
@Test
public void testJoinLambda() {
JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
if (!(ti instanceof MissingTypeInfo)) {
assertTrue(ti.isTupleType());
assertEquals(2, ti.getArity());
assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
}
示例21
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
super(input1, input2, keys1, keys2, returnType, hint, type);
this.joinLocationName = joinLocationName;
if (function == null) {
throw new NullPointerException();
}
this.function = generatedFunction;
UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
}
示例22
public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Join function must not be null.");
}
FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<>(clean(function));
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type(), Utils.getCallLocationName(), true);
return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
}
示例23
@Test
public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
示例24
private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
return (Event first, ComplexPayload second) -> {
verifyState(strPayload, second);
boolean isLastEvent = second != null && first.getEventTime() <= second.getEventTime();
return isLastEvent ? second : new ComplexPayload(first, strPayload);
};
}
示例25
@Test
public void testJoinLambda() {
JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
if (!(ti instanceof MissingTypeInfo)) {
assertTrue(ti.isTupleType());
assertEquals(2, ti.getArity());
assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
}
示例26
/**
* Computes the intersection between the edge set and the given edge set. For all matching pairs,
* only one edge will be in the resulting data set.
*
* @param edges edges to compute intersection with
* @return edge set containing one edge for all matching pairs of the same edge
*/
private DataSet<Edge<K, EV>> getDistinctEdgeIntersection(DataSet<Edge<K, EV>> edges) {
return this.getEdges()
.join(edges)
.where(0, 1, 2)
.equalTo(0, 1, 2)
.with(new JoinFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>>() {
@Override
public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
return first;
}
}).withForwardedFieldsFirst("*").name("Intersect edges")
.distinct()
.name("Edges");
}
示例27
private static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
ValueStateDescriptor<STATE> valueStateDescriptor) {
return new ArtificialValueStateBuilder<>(
valueStateDescriptor.getName(),
inputAndOldStateToNewState,
valueStateDescriptor);
}
示例28
public ArtificialMapStateBuilder(
String stateName,
JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer) {
super(stateName);
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.stateValueGenerator = stateValueGenerator;
}
示例29
public ArtificialListStateBuilder(
String stateName,
JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
ListStateDescriptor<STATE> listStateDescriptor) {
super(stateName);
this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
}
示例30
public ArtificialValueStateBuilder(
String stateName,
JoinFunction<IN, STATE, STATE> stateValueGenerator,
ValueStateDescriptor<STATE> valueStateDescriptor) {
super(stateName);
this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
}