[scala] 직렬화 할 수없는 작업 : 객체가 아닌 클래스에서만 클로저 외부에서 함수를 호출 할 때 java.io.NotSerializableException

클로저 외부에서 함수를 호출 할 때 이상한 동작이 발생합니다.

  • 함수가 객체에있을 때 모든 것이 작동합니다.
  • 함수가 클래스에있을 때 get :

직렬화 할 수없는 태스크 : java.io.NotSerializableException : testing

문제는 클래스가 아닌 객체의 코드가 필요하다는 것입니다. 왜 이런 일이 일어나는지 아십니까? 스칼라 개체가 직렬화되어 있습니까 (기본값)?

이것은 작동 코드 예입니다.

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

이것은 작동하지 않는 예입니다.

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}



답변

RDD는 직렬화 가능 인터페이스를 확장 하므로 이로 인해 작업이 실패하지 않습니다. 이제 이것이 RDDSpark로 직렬화 하고 피할 수 있음을 의미하지는 않습니다.NotSerializableException

Spark는 분산 컴퓨팅 엔진이며 주요 추상화는 RDD (Resilient Distributed Dataset )이며 분산 컬렉션으로 볼 수 있습니다. 기본적으로 RDD의 요소는 클러스터의 노드에 분할되어 있지만 Spark는이를 사용자로부터 추상화하여 사용자가 마치 RDD (컬렉션)와 마치 로컬 요소처럼 상호 작용할 수 있도록합니다.

너무 많은 세부 사항으로 얻을 수 있지만, 당신은 RDD (에 다른 변환을 실행할 때하지 않기 map, flatMap, filter등), 당신의 변환 코드 (폐쇄)입니다 :

  1. 드라이버 노드에서 직렬화
  2. 클러스터의 해당 노드로 배송
  3. 역 직렬화
  4. 마지막으로 노드에서 실행

물론 로컬에서 (예와 같이) 로컬로 실행할 수 있지만 모든 단계 (네트워크를 통한 배송 제외)는 계속 발생합니다. [이를 통해 프로덕션 환경에 배포하기 전에 버그를 발견 할 수 있습니다]

두 번째 경우 testing에는 map 함수 내부 에서 클래스에 정의 된 메소드를 호출하는 것 입니다. Spark는 메소드를 자체적으로 직렬화 할 수 없으므로 Spark는 전체 testing 클래스 를 직렬화하려고 시도 하므로 다른 JVM에서 실행될 때 코드가 계속 작동합니다. 두 가지 가능성이 있습니다.

클래스 테스트를 직렬화 할 수 있도록 Spark에서 전체 클래스를 직렬화 할 수 있습니다.

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

또는 someFuncSpark가 메소드를 직렬화 할 수 있도록 메소드 대신 함수 를 만듭니다 (함수는 스칼라의 객체입니다).

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

비슷하지만 클래스 직렬화와 동일한 문제는 관심이 될 수 있으며이 Spark Summit 2013 프레젠테이션에서 읽을 수 있습니다 .

보조 노트로서, 당신은 다시 작성할 수 있습니다 rddList.map(someFunc(_))rddList.map(someFunc)그들이 정확히 동일합니다. 일반적으로 두 번째는 읽기가 덜 장황하고 깨끗하므로 선호됩니다.

편집 (2015-03-15) : SPARK-5307SerializationDebugger를 도입 했으며 Spark 1.3.0은 그것을 사용하는 첫 번째 버전입니다. NotSerializableException에 직렬화 경로를 추가합니다 . NotSerializableException이 발생하면 디버거는 개체 그래프를 방문하여 직렬화 할 수없는 개체의 경로를 찾고 사용자가 개체를 쉽게 찾을 수 있도록 정보를 구성합니다.

OP의 경우 stdout으로 인쇄됩니다.

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)


답변

Grega의 답변 은 원래 코드가 작동하지 않는 이유와 문제를 해결하는 두 가지 방법을 설명하는 데 유용합니다. 그러나이 솔루션은 매우 유연하지 않습니다. 클로저에 Serializable제어 할 수없는 비 클래스 에 대한 메소드 호출이 포함 된 경우를 고려하십시오 . Serializable이 클래스에 태그를 추가 하거나 기본 구현을 변경하여 메소드를 함수로 변경할 수 없습니다.

Nilesh 는이를위한 훌륭한 해결 방법을 제시하지만 솔루션을보다 간결하고 일반적으로 만들 수 있습니다.

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

그런 다음이 함수 직렬화기를 사용하여 클로저 및 메소드 호출을 자동으로 랩핑 할 수 있습니다.

rdd map genMapper(someFunc)

KryoSerializationWrapperTwitter의 Chill은 이미 핵심 Spark에 의해 도입되었으므로이 기술은 액세스하기 위해 추가 상어 종속성이 필요하지 않은 이점이 있습니다 .


답변

