Java源码示例:org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction

示例1
@Test
public void testTempInIterationTest() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);

	DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
			input.iterateDelta(input, 1, 0);

	DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
			.join(iteration.getSolutionSet()).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

	iteration.closeWith(update, update)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());


	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);

	JobGraphGenerator jgg = new JobGraphGenerator();
	JobGraph jg = jgg.compileJobGraph(oPlan);

	boolean solutionSetUpdateChecked = false;
	for(JobVertex v : jg.getVertices()) {
		if(v.getName().equals("SolutionSet Delta")) {

			// check if input of solution set delta is temped
			TaskConfig tc = new TaskConfig(v.getConfiguration());
			assertTrue(tc.isInputAsynchronouslyMaterialized(0));
			solutionSetUpdateChecked = true;
		}
	}
	assertTrue(solutionSetUpdateChecked);

}
 
示例2
@Test
public void testBulkIterationInClosure() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Long> data1 = env.generateSequence(1, 100);
		DataSet<Long> data2 = env.generateSequence(1, 100);
		
		IterativeDataSet<Long> firstIteration = data1.iterate(100);
		
		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
		
		
		IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
		
		DataSet<Long> joined = mainIteration.join(firstResult)
				.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
				.with(new DummyFlatJoinFunction<Long>());
		
		DataSet<Long> mainResult = mainIteration.closeWith(joined);
		
		mainResult.output(new DiscardingOutputFormat<Long>());
		
		Plan p = env.createProgramPlan();
		
		// optimizer should be able to translate this
		OptimizedPlan op = compileNoStats(p);
		
		// job graph generator should be able to translate this
		new JobGraphGenerator().compileJobGraph(op);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例3
/**
 * <pre>
 *             +---------Iteration-------+
 *             |                         |
 *    /--map--< >----\                   |
 *   /         |      \         /-------< >---sink
 * src-map     |     join------/         |
 *   \         |      /                  |
 *    \        +-----/-------------------+
 *     \            /
 *      \--reduce--/
 * </pre>
 */
@Test
public void testIterationWithStaticInput() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> source = env.generateSequence(1, 1000000);

		DataSet<Long> mapped = source.map(new IdentityMapper<Long>());

		DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());

		IterativeDataSet<Long> iteration = mapped.iterate(10);
		iteration.closeWith(
				iteration.join(reduced)
						.where(new IdentityKeyExtractor<Long>())
						.equalTo(new IdentityKeyExtractor<Long>())
						.with(new DummyFlatJoinFunction<Long>()))
				.output(new DiscardingOutputFormat<Long>());

		compileNoStats(env.createProgramPlan());
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例4
@Test
public void testBranchesOnlyInBCVariables2() {
	try{
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
		
		DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
		DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
		
		DataSet<Tuple2<Long, Long>> joinInput1 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinInput2 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1, "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinResult = joinInput1
			.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
			.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
		
		input
			.map(new IdentityMapper<Tuple2<Long,Long>>())
				.withBroadcastSet(bc_input1, "bc1")
			.union(joinResult)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan plan = env.createProgramPlan();
		compileNoStats(plan);
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例5
@Test
public void testTempInIterationTest() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);

	DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
			input.iterateDelta(input, 1, 0);

	DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
			.join(iteration.getSolutionSet()).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

	iteration.closeWith(update, update)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());


	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);

	JobGraphGenerator jgg = new JobGraphGenerator();
	JobGraph jg = jgg.compileJobGraph(oPlan);

	boolean solutionSetUpdateChecked = false;
	for(JobVertex v : jg.getVertices()) {
		if(v.getName().equals("SolutionSet Delta")) {

			// check if input of solution set delta is temped
			TaskConfig tc = new TaskConfig(v.getConfiguration());
			assertTrue(tc.isInputAsynchronouslyMaterialized(0));
			solutionSetUpdateChecked = true;
		}
	}
	assertTrue(solutionSetUpdateChecked);

}
 
示例6
@Test
public void testBulkIterationInClosure() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Long> data1 = env.generateSequence(1, 100);
		DataSet<Long> data2 = env.generateSequence(1, 100);
		
		IterativeDataSet<Long> firstIteration = data1.iterate(100);
		
		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
		
		
		IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
		
		DataSet<Long> joined = mainIteration.join(firstResult)
				.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
				.with(new DummyFlatJoinFunction<Long>());
		
		DataSet<Long> mainResult = mainIteration.closeWith(joined);
		
		mainResult.output(new DiscardingOutputFormat<Long>());
		
		Plan p = env.createProgramPlan();
		
		// optimizer should be able to translate this
		OptimizedPlan op = compileNoStats(p);
		
		// job graph generator should be able to translate this
		new JobGraphGenerator().compileJobGraph(op);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例7
