[apache-spark] 스파크-repartition () vs coalesce ()

Learning Spark에 따르면

데이터를 다시 파티션하는 것은 비용이 많이 드는 작업입니다. 또한 Spark에는 최적화 된 버전의 repartition()호출 coalesce()이있어 데이터 이동을 피할 수 있지만 RDD 파티션 수를 줄이는 경우에만 가능합니다.

내가 얻는 한 가지 차이점 repartition()은 파티션 수를 늘리거나 줄일 coalesce()수 있지만 파티션 수를 줄이면 줄일 수 있다는 것입니다.

파티션이 여러 시스템에 분산되어 coalesce()실행되는 경우 어떻게 데이터 이동을 피할 수 있습니까?



답변

전체 셔플을 피합니다 . 숫자가 줄어드는 것으로 알려진 경우 실행 프로그램은 최소 수의 파티션에 데이터를 안전하게 보관할 수 있으며 추가 노드에서 데이터를 우리가 유지 한 노드로만 이동할 수 있습니다.

따라서 다음과 같이 진행됩니다.

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

그런 다음 coalesce2 개의 파티션으로 줄입니다.

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

노드 1과 노드 3은 원래 데이터를 이동할 필요가 없었습니다.


답변

저스틴의 대답은 대단하고이 반응은 더 깊이 들어가게됩니다.

repartition알고리즘은 전체 셔플을 수행하고 고르게 분산 된 데이터로 새 파티션을 만듭니다. 1에서 12까지의 숫자로 DataFrame을 만듭니다.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf 내 컴퓨터에 4 개의 파티션이 있습니다.

numbersDf.rdd.partitions.size // => 4

파티션에서 데이터를 나누는 방법은 다음과 같습니다.

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

repartition방법 으로 전체 셔플을 수행하고 두 노드에서이 데이터를 가져 오 겠습니다 .

val numbersDfR = numbersDf.repartition(2)

numbersDfR내 컴퓨터 에서 데이터가 분할되는 방법은 다음과 같습니다 .

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition방법을 사용하면 새 파티션을 만들고 새 파티션에 데이터를 균등하게 분배 할 수 있습니다 (데이터 분배는 더 큰 데이터 세트에 대해 더 균일합니다).

차이 사이 coalescerepartition

coalesce기존 파티션을 사용하여 섞은 데이터 양을 최소화합니다. repartition새 파티션을 만들고 전체 셔플을 수행합니다. coalesce데이터 양이 다른 파티션 (때로는 크기가 다른 파티션)을 생성하고 크기 repartition가 거의 같은 파티션을 생성합니다.

인가 coalesce또는 repartition빠른?

coalesce보다 빠른 실행이 가능 repartition하지만 크기가 다른 파티션은 일반적으로 같은 크기의 파티션보다 작동 속도가 느립니다. 일반적으로 큰 데이터 세트를 필터링 한 후 데이터 세트를 다시 파티셔닝해야합니다. 내가 발견 한 repartition스파크가 동일한 크기의 파티션 작업에 내장되어 있기 때문에 빠른 전체로.

주의 : 재 파티션으로 인해 디스크의 데이터 크기가 증가 할 수 있다는 것이 궁금 합니다. 대규모 데이터 세트에서 재 파티셔닝 / 연합을 사용할 때 테스트를 실행하십시오.

더 자세한 내용을 원하시면 이 블로그 게시물을 읽으십시오 .

실제로 통합 및 재 파티셔닝을 사용할 때


답변

여기서 주목할 점은 Spark RDD의 기본 원칙이 불변성이라는 것입니다. 재 파티션 또는 병합으로 새로운 RDD가 생성됩니다. 기본 RDD는 원래 파티션 수로 계속 존재합니다. 사용 사례가 캐시에서 RDD를 유지해야하는 경우 새로 작성된 RDD에 대해서도 동일하게 수행해야합니다.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2


답변

repartition -모든 데이터를 섞기 때문에 파티션 수를 늘리면서 사용하는 것이 좋습니다.

coalesce-파티션 수를 줄이면서 사용하는 것이 좋습니다. 예를 들어 3 개의 파티션이 있고이를 2로 줄이려면 coalesce세 번째 파티션 데이터를 파티션 1과 2로 이동합니다. 파티션 1과 2는 동일한 컨테이너에 남아 있습니다. 반면, repartition모든 파티션에서 데이터를 섞으면 실행기 간의 네트워크 사용량이 높아져 성능에 영향을 미칩니다.

coalescerepartition파티션 수를 줄이는 동안 보다 성능이 우수 합니다.


답변

무엇에서 다음 코드 와 코드 워드 프로세서 것은 즉 coalesce(n)과 동일 coalesce(n, shuffle = false)repartition(n)동일하다coalesce(n, shuffle = true)

따라서 coalescerepartition파티션 수를 늘리는 데 사용할 수 있습니다.

을 사용하면 shuffle = true실제로 더 많은 파티션으로 통합 할 수 있습니다. 이것은 소수의 파티션이 비정상적으로 클 수있는 소수의 파티션 (예 : 100)이있는 경우에 유용합니다.

강조해야 할 또 다른 중요한 참고 사항 은 파티션 수 를 크게 줄이면 셔플 버전 coalesce( repartition이 경우 와 동일) 을 사용하는 것이 좋습니다. 이렇게하면 부모 파티션 (여러 작업) 에서 계산 을 병렬 로 수행 할 수 있습니다 .

그러나 과도하게 병합을 수행하는 경우 numPartitions = 1(예 :의 경우 하나의 노드) 원하는 노드보다 적은 수의 노드에서 계산이 수행 될 수 있습니다 numPartitions = 1. 이를 피하기 위해을 전달할 수 있습니다 shuffle = true. 이렇게하면 셔플 단계가 추가되지만 현재 업스트림 파티션이 병렬로 실행됩니다 (현재 파티션이 무엇이든간에).

여기 에서 관련 답변을 참조 하십시오


답변

모든 답변은이 자주 묻는 질문에 훌륭한 지식을 추가하고 있습니다.

이 질문의 타임 라인의 전통에 따라 여기 2 센트가 있습니다.

매우 특별한 경우에, repartition이 coalesce보다 빠르다는 것을 알았습니다 .

내 응용 프로그램에서 우리가 추정하는 파일 수가 특정 임계 값보다 적을 때 재 파티션이 더 빨리 작동합니다.

여기 내가 무슨 뜻인지

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

위의 스 니펫에서 파일이 20 미만인 경우 병합이 끝나는 데 시간이 오래 걸리고 파티션이 훨씬 빠르며 위의 코드가되었습니다.

물론이 숫자 (20)는 작업자 수와 데이터 양에 따라 다릅니다.

희망이 도움이됩니다.


답변

다시 파티션 : 데이터를 새로운 수의 파티션으로 섞습니다.

예 : 초기 데이터 프레임은 200 개의 파티션으로 분할됩니다.

df.repartition(500): 200 개의 파티션에서 새로운 500 개의 파티션으로 데이터가 섞입니다.

Coalesce : 기존 파티션 수로 데이터를 섞습니다 .

df.coalesce(5): 나머지 195 개의 파티션에서 5 개의 기존 파티션으로 데이터가 섞입니다.