클로저 외부에서 함수를 호출 할 때 이상한 동작이 발생합니다.
- 함수가 객체에있을 때 모든 것이 작동합니다.
- 함수가 클래스에있을 때 get :
직렬화 할 수없는 태스크 : java.io.NotSerializableException : testing
문제는 클래스가 아닌 객체의 코드가 필요하다는 것입니다. 왜 이런 일이 일어나는지 아십니까? 스칼라 개체가 직렬화되어 있습니까 (기본값)?
이것은 작동 코드 예입니다.
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
이것은 작동하지 않는 예입니다.
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
답변
RDD는 직렬화 가능 인터페이스를 확장 하므로 이로 인해 작업이 실패하지 않습니다. 이제 이것이 RDD
Spark로 직렬화 하고 피할 수 있음을 의미하지는 않습니다.NotSerializableException
Spark는 분산 컴퓨팅 엔진이며 주요 추상화는 RDD (Resilient Distributed Dataset )이며 분산 컬렉션으로 볼 수 있습니다. 기본적으로 RDD의 요소는 클러스터의 노드에 분할되어 있지만 Spark는이를 사용자로부터 추상화하여 사용자가 마치 RDD (컬렉션)와 마치 로컬 요소처럼 상호 작용할 수 있도록합니다.
너무 많은 세부 사항으로 얻을 수 있지만, 당신은 RDD (에 다른 변환을 실행할 때하지 않기 map
, flatMap
, filter
등), 당신의 변환 코드 (폐쇄)입니다 :
- 드라이버 노드에서 직렬화
- 클러스터의 해당 노드로 배송
- 역 직렬화
- 마지막으로 노드에서 실행
물론 로컬에서 (예와 같이) 로컬로 실행할 수 있지만 모든 단계 (네트워크를 통한 배송 제외)는 계속 발생합니다. [이를 통해 프로덕션 환경에 배포하기 전에 버그를 발견 할 수 있습니다]
두 번째 경우 testing
에는 map 함수 내부 에서 클래스에 정의 된 메소드를 호출하는 것 입니다. Spark는 메소드를 자체적으로 직렬화 할 수 없으므로 Spark는 전체 testing
클래스 를 직렬화하려고 시도 하므로 다른 JVM에서 실행될 때 코드가 계속 작동합니다. 두 가지 가능성이 있습니다.
클래스 테스트를 직렬화 할 수 있도록 Spark에서 전체 클래스를 직렬화 할 수 있습니다.
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
또는 someFunc
Spark가 메소드를 직렬화 할 수 있도록 메소드 대신 함수 를 만듭니다 (함수는 스칼라의 객체입니다).
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
비슷하지만 클래스 직렬화와 동일한 문제는 관심이 될 수 있으며이 Spark Summit 2013 프레젠테이션에서 읽을 수 있습니다 .
보조 노트로서, 당신은 다시 작성할 수 있습니다 rddList.map(someFunc(_))
에 rddList.map(someFunc)
그들이 정확히 동일합니다. 일반적으로 두 번째는 읽기가 덜 장황하고 깨끗하므로 선호됩니다.
편집 (2015-03-15) : SPARK-5307 은 SerializationDebugger를 도입 했으며 Spark 1.3.0은 그것을 사용하는 첫 번째 버전입니다. NotSerializableException에 직렬화 경로를 추가합니다 . NotSerializableException이 발생하면 디버거는 개체 그래프를 방문하여 직렬화 할 수없는 개체의 경로를 찾고 사용자가 개체를 쉽게 찾을 수 있도록 정보를 구성합니다.
OP의 경우 stdout으로 인쇄됩니다.
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
답변
Grega의 답변 은 원래 코드가 작동하지 않는 이유와 문제를 해결하는 두 가지 방법을 설명하는 데 유용합니다. 그러나이 솔루션은 매우 유연하지 않습니다. 클로저에 Serializable
제어 할 수없는 비 클래스 에 대한 메소드 호출이 포함 된 경우를 고려하십시오 . Serializable
이 클래스에 태그를 추가 하거나 기본 구현을 변경하여 메소드를 함수로 변경할 수 없습니다.
Nilesh 는이를위한 훌륭한 해결 방법을 제시하지만 솔루션을보다 간결하고 일반적으로 만들 수 있습니다.
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
그런 다음이 함수 직렬화기를 사용하여 클로저 및 메소드 호출을 자동으로 랩핑 할 수 있습니다.
rdd map genMapper(someFunc)
KryoSerializationWrapper
Twitter의 Chill은 이미 핵심 Spark에 의해 도입되었으므로이 기술은 액세스하기 위해 추가 상어 종속성이 필요하지 않은 이점이 있습니다 .
답변
https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- 를 피하는 패러다임 전환 방법을 제안하는 문제를 완전히 설명하는 완전한 대화 leaks-no-ws.md
가장 많이 투표 된 답변은 기본적으로 전체 언어 기능을 버리는 것입니다. 더 이상 메서드를 사용하지 않고 함수 만 사용합니다. 실제로 클래스의 함수형 프로그래밍 메소드는 피해야하지만 함수로 변환한다고해서 여기에서 설계 문제가 해결되지는 않습니다 (위 링크 참조).
이 특정 상황에서 빠른 수정으로 @transient
주석을 사용 하여 위반 값을 직렬화하지 않도록 지시 할 수 있습니다 (여기서는 Spark.ctx
OP의 이름을 따르는 Spark의 클래스가 아닌 사용자 정의 클래스입니다).
@transient
val rddList = Spark.ctx.parallelize(list)
rddList가 다른 곳에 살도록 코드를 재구성 할 수도 있지만, 이는 또한 불분명합니다.
미래는 아마도 포자 일 것이다
앞으로 Scala는 “포자”라고 불리는 것들을 포함시킬 것입니다. 이것은 “폐기물”에 의해 정확하게 끌려 가지 않는 것들을 미세하게 제어 할 수있게합니다. 또한 이것은 실수로 직렬화 할 수없는 유형 (또는 원치 않는 값)을 실수로 가져 오는 모든 실수를 지금보다 오히려 컴파일 오류로 바꿔야합니다. 이는 런타임 예외 / 메모리 누수입니다.
http://docs.scala-lang.org/sips/pending/spores.html
Kryo 직렬화에 대한 팁
kyro를 사용할 때는 등록이 필요하도록 메모리 누수 대신 오류가 발생합니다.
“마지막으로, kryo에 kryo.setRegistrationOptional (true)이 있다는 것을 알고 있지만 사용 방법을 알아내는 데 매우 어려운 시간이 있습니다.이 옵션을 설정하면 kryo가 등록되지 않은 경우 여전히 예외가 발생합니다. 클래스.”
물론 이것은 값 수준 제어가 아닌 유형 수준 제어 만 제공합니다.
… 더 많은 아이디어가 올 것입니다.
답변
다른 접근법을 사용 하여이 문제를 해결했습니다. 클로저를 통과하기 전에 객체를 직렬화하고 나중에 직렬화를 해제하면됩니다. 이 접근 방식은 클래스가 직렬화 가능하지 않은 경우에도 작동하지만 뒤에서 Kryo를 사용하기 때문입니다. 카레 만 있으면됩니다. 😉
내가 한 일의 예는 다음과 같습니다.
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
클래스, 컴패니언 객체, 중첩 클래스, 여러 타사 라이브러리에 대한 참조를 원하는만큼 Blah를 자유롭게 만들 수 있습니다.
KryoSerializationWrapper는 다음을 참조합니다 : https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
답변
나는 비슷한 문제에 직면했고 Grega의 대답 에서 이해하는 것은
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
당신의 작은 동전의 방법은 직렬화하려고 someFunc (_) 방법을하지만, 방법은 직렬화되지 않습니다, 그것은 직렬화 클래스에 시도 테스트 다시 직렬화하지 않습니다.
따라서 코드를 작동 시키려면 doIT 메소드 안에 someFunc 를 정의해야합니다 . 예를 들면 다음과 같습니다.
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
그리고 여러 기능이 그림으로 나오면 모든 기능을 부모 컨텍스트에서 사용할 수 있어야합니다.
답변
이것이 스칼라에 적용되는지는 확실하지 않지만 Java에서는 NotSerializableException
클로저가 직렬화 할 수없는 final
필드에 액세스하지 않도록 코드를 리팩토링 하여 문제를 해결했습니다 .
답변
참고로 Spark 2.4에서는 많은 사람들 이이 문제를 겪을 것입니다. Kryo 직렬화가 향상되었지만 대부분의 경우 spark.kryo.unsafe = true 또는 순진한 kry 직렬 변환기를 사용할 수 없습니다.
빠른 수정을 위해 Spark 구성에서 다음을 변경하십시오.
spark.kryo.unsafe="false"
또는
spark.serializer="org.apache.spark.serializer.JavaSerializer"
명시 적 브로드 캐스트 변수를 사용하고 새로운 내장 된 Twitter-chill API를 사용 rdd.map(row =>
하여 rdd.mapPartitions(partition => {
함수 에서 함수 로 변환함으로써 발생하거나 개인적으로 작성하는 사용자 정의 RDD 변환을 수정 합니다.
예
오래된 (좋지 않은) 방법
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
대체 (더 나은) 방법
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
이 새로운 방법은 파티션 당 한 번만 브로드 캐스트 변수를 호출하는 것이 좋습니다. 클래스를 등록하지 않으면 여전히 Java Serialization을 사용해야합니다.