Java源码示例:org.apache.flink.optimizer.plan.NamedChannel
示例1
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
List<PlanNode> target, CostEstimator estimator)
{
for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
final Channel in = template.clone();
ilp.parameterizeChannel(in);
// instantiate a candidate, if the instantiated local properties meet one possible local property set
outer:
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
in.setRequiredLocalProps(ilps);
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
break outer;
}
}
}
}
}
示例2
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
List<PlanNode> target, CostEstimator estimator)
{
for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
final Channel in = template.clone();
ilp.parameterizeChannel(in);
// instantiate a candidate, if the instantiated local properties meet one possible local property set
outer:
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
in.setRequiredLocalProps(ilps);
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
break outer;
}
}
}
}
}
示例3
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
List<PlanNode> target, CostEstimator estimator)
{
for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
final Channel in = template.clone();
ilp.parameterizeChannel(in);
// instantiate a candidate, if the instantiated local properties meet one possible local property set
outer:
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
in.setRequiredLocalProps(ilps);
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
break outer;
}
}
}
}
}
示例4
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
final PlanNode inputSource = in.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
boolean requiresPipelinebreaker = false;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
// check branch compatibility against input
if (!areBranchCompatible(bcSource, inputSource)) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
for (OptimizerNode brancher : this.hereJoinedBranches) {
PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
if (candAtBrancher == null) {
// closed branch between two broadcast variables
continue;
}
SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
if (res == NOT_FOUND) {
throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
} else if (res == FOUND_SOURCE) {
requiresPipelinebreaker = true;
break;
} else if (res == FOUND_SOURCE_AND_DAM) {
// good
} else {
throw new CompilerException();
}
}
}
}
if (!validCombination) {
continue;
}
if (requiresPipelinebreaker) {
in.setTempMode(in.getTempMode().makePipelineBreaker());
}
final SingleInputPlanNode node = dps.instantiate(in, this);
node.setBroadcastInputs(broadcastChannelsCombination);
// compute how the strategy affects the properties
GlobalProperties gProps = in.getGlobalProperties().clone();
LocalProperties lProps = in.getLocalProperties().clone();
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);
// filter by the user code field copies
gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
// apply
node.initProperties(gProps, lProps);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}
示例5
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels,
RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
final Channel in1 = template1.clone();
ilp1.parameterizeChannel(in1);
for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
final Channel in2 = template2.clone();
ilp2.parameterizeChannel(in2);
for (OperatorDescriptorDual dps: getProperties()) {
for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
{
// valid combination
// for non trivial local properties, we need to check that they are co compatible
// (such as when some sort order is requested, that both are the same sort order
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
// copy, because setting required properties and instantiation may
// change the channels and should not affect prior candidates
Channel in1Copy = in1.clone();
in1Copy.setRequiredLocalProps(lpp.getProperties1());
Channel in2Copy = in2.clone();
in2Copy.setRequiredLocalProps(lpp.getProperties2());
// all right, co compatible
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
break;
}
// else cannot use this pair, fall through the loop and try the next one
}
}
}
}
}
}
示例6
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
final PlanNode inputSource1 = in1.getSource();
final PlanNode inputSource2 = in2.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
}
if (!validCombination) {
continue;
}
placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);
SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
GlobalProperties gp1 = in1.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
LocalProperties lp1 = in1.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
LocalProperties lp2 = in2.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
node.initProperties(combined, locals);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}
示例7
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
final PlanNode inputSource = in.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
boolean requiresPipelinebreaker = false;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
// check branch compatibility against input
if (!areBranchCompatible(bcSource, inputSource)) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
for (OptimizerNode brancher : this.hereJoinedBranches) {
PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
if (candAtBrancher == null) {
// closed branch between two broadcast variables
continue;
}
SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
if (res == NOT_FOUND) {
throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
} else if (res == FOUND_SOURCE) {
requiresPipelinebreaker = true;
break;
} else if (res == FOUND_SOURCE_AND_DAM) {
// good
} else {
throw new CompilerException();
}
}
}
}
if (!validCombination) {
continue;
}
if (requiresPipelinebreaker) {
in.setTempMode(in.getTempMode().makePipelineBreaker());
}
final SingleInputPlanNode node = dps.instantiate(in, this);
node.setBroadcastInputs(broadcastChannelsCombination);
// compute how the strategy affects the properties
GlobalProperties gProps = in.getGlobalProperties().clone();
LocalProperties lProps = in.getLocalProperties().clone();
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);
// filter by the user code field copies
gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
// apply
node.initProperties(gProps, lProps);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}
示例8
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels,
RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
final Channel in1 = template1.clone();
ilp1.parameterizeChannel(in1);
for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
final Channel in2 = template2.clone();
ilp2.parameterizeChannel(in2);
for (OperatorDescriptorDual dps: getProperties()) {
for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
{
// valid combination
// for non trivial local properties, we need to check that they are co compatible
// (such as when some sort order is requested, that both are the same sort order
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
// copy, because setting required properties and instantiation may
// change the channels and should not affect prior candidates
Channel in1Copy = in1.clone();
in1Copy.setRequiredLocalProps(lpp.getProperties1());
Channel in2Copy = in2.clone();
in2Copy.setRequiredLocalProps(lpp.getProperties2());
// all right, co compatible
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
break;
}
// else cannot use this pair, fall through the loop and try the next one
}
}
}
}
}
}
示例9
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
final PlanNode inputSource1 = in1.getSource();
final PlanNode inputSource2 = in2.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
}
if (!validCombination) {
continue;
}
placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);
SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
GlobalProperties gp1 = in1.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
LocalProperties lp1 = in1.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
LocalProperties lp2 = in2.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
node.initProperties(combined, locals);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}
示例10
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
{
final PlanNode inputSource = in.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
boolean requiresPipelinebreaker = false;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
// check branch compatibility against input
if (!areBranchCompatible(bcSource, inputSource)) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
if (in.isOnDynamicPath() && this.hereJoinedBranches != null) {
for (OptimizerNode brancher : this.hereJoinedBranches) {
PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
if (candAtBrancher == null) {
// closed branch between two broadcast variables
continue;
}
SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
if (res == NOT_FOUND) {
throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
} else if (res == FOUND_SOURCE) {
requiresPipelinebreaker = true;
break;
} else if (res == FOUND_SOURCE_AND_DAM) {
// good
} else {
throw new CompilerException();
}
}
}
}
if (!validCombination) {
continue;
}
if (requiresPipelinebreaker) {
in.setTempMode(in.getTempMode().makePipelineBreaker());
}
final SingleInputPlanNode node = dps.instantiate(in, this);
node.setBroadcastInputs(broadcastChannelsCombination);
// compute how the strategy affects the properties
GlobalProperties gProps = in.getGlobalProperties().clone();
LocalProperties lProps = in.getLocalProperties().clone();
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);
// filter by the user code field copies
gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);
// apply
node.initProperties(gProps, lProps);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}
示例11
protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels,
RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
final Channel in1 = template1.clone();
ilp1.parameterizeChannel(in1);
for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
final Channel in2 = template2.clone();
ilp2.parameterizeChannel(in2);
for (OperatorDescriptorDual dps: getProperties()) {
for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
{
// valid combination
// for non trivial local properties, we need to check that they are co compatible
// (such as when some sort order is requested, that both are the same sort order
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
// copy, because setting required properties and instantiation may
// change the channels and should not affect prior candidates
Channel in1Copy = in1.clone();
in1Copy.setRequiredLocalProps(lpp.getProperties1());
Channel in2Copy = in2.clone();
in2Copy.setRequiredLocalProps(lpp.getProperties2());
// all right, co compatible
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
break;
}
// else cannot use this pair, fall through the loop and try the next one
}
}
}
}
}
}
示例12
protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
{
final PlanNode inputSource1 = in1.getSource();
final PlanNode inputSource2 = in2.getSource();
for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
boolean validCombination = true;
// check whether the broadcast inputs use the same plan candidate at the branching point
for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
NamedChannel nc = broadcastChannelsCombination.get(i);
PlanNode bcSource = nc.getSource();
if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
validCombination = false;
break;
}
// check branch compatibility against all other broadcast variables
for (int k = 0; k < i; k++) {
PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
if (!areBranchCompatible(bcSource, otherBcSource)) {
validCombination = false;
break;
}
}
}
if (!validCombination) {
continue;
}
placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);
SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
GlobalProperties gp1 = in1.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
LocalProperties lp1 = in1.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
LocalProperties lp2 = in2.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
node.initProperties(combined, locals);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}