[performance] 아파치 스파크 : map vs mapPartitions?

RDD mapmapPartitions방법 차이점은 무엇입니까 ? 그리고 flatMap좋아 map하거나 좋아 mapPartitions합니까? 감사.

(편집) 즉, 의미 적으로 또는 실행 측면에서 차이점은 무엇입니까?

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

과:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }



답변

RDD의 map과 mapPartitions 메소드의 차이점은 무엇입니까?

메소드 은 함수를 적용하여 소스 RDD의 각 요소 를 결과 RDD의 단일 요소로 변환 합니다. mapPartitions 는 소스 RDD의 각 파티션 을 결과의 여러 요소 (아마도 없음)로 변환합니다.

그리고 flatMap은 map 또는 mapPartitions처럼 동작합니까?

flatMap 은 단일 요소 (as map) 에서 작동 하지 않으며 결과의 여러 요소 (as )를 생성합니다 mapPartitions.


답변

꼬마 도깨비. 팁 :

RDD요소 당 한 번이 아니라 여러 요소에 대해 한 번만 수행해야하는 강력한 초기화가있을 때 RDD, 타사 라이브러리에서 객체 생성과 같은이 초기화를 직렬화 할 수없는 경우 (Spark에서 클러스터로 전송할 수 있도록) 작업자 노드) mapPartitions()대신을
사용하십시오 map(). 들어 데이터 요소 mapPartitions()당 한 번이 아닌 작업자 작업 / 스레드 / 파티션 당 한 번 초기화를 수행 할 수 있습니다 ( 예 : 아래 참조).RDD

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. 않는 flatMap행동하라지도처럼 나처럼 mapPartitions?

예. flatmap.. 자체 설명 의 예 2를 참조하십시오 .

Q1. RDD의 차이 무엇 mapmapPartitions

mapmapPartitions파티션 레벨에서 기능 을 수행하는 동안 요소 별 레벨에서 사용되는 기능을 작동시킵니다
.

시나리오 예 : 특정RDD파티션에 100K 요소가있는경우 사용할 때 매핑 변환에 사용되는 함수를 100K 회 실행합니다map.

반대로 사용 mapPartitions하면 특정 함수를 한 번만 호출하지만 모든 100K 레코드를 전달하고 한 번의 함수 호출로 모든 응답을 다시 가져옵니다.

이후 성능 향상이있을 것입니다 map함수가 뭔가 우리가 (의 경우 한 번에 모든 요소에 전달 된 경우가 수행 할 필요가 없습니다 것이라고 비싼마다하고있다 특히, 특정 기능을 너무 여러 번에 작품 mappartitions).

지도

RDD의 각 항목에 변환 함수를 적용하고 결과를 새 RDD로 리턴합니다.

변형 나열

데프 맵 [U : ClassTag] (f : T => U) : RDD [U]

예 :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

이것은 각 파티션에 대해 한 번만 호출되는 특수 맵입니다. 각 파티션의 전체 내용은 입력 인수 (Iterarator [T])를 통해 순차적 인 값 스트림으로 제공됩니다. 사용자 정의 함수는 또 다른 Iterator [U]를 리턴해야합니다. 결합 된 결과 반복자는 자동으로 새로운 RDD로 변환됩니다. 선택한 분할로 인해 다음 결과에서 튜플 (3,4) 및 (6,7)이 누락되었습니다.

preservesPartitioning입력 함수가 파티 false셔 너를 유지하는지 여부를 나타냅니다. 이는 RDD 쌍이 아니고 입력 함수가 키를 수정하지 않는 한 없어야 합니다.

변형 나열

def mapPartitions [U : ClassTag] (f : 반복자 [T] => 반복자 [U], 보존 분할 : 부울 = 거짓) : RDD [U]

실시 예 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

실시 예 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

위의 프로그램은 다음과 같이 flatMap을 사용하여 작성할 수도 있습니다.

플랫 맵을 사용하는 예 2

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

결론 :

mapPartitionsmap한 번 / 요소가 아닌 한 번 / 파티션으로 함수를 호출하기 때문에 변환이 더 빠릅니다 .

추가 자료 : foreach 대 foreachPartitions 언제 사용 하는가?


답변

지도 :

  1. MapReduce의 map () 메소드와 매우 유사한 한 번에 하나의 행을 처리합니다.
  2. 매 행마다 변환에서 돌아옵니다.

맵 파티션

  1. 한 번에 전체 파티션을 처리합니다.
  2. 전체 파티션을 처리 한 후 함수에서 한 번만 리턴 할 수 있습니다.
  3. 전체 파티션을 처리 할 때까지 모든 중간 결과를 메모리에 보관해야합니다.
  4. MapReduce의 setup () map () 및 cleanup () 함수를 제공합니다

Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/


답변