Java源码示例:org.apache.flink.optimizer.plan.NamedChannel

示例1
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
		List<PlanNode> target, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
		final Channel in = template.clone();
		ilp.parameterizeChannel(in);
		
		// instantiate a candidate, if the instantiated local properties meet one possible local property set
		outer:
		for (OperatorDescriptorSingle dps: getPossibleProperties()) {
			for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
				if (ilps.isMetBy(in.getLocalProperties())) {
					in.setRequiredLocalProps(ilps);
					instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
					break outer;
				}
			}
		}
	}
}
 
示例2
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
		List<PlanNode> target, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
		final Channel in = template.clone();
		ilp.parameterizeChannel(in);
		
		// instantiate a candidate, if the instantiated local properties meet one possible local property set
		outer:
		for (OperatorDescriptorSingle dps: getPossibleProperties()) {
			for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
				if (ilps.isMetBy(in.getLocalProperties())) {
					in.setRequiredLocalProps(ilps);
					instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
					break outer;
				}
			}
		}
	}
}
 
示例3
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
		List<PlanNode> target, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
		final Channel in = template.clone();
		ilp.parameterizeChannel(in);
		
		// instantiate a candidate, if the instantiated local properties meet one possible local property set
		outer:
		for (OperatorDescriptorSingle dps: getPossibleProperties()) {
			for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
				if (ilps.isMetBy(in.getLocalProperties())) {
					in.setRequiredLocalProps(ilps);
					instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
					break outer;
				}
			}
		}
	}
}
 
示例4
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
		List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
	final PlanNode inputSource = in.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		boolean requiresPipelinebreaker = false;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			// check branch compatibility against input
			if (!areBranchCompatible(bcSource, inputSource)) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
			
			// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
			if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
				for (OptimizerNode brancher : this.hereJoinedBranches) {
					PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
					
					if (candAtBrancher == null) {
						// closed branch between two broadcast variables
						continue;
					}
					
					SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
					if (res == NOT_FOUND) {
						throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
					} else if (res == FOUND_SOURCE) {
						requiresPipelinebreaker = true;
						break;
					} else if (res == FOUND_SOURCE_AND_DAM) {
						// good
					} else {
						throw new CompilerException();
					}
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		if (requiresPipelinebreaker) {
			in.setTempMode(in.getTempMode().makePipelineBreaker());
		}
		
		final SingleInputPlanNode node = dps.instantiate(in, this);
		node.setBroadcastInputs(broadcastChannelsCombination);
		
		// compute how the strategy affects the properties
		GlobalProperties gProps = in.getGlobalProperties().clone();
		LocalProperties lProps = in.getLocalProperties().clone();
		gProps = dps.computeGlobalProperties(gProps);
		lProps = dps.computeLocalProperties(lProps);

		// filter by the user code field copies
		gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
		lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
		
		// apply
		node.initProperties(gProps, lProps);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}
 
示例5
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
		RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
		List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
		final Channel in1 = template1.clone();
		ilp1.parameterizeChannel(in1);
		
		for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
			final Channel in2 = template2.clone();
			ilp2.parameterizeChannel(in2);
			
			for (OperatorDescriptorDual dps: getProperties()) {
				for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
					if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
						lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
					{
						// valid combination
						// for non trivial local properties, we need to check that they are co compatible
						// (such as when some sort order is requested, that both are the same sort order
						if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
							in1.getLocalProperties(), in2.getLocalProperties()))
						{
							// copy, because setting required properties and instantiation may
							// change the channels and should not affect prior candidates
							Channel in1Copy = in1.clone();
							in1Copy.setRequiredLocalProps(lpp.getProperties1());
							
							Channel in2Copy = in2.clone();
							in2Copy.setRequiredLocalProps(lpp.getProperties2());
							
							// all right, co compatible
							instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
							break;
						}
						// else cannot use this pair, fall through the loop and try the next one
					}
				}
			}
		}
	}
}
 
示例6
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
		List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
		RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
		RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
	final PlanNode inputSource1 = in1.getSource();
	final PlanNode inputSource2 = in2.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
		
		DualInputPlanNode node = operator.instantiate(in1, in2, this);
		node.setBroadcastInputs(broadcastChannelsCombination);

		SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
		GlobalProperties gp1 = in1.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
		GlobalProperties gp2 = in2.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
		GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);

		SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
		LocalProperties lp1 = in1.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
		LocalProperties lp2 = in2.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
		LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
		
		node.initProperties(combined, locals);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}
 
示例7
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
		List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
	final PlanNode inputSource = in.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		boolean requiresPipelinebreaker = false;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			// check branch compatibility against input
			if (!areBranchCompatible(bcSource, inputSource)) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
			
			// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
			if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
				for (OptimizerNode brancher : this.hereJoinedBranches) {
					PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
					
					if (candAtBrancher == null) {
						// closed branch between two broadcast variables
						continue;
					}
					
					SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
					if (res == NOT_FOUND) {
						throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
					} else if (res == FOUND_SOURCE) {
						requiresPipelinebreaker = true;
						break;
					} else if (res == FOUND_SOURCE_AND_DAM) {
						// good
					} else {
						throw new CompilerException();
					}
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		if (requiresPipelinebreaker) {
			in.setTempMode(in.getTempMode().makePipelineBreaker());
		}
		
		final SingleInputPlanNode node = dps.instantiate(in, this);
		node.setBroadcastInputs(broadcastChannelsCombination);
		
		// compute how the strategy affects the properties
		GlobalProperties gProps = in.getGlobalProperties().clone();
		LocalProperties lProps = in.getLocalProperties().clone();
		gProps = dps.computeGlobalProperties(gProps);
		lProps = dps.computeLocalProperties(lProps);

		// filter by the user code field copies
		gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
		lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
		
		// apply
		node.initProperties(gProps, lProps);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}
 