https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- 를 피하는 패러다임 전환 방법을 제안하는 문제를 완전히 설명하는 완전한 대화 leaks-no-ws.md

가장 많이 투표 된 답변은 기본적으로 전체 언어 기능을 버리는 것입니다. 더 이상 메서드를 사용하지 않고 함수 만 사용합니다. 실제로 클래스의 함수형 프로그래밍 메소드는 피해야하지만 함수로 변환한다고해서 여기에서 설계 문제가 해결되지는 않습니다 (위 링크 참조).

이 특정 상황에서 빠른 수정으로 @transient주석을 사용 하여 위반 값을 직렬화하지 않도록 지시 할 수 있습니다 (여기서는 Spark.ctxOP의 이름을 따르는 Spark의 클래스가 아닌 사용자 정의 클래스입니다).

@transient
val rddList = Spark.ctx.parallelize(list)

rddList가 다른 곳에 살도록 코드를 재구성 할 수도 있지만, 이는 또한 불분명합니다.

미래는 아마도 포자 일 것이다

앞으로 Scala는 “포자”라고 불리는 것들을 포함시킬 것입니다. 이것은 “폐기물”에 의해 정확하게 끌려 가지 않는 것들을 미세하게 제어 할 수있게합니다. 또한 이것은 실수로 직렬화 할 수없는 유형 (또는 원치 않는 값)을 실수로 가져 오는 모든 실수를 지금보다 오히려 컴파일 오류로 바꿔야합니다. 이는 런타임 예외 / 메모리 누수입니다.

http://docs.scala-lang.org/sips/pending/spores.html

Kryo 직렬화에 대한 팁

kyro를 사용할 때는 등록이 필요하도록 메모리 누수 대신 오류가 발생합니다.

“마지막으로, kryo에 kryo.setRegistrationOptional (true)이 있다는 것을 알고 있지만 사용 방법을 알아내는 데 매우 어려운 시간이 있습니다.이 옵션을 설정하면 kryo가 등록되지 않은 경우 여전히 예외가 발생합니다. 클래스.”

kryo로 수업 등록 전략

물론 이것은 값 수준 제어가 아닌 유형 수준 제어 만 제공합니다.

… 더 많은 아이디어가 올 것입니다.


답변

다른 접근법을 사용 하여이 문제를 해결했습니다. 클로저를 통과하기 전에 객체를 직렬화하고 나중에 직렬화를 해제하면됩니다. 이 접근 방식은 클래스가 직렬화 가능하지 않은 경우에도 작동하지만 뒤에서 Kryo를 사용하기 때문입니다. 카레 만 있으면됩니다. 😉

내가 한 일의 예는 다음과 같습니다.

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

클래스, 컴패니언 객체, 중첩 클래스, 여러 타사 라이브러리에 대한 참조를 원하는만큼 Blah를 자유롭게 만들 수 있습니다.

KryoSerializationWrapper는 다음을 참조합니다 : https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala


답변

나는 비슷한 문제에 직면했고 Grega의 대답 에서 이해하는 것은

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

당신의 작은 동전의 방법은 직렬화하려고 someFunc (_) 방법을하지만, 방법은 직렬화되지 않습니다, 그것은 직렬화 클래스에 시도 테스트 다시 직렬화하지 않습니다.

따라서 코드를 작동 시키려면 doIT 메소드 안에 someFunc 를 정의해야합니다 . 예를 들면 다음과 같습니다.

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

그리고 여러 기능이 그림으로 나오면 모든 기능을 부모 컨텍스트에서 사용할 수 있어야합니다.


답변

이것이 스칼라에 적용되는지는 확실하지 않지만 Java에서는 NotSerializableException클로저가 직렬화 할 수없는 final필드에 액세스하지 않도록 코드를 리팩토링 하여 문제를 해결했습니다 .


답변

참고로 Spark 2.4에서는 많은 사람들 이이 문제를 겪을 것입니다. Kryo 직렬화가 향상되었지만 대부분의 경우 spark.kryo.unsafe = true 또는 순진한 kry 직렬 변환기를 사용할 수 없습니다.

빠른 수정을 위해 Spark 구성에서 다음을 변경하십시오.

spark.kryo.unsafe="false"

또는

spark.serializer="org.apache.spark.serializer.JavaSerializer"

명시 적 브로드 캐스트 변수를 사용하고 새로운 내장 된 Twitter-chill API를 사용 rdd.map(row =>하여 rdd.mapPartitions(partition => {함수 에서 함수 로 변환함으로써 발생하거나 개인적으로 작성하는 사용자 정의 RDD 변환을 수정 합니다.

오래된 (좋지 않은) 방법

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

대체 (더 나은) 방법

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

이 새로운 방법은 파티션 당 한 번만 브로드 캐스트 변수를 호출하는 것이 좋습니다. 클래스를 등록하지 않으면 여전히 Java Serialization을 사용해야합니다.