[apache-spark] Apache Spark : 재 파티셔닝, 정렬 및 캐싱이 조인에 미치는 영향

테이블을 자체에 조인 할 때 Spark의 동작을 탐색 중입니다. Databricks를 사용하고 있습니다.

내 더미 시나리오는 다음과 같습니다

  1. 외부 테이블을 데이터 프레임 A로 읽습니다 (기본 파일은 델타 형식 임)

  2. 특정 열만 선택한 상태에서 데이터 프레임 B를 데이터 프레임 A로 정의

  3. column1 및 column2의 데이터 프레임 A 및 B 조인

(예, 이해가되지 않습니다. Spark의 기본 메커니즘을 이해하기 위해 실험하고 있습니다)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

첫 번째 시도는 코드를 그대로 실행하는 것입니다 (시도 1). 그런 다음 다시 파티션하고 캐시하려고했습니다 (시도 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

마지막으로 파티션을 나누고 분류하고 캐시했습니다.

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

생성 된 각 dags는 첨부 된대로입니다.

내 질문은 :

  1. 시도 1에서 캐싱이 명시 적으로 지정되지 않은 경우에도 테이블이 캐시 된 것처럼 보입니다.

  2. InMemoreTableScan 뒤에 항상이 유형의 다른 노드가 오는 이유는 무엇입니까?

  3. 시도 3에서 캐싱이 두 단계로 진행되는 이유는 무엇입니까?

  4. 시도 3 왜 WholeStageCodegen은 하나의 InMemoreTableScan을 따릅니다.

시도 1

시도 2

여기에 이미지 설명을 입력하십시오



답변

이 3 가지 계획에서 관찰하고있는 것은 DataBricks 런타임과 Spark의 혼합입니다.

우선 DataBricks 런타임 3.3 이상을 실행하는 동안 모든 쪽모이 세공 파일에 대해 캐싱이 자동으로 활성화됩니다. 해당 설정 :
spark.databricks.io.cache.enabled true

두 번째 쿼리의 경우 조인이 호출 될 때 spark가 데이터 세트 A와 데이터 세트 B를 병렬로 계산하려고했기 때문에 InMemoryTableScan 이 두 번 발생했습니다. 다른 실행자가 위의 작업을 할당 받았다고 가정하면 둘 다 (DataBricks) 캐시에서 테이블을 스캔해야합니다.

세 번째는 InMemoryTableScan 이 캐싱 자체를 참조하지 않습니다. 이는 계획된 촉매가 무엇이든 캐시 된 테이블을 여러 번 스캔하는 것을 의미합니다.

추신 : 나는 포인트 4를 시각화 할 수 없습니다 🙂


답변