Java源码示例:org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction
示例1
@Test
public void testTempInIterationTest() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
input.iterateDelta(input, 1, 0);
DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
iteration.closeWith(update, update)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jg = jgg.compileJobGraph(oPlan);
boolean solutionSetUpdateChecked = false;
for(JobVertex v : jg.getVertices()) {
if(v.getName().equals("SolutionSet Delta")) {
// check if input of solution set delta is temped
TaskConfig tc = new TaskConfig(v.getConfiguration());
assertTrue(tc.isInputAsynchronouslyMaterialized(0));
solutionSetUpdateChecked = true;
}
}
assertTrue(solutionSetUpdateChecked);
}
示例2
@Test
public void testBulkIterationInClosure() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> data1 = env.generateSequence(1, 100);
DataSet<Long> data2 = env.generateSequence(1, 100);
IterativeDataSet<Long> firstIteration = data1.iterate(100);
DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
DataSet<Long> joined = mainIteration.join(firstResult)
.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>());
DataSet<Long> mainResult = mainIteration.closeWith(joined);
mainResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
// optimizer should be able to translate this
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例3
/**
* <pre>
* +---------Iteration-------+
* | |
* /--map--< >----\ |
* / | \ /-------< >---sink
* src-map | join------/ |
* \ | / |
* \ +-----/-------------------+
* \ /
* \--reduce--/
* </pre>
*/
@Test
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
IterativeDataSet<Long> iteration = mapped.iterate(10);
iteration.closeWith(
iteration.join(reduced)
.where(new IdentityKeyExtractor<Long>())
.equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>()))
.output(new DiscardingOutputFormat<Long>());
compileNoStats(env.createProgramPlan());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例4
@Test
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Tuple2<Long, Long>> joinInput1 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinInput2 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinResult = joinInput1
.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
input
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例5
@Test
public void testTempInIterationTest() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
input.iterateDelta(input, 1, 0);
DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
iteration.closeWith(update, update)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jg = jgg.compileJobGraph(oPlan);
boolean solutionSetUpdateChecked = false;
for(JobVertex v : jg.getVertices()) {
if(v.getName().equals("SolutionSet Delta")) {
// check if input of solution set delta is temped
TaskConfig tc = new TaskConfig(v.getConfiguration());
assertTrue(tc.isInputAsynchronouslyMaterialized(0));
solutionSetUpdateChecked = true;
}
}
assertTrue(solutionSetUpdateChecked);
}
示例6
@Test
public void testBulkIterationInClosure() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> data1 = env.generateSequence(1, 100);
DataSet<Long> data2 = env.generateSequence(1, 100);
IterativeDataSet<Long> firstIteration = data1.iterate(100);
DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
DataSet<Long> joined = mainIteration.join(firstResult)
.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>());
DataSet<Long> mainResult = mainIteration.closeWith(joined);
mainResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
// optimizer should be able to translate this
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例7
/**
* <pre>
* +---------Iteration-------+
* | |
* /--map--< >----\ |
* / | \ /-------< >---sink
* src-map | join------/ |
* \ | / |
* \ +-----/-------------------+
* \ /
* \--reduce--/
* </pre>
*/
@Test
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
IterativeDataSet<Long> iteration = mapped.iterate(10);
iteration.closeWith(
iteration.join(reduced)
.where(new IdentityKeyExtractor<Long>())
.equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>()))
.output(new DiscardingOutputFormat<Long>());
compileNoStats(env.createProgramPlan());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例8
@Test
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Tuple2<Long, Long>> joinInput1 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinInput2 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinResult = joinInput1
.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
input
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例9
@Test
public void testTempInIterationTest() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
input.iterateDelta(input, 1, 0);
DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
iteration.closeWith(update, update)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jg = jgg.compileJobGraph(oPlan);
boolean solutionSetUpdateChecked = false;
for(JobVertex v : jg.getVertices()) {
if(v.getName().equals("SolutionSet Delta")) {
// check if input of solution set delta is temped
TaskConfig tc = new TaskConfig(v.getConfiguration());
assertTrue(tc.isInputAsynchronouslyMaterialized(0));
solutionSetUpdateChecked = true;
}
}
assertTrue(solutionSetUpdateChecked);
}
示例10
@Test
public void testBulkIterationInClosure() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> data1 = env.generateSequence(1, 100);
DataSet<Long> data2 = env.generateSequence(1, 100);
IterativeDataSet<Long> firstIteration = data1.iterate(100);
DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
DataSet<Long> joined = mainIteration.join(firstResult)
.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>());
DataSet<Long> mainResult = mainIteration.closeWith(joined);
mainResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
// optimizer should be able to translate this
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例11
/**
* <pre>
* +---------Iteration-------+
* | |
* /--map--< >----\ |
* / | \ /-------< >---sink
* src-map | join------/ |
* \ | / |
* \ +-----/-------------------+
* \ /
* \--reduce--/
* </pre>
*/
@Test
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
IterativeDataSet<Long> iteration = mapped.iterate(10);
iteration.closeWith(
iteration.join(reduced)
.where(new IdentityKeyExtractor<Long>())
.equalTo(new IdentityKeyExtractor<Long>())
.with(new DummyFlatJoinFunction<Long>()))
.output(new DiscardingOutputFormat<Long>());
compileNoStats(env.createProgramPlan());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例12
@Test
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
DataSet<Tuple2<Long, Long>> joinInput1 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinInput2 =
input.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.withBroadcastSet(bc_input2, "bc2");
DataSet<Tuple2<Long, Long>> joinResult = joinInput1
.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
input
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
示例13
@Test
public void testIncompatibleHashAndCustomPartitioning() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
DataSet<Tuple3<Long, Long, Long>> partitioned = input
.partitionCustom(new Partitioner<Long>() {
@Override
public int partition(Long key, int numPartitions) { return 0; }
}, 0)
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH ||
coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例14
private static JoinNode getJoinNode() {
return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
示例15
/**
* <pre>
* SINK
* |
* COGROUP
* +---/ \----+
* / \
* / MATCH10
* / | \
* / | MATCH9
* MATCH5 | | \
* | \ | | MATCH8
* | MATCH4 | | | \
* | | \ | | | MATCH7
* | | MATCH3 | | | | \
* | | | \ | | | | MATCH6
* | | | MATCH2 | | | | | |
* | | | | \ +--+--+--+--+--+
* | | | | MATCH1 MAP
* \ | | | | | /-----------/
* (DATA SOURCE ONE)
* </pre>
*/
@Test
public void testBranchingSourceMultipleTimes() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> mapped = source.map(
new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
return null;
}
});
DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
joined5.coGroup(joined10)
.where(1).equalTo(1)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例16
/**
*
* <pre>
* (SINK A)
* | (SINK B) (SINK C)
* CROSS / /
* / \ | +------+
* / \ | /
* REDUCE MATCH2
* | +---/ \
* \ / |
* MAP |
* | |
* COGROUP MATCH1
* / \ / \
* (SRC A) (SRC B) (SRC C)
* </pre>
*/
@Test
public void testBranchingWithMultipleDataSinks() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
.where(0).equalTo(1)
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first,
Iterable<Tuple2<Long, Long>> second,
Collector<Tuple2<Long, Long>> out) {
}
})
.map(new IdentityMapper<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
.where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
.where(1).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> reduced = mapped
.groupBy(1)
.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
reduced.cross(joined2)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例17
@Test
public void testIncompatibleHashAndCustomPartitioning() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
DataSet<Tuple3<Long, Long, Long>> partitioned = input
.partitionCustom(new Partitioner<Long>() {
@Override
public int partition(Long key, int numPartitions) { return 0; }
}, 0)
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH ||
coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例18
private static JoinNode getJoinNode() {
return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
示例19
/**
* <pre>
* SINK
* |
* COGROUP
* +---/ \----+
* / \
* / MATCH10
* / | \
* / | MATCH9
* MATCH5 | | \
* | \ | | MATCH8
* | MATCH4 | | | \
* | | \ | | | MATCH7
* | | MATCH3 | | | | \
* | | | \ | | | | MATCH6
* | | | MATCH2 | | | | | |
* | | | | \ +--+--+--+--+--+
* | | | | MATCH1 MAP
* \ | | | | | /-----------/
* (DATA SOURCE ONE)
* </pre>
*/
@Test
public void testBranchingSourceMultipleTimes() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> mapped = source.map(
new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
return null;
}
});
DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
joined5.coGroup(joined10)
.where(1).equalTo(1)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例20
/**
*
* <pre>
* (SINK A)
* | (SINK B) (SINK C)
* CROSS / /
* / \ | +------+
* / \ | /
* REDUCE MATCH2
* | +---/ \
* \ / |
* MAP |
* | |
* COGROUP MATCH1
* / \ / \
* (SRC A) (SRC B) (SRC C)
* </pre>
*/
@Test
public void testBranchingWithMultipleDataSinks() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
.where(0).equalTo(1)
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first,
Iterable<Tuple2<Long, Long>> second,
Collector<Tuple2<Long, Long>> out) {
}
})
.map(new IdentityMapper<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
.where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
.where(1).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> reduced = mapped
.groupBy(1)
.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
reduced.cross(joined2)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例21
@Test
public void testIncompatibleHashAndCustomPartitioning() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
DataSet<Tuple3<Long, Long, Long>> partitioned = input
.partitionCustom(new Partitioner<Long>() {
@Override
public int partition(Long key, int numPartitions) { return 0; }
}, 0)
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH ||
coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例22
private static JoinNode getJoinNode() {
return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
示例23
/**
* <pre>
* SINK
* |
* COGROUP
* +---/ \----+
* / \
* / MATCH10
* / | \
* / | MATCH9
* MATCH5 | | \
* | \ | | MATCH8
* | MATCH4 | | | \
* | | \ | | | MATCH7
* | | MATCH3 | | | | \
* | | | \ | | | | MATCH6
* | | | MATCH2 | | | | | |
* | | | | \ +--+--+--+--+--+
* | | | | MATCH1 MAP
* \ | | | | | /-----------/
* (DATA SOURCE ONE)
* </pre>
*/
@Test
public void testBranchingSourceMultipleTimes() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> mapped = source.map(
new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
return null;
}
});
DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
joined5.coGroup(joined10)
.where(1).equalTo(1)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例24
/**
*
* <pre>
* (SINK A)
* | (SINK B) (SINK C)
* CROSS / /
* / \ | +------+
* / \ | /
* REDUCE MATCH2
* | +---/ \
* \ / |
* MAP |
* | |
* COGROUP MATCH1
* / \ / \
* (SRC A) (SRC B) (SRC C)
* </pre>
*/
@Test
public void testBranchingWithMultipleDataSinks() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
.where(0).equalTo(1)
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first,
Iterable<Tuple2<Long, Long>> second,
Collector<Tuple2<Long, Long>> out) {
}
})
.map(new IdentityMapper<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
.where(0).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
.where(1).equalTo(1)
.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
DataSet<Tuple2<Long, Long>> reduced = mapped
.groupBy(1)
.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
reduced.cross(joined2)
.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}