Java源码示例:org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree

示例1
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// s
	DataSet<Vertex<K, LongValue>> sourceIds = input
		.getEdges()
		.map(new MapEdgeToSourceId<>())
			.setParallelism(parallelism)
			.name("Edge to source ID");

	// s, d(s)
	DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		sourceDegree = input.getVertices()
			.leftOuterJoin(sourceDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return sourceDegree;
}
 
示例2
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// t
	DataSet<Vertex<K, LongValue>> targetIds = input
		.getEdges()
		.map(new MapEdgeToTargetId<>())
			.setParallelism(parallelism)
			.name("Edge to target ID");

	// t, d(t)
	DataSet<Vertex<K, LongValue>> targetDegree = targetIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		targetDegree = input.getVertices()
			.leftOuterJoin(targetDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return targetDegree;
}
 
示例3
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
		new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>();

	// v
	DataSet<Vertex<K, LongValue>> vertexIds = input
		.getEdges()
		.map(mapEdgeToId)
			.setParallelism(parallelism)
			.name("Edge to vertex ID");

	// v, deg(v)
	DataSet<Vertex<K, LongValue>> degree = vertexIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		degree = input
			.getVertices()
			.leftOuterJoin(degree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return degree;
}
 
示例4
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// s
	DataSet<Vertex<K, LongValue>> sourceIds = input
		.getEdges()
		.map(new MapEdgeToSourceId<>())
			.setParallelism(parallelism)
			.name("Edge to source ID");

	// s, d(s)
	DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		sourceDegree = input.getVertices()
			.leftOuterJoin(sourceDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return sourceDegree;
}
 
示例5
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// t
	DataSet<Vertex<K, LongValue>> targetIds = input
		.getEdges()
		.map(new MapEdgeToTargetId<>())
			.setParallelism(parallelism)
			.name("Edge to target ID");

	// t, d(t)
	DataSet<Vertex<K, LongValue>> targetDegree = targetIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		targetDegree = input.getVertices()
			.leftOuterJoin(targetDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return targetDegree;
}
 
示例6
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
		new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>();

	// v
	DataSet<Vertex<K, LongValue>> vertexIds = input
		.getEdges()
		.map(mapEdgeToId)
			.setParallelism(parallelism)
			.name("Edge to vertex ID");

	// v, deg(v)
	DataSet<Vertex<K, LongValue>> degree = vertexIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		degree = input
			.getVertices()
			.leftOuterJoin(degree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return degree;
}
 
示例7
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// s
	DataSet<Vertex<K, LongValue>> sourceIds = input
		.getEdges()
		.map(new MapEdgeToSourceId<>())
			.setParallelism(parallelism)
			.name("Edge to source ID");

	// s, d(s)
	DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		sourceDegree = input.getVertices()
			.leftOuterJoin(sourceDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return sourceDegree;
}
 
示例8
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// t
	DataSet<Vertex<K, LongValue>> targetIds = input
		.getEdges()
		.map(new MapEdgeToTargetId<>())
			.setParallelism(parallelism)
			.name("Edge to target ID");

	// t, d(t)
	DataSet<Vertex<K, LongValue>> targetDegree = targetIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		targetDegree = input.getVertices()
			.leftOuterJoin(targetDegree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return targetDegree;
}
 
示例9
@Override
public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
		new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>();

	// v
	DataSet<Vertex<K, LongValue>> vertexIds = input
		.getEdges()
		.map(mapEdgeToId)
			.setParallelism(parallelism)
			.name("Edge to vertex ID");

	// v, deg(v)
	DataSet<Vertex<K, LongValue>> degree = vertexIds
		.groupBy(0)
		.reduce(new DegreeCount<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Degree count");

	if (includeZeroDegreeVertices.get()) {
		degree = input
			.getVertices()
			.leftOuterJoin(degree)
			.where(0)
			.equalTo(0)
			.with(new JoinVertexWithVertexDegree<>())
				.setParallelism(parallelism)
				.name("Zero degree vertices");
	}

	return degree;
}