提问者:小点点

在 graphX 中,如何使用利用其拓扑的自定义分区策略对图形进行分区?


我想添加一个利用图拓扑信息的新分区策略。不过,我发现分区策略只有一个功能,如下所示。我找不到任何可以接收图形数据的函数。

  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    println("partitioning!")
    numParts
  }

,此函数只能获取一条 src-DST 信息。

在 spark graphx source org.apache.spark.graphx.impl.GraphImpl 中,我发现代码如下:

  override def partitionBy(
      partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
    val edTag = classTag[ED]
    val vdTag = classTag[VD]
    val newEdges = edges.withPartitionsRDD(edges.map { e =>
      val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
      (part, (e.srcId, e.dstId, e.attr))
    }
      .partitionBy(new HashPartitioner(numPartitions))
      .mapPartitionsWithIndex(
        { (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
          val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
          iter.foreach { message =>
            val data = message._2
            builder.add(data._1, data._2, data._3)
          }
          val edgePartition = builder.toEdgePartition
          Iterator((pid, edgePartition))
        }, preservesPartitioning = true)).cache()
    GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
  }

.partitionBy(new HashPartitioner(numPartitions)) 如下,partitionBy 来自 PairRDDFunctions 类如下,

  /**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

哈希分区程序如下,

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

,但这些函数无法获取图形数据。

我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp代码,在预处理阶段,他们可以获取图形,因此可以使用图形拓扑信息。

我想利用图形拓扑信息,但我不知道如何在 Spark 中添加一个新函数来获取图形数据,然后为每个边缘提供一个新的分区 ID


共1个答案

匿名用户

这是一种方法:

  • 从图表中收集最少的必要信息
  • 实例化捕获此信息的分区策略

举个虚拟示例,下面是使用以下规则对图形进行分区的代码片段:如果目标也是图形中的源,则将其分配给分区 0,否则分配给分区 1

val graph: Graph[_, _] = [...]

graph.partitionBy(
  new PartitionStrategy {
    // select distinct sources only
    val capturedGraphData: Set[Long] = graph
      .edges
      .map(e => e.srcId)
      .distinct()
      .collect()
      .toSet

    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      if(capturedGraphData.contains(dst)) 0
      else 1
    }
  }
)

关于可伸缩性的注意事项:如果用例需要大小太大的捕获 GraphData,驱动程序和执行程序的内存将很痛苦,这就是为什么只从图形中选择最少的必要信息很重要的原因,因为它将在驱动程序上收集并广播给每个执行程序。