/**
 * <pre>
 *             +---------Iteration-------+
 *             |                         |
 *    /--map--< >----\                   |
 *   /         |      \         /-------< >---sink
 * src-map     |     join------/         |
 *   \         |      /                  |
 *    \        +-----/-------------------+
 *     \            /
 *      \--reduce--/
 * </pre>
 */
@Test
public void testIterationWithStaticInput() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> source = env.generateSequence(1, 1000000);

		DataSet<Long> mapped = source.map(new IdentityMapper<Long>());

		DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());

		IterativeDataSet<Long> iteration = mapped.iterate(10);
		iteration.closeWith(
				iteration.join(reduced)
						.where(new IdentityKeyExtractor<Long>())
						.equalTo(new IdentityKeyExtractor<Long>())
						.with(new DummyFlatJoinFunction<Long>()))
				.output(new DiscardingOutputFormat<Long>());

		compileNoStats(env.createProgramPlan());
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例8
@Test
public void testBranchesOnlyInBCVariables2() {
	try{
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
		
		DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
		DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
		
		DataSet<Tuple2<Long, Long>> joinInput1 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinInput2 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1, "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinResult = joinInput1
			.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
			.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
		
		input
			.map(new IdentityMapper<Tuple2<Long,Long>>())
				.withBroadcastSet(bc_input1, "bc1")
			.union(joinResult)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan plan = env.createProgramPlan();
		compileNoStats(plan);
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例9
@Test
public void testTempInIterationTest() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);

	DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
			input.iterateDelta(input, 1, 0);

	DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
			.join(iteration.getSolutionSet()).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

	iteration.closeWith(update, update)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());


	Plan plan = env.createProgramPlan();
	OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);

	JobGraphGenerator jgg = new JobGraphGenerator();
	JobGraph jg = jgg.compileJobGraph(oPlan);

	boolean solutionSetUpdateChecked = false;
	for(JobVertex v : jg.getVertices()) {
		if(v.getName().equals("SolutionSet Delta")) {

			// check if input of solution set delta is temped
			TaskConfig tc = new TaskConfig(v.getConfiguration());
			assertTrue(tc.isInputAsynchronouslyMaterialized(0));
			solutionSetUpdateChecked = true;
		}
	}
	assertTrue(solutionSetUpdateChecked);

}
 
示例10
@Test
public void testBulkIterationInClosure() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Long> data1 = env.generateSequence(1, 100);
		DataSet<Long> data2 = env.generateSequence(1, 100);
		
		IterativeDataSet<Long> firstIteration = data1.iterate(100);
		
		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
		
		
		IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
		
		DataSet<Long> joined = mainIteration.join(firstResult)
				.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
				.with(new DummyFlatJoinFunction<Long>());
		
		DataSet<Long> mainResult = mainIteration.closeWith(joined);
		
		mainResult.output(new DiscardingOutputFormat<Long>());
		
		Plan p = env.createProgramPlan();
		
		// optimizer should be able to translate this
		OptimizedPlan op = compileNoStats(p);
		
		// job graph generator should be able to translate this
		new JobGraphGenerator().compileJobGraph(op);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例11
/**
 * <pre>
 *             +---------Iteration-------+
 *             |                         |
 *    /--map--< >----\                   |
 *   /         |      \         /-------< >---sink
 * src-map     |     join------/         |
 *   \         |      /                  |
 *    \        +-----/-------------------+
 *     \            /
 *      \--reduce--/
 * </pre>
 */
