[scala] (왜) 캐시를 호출하거나 RDD를 유지해야합니까?

RDD (Resilient Distributed Dataset)가 텍스트 파일 또는 컬렉션 (또는 다른 RDD)에서 생성 될 때 RDD 데이터를 메모리에 저장하려면 “캐시”또는 “지속”을 명시 적으로 호출해야합니까? 또는 RDD 데이터가 기본적으로 메모리에 분산 방식으로 저장됩니까?

val textFile = sc.textFile("/user/emp.txt")

내 이해에 따라 위의 단계 후에 textFile은 RDD이며 노드의 모든 / 일부 메모리에서 사용할 수 있습니다.

그렇다면 왜 textFile RDD에서 “캐시”또는 “지속”을 호출해야합니까?



답변

대부분의 RDD 작업은 게으르다. RDD를 일련의 작업에 대한 설명으로 생각하십시오. RDD는 데이터가 아닙니다. 따라서이 줄 :

val textFile = sc.textFile("/user/emp.txt")

아무것도하지 않습니다. “이 파일을로드해야합니다”라는 RDD를 작성합니다. 이 시점에서 파일이로드되지 않습니다.

데이터의 내용을 관찰해야하는 RDD 작업은 지연 될 수 없습니다. (이것은 actions 이라고합니다 .) 예는 RDD.count— 파일의 줄 수를 알려면 파일을 읽어야합니다. 따라서을 쓰면 textFile.count이 시점에서 파일을 읽고 행을 계산하고 카운트를 반환합니다.

textFile.count다시 전화하면 어떻게 되나요? 같은 것 : 파일을 읽고 다시 계산합니다. 아무것도 저장되지 않습니다. RDD는 데이터가 아닙니다.

그래서 무엇을 RDD.cache합니까? textFile.cache위 코드에 추가 하면

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

아무것도하지 않습니다. RDD.cache또한 게으른 작업입니다. 파일을 여전히 읽지 못했습니다. 그러나 이제 RDD는 “이 파일을 읽고 내용을 캐시합니다”라고 말합니다. 그런 다음 textFile.count처음 실행 하면 파일이로드, 캐시 및 계산됩니다. textFile.count두 번째로 호출 하면 작업이 캐시를 사용합니다. 캐시에서 데이터를 가져 와서 줄을 계산합니다.

캐시 동작은 사용 가능한 메모리에 따라 다릅니다. 예를 들어 파일이 메모리에 맞지 않으면 textFile.count일반적인 동작으로 돌아가 파일을 다시 읽습니다.


답변

나는 그 질문이 다음과 같이 더 잘 공식화 될 것이라고 생각합니다.

언제 캐시를 호출하거나 RDD를 유지해야합니까?

스파크 프로세스는 게으르다. 즉, 필요할 때까지 아무 일도 일어나지 않을 것이다. 질문에 신속하게 답변하기 위해 val textFile = sc.textFile("/user/emp.txt")발행 된 후에 는 데이터가 아무 것도 발생하지 않으며 HadoopRDD파일을 소스로 사용하여 a 만 구성됩니다.

해당 데이터를 약간 변환한다고 가정 해 보겠습니다.

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

다시 말하지만 데이터에는 아무런 변화가 없습니다. 이제 필요한 경우 적용 할 wordsRDD참조 testFile및 함수 가 포함 된 새로운 RDD 가 있습니다 .

과 같은 RDD에 대해 조치가 호출 될 때만 계보wordsRDD.count 라고하는 RDD 체인 이 실행됩니다. 즉, 파티션으로 분류 된 데이터는 Spark 클러스터의 실행 프로그램에 의해로드되고 함수가 적용되고 결과가 계산됩니다.flatMap

이 예의 선형 계보에서는 cache()필요하지 않습니다. 데이터가 실행기에로드되고 모든 변환이 적용되고 count데이터가 메모리에 맞는 경우 메모리에 모두 계산됩니다.

