Java源码示例:org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair

示例1
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
		throws Exception {
	super.run(input);

	// s, t, (d(s), d(t))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
		.flatMap(new EdgeStats<>())
			.setParallelism(parallelism)
			.name("Edge stats")
		.groupBy(0, 1)
		.reduceGroup(new ReduceEdgeStats<>())
			.setParallelism(parallelism)
			.name("Reduce edge stats")
		.groupBy(0)
		.reduce(new SumEdgeStats<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Sum edge stats");

	edgeMetricsHelper = new EdgeMetricsHelper<>();

	edgeStats
		.output(edgeMetricsHelper)
			.setParallelism(parallelism)
			.name("Edge metrics");

	return this;
}
 
示例2
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
		throws Exception {
	super.run(input);

	// s, t, (d(s), d(t))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
		.flatMap(new EdgeStats<>())
			.setParallelism(parallelism)
			.name("Edge stats")
		.groupBy(0, 1)
		.reduceGroup(new ReduceEdgeStats<>())
			.setParallelism(parallelism)
			.name("Reduce edge stats")
		.groupBy(0)
		.reduce(new SumEdgeStats<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Sum edge stats");

	edgeMetricsHelper = new EdgeMetricsHelper<>();

	edgeStats
		.output(edgeMetricsHelper)
			.setParallelism(parallelism)
			.name("Edge metrics");

	return this;
}
 
示例3
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
		throws Exception {
	super.run(input);

	// s, t, (d(s), d(t))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
		.flatMap(new EdgeStats<>())
			.setParallelism(parallelism)
			.name("Edge stats")
		.groupBy(0, 1)
		.reduceGroup(new ReduceEdgeStats<>())
			.setParallelism(parallelism)
			.name("Reduce edge stats")
		.groupBy(0)
		.reduce(new SumEdgeStats<>())
		.setCombineHint(CombineHint.HASH)
			.setParallelism(parallelism)
			.name("Sum edge stats");

	edgeMetricsHelper = new EdgeMetricsHelper<>();

	edgeStats
		.output(edgeMetricsHelper)
			.setParallelism(parallelism)
			.name("Edge metrics");

	return this;
}
 
示例4
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// u, v, bitmask where u < v
	DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
		.getEdges()
		.map(new OrderByID<>())
			.setParallelism(parallelism)
			.name("Order by ID")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by ID");

	// u, v, (deg(u), deg(v))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
		.map(new OrderByDegree<>())
			.setParallelism(parallelism)
			.name("Order by degree")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by degree");

	// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
	DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
		.groupBy(0)
		.sortGroup(1, Order.ASCENDING)
		.reduceGroup(new GenerateTriplets<>())
			.name("Generate triplets");

	// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
	DataSet<Result<K>> triangles = triplets
		.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
		.where(1, 2)
		.equalTo(0, 1)
		.with(new ProjectTriangles<>())
			.name("Triangle listing");

	if (permuteResults) {
		triangles = triangles
			.flatMap(new PermuteResult<>())
				.name("Permute triangle vertices");
	} else if (sortTriangleVertices.get()) {
		triangles = triangles
			.map(new SortTriangleVertices<>())
				.name("Sort triangle vertices");
	}

	return triangles;
}
 
示例5
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// u, v, bitmask where u < v
	DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
		.getEdges()
		.map(new OrderByID<>())
			.setParallelism(parallelism)
			.name("Order by ID")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by ID");

	// u, v, (deg(u), deg(v))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
		.map(new OrderByDegree<>())
			.setParallelism(parallelism)
			.name("Order by degree")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by degree");

	// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
	DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
		.groupBy(0)
		.sortGroup(1, Order.ASCENDING)
		.reduceGroup(new GenerateTriplets<>())
			.name("Generate triplets");

	// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
	DataSet<Result<K>> triangles = triplets
		.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
		.where(1, 2)
		.equalTo(0, 1)
		.with(new ProjectTriangles<>())
			.name("Triangle listing");

	if (permuteResults) {
		triangles = triangles
			.flatMap(new PermuteResult<>())
				.name("Permute triangle vertices");
	} else if (sortTriangleVertices.get()) {
		triangles = triangles
			.map(new SortTriangleVertices<>())
				.name("Sort triangle vertices");
	}

	return triangles;
}
 
示例6
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// u, v, bitmask where u < v
	DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
		.getEdges()
		.map(new OrderByID<>())
			.setParallelism(parallelism)
			.name("Order by ID")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by ID");

	// u, v, (deg(u), deg(v))
	DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
		.run(new EdgeDegreesPair<K, VV, EV>()
			.setParallelism(parallelism));

	// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
	DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
		.map(new OrderByDegree<>())
			.setParallelism(parallelism)
			.name("Order by degree")
		.groupBy(0, 1)
		.reduceGroup(new ReduceBitmask<>())
			.setParallelism(parallelism)
			.name("Flatten by degree");

	// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
	DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
		.groupBy(0)
		.sortGroup(1, Order.ASCENDING)
		.reduceGroup(new GenerateTriplets<>())
			.name("Generate triplets");

	// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
	DataSet<Result<K>> triangles = triplets
		.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
		.where(1, 2)
		.equalTo(0, 1)
		.with(new ProjectTriangles<>())
			.name("Triangle listing");

	if (permuteResults) {
		triangles = triangles
			.flatMap(new PermuteResult<>())
				.name("Permute triangle vertices");
	} else if (sortTriangleVertices.get()) {
		triangles = triangles
			.map(new SortTriangleVertices<>())
				.name("Sort triangle vertices");
	}

	return triangles;
}