@Test
public void testIterationWithStaticInput() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Long> source = env.generateSequence(1, 1000000);

		DataSet<Long> mapped = source.map(new IdentityMapper<Long>());

		DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());

		IterativeDataSet<Long> iteration = mapped.iterate(10);
		iteration.closeWith(
				iteration.join(reduced)
						.where(new IdentityKeyExtractor<Long>())
						.equalTo(new IdentityKeyExtractor<Long>())
						.with(new DummyFlatJoinFunction<Long>()))
				.output(new DiscardingOutputFormat<Long>());

		compileNoStats(env.createProgramPlan());
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例12
@Test
public void testBranchesOnlyInBCVariables2() {
	try{
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(100);

		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
		
		DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
		DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
		
		DataSet<Tuple2<Long, Long>> joinInput1 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinInput2 =
				input.map(new IdentityMapper<Tuple2<Long,Long>>())
					.withBroadcastSet(bc_input1, "bc1")
					.withBroadcastSet(bc_input2, "bc2");
		
		DataSet<Tuple2<Long, Long>> joinResult = joinInput1
			.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
			.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
		
		input
			.map(new IdentityMapper<Tuple2<Long,Long>>())
				.withBroadcastSet(bc_input1, "bc1")
			.union(joinResult)
			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		
		Plan plan = env.createProgramPlan();
		compileNoStats(plan);
	}
	catch(Exception e){
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例13
@Test
public void testIncompatibleHashAndCustomPartitioning() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
		
		DataSet<Tuple3<Long, Long, Long>> partitioned = input
			.partitionCustom(new Partitioner<Long>() {
				@Override
				public int partition(Long key, int numPartitions) { return 0; }
			}, 0)
			.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
			
		
		DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
			.distinct(0, 1)
			.groupBy(1)
			.sortGroup(0, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
		
		grouped
			.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
			.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();

		assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
		assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || 
					coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例14
private static JoinNode getJoinNode() {
	return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
 
示例15
/**
 * <pre>
 *                              SINK
 *                               |
 *                            COGROUP
 *                        +---/    \----+
 *                       /               \
 *                      /             MATCH10
 *                     /               |    \
 *                    /                |  MATCH9
 *                MATCH5               |  |   \
 *                |   \                |  | MATCH8
 *                | MATCH4             |  |  |   \
 *                |  |   \             |  |  | MATCH7
 *                |  | MATCH3          |  |  |  |   \
 *                |  |  |   \          |  |  |  | MATCH6
 *                |  |  | MATCH2       |  |  |  |  |  |
 *                |  |  |  |   \       +--+--+--+--+--+
 *                |  |  |  | MATCH1            MAP 
 *                \  |  |  |  |  | /-----------/
 *                (DATA SOURCE ONE)
 * </pre>
 */
@Test
public void testBranchingSourceMultipleTimes() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
			.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
													.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> mapped = source.map(
				new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
					@Override
					public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
						return null;
					}
		});

		DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());


		joined5.coGroup(joined10)
				.where(1).equalTo(1)
				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())

			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例16
/**
 * 
 * <pre>

 *              (SINK A)
 *                  |    (SINK B)    (SINK C)
 *                CROSS    /          /
 *               /     \   |  +------+
 *              /       \  | /
 *          REDUCE      MATCH2
 *             |    +---/    \
 *              \  /          |
 *               MAP          |
 *                |           |
 *             COGROUP      MATCH1
 *             /     \     /     \
 *        (SRC A)    (SRC B)    (SRC C)
 * </pre>
 */
@Test
public void testBranchingWithMultipleDataSinks() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
				.where(0).equalTo(1)
				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
						@Override
						public void coGroup(Iterable<Tuple2<Long, Long>> first,
												Iterable<Tuple2<Long, Long>> second,
												Collector<Tuple2<Long, Long>> out) {
						  }
				})
				.map(new IdentityMapper<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
				.where(0).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
				.where(1).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> reduced = mapped
				.groupBy(1)
				.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());

		reduced.cross(joined2)
				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例17
@Test
public void testIncompatibleHashAndCustomPartitioning() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
		
		DataSet<Tuple3<Long, Long, Long>> partitioned = input
			.partitionCustom(new Partitioner<Long>() {
				@Override
				public int partition(Long key, int numPartitions) { return 0; }
			}, 0)
			.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
			
		
		DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
			.distinct(0, 1)
			.groupBy(1)
			.sortGroup(0, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
		
		grouped
			.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
			.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();

		assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
		assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || 
					coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例18
private static JoinNode getJoinNode() {
	return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
 
示例19
/**
 * <pre>
 *                              SINK
 *                               |
 *                            COGROUP
 *                        +---/    \----+
 *                       /               \
 *                      /             MATCH10
 *                     /               |    \
 *                    /                |  MATCH9
 *                MATCH5               |  |   \
 *                |   \                |  | MATCH8
 *                | MATCH4             |  |  |   \
 *                |  |   \             |  |  | MATCH7
 *                |  | MATCH3          |  |  |  |   \
 *                |  |  |   \          |  |  |  | MATCH6
 *                |  |  | MATCH2       |  |  |  |  |  |
 *                |  |  |  |   \       +--+--+--+--+--+
 *                |  |  |  | MATCH1            MAP 
 *                \  |  |  |  |  | /-----------/
 *                (DATA SOURCE ONE)
 * </pre>
 */
@Test
public void testBranchingSourceMultipleTimes() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
			.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
													.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> mapped = source.map(
				new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
					@Override
					public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
						return null;
					}
		});

		DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());


		joined5.coGroup(joined10)
				.where(1).equalTo(1)
				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())

			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例20