cacheRDD 계보가 분기 될 때 유용합니다. 이전 예제의 단어를 긍정적 인 단어와 부정적인 단어의 개수로 필터링한다고 가정 해 봅시다. 당신은 이렇게 할 수 있습니다 :

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

여기서 각 지점은 데이터를 다시로드합니다. 명시 적 cache명령문을 추가하면 이전에 수행 된 처리가 보존되고 재사용됩니다. 작업은 다음과 같습니다.

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

이러한 이유로 cache추가 처리를 위해 재사용 할 수있는 체크 포인트를 생성하므로 ‘계보를 깰’것으로 알려져 있습니다.

경험 법칙 : cacheRDD 계보가 분기 되거나 RDD가 루프 에서처럼 여러 번 사용될 때 사용합니다.


답변

RDD 데이터를 메모리에 저장하려면 명시 적으로 “캐시”또는 “지속적”을 호출해야합니까?

예, 필요한 경우에만 가능합니다.

RDD 데이터는 기본적으로 메모리에 분산 방식으로 저장됩니까?

아니!

그리고 그 이유는 다음과 같습니다.

  • Spark는 두 가지 유형의 공유 변수, 즉 모든 노드에서 메모리에 값을 캐시하는 데 사용할 수있는 브로드 캐스트 변수와 카운터 및 합계와 같이 “추가 된”변수 인 누산기의 두 가지 공유 변수를 지원합니다.

  • RDD는 기존의 데이터 세트에서 새 데이터 세트를 작성하는 변환과 데이터 세트에서 계산을 실행 한 후 드라이버 프로그램에 값을 리턴하는 조치의 두 가지 유형의 조작을 지원합니다. 예를 들어 map은 함수를 통해 각 데이터 집합 요소를 전달하고 결과를 나타내는 새 RDD를 반환하는 변환입니다. 반면에 reduce는 일부 함수를 사용하여 RDD의 모든 요소를 ​​집계하고 최종 결과를 드라이버 프로그램으로 리턴하는 조치입니다 (분산 데이터 세트를 리턴하는 병렬 reduceByKey도 있음).

  • Spark의 모든 변환은 결과를 즉시 계산하지 않기 때문에 게으 릅니다. 대신, 일부 기본 데이터 세트 (예 : 파일)에 적용된 변환 만 기억합니다. 변환은 조치가 드라이버 프로그램으로 결과를 리턴해야하는 경우에만 계산됩니다. 이 디자인을 통해 Spark를보다 효율적으로 실행할 수 있습니다. 예를 들어 맵을 통해 생성 된 데이터 세트가 축소에 사용되며 축소 된 결과 만 더 큰 매핑 된 데이터 세트가 아닌 드라이버로 반환한다는 것을 알 수 있습니다.

  • 기본적으로 변환 된 각 RDD는 작업을 실행할 때마다 다시 계산 될 수 있습니다. 그러나 persist (또는 캐시) 방법을 사용하여 메모리에 RDD를 유지하는 경우도 있습니다.이 경우 Spark는 다음에 쿼리 할 때 훨씬 빠르게 액세스 할 수 있도록 요소를 클러스터에 유지합니다. 디스크에 RDD를 유지하거나 여러 노드에 복제 할 수도 있습니다.

자세한 내용은 Spark 프로그래밍 안내서 를 확인하십시오 .


답변

다음은 RDD를 캐시해야하는 세 가지 상황입니다.

RDD를 여러 번 사용

동일한 RDD에서 여러 작업 수행

긴 체인 (또는 매우 비싼) 변환의 경우


답변

cache메소드 호출 을 추가 (또는 임시로 추가) 하는 다른 이유 추가

디버그 메모리 문제

cache있어서, 스파크는 RDD의 크기에 대한 디버깅 정보를 제공한다. spark 통합 UI에서는 RDD 메모리 소비 정보를 얻을 수 있습니다. 그리고 이것은 메모리 문제를 진단하는데 매우 도움이되었습니다.


답변