我想添加一个利用图拓扑信息的新分区策略。不过,我发现分区策略只有一个功能,如下所示。我找不到任何可以接收图形数据的函数。
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
println("partitioning!")
numParts
}
,此函数只能获取一条 src-DST 信息。
在 spark graphx source org.apache.spark.graphx.impl.GraphImpl
中,我发现代码如下:
override def partitionBy(
partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
val edTag = classTag[ED]
val vdTag = classTag[VD]
val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}
,.partitionBy(new HashPartitioner(numPartitions))
如下,partitionBy
来自 PairRDDFunctions
类如下,
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
哈希分区程序
如下,
/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
,但这些函数无法获取图形数据。
我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp代码,在预处理阶段,他们可以获取图形,因此可以使用图形拓扑信息。
我想利用图形拓扑信息,但我不知道如何在 Spark 中添加一个新函数来获取图形数据,然后为每个边缘提供一个新的分区 ID
。
这是一种方法:
分区策略
举个虚拟示例,下面是使用以下规则对图形进行分区的代码片段:如果目标也是图形
中的源,则将其分配给分区 0,否则分配给分区 1
val graph: Graph[_, _] = [...]
graph.partitionBy(
new PartitionStrategy {
// select distinct sources only
val capturedGraphData: Set[Long] = graph
.edges
.map(e => e.srcId)
.distinct()
.collect()
.toSet
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
if(capturedGraphData.contains(dst)) 0
else 1
}
}
)
关于可伸缩性的注意事项:如果用例需要大小太大的捕获 GraphData
,驱动程序和执行程序的内存将很痛苦,这就是为什么只从图形中选择最少的必要信息很重要的原因,因为它将在驱动程序上收集并广播给每个执行程序。