/**
 * 
 * <pre>

 *              (SINK A)
 *                  |    (SINK B)    (SINK C)
 *                CROSS    /          /
 *               /     \   |  +------+
 *              /       \  | /
 *          REDUCE      MATCH2
 *             |    +---/    \
 *              \  /          |
 *               MAP          |
 *                |           |
 *             COGROUP      MATCH1
 *             /     \     /     \
 *        (SRC A)    (SRC B)    (SRC C)
 * </pre>
 */
@Test
public void testBranchingWithMultipleDataSinks() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
				.where(0).equalTo(1)
				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
						@Override
						public void coGroup(Iterable<Tuple2<Long, Long>> first,
												Iterable<Tuple2<Long, Long>> second,
												Collector<Tuple2<Long, Long>> out) {
						  }
				})
				.map(new IdentityMapper<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
				.where(0).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
				.where(1).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> reduced = mapped
				.groupBy(1)
				.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());

		reduced.cross(joined2)
				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例21
@Test
public void testIncompatibleHashAndCustomPartitioning() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
		
		DataSet<Tuple3<Long, Long, Long>> partitioned = input
			.partitionCustom(new Partitioner<Long>() {
				@Override
				public int partition(Long key, int numPartitions) { return 0; }
			}, 0)
			.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
			
		
		DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
			.distinct(0, 1)
			.groupBy(1)
			.sortGroup(0, Order.ASCENDING)
			.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
		
		grouped
			.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
			.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
			.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		SinkPlanNode sink = op.getDataSinks().iterator().next();
		DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();

		assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
		assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || 
					coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例22
private static JoinNode getJoinNode() {
	return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
 
示例23
/**
 * <pre>
 *                              SINK
 *                               |
 *                            COGROUP
 *                        +---/    \----+
 *                       /               \
 *                      /             MATCH10
 *                     /               |    \
 *                    /                |  MATCH9
 *                MATCH5               |  |   \
 *                |   \                |  | MATCH8
 *                | MATCH4             |  |  |   \
 *                |  |   \             |  |  | MATCH7
 *                |  | MATCH3          |  |  |  |   \
 *                |  |  |   \          |  |  |  | MATCH6
 *                |  |  | MATCH2       |  |  |  |  |  |
 *                |  |  |  |   \       +--+--+--+--+--+
 *                |  |  |  | MATCH1            MAP 
 *                \  |  |  |  |  | /-----------/
 *                (DATA SOURCE ONE)
 * </pre>
 */
@Test
public void testBranchingSourceMultipleTimes() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
			.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
													.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> mapped = source.map(
				new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
					@Override
					public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
						return null;
					}
		});

		DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());


		joined5.coGroup(joined10)
				.where(1).equalTo(1)
				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())

			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例24
/**
 * 
 * <pre>

 *              (SINK A)
 *                  |    (SINK B)    (SINK C)
 *                CROSS    /          /
 *               /     \   |  +------+
 *              /       \  | /
 *          REDUCE      MATCH2
 *             |    +---/    \
 *              \  /          |
 *               MAP          |
 *                |           |
 *             COGROUP      MATCH1
 *             /     \     /     \
 *        (SRC A)    (SRC B)    (SRC C)
 * </pre>
 */
@Test
public void testBranchingWithMultipleDataSinks() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(DEFAULT_PARALLELISM);

		DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
				.map(new Duplicator<Long>());

		DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
				.where(0).equalTo(1)
				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
						@Override
						public void coGroup(Iterable<Tuple2<Long, Long>> first,
												Iterable<Tuple2<Long, Long>> second,
												Collector<Tuple2<Long, Long>> out) {
						  }
				})
				.map(new IdentityMapper<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
				.where(0).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
				.where(1).equalTo(1)
				.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());

		DataSet<Tuple2<Long, Long>> reduced = mapped
				.groupBy(1)
				.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());

		reduced.cross(joined2)
				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());

		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
		joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

		Plan plan = env.createProgramPlan();
		OptimizedPlan oPlan = compileNoStats(plan);
		new JobGraphGenerator().compileJobGraph(oPlan);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}