Java源码示例:org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair
示例1
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
.flatMap(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0, 1)
.reduceGroup(new ReduceEdgeStats<>())
.setParallelism(parallelism)
.name("Reduce edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
示例2
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
.flatMap(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0, 1)
.reduceGroup(new ReduceEdgeStats<>())
.setParallelism(parallelism)
.name("Reduce edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
示例3
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
.flatMap(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0, 1)
.reduceGroup(new ReduceEdgeStats<>())
.setParallelism(parallelism)
.name("Reduce edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
示例4
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, bitmask where u < v
DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
.getEdges()
.map(new OrderByID<>())
.setParallelism(parallelism)
.name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by ID");
// u, v, (deg(u), deg(v))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
.map(new OrderByDegree<>())
.setParallelism(parallelism)
.name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by degree");
// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<>())
.name("Generate triplets");
// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
DataSet<Result<K>> triangles = triplets
.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<>())
.name("Triangle listing");
if (permuteResults) {
triangles = triangles
.flatMap(new PermuteResult<>())
.name("Permute triangle vertices");
} else if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<>())
.name("Sort triangle vertices");
}
return triangles;
}
示例5
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, bitmask where u < v
DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
.getEdges()
.map(new OrderByID<>())
.setParallelism(parallelism)
.name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by ID");
// u, v, (deg(u), deg(v))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
.map(new OrderByDegree<>())
.setParallelism(parallelism)
.name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by degree");
// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<>())
.name("Generate triplets");
// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
DataSet<Result<K>> triangles = triplets
.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<>())
.name("Triangle listing");
if (permuteResults) {
triangles = triangles
.flatMap(new PermuteResult<>())
.name("Permute triangle vertices");
} else if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<>())
.name("Sort triangle vertices");
}
return triangles;
}
示例6
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// u, v, bitmask where u < v
DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
.getEdges()
.map(new OrderByID<>())
.setParallelism(parallelism)
.name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by ID");
// u, v, (deg(u), deg(v))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
.map(new OrderByDegree<>())
.setParallelism(parallelism)
.name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Flatten by degree");
// u, v, w, bitmask where (u, v) and (u, w) are edges in graph
DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<>())
.name("Generate triplets");
// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
DataSet<Result<K>> triangles = triplets
.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<>())
.name("Triangle listing");
if (permuteResults) {
triangles = triangles
.flatMap(new PermuteResult<>())
.name("Permute triangle vertices");
} else if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<>())
.name("Sort triangle vertices");
}
return triangles;
}