示例8
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
		RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
		List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
		final Channel in1 = template1.clone();
		ilp1.parameterizeChannel(in1);
		
		for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
			final Channel in2 = template2.clone();
			ilp2.parameterizeChannel(in2);
			
			for (OperatorDescriptorDual dps: getProperties()) {
				for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
					if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
						lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
					{
						// valid combination
						// for non trivial local properties, we need to check that they are co compatible
						// (such as when some sort order is requested, that both are the same sort order
						if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
							in1.getLocalProperties(), in2.getLocalProperties()))
						{
							// copy, because setting required properties and instantiation may
							// change the channels and should not affect prior candidates
							Channel in1Copy = in1.clone();
							in1Copy.setRequiredLocalProps(lpp.getProperties1());
							
							Channel in2Copy = in2.clone();
							in2Copy.setRequiredLocalProps(lpp.getProperties2());
							
							// all right, co compatible
							instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
							break;
						}
						// else cannot use this pair, fall through the loop and try the next one
					}
				}
			}
		}
	}
}
 
示例9
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
		List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
		RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
		RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
	final PlanNode inputSource1 = in1.getSource();
	final PlanNode inputSource2 = in2.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
		
		DualInputPlanNode node = operator.instantiate(in1, in2, this);
		node.setBroadcastInputs(broadcastChannelsCombination);

		SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
		GlobalProperties gp1 = in1.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
		GlobalProperties gp2 = in2.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
		GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);

		SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
		LocalProperties lp1 = in1.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
		LocalProperties lp2 = in2.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
		LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
		
		node.initProperties(combined, locals);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}
 
示例10
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
		List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
	final PlanNode inputSource = in.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		boolean requiresPipelinebreaker = false;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			// check branch compatibility against input
			if (!areBranchCompatible(bcSource, inputSource)) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
			
			// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
			if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
				for (OptimizerNode brancher : this.hereJoinedBranches) {
					PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
					
					if (candAtBrancher == null) {
						// closed branch between two broadcast variables
						continue;
					}
					
					SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
					if (res == NOT_FOUND) {
						throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
					} else if (res == FOUND_SOURCE) {
						requiresPipelinebreaker = true;
						break;
					} else if (res == FOUND_SOURCE_AND_DAM) {
						// good
					} else {
						throw new CompilerException();
					}
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		if (requiresPipelinebreaker) {
			in.setTempMode(in.getTempMode().makePipelineBreaker());
		}
		
		final SingleInputPlanNode node = dps.instantiate(in, this);
		node.setBroadcastInputs(broadcastChannelsCombination);
		
		// compute how the strategy affects the properties
		GlobalProperties gProps = in.getGlobalProperties().clone();
		LocalProperties lProps = in.getLocalProperties().clone();
		gProps = dps.computeGlobalProperties(gProps);
		lProps = dps.computeLocalProperties(lProps);

		// filter by the user code field copies
		gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
		lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
		
		// apply
		node.initProperties(gProps, lProps);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}
 
示例11
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
		RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
		List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
	for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
		final Channel in1 = template1.clone();
		ilp1.parameterizeChannel(in1);
		
		for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
			final Channel in2 = template2.clone();
			ilp2.parameterizeChannel(in2);
			
			for (OperatorDescriptorDual dps: getProperties()) {
				for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
					if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
						lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
					{
						// valid combination
						// for non trivial local properties, we need to check that they are co compatible
						// (such as when some sort order is requested, that both are the same sort order
						if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
							in1.getLocalProperties(), in2.getLocalProperties()))
						{
							// copy, because setting required properties and instantiation may
							// change the channels and should not affect prior candidates
							Channel in1Copy = in1.clone();
							in1Copy.setRequiredLocalProps(lpp.getProperties1());
							
							Channel in2Copy = in2.clone();
							in2Copy.setRequiredLocalProps(lpp.getProperties2());
							
							// all right, co compatible
							instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
							break;
						}
						// else cannot use this pair, fall through the loop and try the next one
					}
				}
			}
		}
	}
}
 
示例12
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
		List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
		RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
		RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
	final PlanNode inputSource1 = in1.getSource();
	final PlanNode inputSource2 = in2.getSource();
	
	for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
		
		boolean validCombination = true;
		
		// check whether the broadcast inputs use the same plan candidate at the branching point
		for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
			NamedChannel nc = broadcastChannelsCombination.get(i);
			PlanNode bcSource = nc.getSource();
			
			if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
				validCombination = false;
				break;
			}
			
			// check branch compatibility against all other broadcast variables
			for (int k = 0; k < i; k++) {
				PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
				
				if (!areBranchCompatible(bcSource, otherBcSource)) {
					validCombination = false;
					break;
				}
			}
		}
		
		if (!validCombination) {
			continue;
		}
		
		placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
		
		DualInputPlanNode node = operator.instantiate(in1, in2, this);
		node.setBroadcastInputs(broadcastChannelsCombination);

		SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
		GlobalProperties gp1 = in1.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
		GlobalProperties gp2 = in2.getGlobalProperties().clone()
				.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
		GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);

		SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
		LocalProperties lp1 = in1.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
		LocalProperties lp2 = in2.getLocalProperties().clone()
				.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
		LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
		
		node.initProperties(combined, locals);
		node.updatePropertiesWithUniqueSets(getUniqueFields());
		target.add(node);
	}
}