[scala] HashPartitioner는 어떻게 작동합니까?

의 문서를 읽었습니다 HashPartitioner. 불행히도 API 호출을 제외하고는 많은 설명이 없습니다. 나는 HashPartitioner키의 해시를 기반으로 분산 세트 를 분할 한다는 가정하에 있습니다. 예를 들어 내 데이터가

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

따라서 파티 셔 너는 동일한 키가 동일한 파티션에있는 다른 파티션에 이것을 넣습니다. 그러나 생성자 인수의 중요성을 이해하지 못합니다.

new HashPartitoner(numPartitions) //What does numPartitions do?

위의 데이터 세트의 경우 결과가 어떻게 다를까요?

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

그렇다면 HashPartitioner실제로 어떻게 작동합니까?



답변

음, 데이터 세트를 약간 더 흥미롭게 만들 수 있습니다.

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

6 가지 요소가 있습니다.

rdd.count
Long = 6

파티 셔너 없음 :

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

8 개의 파티션 :

rdd.partitions.length
Int = 8

이제 파티션 당 요소 수를 세는 작은 도우미를 정의 해 보겠습니다.

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

파티 셔 너가 없기 때문에 데이터 세트가 파티션간에 균일하게 배포됩니다 ( Spark의 기본 파티셔닝 체계 ).

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

초기 배포

이제 데이터 세트를 다시 분할하겠습니다.

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

에 전달 된 매개 변수 HashPartitioner는 파티션 수 를 정의 하므로 하나의 파티션이 필요합니다.

rddOneP.partitions.length
Int = 1

파티션이 하나뿐이므로 모든 요소가 포함됩니다.

countByPartition(rddOneP).collect
Array[Int] = Array(6)

해시 파티션 -1

셔플 후 값의 순서는 결정적이지 않습니다.

우리가 사용한다면 같은 방법 HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

우리는 2 개의 파티션을 얻을 것입니다 :

rddTwoP.partitions.length
Int = 2

rdd키 데이터에 의해 분할 되므로 더 이상 균일하게 배포되지 않습니다.

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

세 개의 키와 두 개의 다른 hashCodemod 값만 있기 때문에 numPartitions여기서 예상치 못한 것은 없습니다.

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

위 사항을 확인하기 위해 :

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

해시 파티션 -2

마지막으로 HashPartitioner(7)우리는 각각 2 개의 요소가있는 비어 있지 않은 3 개의 파티션, 7 개의 파티션을 얻습니다.

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

해시 파티션 -7

요약 및 참고 사항

  • HashPartitioner 파티션 수를 정의하는 단일 인수를 사용합니다.
  • 값은 hash키를 사용하여 파티션에 할당됩니다 . hash기능은 언어에 따라 다를 수 있습니다 (Scala RDD는 hashCode, DataSetsMurmurHash 3, PySpark, 사용 portable_hash).

    이와 같은 간단한 경우 key가 작은 정수인 hash경우 ID ( i = hash(i)) 라고 가정 할 수 있습니다 .

    Scala API는 nonNegativeMod계산 된 해시를 기반으로 파티션을 결정 하는 데 사용 합니다.

  • 키 배포가 균일하지 않은 경우 클러스터의 일부가 유휴 상태 일 수 있습니다.

  • 키는 해시 가능해야합니다. PySpark의 reduceByKey대한 키로 A list에 대한 내 대답을 확인 하여 PySpark 특정 문제에 대해 읽을 수 있습니다. 또 다른 가능한 문제는 HashPartitioner 문서에 강조되어 있습니다 .

    Java 배열에는 내용이 아닌 배열의 ID를 기반으로하는 hashCode가 있으므로 HashPartitioner를 사용하여 RDD [Array [ ]] 또는 RDD [(Array [ ], _)] 를 분할하려고 하면 예기치 않거나 잘못된 결과가 생성됩니다.

  • Python 3에서는 해싱이 일관 적인지 확인해야합니다. 참조 예외 기능 : 문자열의 해시의 임의성이 PYTHONHASHSEED을 통해 비활성화해야 pyspark에 의미?

  • 해시 파티 셔 너는 주입 형도 아니고 대체 형도 아닙니다. 단일 파티션에 여러 키를 할당 할 수 있으며 일부 파티션은 비어있을 수 있습니다.

  • 현재 해시 기반 메서드는 REPL 정의 케이스 클래스 ( Apache Spark의 Case 클래스 동등성) 와 결합 될 때 Scala에서 작동하지 않습니다 .

  • HashPartitioner(또는 기타 Partitioner) 데이터를 섞습니다. 분할이 여러 작업간에 재사용되지 않는 한 셔플 할 데이터 양을 줄이지 않습니다.


답변

RDD이것은 여러 부분으로 나뉘어져 있음을 의미합니다. 이 파티션 각각은 잠재적으로 다른 시스템에 있습니다. 인수가있는 해시 파티 셔 너는 다음과 같은 방식으로 numPartitions쌍을 배치 할 파티션을 선택합니다 (key, value).

  1. 정확히 numPartitions파티션을 만듭니다 .
  2. (key, value)번호가있는 파티션의 장소Hash(key) % numPartitions


답변

HashPartitioner.getPartition메서드는 를 인수로 사용하고 키가 속한 파티션 의 인덱스 를 반환합니다 . 파티 셔 너는 유효한 인덱스가 무엇인지 알아야하므로 올바른 범위의 숫자를 반환합니다. 파티션 수는 numPartitions생성자 인수를 통해 지정됩니다 .

구현은 대략 key.hashCode() % numPartitions. 자세한 내용은 Partitioner.scala 를 참조하세요.


답변